-
[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)프로그래밍/파이썬 2020. 2. 22. 15:48반응형
[파이썬] eBest Xing api 실시간조회, 멀티스레드 설계 (1)
[파이썬] eBest Xing api 실시간조회, 스레드 설계 구현 (2)
[파이썬] eBest Xing api 실시간조회, ThreadJob 구현 (3)
[파이썬] eBest Xing api 실시간조회, ConnectionManager 구현 (4)
[파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5)
[파이썬] eBest Xing api 실시간조회, NWS 뉴스 요청 구현 (6)
[파이썬] eBest Xing api 실시간조회, S3_ 코스피체결 구현 (7)
[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)
지금까지 구현된 모듈을 테스트해보겠다. ConnectionManager, NWS, S3_ 스레드들을 초기화하고 명령어를 넣는 작업을 해준다.
import time from name_yalsooni.crawler_py.ebest.manager.connection import ConnectionManagerFactory from name_yalsooni.crawler_py.ebest.realtime.query.nws import NWS from name_yalsooni.crawler_py.ebest.realtime.query.s3 import S3 from name_yalsooni.crawler_py.ebest.util import Log class RunnerRealTime: cm = None def execute(self): Log.write("Process Start Up..") Log.write("-- ConnectionManager Start --") self.cm = ConnectionManagerFactory.get_instance() self.cm.start() Log.write("-- [NWS] NEWS Start --") nws_event = NWS() nws_event.start() Log.write("-- [S3_] KOSPI Start --") s3_event = S3() s3_event.start() Log.write("-- Push Command --") # 뉴스 요청 nws_command = dict() nws_command[nws_event.CM_NWCODE] = "NWS001" nws_event.call(nws_command) # GS건설 체결정보 요청 s3_command = dict() s3_command[s3_event.CM_SHCODE] = "006360" s3_event.call(s3_command) # 10초 지연 후 APPEND 명령어 time.sleep(10) # 현대건설 체결정보 요청 s3_command[s3_event.CM_SHCODE] = "000720" s3_event.append_call(s3_command) while True: time.sleep(5) def main(): RunnerRealTime().execute() if __name__ == "__main__": main()
- 실행 동영상
반응형'프로그래밍 > 파이썬' 카테고리의 다른 글
[파이썬] eBest Xing api 실시간조회, 유틸 클래스 (0) 2020.02.22 [파이썬] eBest Xing api 실시간조회, S3_ 코스피체결 구현 (7) (4) 2020.02.22 [파이썬] eBest Xing api 실시간조회, NWS 뉴스 요청 구현 (6) (0) 2020.02.22 [파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5) (2) 2020.02.22 [파이썬] eBest Xing api 실시간조회, ConnectionManager 구현 (4) (0) 2020.02.22