Files
kis_bot/kiwoom_rest_api/websocket.py
2026-03-17 12:33:30 +09:00

321 lines
12 KiB
Python

import asyncio
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Union
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
from .config import get_ws_url, WS_TIMEOUT
logger = logging.getLogger(__name__)
class WebSocketError(Exception):
"""Custom exception for WebSocket errors"""
def __init__(self, message: str, error_data: dict = None):
self.message = message
self.error_data = error_data or {}
super().__init__(message)
class RealTimeData:
"""실시간 데이터를 담는 클래스"""
def __init__(self, data: Dict[str, Any]):
self.raw_data = data
self.trnm = data.get('trnm')
self.return_code = data.get('return_code')
self.return_msg = data.get('return_msg')
self.data = data.get('data', [])
class WebSocketClient:
"""키움증권 실시간 웹소켓 클라이언트"""
def __init__(
self,
access_token: str,
ws_url: Optional[str] = None,
auto_reconnect: bool = True,
reconnect_interval: int = 5,
ping_interval: int = 30
):
"""
웹소켓 클라이언트 초기화
Args:
access_token: 액세스 토큰
ws_url: 웹소켓 URL (None이면 설정에서 자동 선택)
auto_reconnect: 자동 재연결 여부
reconnect_interval: 재연결 간격 (초)
ping_interval: PING 간격 (초)
"""
self.access_token = access_token
self.ws_url = ws_url or get_ws_url()
self.auto_reconnect = auto_reconnect
self.reconnect_interval = reconnect_interval
self.ping_interval = ping_interval
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.connected = False
self.keep_running = True
self.is_logged_in = False
# 콜백 함수들
self.on_connect: Optional[Callable] = None
self.on_disconnect: Optional[Callable] = None
self.on_login: Optional[Callable] = None
self.on_data: Optional[Callable[[RealTimeData], None]] = None
self.on_error: Optional[Callable[[Exception], None]] = None
# 태스크들
self._receive_task: Optional[asyncio.Task] = None
self._ping_task: Optional[asyncio.Task] = None
async def connect(self) -> None:
"""웹소켓 서버에 연결"""
try:
logger.info(f"웹소켓 서버에 연결 중: {self.ws_url}")
self.websocket = await websockets.connect(
self.ws_url,
ping_interval=self.ping_interval,
ping_timeout=WS_TIMEOUT
)
self.connected = True
logger.info("웹소켓 서버 연결 성공")
if self.on_connect:
await self.on_connect()
except Exception as e:
logger.error(f"웹소켓 연결 실패: {e}")
self.connected = False
if self.on_error:
await self.on_error(e)
raise WebSocketError(f"연결 실패: {e}")
async def login(self) -> None:
"""웹소켓 서버에 로그인"""
if not self.connected:
await self.connect()
login_data = {
'trnm': 'LOGIN',
'token': self.access_token
}
await self.send(login_data)
logger.info("로그인 요청 전송")
async def send(self, message: Union[Dict[str, Any], str]) -> None:
"""메시지 전송"""
if not self.connected:
if self.auto_reconnect:
await self.connect()
else:
raise WebSocketError("웹소켓이 연결되지 않았습니다")
try:
if isinstance(message, dict):
message_str = json.dumps(message)
else:
message_str = message
await self.websocket.send(message_str)
logger.debug(f"메시지 전송: {message_str}")
except Exception as e:
logger.error(f"메시지 전송 실패: {e}")
if self.on_error:
await self.on_error(e)
raise WebSocketError(f"메시지 전송 실패: {e}")
async def register_realtime(
self,
group_no: str = "1",
type_list: List[str] = None,
item_list: List[str] = None,
refresh: str = "1"
) -> None:
"""
실시간 데이터 등록
Args:
group_no: 그룹 번호
type_list: 실시간 항목 리스트 (예: ['04', '0A', '0B'])
item_list: 종목코드 리스트 (빈 리스트면 전체)
refresh: 기존등록유지여부 (0: 기존유지안함, 1: 기존유지)
"""
if type_list is None:
type_list = ['04'] # 기본값: 잔고
if item_list is None:
item_list = [''] # 빈 문자열은 전체 종목
register_data = {
'trnm': 'REG',
'grp_no': group_no,
'refresh': refresh,
'data': [{
'item': item_list,
'type': type_list
}]
}
await self.send(register_data)
logger.info(f"실시간 데이터 등록: {type_list}")
async def unregister_realtime(self, group_no: str = "1") -> None:
"""실시간 데이터 해지"""
unregister_data = {
'trnm': 'REMOVE',
'grp_no': group_no
}
await self.send(unregister_data)
logger.info("실시간 데이터 해지")
async def _handle_message(self, message: str) -> None:
"""메시지 처리"""
try:
data = json.loads(message)
realtime_data = RealTimeData(data)
trnm = realtime_data.trnm
if trnm == 'LOGIN':
if realtime_data.return_code == 0:
self.is_logged_in = True
logger.info("로그인 성공")
if self.on_login:
await self.on_login()
else:
error_msg = realtime_data.return_msg or "로그인 실패"
logger.error(f"로그인 실패: {error_msg}")
if self.on_error:
await self.on_error(WebSocketError(error_msg))
elif trnm == 'PING':
# PING에 PONG으로 응답
await self.send(data)
logger.debug("PING-PONG 응답")
elif trnm == 'REAL':
# 실시간 데이터 수신
logger.debug(f"실시간 데이터 수신: {data}")
if self.on_data:
await self.on_data(realtime_data)
else:
# 기타 응답
logger.debug(f"기타 응답 수신: {data}")
if self.on_data:
await self.on_data(realtime_data)
except json.JSONDecodeError as e:
logger.error(f"JSON 파싱 오류: {e}")
if self.on_error:
await self.on_error(e)
except Exception as e:
logger.error(f"메시지 처리 오류: {e}")
if self.on_error:
await self.on_error(e)
async def _receive_messages(self) -> None:
"""메시지 수신 루프"""
while self.keep_running:
try:
if not self.connected or not self.websocket:
break
message = await self.websocket.recv()
await self._handle_message(message)
except ConnectionClosed:
logger.warning("웹소켓 연결이 종료되었습니다")
self.connected = False
self.is_logged_in = False
if self.on_disconnect:
await self.on_disconnect()
if self.auto_reconnect and self.keep_running:
logger.info(f"{self.reconnect_interval}초 후 재연결을 시도합니다")
await asyncio.sleep(self.reconnect_interval)
try:
await self.connect()
await self.login()
except Exception as e:
logger.error(f"재연결 실패: {e}")
else:
break
except Exception as e:
logger.error(f"메시지 수신 오류: {e}")
if self.on_error:
await self.on_error(e)
async def _ping_loop(self) -> None:
"""PING 루프 (연결 유지)"""
while self.keep_running and self.connected:
try:
await asyncio.sleep(self.ping_interval)
if self.connected and self.websocket:
await self.websocket.ping()
logger.debug("PING 전송")
except Exception as e:
logger.error(f"PING 오류: {e}")
async def start(self) -> None:
"""웹소켓 클라이언트 시작"""
try:
await self.connect()
await self.login()
# 수신 및 PING 태스크 시작
self._receive_task = asyncio.create_task(self._receive_messages())
self._ping_task = asyncio.create_task(self._ping_loop())
logger.info("웹소켓 클라이언트 시작됨")
except Exception as e:
logger.error(f"웹소켓 클라이언트 시작 실패: {e}")
raise
async def stop(self) -> None:
"""웹소켓 클라이언트 중지"""
logger.info("웹소켓 클라이언트 중지 중...")
self.keep_running = False
# 태스크들 취소
if self._receive_task:
self._receive_task.cancel()
if self._ping_task:
self._ping_task.cancel()
# 웹소켓 연결 종료
if self.websocket:
await self.websocket.close()
self.connected = False
self.is_logged_in = False
logger.info("웹소켓 클라이언트 중지됨")
async def run_forever(self) -> None:
"""무한 루프로 실행"""
try:
await self.start()
# 태스크들이 완료될 때까지 대기
if self._receive_task:
await self._receive_task
except asyncio.CancelledError:
logger.info("웹소켓 클라이언트가 취소되었습니다")
except Exception as e:
logger.error(f"웹소켓 클라이언트 실행 오류: {e}")
finally:
await self.stop()
def run_sync(self) -> None:
"""동기적으로 실행 (새로운 이벤트 루프에서)"""
try:
asyncio.run(self.run_forever())
except KeyboardInterrupt:
logger.info("사용자에 의해 중단되었습니다")
except Exception as e:
logger.error(f"동기 실행 오류: {e}")
raise