프로그래밍/파이썬

[파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5)

말춘이 2020. 2. 22. 15:34
반응형

 

 

GitHub - malchooni/EBestAPI_Python: 파이썬 학습목적의 이베스트 API 구현

파이썬 학습목적의 이베스트 API 구현. Contribute to malchooni/EBestAPI_Python development by creating an account on GitHub.

github.com

 

 

[파이썬] eBest Xing api 실시간조회, 멀티스레드 설계 (1)

[파이썬] eBest Xing api 실시간조회, 스레드 설계 구현 (2)

[파이썬] eBest Xing api 실시간조회, ThreadJob 구현 (3)

[파이썬] eBest Xing api 실시간조회, ConnectionManager 구현 (4)

[파이썬] eBest Xing api 실시간조회, QueryThreadJob 구현 (5)

[파이썬] eBest Xing api 실시간조회, NWS 뉴스 요청 구현 (6)

[파이썬] eBest Xing api 실시간조회, S3_ 코스피체결 구현 (7)

[파이썬] eBest Xing api 실시간조회, 테스트 및 실행 화면 (8)

 

 

  QueryThreadJob은 모든 요청 쿼리들의 상위 클래스이다.
  공통부분을 상위 클래스에서 정의, 구현함으로써 불필요한 중복 코드를 쉽게 제거할 수 있다. 또한 요청 쿼리에 대한 표준을 구축함으로써 생산성이 향상될 것이다. 이 클래스를 상속받는 요청 쿼리들은 각각의 스레드로 별도 동작한다. 뉴스정보와 코스피 체결 클래스를 구현할 것인데 이 클래스를 상속받아 구현하게 된다.

 

from abc import *
 
from name_yalsooni.crawler_py.ebest.definition import ThreadJob
from name_yalsooni.crawler_py.ebest.manager.connection import ConnectionManagerFactory
from name_yalsooni.crawler_py.ebest.util import Log
from name_yalsooni.crawler_py.ebest.xasession import XASessionEventHandler
 
 
class EventHandler:
    event = None
 
 
class QueryThreadJob(ThreadJob):
    __metaclass__ = ABCMeta
 
    _OP_CALL = "CALL"
 
    _event = None
    _thread_name = None
 
    _operation_dict = None
    _connection_manager = None
 
    @abstractmethod
    def _operation_init(self):
        pass
 
    @abstractmethod
    def _operation_call(self):
        pass
 
    def __init__(self, thread_name, command_queue_timeout):
        ThreadJob.__init__(self, thread_name, command_queue_timeout)
        self._thread_name = thread_name
        self._operation_dict = dict()
 
    def _init(self):
        Log.write(self._thread_name + " init..")
        self._connection_manager = ConnectionManagerFactory.get_instance()
 
        #기본 오퍼레이션 등록
        self._operation_dict[self._OP_CALL] = self._operation_call
        self._operation_init()
 
    def _execute(self, command):
        if not XASessionEventHandler.login_flag:
            self._connection_manager.login_call()
 
        try:
            operation = self._operation_dict[command[self.CM_COMMAND]]
            operation(command)
        except KeyError:
            Log.write("Not found operation key : " + command[self.CM_COMMAND])
 
    def _shutdown(self):
        self._event.UnadviseRealData()
 
    # push call command
    def call(self, command):
        command[self.CM_COMMAND] = self._OP_CALL
        self._push_command(command)

 

- 25, 29 Line 
  _operation_init, _operation_call 추상화 메소드를 선언한다.
- 42 Line 
  _operation_dict 딕셔너리에 오퍼레이션 객체를 보관한다.
- 50 Line 
  _operation_dict에서 명령어로 오퍼레이션 객체를 꺼낸다.
  오퍼레이션 객체가 존재 하지 않으면 예외가 발생된다.

 

  이제 QueryThreadJob클래스를 상속받아 구현하는 뉴스정보와 코스피 체결정보 클래스를 구현해보겠다.

 

 

buycycle.name

'Buycycle'은 증권사 API를 HTTP Json으로 요청 및 응답 받을 수 있습니다. 요청 받은 Json 메시지를 증권사 API 양식에 맞게 변환해 주는 자바 기반의 오픈 소스 입니다. HTTP RESTful을 제공함으로써 사용자

buycycle.name

반응형