""" kis_ws.py — KIS WebSocket 실시간 체결가 캐시 (H0STCNT0) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 역할: check_sell_signals() 의 inquire_price() REST 폴링을 대체. 보유 종목 코드를 구독해두면 KIS가 체결마다 push → 즉시 가격 캐시 갱신. WebSocket vs REST: REST : 요청마다 0.5s 딜레이(API 제한) + 응답 대기 → 보유 3종목 ≈ 3초 낭비 WebSocket: 연결 1회 + push 수신 → 체결 즉시 캐시 갱신, API 카운트 무관 KIS 공식 스펙: TR_ID : H0STCNT0 (국내주식 실시간체결가) 실전 URL : ws://ops.koreainvestment.com:21000 (KIS_WS_URL_REAL, env/DB 변경 가능) 모의 URL : ws://ops.koreainvestment.com:31000 (KIS_WS_URL_MOCK, env/DB 변경 가능) 세션 구독 한도: 최대 41종목 (MAX_STOCKS=3~4 수준에서 문제 없음) KIS 2026-02-24 경고 준수 (무한 재연결 차단 정책): - 재연결 횟수 제한: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 자동 대기 - 최대 총 재연결 횟수: MAX_RECONNECT_ATTEMPTS 회 초과 시 WebSocket 종료 → REST fallback - 지수 백오프: 5초 → 10초 → 20초 ... 최대 300초 모의투자(VTS): KIS 모의투자는 WebSocket을 지원하지 않는 경우가 많음. 연결 실패 시 is_active=False → check_sell_signals()가 REST로 자동 fallback. KIS_WS_MOCK_ENABLED=true (env/DB) 로 강제 활성화 가능. 설치 필요: pip install websocket-client """ from __future__ import annotations import json import logging import queue import threading import time from pathlib import Path from typing import Dict, Optional, Set import requests logger = logging.getLogger("KISWebSocket") # ------------------------------------------------------------------ # 모듈 수준에서 get_env 함수를 참조 (kis_long_ver1 공용 함수 재사용) # 이 파일이 단독으로도 동작할 수 있도록 fallback import 포함 # ------------------------------------------------------------------ try: from kis_long_ver1 import get_env_from_db, get_env_int, get_env_bool except ImportError: try: from kis_short_ver2 import get_env_from_db, get_env_int, get_env_bool except ImportError: # 모듈 임포트 전 단계에서는 기본값만 사용 def get_env_from_db(key, default=""): # type: ignore[misc] return default def get_env_int(key, default): # type: ignore[misc] return default def get_env_bool(key, default=False): # type: ignore[misc] return default class KISWebSocketPriceCache: """ KIS H0STCNT0 실시간 체결가 WebSocket 수신기. 사용법: ws_cache = KISWebSocketPriceCache(app_key, app_secret, is_mock=False) ok = ws_cache.start() # 백그라운드 스레드 시작 ws_cache.subscribe("005930") # 종목 구독 data = ws_cache.get_price("005930") # inquire_price 호환 dict 반환 ws_cache.stop() get_price() 반환 값이 None 이면 → REST inquire_price() 로 fallback """ # H0STCNT0 데이터 필드 인덱스 ('^' 구분) # KIS H0STCNT0 국내주식 실시간체결 전문 순서 (0-based) IDX_CODE = 0 # MKSC_SHRN_ISCD: 유가증권 단축 종목코드 IDX_TIME = 1 # STCK_CNTG_HOUR: 체결 시간 IDX_PRICE = 2 # STCK_PRPR: 주식 현재가 (체결가) IDX_SIGN = 3 # PRDY_VRSS_SIGN: 전일 대비 부호 IDX_CHANGE = 4 # PRDY_VRSS: 전일 대비 IDX_CHGPCT = 5 # PRDY_CTRT: 전일 대비율 # 당일 시고저 (매수 체크 시 REST inquire_price 대체용 → API 과부하 방지) IDX_OPEN = 7 # STCK_OPRC: 주식 시가 (당일) IDX_HIGH = 8 # STCK_HGPR: 주식 고가 (당일) IDX_LOW = 9 # STCK_LWPR: 주식 저가 (당일) IDX_VOLUME = 11 # ACML_VOL: 누적 거래량 (참고용) # 재연결 정책 (KIS 2026-02-24 경고 준수) MAX_RECONNECT_ATTEMPTS = 10 # 총 재연결 최대 횟수 (STABLE_CONN_RESET_SEC 이상 안정 연결 후 끊기면 초기화) MAX_RECONNECTS_PER_HOUR = 6 # 1시간 내 재연결 허용 횟수 RECONNECT_BASE_DELAY_SEC = 5.0 # 초기 재연결 대기(초) RECONNECT_MAX_DELAY_SEC = 300.0 # 최대 재연결 대기(초) # 이 시간(초) 이상 안정적으로 연결이 유지됐다가 끊기면 카운터를 초기화. # 예: 5분 이상 정상 운영 후 네트워크 일시 장애 → '버스트 차단'이 아닌 '정상 재연결'로 간주. STABLE_CONN_RESET_SEC = 300.0 # 5분 # Approval key 유효시간 (23시간, KIS REST 토큰과 별개) APPROVAL_KEY_CACHE_SEC = 82800 def __init__(self, app_key: str, app_secret: str, is_mock: bool = True): self.app_key = app_key self.app_secret = app_secret self.is_mock = is_mock # WebSocket URL (env/DB 로 재정의 가능 → 연결 실패 시 사용자가 수정) _default_real = "ws://ops.koreainvestment.com:21000" _default_mock = "ws://ops.koreainvestment.com:31000" self._ws_url = ( get_env_from_db("KIS_WS_URL_MOCK", _default_mock) if is_mock else get_env_from_db("KIS_WS_URL_REAL", _default_real) ) self._base_url = ( "https://openapivts.koreainvestment.com:29443" if is_mock else "https://openapi.koreainvestment.com:9443" ) # ── 가격 캐시 ────────────────────────────────────────────── # { code: {"data": dict, "ts": float} } self._cache: Dict[str, Dict] = {} self._cache_lock = threading.Lock() # ── 구독 목록 ────────────────────────────────────────────── self._subscribed: Set[str] = set() self._sub_lock = threading.Lock() # ── 영구 구독 목록 (홀딩 관심종목 등) ────────────────────── # unsubscribe() 호출에도 해제되지 않는 고정 구독 코드 self._permanent_codes: Set[str] = set() self._load_permanent_watchlist() # ── WebSocket 관련 ───────────────────────────────────────── self._ws = None # 현재 WebSocketApp 인스턴스 self._ws_thread: Optional[threading.Thread] = None self._running = False self._connected = False # ── approval_key (REST 토큰과 별개, WebSocket 전용 인증) ─── self._approval_key: Optional[str] = None self._approval_key_ts: float = 0.0 # ── 재연결 관리 ──────────────────────────────────────────── self._reconnect_count = 0 self._reconnect_times: list = [] # 최근 재연결 타임스탬프 목록 self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC self._last_connect_time: float = 0.0 # 마지막 연결 성공 시각 (안정 연결 판단용) # ── CandleAggregator (스캘핑봇 연동 시 외부에서 주입) ───── # attach_candle_aggregator(agg) 로 연결, None이면 봉 집계 비활성 self._candle_agg: Optional["CandleAggregator"] = None # ── WS 연결 성공 시 갭보정 콜백 ─────────────────────────── # set_on_connected_callback(fn) 으로 등록. # 연결 성공(_on_open) 후 별도 스레드에서 호출됨. # 장 시간일 때만 실행 (새벽 재연결 시 API 빈 응답 방지) self._on_connected_callback: Optional[callable] = None # ── websocket-client 사용 가능 여부 ─────────────────────── try: import websocket as _ws_lib self._ws_lib = _ws_lib self._available = True except ImportError: self._ws_lib = None self._available = False logger.warning( "⚠️ websocket-client 미설치 → WebSocket 실시간 가격 비활성.\n" " pip install websocket-client 를 실행하세요." ) # ================================================================== # Public API # ================================================================== def attach_candle_aggregator(self, agg: "CandleAggregator") -> None: """ CandleAggregator 를 연결합니다. 연결 후부터 틱 수신 시 자동으로 agg.on_tick() 이 호출됩니다. kis_scalping_ver1.py 에서 ws_cache.attach_candle_aggregator(agg) 로 호출. """ self._candle_agg = agg logger.info("✅ CandleAggregator 연결 완료 (봉 집계 활성화)") def set_on_connected_callback(self, fn) -> None: """ WS 연결 성공(_on_open) 시 호출할 콜백 등록. ScalpingBotV1._fill_all_gaps 를 넘겨서 매 연결 시 갭보정 자동 실행. 장 시간일 때만 콜백을 실행 (새벽 자동재연결 시 API 빈 응답 방지). """ self._on_connected_callback = fn def start(self, force_cleanup: bool = True) -> bool: """ WebSocket 수신 백그라운드 스레드를 시작합니다. 성공 시 True, 사용 불가(모의/패키지 미설치/키 없음) 시 False. False 를 반환해도 봇은 REST fallback으로 정상 동작합니다. """ if not self._available: return False # 모의투자는 기본 비활성 (KIS 모의 서버 WebSocket 미지원 가능성) # KIS_WS_MOCK_ENABLED=true 로 강제 활성화 가능 if self.is_mock and not get_env_bool("KIS_WS_MOCK_ENABLED", False): logger.info("ℹ️ 모의투자: WebSocket 기본 비활성 (KIS_WS_MOCK_ENABLED=true 로 활성 가능)") return False if self._running: return True # ── [CRITICAL] 비정상 종료 후 재시작 대비: 이전 세션 강제 정리 ────────────── # force_cleanup=True 시: # 1. approval_key 새로 발급 (이전 세션 무효화) # 2. _subscribed 세트 비우기 (메모리 정리) # 3. _cache 비우기 (오래된 데이터 제거) # KIS 서버는 연결이 끊기면 5~10 분 내 자동 구독 해제되므로, # 새 approval_key 로 새 세션을 열면 이전 구독은 자동 소멸. if force_cleanup: logger.info("🧹 WebSocket 세션 초기화 (비정상 종료 대비)") self._approval_key = None # 이전 approval_key 무효화 self._approval_key_ts = 0.0 with self._sub_lock: self._subscribed.clear() # 구독 목록 초기화 with self._cache_lock: self._cache.clear() # 캐시 초기화 logger.info("✅ WebSocket 세션 초기화 완료 (구독/캐시 리셋)") # approval_key 발급 (연결 전 확인) if not self._get_approval_key(): logger.warning("⚠️ WebSocket approval_key 발급 실패 → REST fallback 모드") return False self._running = True self._ws_thread = threading.Thread( target=self._run_ws_loop, daemon=True, name="KIS-WS-H0STCNT0" ) self._ws_thread.start() logger.info( "✅ KIS WebSocket 수신 스레드 시작 (H0STCNT0 | url=%s)", self._ws_url ) return True def stop(self, clear_subscriptions: bool = False) -> None: """ WebSocket 수신 중단 및 스레드 종료. Args: clear_subscriptions: True 시 모든 종목 구독 해제 후 종료 (봇 정상 종료 시 KIS 서버 정리용) """ # 옵션: 모든 구독 해제 (KIS 서버 정리용) if clear_subscriptions and self._connected and self._ws: with self._sub_lock: codes = list(self._subscribed) for code in codes: self._send_sub_msg(code, subscribe=False) with self._sub_lock: self._subscribed.discard(code) with self._cache_lock: self._cache.pop(code, None) logger.info("📡 WebSocket 구독 해제: %s", code) logger.info("✅ 모든 WebSocket 구독 정리 완료 (%d종목)", len(codes)) self._running = False self._connected = False if self._ws: try: self._ws.close() except Exception: pass if self._ws_thread and self._ws_thread.is_alive(): self._ws_thread.join(timeout=5) logger.info("🛑 KIS WebSocket 종료") # KIS WebSocket 세션 당 구독 가능 최대 종목 수 # 초과 시 서버에서 오류 반환 또는 계정 일시 차단 가능 (KIS 공지 준수) MAX_SUBSCRIPTIONS = 41 def _load_permanent_watchlist(self) -> None: """ long_term_watchlist.json 에 있는 홀딩 관심종목을 영구 구독 목록으로 로드. WS 연결 시 자동으로 subscribe(), unsubscribe() 호출 시에도 해제하지 않음. """ try: wl_path = Path(__file__).resolve().parent / "long_term_watchlist.json" if not wl_path.exists(): return items = json.loads(wl_path.read_text(encoding="utf-8")).get("items", []) codes = [i["code"] for i in items if i.get("code")] self._permanent_codes = set(codes) if codes: logger.info( "📌 홀딩 영구 구독 목록 로드: %s (%d종목)", ", ".join(codes), len(codes), ) except Exception as e: logger.warning("영구 구독 목록 로드 실패: %s", e) def subscribe(self, code: str) -> None: """ 실시간 체결가 구독 등록. 이미 연결 중이면 즉시 구독 메시지 전송, 연결 전이면 연결 성공 시 일괄 등록. KIS 세션 한도(MAX_SUBSCRIPTIONS=41) 초과 시 등록 거부 후 경고 로그 출력. """ code = (code or "").strip() if not code: return with self._sub_lock: if code in self._subscribed: return # 이미 구독 중 → 중복 전송 방지 if len(self._subscribed) >= self.MAX_SUBSCRIPTIONS: logger.warning( "⚠️ WebSocket 구독 한도 초과(%d/%d) → %s 구독 거부 " "(KIS 세션 한도 준수: 불필요 종목 구독해제 후 재시도)", len(self._subscribed), self.MAX_SUBSCRIPTIONS, code, ) return self._subscribed.add(code) if self._connected and self._ws: self._send_sub_msg(code, subscribe=True) logger.info("📡 WebSocket 구독 추가: %s (%d/%d)", code, len(self._subscribed), self.MAX_SUBSCRIPTIONS) def unsubscribe(self, code: str) -> None: """ 실시간 체결가 구독 해제 및 캐시 삭제. 단, long_term_watchlist.json 의 영구 구독 종목은 해제하지 않음. """ code = (code or "").strip() if code in self._permanent_codes: logger.debug("📌 영구 구독 종목 해제 요청 무시: %s (홀딩 관심종목)", code) return with self._sub_lock: self._subscribed.discard(code) with self._cache_lock: self._cache.pop(code, None) if self._connected and self._ws: self._send_sub_msg(code, subscribe=False) logger.info("📡 WebSocket 구독 해제: %s", code) def get_price(self, code: str, max_age_sec: float = 5.0) -> Optional[Dict]: """ WebSocket 캐시에서 실시간 가격 + 당일 시고저를 꺼냅니다. max_age_sec 초 이내 수신된 체결 틱만 유효하게 취급합니다. 반환 형식: { "stck_prpr": "73900", # 현재가 (체결가, REST inquire_price와 동일) "prdy_vrss": "200", # 전일 대비 "prdy_ctrt": "0.27", # 전일 대비율 "stck_oprc": "73000", # 당일 시가 (REST와 동일) "stck_hgpr": "74500", # 당일 고가 (REST와 동일) "stck_lwpr": "72800", # 당일 저가 (REST와 동일) } 반환 None: WebSocket 미연결 / 데이터 없음 / max_age_sec 초 초과 → REST fallback 권장 ※ WS 캐시 정확도: H0STCNT0 체결 틱마다 KIS 서버가 시고저를 함께 전송 → 장중 REST inquire_price와 동일하며 지연이 없음 (REST 왕복 100~300ms 생략) """ if not self._available or not self._connected: return None with self._cache_lock: entry = self._cache.get(code) if not entry: return None age = time.time() - entry.get("ts", 0) if age > max_age_sec: # 데이터가 너무 오래됨 → REST로 재확인 return None return entry.get("data") @property def is_active(self) -> bool: """WebSocket이 연결되어 실시간 데이터를 수신 중이면 True.""" return bool(self._available and self._connected and self._running) # ================================================================== # 내부 메서드 # ================================================================== def _get_approval_key(self) -> Optional[str]: """ WebSocket 전용 approval_key 발급. REST 액세스 토큰과 완전히 별개 (endpoint: /oauth2/Approval). """ now = time.time() if self._approval_key and (now - self._approval_key_ts) < self.APPROVAL_KEY_CACHE_SEC: return self._approval_key try: url = f"{self._base_url}/oauth2/Approval" body = { "grant_type": "client_credentials", "appkey": self.app_key, "secretkey": self.app_secret, } r = requests.post(url, json=body, timeout=10) data = r.json() key = data.get("approval_key") if key: self._approval_key = key self._approval_key_ts = now logger.info( "✅ WebSocket approval_key 발급 완료 (앞8자: %s…)", key[:8] ) return key logger.error("❌ WebSocket approval_key 발급 실패: %s", data) except Exception as e: logger.error("❌ WebSocket approval_key 요청 예외: %s", e) return None def _build_sub_payload(self, code: str, subscribe: bool) -> str: """구독(tr_type=1) / 해제(tr_type=2) JSON 메시지 생성.""" return json.dumps({ "header": { "approval_key": self._approval_key or "", "custtype": "P", "tr_type": "1" if subscribe else "2", "content-type": "utf-8", }, "body": { "input": { "tr_id": "H0STCNT0", "tr_key": code, } }, }) def _send_sub_msg(self, code: str, subscribe: bool = True) -> None: """WebSocket으로 구독/해제 메시지 전송. 실패 시 조용히 무시.""" if not self._ws: return try: self._ws.send(self._build_sub_payload(code, subscribe)) except Exception as e: logger.debug("구독 메시지 전송 실패(%s): %s", code, e) def _parse_realtime_msg(self, raw: str) -> None: """ H0STCNT0 실시간 체결가 메시지 파싱 및 캐시 갱신. 정상 포맷: "0|H0STCNT0|001|005930^082317^73900^5^200^0.27^..." parts[0]: 암호화구분 (0=평문, 1=암호화) parts[1]: TR_ID parts[2]: 건수 parts[3]: 데이터 ('^' 구분) KIS PINGPONG: 메시지가 "PINGPONG" 문자열 → 동일하게 echoing. JSON 응답(구독 확인/에러): {"header":{...},"body":{...}} → 무시. """ if not raw: return # ── KIS Application-Level PINGPONG ───────────────────────── if raw.strip() == "PINGPONG": if self._ws: try: self._ws.send("PINGPONG") except Exception: pass return # ── 구독 응답(JSON) 무시 ──────────────────────────────────── if raw.startswith("{"): try: j = json.loads(raw) header = j.get("header", {}) if header.get("tr_id") == "H0STCNT0": body = j.get("body", {}) rt = body.get("rt_cd", "") msg = body.get("msg1", "") logger.debug("H0STCNT0 구독 응답: rt_cd=%s msg=%s", rt, msg) except Exception: pass return # ── 실시간 데이터 파싱 ────────────────────────────────────── parts = raw.split("|") if len(parts) < 4: return # parts[0]=암호화구분, parts[1]=TR_ID, parts[2]=건수, parts[3]=데이터 if parts[1] != "H0STCNT0": return # 암호화된 데이터는 아직 미지원 (평문만 처리) if parts[0] == "1": logger.debug("H0STCNT0 암호화 데이터 수신 (처리 스킵) → REST fallback 권장") return # 한 메시지에 여러 건이 포함될 수 있음 (parts[2] = 건수) # 단순히 parts[3] 전체를 파싱 (단건 기준) fields = parts[3].split("^") if len(fields) <= max(self.IDX_PRICE, self.IDX_CHGPCT): return try: code = fields[self.IDX_CODE].strip() price = float(fields[self.IDX_PRICE]) if not code or price <= 0: return chg_raw = fields[self.IDX_CHANGE] if len(fields) > self.IDX_CHANGE else "0" pct_raw = fields[self.IDX_CHGPCT] if len(fields) > self.IDX_CHGPCT else "0.00" # 당일 시고저: REST inquire_price 대체용 (매수 체크 시 API 과부하 방지) open_raw = fields[self.IDX_OPEN] if len(fields) > self.IDX_OPEN else "0" high_raw = fields[self.IDX_HIGH] if len(fields) > self.IDX_HIGH else "0" low_raw = fields[self.IDX_LOW] if len(fields) > self.IDX_LOW else "0" # inquire_price output 딕셔너리와 키 이름을 맞춤 # stck_oprc/hgpr/lwpr 도 함께 저장 → kis_short_ver2 매수 체크에서 REST 없이 활용 data_compat = { "stck_prpr": str(int(price)), # 현재가 (int 문자열) "prdy_vrss": chg_raw, # 전일 대비 "prdy_ctrt": pct_raw, # 전일 대비율 "stck_oprc": open_raw, # 당일 시가 "stck_hgpr": high_raw, # 당일 고가 "stck_lwpr": low_raw, # 당일 저가 } with self._cache_lock: self._cache[code] = {"data": data_compat, "ts": time.time()} # ── CandleAggregator 연동: 틱 → 봉 집계 (스캘핑봇 전용, 연결 시 활성화) if self._candle_agg is not None: tick_time = fields[self.IDX_TIME].strip() if len(fields) > self.IDX_TIME else "" vol_raw = fields[self.IDX_VOLUME].strip() if len(fields) > self.IDX_VOLUME else "0" try: tick_vol = int(vol_raw) except ValueError: tick_vol = 0 self._candle_agg.on_tick(code, price, tick_vol, tick_time) logger.debug("H0STCNT0 수신: %s → %s원", code, int(price)) except (ValueError, IndexError) as e: logger.debug("H0STCNT0 파싱 오류: %s | raw=%s", e, raw[:80]) # ================================================================== # WebSocket 루프 (백그라운드 스레드) # ================================================================== def _is_market_hours(self) -> bool: """ KIS WebSocket 서비스 시간 여부. - 실전투자: 08:25~ (장전 호가 포함) - 모의투자: 09:00~ (모의 WS 서버가 09:00 이전 연결 즉시 거부) False 반환 시 재연결 시도 없이 다음 장 시작까지 대기. """ import datetime as _dt now = _dt.datetime.now() if now.weekday() >= 5: # 토·일 return False # 모의투자는 09:00, 실전투자는 08:25 부터 서버 응답 open_h, open_m = (9, 0) if self.is_mock else (8, 25) return _dt.time(open_h, open_m) <= now.time() <= _dt.time(16, 5) def _seconds_until_market_open(self) -> float: """다음 장 시작까지 남은 초. 최소 60초 반환. - 실전투자: 08:25 기준 - 모의투자: 09:00 기준 """ import datetime as _dt now = _dt.datetime.now() open_h, open_m = (9, 0) if self.is_mock else (8, 25) target = now.replace(hour=open_h, minute=open_m, second=0, microsecond=0) if now.time() >= _dt.time(16, 5): # 오늘 장 마감 → 내일 장 시작 target += _dt.timedelta(days=1) # 주말 건너뛰기 while target.weekday() >= 5: target += _dt.timedelta(days=1) return max(60.0, (target - now).total_seconds()) def _run_ws_loop(self) -> None: """ 재연결 정책을 포함한 WebSocket 메인 루프. run_forever() 가 반환(연결 끊김)되면 지수 백오프 후 재연결 시도. KIS 정책: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 강제 대기. 장외 시간(16:05~08:25, 주말): 재연결 없이 다음 장까지 대기. """ # 즉시 끊김(장외 서버 거부) 감지: INSTANT_DROP_SEC 이내 끊김이 N회 연속이면 장외 슬립 INSTANT_DROP_SEC = 3.0 # 연결 후 이 초 이내에 끊기면 "즉시 종료"로 판정 INSTANT_DROP_MAX = 3 # 연속 N회 즉시 종료 → 장외 슬립으로 전환 _instant_drop_streak = 0 # 연속 즉시 종료 카운터 while self._running: now = time.time() # ── 장외 시간이면 다음 장 시작까지 대기 ───────────────────── if not self._is_market_hours(): wait_sec = self._seconds_until_market_open() logger.info( "🌙 장외 시간 — WebSocket 재연결 중지, 다음 장 시작까지 %.0f분 대기", wait_sec / 60, ) # 재연결 카운터·백오프 초기화 (장 시작 시 깨끗하게 재접속) self._reconnect_count = 0 self._reconnect_times = [] self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC _instant_drop_streak = 0 # 60초 단위로 쪼개서 슬립 (stop() 신호 빠르게 감지) for _ in range(int(wait_sec // 60)): if not self._running: return time.sleep(60) time.sleep(wait_sec % 60) continue # ── 연속 즉시 종료 감지 → 장외 서버 거부로 간주 ───────────── # 연결 직후 INSTANT_DROP_SEC 이내에 끊기는 패턴이 N회 반복되면 # 서버가 장외이거나 앱키가 차단된 것으로 간주하고 긴 슬립 진입 if _instant_drop_streak >= INSTANT_DROP_MAX: wait_sec = self._seconds_until_market_open() logger.warning( "⚠️ WebSocket 연속 즉시 끊김 %d회 감지 (장외 서버 거부 추정) " "→ %.0f분 대기 후 재시도 (KIS 차단 방지)", _instant_drop_streak, wait_sec / 60, ) self._reconnect_count = 0 self._reconnect_times = [] self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC _instant_drop_streak = 0 for _ in range(int(wait_sec // 60)): if not self._running: return time.sleep(60) time.sleep(wait_sec % 60) continue # ── 안정 연결 후 끊김이면 재연결 카운터 초기화 ─────────── # STABLE_CONN_RESET_SEC(5분) 이상 정상 운영 후 끊겼다면 # 이전 재연결 이력을 소거하여 'KIS 정상 케이스'로 처리. # (수개월 운영 중 산발적 네트워크 단절에 의해 10회 소진 → 영구 비활성화 방지) if ( self._last_connect_time > 0 and (now - self._last_connect_time) > self.STABLE_CONN_RESET_SEC and self._reconnect_count > 0 ): logger.info( "♻️ WebSocket 안정 운영(%.0f분) 후 끊김 → 재연결 카운터 초기화 (%d→0)", (now - self._last_connect_time) / 60, self._reconnect_count, ) self._reconnect_count = 0 self._reconnect_times = [] self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC # ── 1시간 내 재연결 횟수 점검 ──────────────────────────── self._reconnect_times = [t for t in self._reconnect_times if now - t < 3600] if len(self._reconnect_times) >= self.MAX_RECONNECTS_PER_HOUR: # 가장 오래된 재연결로부터 1시간이 지날 때까지 대기 wait = max(10, 3600 - (now - self._reconnect_times[0]) + 10) logger.warning( "⛔ 1시간 내 WebSocket 재연결 %d회 초과 → %.0f초(%.0f분) 강제 대기 (KIS 차단 방지)", self.MAX_RECONNECTS_PER_HOUR, wait, wait / 60, ) time.sleep(wait) continue # ── 총 재연결 횟수 초과 → REST fallback 전환 ───────────── if self._reconnect_count >= self.MAX_RECONNECT_ATTEMPTS: logger.error( "❌ WebSocket 최대 재연결 %d회 초과 → WebSocket 종료, REST fallback 전환", self.MAX_RECONNECT_ATTEMPTS, ) self._running = False break # ── approval_key 갱신 ───────────────────────────────────── approval_key = self._get_approval_key() if not approval_key: logger.warning( "WebSocket approval_key 발급 불가 → %ds 대기 후 재시도", int(self._reconnect_delay), ) time.sleep(self._reconnect_delay) self._reconnect_delay = min( self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC ) continue # ── 재연결 카운트 및 대기 ───────────────────────────────── self._reconnect_count += 1 if self._reconnect_count > 1: self._reconnect_times.append(time.time()) logger.info( "🔄 WebSocket 재연결 시도 %d/%d (대기 %ds 완료)", self._reconnect_count, self.MAX_RECONNECT_ATTEMPTS, int(self._reconnect_delay), ) time.sleep(self._reconnect_delay) self._reconnect_delay = min( self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC ) # ── WebSocketApp 생성 및 실행 ───────────────────────────── _conn_start = time.time() # 연결 시작 시각 (즉시 종료 감지용) try: ws_app = self._ws_lib.WebSocketApp( self._ws_url, on_open = self._on_open, on_message = self._on_message, on_error = self._on_error, on_close = self._on_close, ) self._ws = ws_app # ping_interval: WebSocket 프로토콜 레벨 PING (연결 유지) # KIS Application-Level PINGPONG은 _parse_realtime_msg 에서 처리 ws_app.run_forever(ping_interval=20, ping_timeout=10) except Exception as e: logger.error("WebSocket run_forever 예외: %s", e) if not self._running: break # ── 즉시 종료 여부 판정 (장외 서버 거부 감지) ──────────────── _conn_duration = time.time() - _conn_start if _conn_duration < INSTANT_DROP_SEC: _instant_drop_streak += 1 logger.debug( "⚡ WebSocket 즉시 종료 감지 (%.1f초, streak=%d/%d)", _conn_duration, _instant_drop_streak, INSTANT_DROP_MAX, ) else: # 어느 정도 연결 유지됐다면 즉시 종료 streak 초기화 _instant_drop_streak = 0 logger.info("KIS WebSocket 루프 종료 (is_active=False, REST fallback 전환)") # ------------------------------------------------------------------ # WebSocket 콜백 # ------------------------------------------------------------------ def _on_open(self, ws) -> None: """연결 성공: 등록된 모든 종목 구독 요청 전송.""" self._connected = True self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC # 대기 시간 리셋 self._last_connect_time = time.time() # 안정 연결 판단용 타임스탬프 기록 logger.info("✅ KIS WebSocket 연결 성공 (H0STCNT0 | url=%s)", self._ws_url) # 영구 구독 목록(홀딩 관심종목) 먼저 구독 등록 for code in sorted(self._permanent_codes): with self._sub_lock: if code not in self._subscribed: if len(self._subscribed) < self.MAX_SUBSCRIPTIONS: self._subscribed.add(code) else: logger.warning("⚠️ 구독 한도로 영구구독 추가 불가: %s", code) continue self._send_sub_msg(code, subscribe=True) if self._permanent_codes: logger.info("📌 영구 구독 등록 완료: %d종목", len(self._permanent_codes)) # 재연결 시에도 구독 목록 재등록 with self._sub_lock: codes = set(self._subscribed) for code in sorted(codes): self._send_sub_msg(code, subscribe=True) if codes: logger.info("📡 WebSocket 구독 일괄 등록: %s", ", ".join(sorted(codes))) # ── 연결 성공 시 갭보정 콜백 (장 시간일 때만) ─────────────── # 봇 시작 후 처음 WS가 안정 연결되는 시점(9:00 이후)에 # REST 분봉 데이터로 DB를 채워 '봉부족' 문제 해소. # 새벽 재연결 시에는 is_market_hours() False → 콜백 스킵. if self._on_connected_callback and self._is_market_hours(): try: cb_thread = threading.Thread( target=self._on_connected_callback, name="WS-GapFill", daemon=True, ) cb_thread.start() except Exception as e: logger.debug("갭보정 콜백 실행 실패: %s", e) def _on_message(self, ws, message: str) -> None: self._parse_realtime_msg(message) def _on_error(self, ws, error) -> None: self._connected = False logger.warning("⚠️ KIS WebSocket 오류: %s", error) def _on_close(self, ws, close_status_code, close_msg) -> None: self._connected = False logger.info( "🔌 KIS WebSocket 연결 종료 (code=%s msg=%s)", close_status_code, close_msg or "", ) # ====================================================================== # ====================================================================== # CandleAggregator — WebSocket 틱 → OHLCV 봉 실시간 집계기 # ====================================================================== class CandleAggregator: """ KISWebSocketPriceCache 에서 수신한 틱을 N분봉으로 집계합니다. Two-Track 아키텍처 (Gemini/퀀트 펌 방식) ───────────────────────────────────────── [트랙 1 — 매매 두뇌 (논블로킹)] WebSocket 틱 → on_tick() → RAM에서 OHLCV 즉시 갱신 → 매수/매도 판단은 get_latest_confirmed() 등 메모리 접근만 사용 → DB 대기 시간 0ms, 타점 놓침 없음 [트랙 2 — 기록원 스레드 (백그라운드)] 봉 확정 시 dict를 Queue에 put_nowait() (논블로킹, 0.000001초) → 백그라운드 스레드(_db_writer)가 BATCH_SIZE개 or FLUSH_INTERVAL초마다 DB에 executemany() 한 방에 묶어서 INSERT → 백테스트용 봉 데이터 완전 보존, 매매 루프 블로킹 없음 진행 중 봉(is_confirmed=0) 처리 ───────────────────────────────── - RAM의 _current 버퍼에만 존재 → DB에 절대 쓰지 않음 - 매수 루프는 get_current_candle() 로 즉시 메모리 접근 - 봉 확정(분 바뀜) 순간에만 Queue → DB 기록 RSI 계산 ──────── - 확정 봉 close 리스트(_closes, 최대 MAX_CLOSE_BUFFER개) RAM에 유지 - RSI(2/3/5) 계산은 순수 Python 연산, DB 조회 없음 - 봉 수 부족 시 RSI=None → 매수 루프에서 신호 무시 스레드 안전성 ───────────── - _lock : on_tick / fill_gap 간 경합 방지 (RAM 버퍼 보호) - Queue : thread-safe, put_nowait 는 lock 불필요 - _db_writer : 독립 daemon 스레드 (봇 종료 시 자동 소멸) """ MAX_CLOSE_BUFFER = 200 # RSI 계산용 close 보관 최대 개수 BATCH_SIZE = 50 # 이 개수 이상 쌓이면 즉시 배치 플러시 FLUSH_INTERVAL = 2.0 # 초 — BATCH_SIZE 미달이라도 이 주기로 플러시 def __init__(self, db=None, timeframes: list = None): """ Args: db : TradeDB 인스턴스. None이면 DB 쓰기 비활성(순수 RAM 모드). timeframes: 집계할 봉 단위 리스트 (기본 [1, 3]) """ self.db = db self.timeframes: list = timeframes if timeframes else [1, 3] self._lock = threading.Lock() # ── 트랙 1: RAM 버퍼 ───────────────────────────────────── # 진행 중인 봉: { (code, tf): {candle_time, open, high, low, close, volume} } self._current: Dict[tuple, dict] = {} # RSI 계산용 확정 봉 close 가격 히스토리: { (code, tf): [c1, c2, ...] } self._closes: Dict[tuple, list] = {} # 최근 확정 봉 보관 (get_latest_confirmed / get_candles 용): { (code, tf): [dict, ...] } self._confirmed: Dict[tuple, list] = {} # ── 트랙 2: 비동기 DB 배치 큐 ──────────────────────────── # 봉 확정 시 dict를 여기 넣으면 _db_writer 가 배치로 INSERT self._write_queue: queue.Queue = queue.Queue(maxsize=10000) self._db_writer_thread: Optional[threading.Thread] = None if self.db is not None: self._start_db_writer() logger.info("✅ CandleAggregator 초기화 완료 (timeframes=%s, db=%s)", self.timeframes, "활성" if self.db else "비활성(RAM 전용)") # ------------------------------------------------------------------ # 트랙 2: 백그라운드 DB 배치 기록원 # ------------------------------------------------------------------ def _start_db_writer(self) -> None: """백그라운드 DB 배치 기록 스레드를 시작합니다.""" self._db_writer_thread = threading.Thread( target=self._db_writer_loop, name="CandleDBWriter", daemon=True, # 봇 종료 시 자동 소멸 ) self._db_writer_thread.start() logger.info("✅ CandleAggregator DB 기록원 스레드 시작 (배치=%d, 주기=%.1fs)", self.BATCH_SIZE, self.FLUSH_INTERVAL) def _db_writer_loop(self) -> None: """ 배치 기록 루프. - Queue에서 BATCH_SIZE개 모이면 즉시 배치 INSERT - BATCH_SIZE 미달이라도 FLUSH_INTERVAL초마다 플러시 """ batch: list = [] last_flush = time.time() while True: try: # FLUSH_INTERVAL 내에서 최대한 많이 모아 배치 구성 timeout = max(0.1, self.FLUSH_INTERVAL - (time.time() - last_flush)) item = self._write_queue.get(timeout=timeout) if item is None: # None = 종료 신호 break batch.append(item) self._write_queue.task_done() except queue.Empty: pass now = time.time() should_flush = len(batch) >= self.BATCH_SIZE or \ (batch and (now - last_flush) >= self.FLUSH_INTERVAL) if should_flush and batch: self._flush_batch(batch) batch = [] last_flush = now # 루프 종료 시 남은 배치 처리 if batch: self._flush_batch(batch) logger.info("CandleDBWriter 스레드 종료") def _flush_batch(self, batch: list) -> None: """ 배치 리스트를 DB에 한 번의 executemany 로 INSERT. 각 item: {code, tf, candle_time, open, high, low, close, volume, is_confirmed, source, rsi_2, rsi_3, rsi_5} """ if not self.db or not batch: return try: import datetime as _dt now_str = _dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") rows = [] for item in batch: rows.append(( item["code"], item["tf"], item["candle_time"], item["open"], item["high"], item["low"], item["close"], item.get("volume", 0), item.get("rsi_2"), item.get("rsi_3"), item.get("rsi_5"), item.get("is_confirmed", 1), item.get("source", "ws"), now_str, )) # ws_candles 테이블이 존재하면 배치 INSERT (없으면 조용히 skip) self.db.conn.execute( """ INSERT INTO ws_candles (code, timeframe, candle_time, `open`, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE `open`=VALUES(`open`), high=VALUES(high), low=VALUES(low), close=VALUES(close), volume=VALUES(volume), rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5), is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at) """, rows[0], # 단건 execute (pymysql executemany 는 첫 번째 인수만 받음) ) if len(rows) == 1 else self._executemany_batch(rows) logger.debug("💾 [배치저장] %d봉 → DB", len(rows)) except Exception as e: logger.debug("_flush_batch 실패 (무시): %s", e) def _executemany_batch(self, rows: list) -> None: """여러 봉을 executemany 로 한 번에 INSERT.""" sql = """ INSERT INTO ws_candles (code, timeframe, candle_time, `open`, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE `open`=VALUES(`open`), high=VALUES(high), low=VALUES(low), close=VALUES(close), volume=VALUES(volume), rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5), is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at) """ # pymysql executemany: cursor.executemany(sql, list_of_tuples) import pymysql with self.db.conn._lock: self.db.conn._ensure_connected() cur = self.db.conn._conn.cursor() cur.executemany(sql, rows) def stop(self) -> None: """기록원 스레드를 안전하게 종료합니다.""" if self._db_writer_thread and self._db_writer_thread.is_alive(): self._write_queue.put(None) # 종료 신호 self._db_writer_thread.join(timeout=5) # ------------------------------------------------------------------ # 시각 유틸 # ------------------------------------------------------------------ @staticmethod def _candle_key(tick_time_str: str, timeframe: int) -> str: """ 틱 시각(HHMMSS)과 timeframe(분)으로 봉 시작 시각 키를 반환. 날짜는 오늘 날짜를 사용 (장 중에만 동작하므로 안전). 반환 형식: YYYYMMDDHHMM (분 단위, 초 버림) 예) tick_time="091523", timeframe=3 → "202603020915" (09:15봉) """ import datetime as _dt try: hh = int(tick_time_str[0:2]) if len(tick_time_str) >= 6 else 0 mm = int(tick_time_str[2:4]) if len(tick_time_str) >= 6 else 0 # timeframe 단위로 내림: 3분봉이면 09:17 → 09:15 floored_mm = (mm // timeframe) * timeframe today = _dt.date.today().strftime("%Y%m%d") return f"{today}{hh:02d}{floored_mm:02d}" except Exception: import datetime as _dt return _dt.datetime.now().strftime("%Y%m%d%H%M") # ------------------------------------------------------------------ # RSI 계산 (확정 봉 close 리스트 기반) # ------------------------------------------------------------------ @staticmethod def _calc_rsi(closes: list, period: int) -> Optional[float]: """ close 가격 리스트에서 RSI(period) 계산. - Wilder's smoothed moving average (단순 rolling mean 사용) - 데이터 부족 시 None 반환 """ if len(closes) < period + 1: return None recent = closes[-(period + 1):] gains, losses = [], [] for i in range(1, len(recent)): delta = recent[i] - recent[i - 1] gains.append(max(delta, 0)) losses.append(max(-delta, 0)) avg_gain = sum(gains) / period if period > 0 else 0 avg_loss = sum(losses) / period if period > 0 else 0 if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss return round(100 - (100 / (1 + rs)), 2) def _compute_rsi_set(self, closes: list) -> tuple: """RSI 2/3/5 를 한 번에 계산해서 (rsi2, rsi3, rsi5) 반환.""" return ( self._calc_rsi(closes, 2), self._calc_rsi(closes, 3), self._calc_rsi(closes, 5), ) # ------------------------------------------------------------------ # 핵심: 틱 수신 처리 # ------------------------------------------------------------------ def on_tick(self, code: str, price: float, volume: int, tick_time: str) -> None: """ KISWebSocketPriceCache._parse_realtime_msg 에서 틱마다 호출됨. 모든 timeframe 에 대해 봉 갱신/확정 처리. """ if price <= 0: return with self._lock: for tf in self.timeframes: self._process_tick(code, price, volume, tick_time, tf) def _process_tick(self, code: str, price: float, volume: int, tick_time: str, tf: int) -> None: """ 단일 timeframe 에 대한 틱 처리 (lock 내부에서 호출). [트랙 1] RAM 갱신만 수행, DB 호출 없음 → 블로킹 0ms [트랙 2] 봉 확정 순간에만 Queue.put_nowait() → 기록원이 비동기 배치 저장 """ key = (code, tf) new_ctime = self._candle_key(tick_time, tf) if key not in self._current: # 첫 틱: 새 봉 시작 (RAM만) self._current[key] = { "candle_time": new_ctime, "open": price, "high": price, "low": price, "close": price, "volume": volume, } return cur = self._current[key] if new_ctime != cur["candle_time"]: # ── 봉 확정 ──────────────────────────────────────────── closes = self._closes.setdefault(key, []) closes.append(cur["close"]) if len(closes) > self.MAX_CLOSE_BUFFER: closes.pop(0) rsi2, rsi3, rsi5 = self._compute_rsi_set(closes) confirmed_candle = { "code": code, "tf": tf, "candle_time": cur["candle_time"], "open": cur["open"], "high": cur["high"], "low": cur["low"], "close": cur["close"], "volume": cur["volume"], "rsi_2": rsi2, "rsi_3": rsi3, "rsi_5": rsi5, "is_confirmed": 1, "source": "ws", } # [트랙 1] 확정 봉을 RAM _confirmed 버퍼에 보관 (매수 루프 직접 참조용) buf = self._confirmed.setdefault(key, []) buf.append(confirmed_candle) if len(buf) > self.MAX_CLOSE_BUFFER: buf.pop(0) # [트랙 2] Queue에 던지고 즉시 반환 (논블로킹) → 기록원이 배치 저장 try: self._write_queue.put_nowait(confirmed_candle) except queue.Full: logger.warning("⚠️ CandleAggregator 쓰기 Queue 가득참 — 봉 1개 DROP (코드: %s)", code) logger.debug( "🕯 [봉확정] %s %dM %s C=%.0f RSI3=%s", code, tf, cur["candle_time"], cur["close"], f"{rsi3:.1f}" if rsi3 is not None else "N/A", ) # ── 새 봉 시작 (RAM만) ────────────────────────────────── self._current[key] = { "candle_time": new_ctime, "open": price, "high": price, "low": price, "close": price, "volume": volume, } else: # 같은 봉: OHLCV 갱신 (RAM만, DB 쓰기 없음) cur["high"] = max(cur["high"], price) cur["low"] = min(cur["low"], price) cur["close"] = price cur["volume"] = volume # 누적 거래량(ACML_VOL) 그대로 덮어씀 # ------------------------------------------------------------------ # 재접속 갭 보정: REST get_minute_chart 로 빈 봉 채우기 # ------------------------------------------------------------------ def fill_gap_from_rest(self, code: str, tf: int, rest_df) -> int: """ WS 재접속 후 빠진 봉 구간을 REST 분봉 데이터로 채움. [트랙 1] close 가격을 _closes / _confirmed 에 넣어 RSI 웜업 [트랙 2] 봉 dict를 Queue 에 넣어 기록원이 배치로 DB 저장 (source='rest') Args: code : 종목코드 tf : timeframe (분) rest_df : get_minute_chart 반환 DataFrame (오래된→최신 순 정렬 필요) 컬럼: time, open, high, low, close, volume Returns: 채워진 봉 수 """ if rest_df is None or rest_df.empty: return 0 with self._lock: key = (code, tf) closes = self._closes.setdefault(key, []) conf_buf = self._confirmed.setdefault(key, []) inserted = 0 for _, row in rest_df.iterrows(): ctime = str(row.get("time", ""))[:12] # YYYYMMDDHHMM 12자리 if not ctime or len(ctime) < 12: continue close = float(row.get("close", 0)) if close <= 0: continue # [트랙 1] RAM 웜업 closes.append(close) if len(closes) > self.MAX_CLOSE_BUFFER: closes.pop(0) rsi2, rsi3, rsi5 = self._compute_rsi_set(closes) candle = { "code": code, "tf": tf, "candle_time": ctime, "open": float(row.get("open", close)), "high": float(row.get("high", close)), "low": float(row.get("low", close)), "close": close, "volume": int(row.get("volume", 0)), "rsi_2": rsi2, "rsi_3": rsi3, "rsi_5": rsi5, "is_confirmed": 1, "source": "rest", } conf_buf.append(candle) if len(conf_buf) > self.MAX_CLOSE_BUFFER: conf_buf.pop(0) # [트랙 2] Queue에 넣어 기록원이 배치로 DB 저장 try: self._write_queue.put_nowait(candle) except queue.Full: pass inserted += 1 if inserted: logger.info("🔧 [갭보정] %s %dM → REST %d봉 RAM 적재 + DB 큐 등록", code, tf, inserted) return inserted # ------------------------------------------------------------------ # [트랙 1] RAM 버퍼 조회 — 매수/매도 루프에서 직접 호출 (DB 조회 없음) # ------------------------------------------------------------------ def get_latest_confirmed(self, code: str, tf: int) -> Optional[dict]: """ 가장 최근 확정된 봉(완성된 마지막 봉)을 반환. None이면 아직 봉이 확정되지 않음 (장 초반 등). """ with self._lock: buf = self._confirmed.get((code, tf)) return buf[-1] if buf else None def get_prev_confirmed(self, code: str, tf: int) -> Optional[dict]: """직전 확정봉 (최신에서 2번째). 패턴 확인용 (현재봉 - 1).""" with self._lock: buf = self._confirmed.get((code, tf)) return buf[-2] if buf and len(buf) >= 2 else None def get_candles(self, code: str, tf: int, n: int = 10) -> list: """최근 n개 확정 봉 리스트 반환 (오래된→최신 순).""" with self._lock: buf = self._confirmed.get((code, tf), []) return list(buf[-n:]) def get_confirmed_count(self, code: str, tf: int) -> int: """확정된 봉 수 (RSI 안정화 여부 확인용).""" with self._lock: return len(self._confirmed.get((code, tf), [])) def get_current_candle(self, code: str, tf: int) -> Optional[dict]: """ 현재 진행 중인 봉(미확정, is_confirmed=0) 반환. RSI는 포함되지 않음 (확정 봉 기준으로만 계산). """ with self._lock: return dict(self._current.get((code, tf), {})) or None def get_rsi(self, code: str, tf: int, period: int = 3) -> Optional[float]: """ 최신 확정 봉의 RSI(period) 값 반환. period: 2, 3, 5 중 하나 (스캘핑 단타용 초단기 RSI) """ candle = self.get_latest_confirmed(code, tf) if candle is None: return None return candle.get(f"rsi_{period}") def remove_code(self, code: str) -> None: """ 유니버스에서 빠진 종목의 RAM 버퍼를 정리합니다. _sync_subscriptions()에서 구독 해제 시 같이 호출. 등록된 모든 timeframe 의 _confirmed / _closes / _current 를 삭제. """ with self._lock: for tf in list(self.timeframes): key = (code, tf) self._confirmed.pop(key, None) self._closes.pop(key, None) self._current.pop(key, None) logger.debug("🗑️ CandleAggregator RAM 정리: %s", code) # ====================================================================== # 키움 REST API 공통 유틸 — 봇 시작 시 WS 갭보정용 # KIS get_minute_chart() 는 당일봉만 제공하지만, # 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월) + 페이지네이션 지원. # ====================================================================== # ────────────────────────────────────────────────────────────────────── # 키움 토큰 매니저 싱글톤 풀 # # kiwoom_rest_api/auth/token.py 의 TokenManager 와 동일한 설계: # - _is_access_token_valid(): expires_dt 기반 정확한 만료 판별 (30s 버퍼) # - _request_new_token(): 만료 시에만 발급 (au10001 rate limit 방지) # # 차이점: DB에서 읽은 app_key/secret 직접 주입 (환경변수 불필요) # 싱글톤 풀: 캐시 키(domain:appkey앞8자) → 인스턴스 재사용 # 종목×timeframe마다 새 인스턴스를 만들지 않음 # ────────────────────────────────────────────────────────────────────── import threading as _threading from datetime import datetime as _datetime, timedelta as _timedelta _kiwoom_token_lock = _threading.Lock() _kiwoom_managers: Dict[str, "KiwoomTokenManager"] = {} # 싱글톤 풀 class KiwoomTokenManager: """ kiwoom_rest_api/auth/token.py TokenManager 와 동일한 로직 — DB 키 직접 주입 버전. - get_token(): 유효한 토큰이면 바로 반환, 만료 시에만 재발급 (au10001 방지) - expires_dt 를 정확히 파싱 (하드코딩 23h 아님) - 30초 버퍼: 경계 케이스 방지 (TokenManager 와 동일) - 스레드 안전: 인스턴스당 Lock 보유 """ def __init__(self, app_key: str, app_secret: str, is_mock: bool = False): self._app_key = app_key self._app_secret = app_secret self._is_mock = is_mock self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com" self._mode_str = "모의" if is_mock else "실전" self._token: Optional[str] = None self._expiry: Optional[_datetime] = None self._lock = _threading.Lock() def _is_valid(self) -> bool: """만료 30초 전까지 유효 (TokenManager._is_access_token_valid 와 동일)""" if not self._token or not self._expiry: return False return _datetime.now() < self._expiry - _timedelta(seconds=30) def _request_new_token(self) -> None: """토큰 신규 발급 — 만료 시에만 호출됨""" resp = requests.post( f"https://{self._domain}/oauth2/token", json={ "grant_type": "client_credentials", "appkey": self._app_key, "secretkey": self._app_secret, }, timeout=10, ) data = resp.json() token = (data.get("token") or data.get("access_token") or "").strip() if not token: raise RuntimeError(f"키움 토큰 발급 실패 [{self._mode_str}]: {data}") # expires_dt 정확히 파싱 (TokenManager._update_token_info 와 동일 로직) exp_s = data.get("expires_dt", "") try: self._expiry = _datetime.strptime(str(exp_s), "%Y%m%d%H%M%S") except Exception: # expires_in 도 없으면 24h 기본 (보수적 fallback) exp_in = data.get("expires_in", 86400) self._expiry = _datetime.now() + _timedelta(seconds=int(exp_in)) self._token = token logger.info( "✅ 키움 토큰 발급 완료 [%s] (앞8자: %s…, 만료: %s)", self._mode_str, token[:8], self._expiry.strftime("%Y-%m-%d %H:%M:%S"), ) def get_token(self) -> Optional[str]: """ 유효한 토큰 반환. 만료 시에만 재발급. kiwoom_rest_api TokenManager.get_token() 호환 인터페이스. """ with self._lock: if not self._is_valid(): try: self._request_new_token() except Exception as e: logger.warning("⚠️ 키움 토큰 발급 예외 [%s]: %s", self._mode_str, e) return None return self._token def _get_kiwoom_token_cached( kiwoom_key: str, kiwoom_secret: str, is_mock: bool, ) -> Optional[str]: """ KiwoomTokenManager 싱글톤 풀에서 인스턴스를 꺼내 토큰 반환. 동일 키 조합은 같은 인스턴스를 재사용 → au10001 rate limit 방지. """ domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com" cache_key = f"{domain}:{kiwoom_key[:8]}" with _kiwoom_token_lock: if cache_key not in _kiwoom_managers: _kiwoom_managers[cache_key] = KiwoomTokenManager( kiwoom_key, kiwoom_secret, is_mock=is_mock ) mgr = _kiwoom_managers[cache_key] return mgr.get_token() def _get_kiwoom_creds(db) -> tuple: """ DB env_config 최신 행에서 키움 앱키/시크릿 반환. KIS_MOCK 설정에 따라 MOCK / REAL 키를 자동 선택. Returns: (app_key, app_secret, is_mock) — 키 없으면 (None, None, False) """ try: row = db.conn.execute( "SELECT * FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() if not row: return None, None, False r = dict(row) is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes") if is_mock: key = str(r.get("KIWOOM_APP_KEY_MOCK", "") or "").strip() secret = str(r.get("KIWOOM_APP_SECRET_MOCK", "") or "").strip() else: key = str(r.get("KIWOOM_APP_KEY_REAL", "") or "").strip() secret = str(r.get("KIWOOM_APP_SECRET_REAL", "") or "").strip() # 레거시 필드 폴백 (KIWOOM_APP_KEY) if not key or not secret: key = str(r.get("KIWOOM_APP_KEY", "") or "").strip() secret = str(r.get("KIWOOM_APP_SECRET", "") or "").strip() if not key or not secret: return None, None, is_mock return key, secret, is_mock except Exception as e: logger.debug("키움 크레덴셜 조회 실패: %s", e) return None, None, False def get_kiwoom_candles_df( code: str, tf_min: int, kiwoom_key: str, kiwoom_secret: str, is_mock: bool = False, n: int = 120, ) -> "object": # pd.DataFrame """ 키움 REST API (ka10080 — 주식분봉차트조회) 로 분봉 조회. fill_gap_from_rest() 호환 DataFrame 반환. Args: code : 종목코드 (6자리) tf_min : 봉 단위 분 (1 / 3 / 5 / 10 / 15 / 30 / 45 / 60) kiwoom_key : 키움 앱키 kiwoom_secret: 키움 앱시크릿 is_mock : True = 모의투자 도메인 사용 n : 최대 수집 봉 수 (기본 120, 60분봉 기준 약 17 영업일) Returns: pd.DataFrame — 컬럼: time(YYYYMMDDHHMM), open, high, low, close, volume 오래된→최신 순 (fill_gap_from_rest 기대 순서) 빈 DataFrame (오류 시) """ try: import pandas as pd except ImportError: logger.error("pandas 미설치 → 키움 갭보정 불가") return None # type: ignore[return-value] domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com" # ── 1. 키움 OAuth 토큰 (캐시 우선 — 23시간 재사용, au10001 rate limit 방지) ── token = _get_kiwoom_token_cached(kiwoom_key, kiwoom_secret, is_mock) if not token: return pd.DataFrame() # ── 2. ka10080 분봉 차트 조회 (페이지네이션) ────────────────────── # 키움은 최신→과거 순으로 반환, 한 페이지에 최대 900봉 base_url = f"https://{domain}/api/dostk/chart" headers = { "content-type": "application/json;charset=UTF-8", "appkey": kiwoom_key, "appsecret": kiwoom_secret, "authorization": f"Bearer {token}", "api-id": "ka10080", "cont-yn": "N", "next-key": "", } body = { "stk_cd": code, "tic_scope": str(tf_min), # "1" / "3" / "15" / "60" etc. "upd_stkpc_tp": "1", # 수정주가 반영 } rows: list = [] while len(rows) < n: try: resp = requests.post(base_url, json=body, headers=headers, timeout=15) data = resp.json() except Exception as e: logger.warning("⚠️ 키움 ka10080 조회 실패 (%s %dM): %s", code, tf_min, e) break records = data.get("stk_min_pole_chart_qry") or [] if not records: break for rec in records: raw_dt = str(rec.get("cntr_tm", "")) if len(raw_dt) < 12: continue try: rows.append({ # cntr_tm = YYYYMMDDHHMMSS → 12자리로 잘라 YYYYMMDDHHMM "time": raw_dt[:12], "open": abs(float(rec.get("open_pric", 0) or 0)), "high": abs(float(rec.get("high_pric", 0) or 0)), "low": abs(float(rec.get("low_pric", 0) or 0)), "close": abs(float(rec.get("cur_prc", 0) or 0)), "volume": abs(int(float(rec.get("trde_qty", 0) or 0))), }) except (ValueError, TypeError): continue if len(rows) >= n: break # 연속 조회 여부 확인 (응답 헤더) cont_yn = str(resp.headers.get("cont-yn", "N")).upper() next_key = str(resp.headers.get("next-key", "")).strip() if cont_yn != "Y" or not next_key or len(rows) >= n: break headers["cont-yn"] = cont_yn headers["next-key"] = next_key time.sleep(0.35) # 키움 API 레이트리밋 if not rows: logger.debug("키움 ka10080 반환 행 없음 (%s %dM)", code, tf_min) return pd.DataFrame() # 키움은 최신→과거 순 → fill_gap_from_rest 는 오래된→최신 순 필요 → 역순 df = pd.DataFrame(rows[:n][::-1]) logger.info("✅ 키움 %dM 갭보정 데이터: %s %d봉", tf_min, code, len(df)) return df