프로그래밍/파이썬
[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)
말춘이
2020. 2. 22. 15:48
반응형
[파이썬] eBest Xing api 실시간조회, 멀티스레드 설계 (1)
[파이썬] eBest Xing api 실시간조회, 스레드 설계 구현 (2)
[파이썬] eBest Xing api 실시간조회, ThreadJob 구현 (3)
[파이썬] eBest Xing api 실시간조회, ConnectionManager 구현 (4)
[파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5)
[파이썬] eBest Xing api 실시간조회, NWS 뉴스 요청 구현 (6)
[파이썬] eBest Xing api 실시간조회, S3_ 코스피체결 구현 (7)
[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)
지금까지 구현된 모듈을 테스트해보겠다. ConnectionManager, NWS, S3_ 스레드들을 초기화하고 명령어를 넣는 작업을 해준다.
import time
from name_yalsooni.crawler_py.ebest.manager.connection import ConnectionManagerFactory
from name_yalsooni.crawler_py.ebest.realtime.query.nws import NWS
from name_yalsooni.crawler_py.ebest.realtime.query.s3 import S3
from name_yalsooni.crawler_py.ebest.util import Log
class RunnerRealTime:
cm = None
def execute(self):
Log.write("Process Start Up..")
Log.write("-- ConnectionManager Start --")
self.cm = ConnectionManagerFactory.get_instance()
self.cm.start()
Log.write("-- [NWS] NEWS Start --")
nws_event = NWS()
nws_event.start()
Log.write("-- [S3_] KOSPI Start --")
s3_event = S3()
s3_event.start()
Log.write("-- Push Command --")
# 뉴스 요청
nws_command = dict()
nws_command[nws_event.CM_NWCODE] = "NWS001"
nws_event.call(nws_command)
# GS건설 체결정보 요청
s3_command = dict()
s3_command[s3_event.CM_SHCODE] = "006360"
s3_event.call(s3_command)
# 10초 지연 후 APPEND 명령어
time.sleep(10)
# 현대건설 체결정보 요청
s3_command[s3_event.CM_SHCODE] = "000720"
s3_event.append_call(s3_command)
while True:
time.sleep(5)
def main():
RunnerRealTime().execute()
if __name__ == "__main__":
main()
- 실행 동영상
반응형