[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)
GitHub - malchooni/EBestAPI_Python: 파이썬 학습목적의 이베스트 API 구현
파이썬 학습목적의 이베스트 API 구현. Contribute to malchooni/EBestAPI_Python development by creating an account on GitHub.
github.com
[파이썬] eBest Xing api 실시간조회, 멀티스레드 설계 (1)
[파이썬] eBest Xing api 실시간조회, 스레드 설계 구현 (2)
[파이썬] eBest Xing api 실시간조회, ThreadJob 구현 (3)
[파이썬] eBest Xing api 실시간조회, ConnectionManager 구현 (4)
[파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5)
[파이썬] eBest Xing api 실시간조회, NWS 뉴스 요청 구현 (6)
[파이썬] eBest Xing api 실시간조회, S3_ 코스피체결 구현 (7)
[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)
지금까지 구현된 모듈을 테스트해보겠다. ConnectionManager, NWS, S3_ 스레드들을 초기화하고 명령어를 넣는 작업을 해준다.
import time
from name_yalsooni.crawler_py.ebest.manager.connection import ConnectionManagerFactory
from name_yalsooni.crawler_py.ebest.realtime.query.nws import NWS
from name_yalsooni.crawler_py.ebest.realtime.query.s3 import S3
from name_yalsooni.crawler_py.ebest.util import Log
class RunnerRealTime:
cm = None
def execute(self):
Log.write("Process Start Up..")
Log.write("-- ConnectionManager Start --")
self.cm = ConnectionManagerFactory.get_instance()
self.cm.start()
Log.write("-- [NWS] NEWS Start --")
nws_event = NWS()
nws_event.start()
Log.write("-- [S3_] KOSPI Start --")
s3_event = S3()
s3_event.start()
Log.write("-- Push Command --")
# 뉴스 요청
nws_command = dict()
nws_command[nws_event.CM_NWCODE] = "NWS001"
nws_event.call(nws_command)
# GS건설 체결정보 요청
s3_command = dict()
s3_command[s3_event.CM_SHCODE] = "006360"
s3_event.call(s3_command)
# 10초 지연 후 APPEND 명령어
time.sleep(10)
# 현대건설 체결정보 요청
s3_command[s3_event.CM_SHCODE] = "000720"
s3_event.append_call(s3_command)
while True:
time.sleep(5)
def main():
RunnerRealTime().execute()
if __name__ == "__main__":
main()
- 실행 동영상
buycycle.name
'Buycycle'은 증권사 API를 HTTP Json으로 요청 및 응답 받을 수 있습니다. 요청 받은 Json 메시지를 증권사 API 양식에 맞게 변환해 주는 자바 기반의 오픈 소스 입니다. HTTP RESTful을 제공함으로써 사용자
buycycle.name