1546 lines
68 KiB
Python
1546 lines
68 KiB
Python
"""
|
||
kis_ws.py — KIS WebSocket 실시간 체결가 캐시 (H0STCNT0)
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
역할: check_sell_signals() 의 inquire_price() REST 폴링을 대체.
|
||
보유 종목 코드를 구독해두면 KIS가 체결마다 push → 즉시 가격 캐시 갱신.
|
||
|
||
WebSocket vs REST:
|
||
REST : 요청마다 0.5s 딜레이(API 제한) + 응답 대기 → 보유 3종목 ≈ 3초 낭비
|
||
WebSocket: 연결 1회 + push 수신 → 체결 즉시 캐시 갱신, API 카운트 무관
|
||
|
||
KIS 공식 스펙:
|
||
TR_ID : H0STCNT0 (국내주식 실시간체결가)
|
||
실전 URL : ws://ops.koreainvestment.com:21000 (KIS_WS_URL_REAL, env/DB 변경 가능)
|
||
모의 URL : ws://ops.koreainvestment.com:31000 (KIS_WS_URL_MOCK, env/DB 변경 가능)
|
||
세션 구독 한도: 최대 41종목 (MAX_STOCKS=3~4 수준에서 문제 없음)
|
||
|
||
KIS 2026-02-24 경고 준수 (무한 재연결 차단 정책):
|
||
- 재연결 횟수 제한: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 자동 대기
|
||
- 최대 총 재연결 횟수: MAX_RECONNECT_ATTEMPTS 회 초과 시 WebSocket 종료 → REST fallback
|
||
- 지수 백오프: 5초 → 10초 → 20초 ... 최대 300초
|
||
|
||
모의투자(VTS):
|
||
KIS 모의투자는 WebSocket을 지원하지 않는 경우가 많음.
|
||
연결 실패 시 is_active=False → check_sell_signals()가 REST로 자동 fallback.
|
||
KIS_WS_MOCK_ENABLED=true (env/DB) 로 강제 활성화 가능.
|
||
|
||
설치 필요:
|
||
pip install websocket-client
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import queue
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Dict, Optional, Set
|
||
|
||
import requests
|
||
|
||
logger = logging.getLogger("KISWebSocket")
|
||
|
||
|
||
# ------------------------------------------------------------------
|
||
# 모듈 수준에서 get_env 함수를 참조 (kis_long_ver1 공용 함수 재사용)
|
||
# 이 파일이 단독으로도 동작할 수 있도록 fallback import 포함
|
||
# ------------------------------------------------------------------
|
||
try:
|
||
from kis_long_ver1 import get_env_from_db, get_env_int, get_env_bool
|
||
except ImportError:
|
||
try:
|
||
from kis_short_ver2 import get_env_from_db, get_env_int, get_env_bool
|
||
except ImportError:
|
||
# 모듈 임포트 전 단계에서는 기본값만 사용
|
||
def get_env_from_db(key, default=""): # type: ignore[misc]
|
||
return default
|
||
|
||
def get_env_int(key, default): # type: ignore[misc]
|
||
return default
|
||
|
||
def get_env_bool(key, default=False): # type: ignore[misc]
|
||
return default
|
||
|
||
|
||
class KISWebSocketPriceCache:
|
||
"""
|
||
KIS H0STCNT0 실시간 체결가 WebSocket 수신기.
|
||
|
||
사용법:
|
||
ws_cache = KISWebSocketPriceCache(app_key, app_secret, is_mock=False)
|
||
ok = ws_cache.start() # 백그라운드 스레드 시작
|
||
ws_cache.subscribe("005930") # 종목 구독
|
||
data = ws_cache.get_price("005930") # inquire_price 호환 dict 반환
|
||
ws_cache.stop()
|
||
|
||
get_price() 반환 값이 None 이면 → REST inquire_price() 로 fallback
|
||
"""
|
||
|
||
# H0STCNT0 데이터 필드 인덱스 ('^' 구분)
|
||
# KIS H0STCNT0 국내주식 실시간체결 전문 순서 (0-based)
|
||
IDX_CODE = 0 # MKSC_SHRN_ISCD: 유가증권 단축 종목코드
|
||
IDX_TIME = 1 # STCK_CNTG_HOUR: 체결 시간
|
||
IDX_PRICE = 2 # STCK_PRPR: 주식 현재가 (체결가)
|
||
IDX_SIGN = 3 # PRDY_VRSS_SIGN: 전일 대비 부호
|
||
IDX_CHANGE = 4 # PRDY_VRSS: 전일 대비
|
||
IDX_CHGPCT = 5 # PRDY_CTRT: 전일 대비율
|
||
# 당일 시고저 (매수 체크 시 REST inquire_price 대체용 → API 과부하 방지)
|
||
IDX_OPEN = 7 # STCK_OPRC: 주식 시가 (당일)
|
||
IDX_HIGH = 8 # STCK_HGPR: 주식 고가 (당일)
|
||
IDX_LOW = 9 # STCK_LWPR: 주식 저가 (당일)
|
||
IDX_VOLUME = 11 # ACML_VOL: 누적 거래량 (참고용)
|
||
|
||
# 재연결 정책 (KIS 2026-02-24 경고 준수)
|
||
MAX_RECONNECT_ATTEMPTS = 10 # 총 재연결 최대 횟수 (STABLE_CONN_RESET_SEC 이상 안정 연결 후 끊기면 초기화)
|
||
MAX_RECONNECTS_PER_HOUR = 6 # 1시간 내 재연결 허용 횟수
|
||
RECONNECT_BASE_DELAY_SEC = 5.0 # 초기 재연결 대기(초)
|
||
RECONNECT_MAX_DELAY_SEC = 300.0 # 최대 재연결 대기(초)
|
||
# 이 시간(초) 이상 안정적으로 연결이 유지됐다가 끊기면 카운터를 초기화.
|
||
# 예: 5분 이상 정상 운영 후 네트워크 일시 장애 → '버스트 차단'이 아닌 '정상 재연결'로 간주.
|
||
STABLE_CONN_RESET_SEC = 300.0 # 5분
|
||
|
||
# Approval key 유효시간 (23시간, KIS REST 토큰과 별개)
|
||
APPROVAL_KEY_CACHE_SEC = 82800
|
||
|
||
def __init__(self, app_key: str, app_secret: str, is_mock: bool = True):
|
||
self.app_key = app_key
|
||
self.app_secret = app_secret
|
||
self.is_mock = is_mock
|
||
|
||
# WebSocket URL (env/DB 로 재정의 가능 → 연결 실패 시 사용자가 수정)
|
||
_default_real = "ws://ops.koreainvestment.com:21000"
|
||
_default_mock = "ws://ops.koreainvestment.com:31000"
|
||
self._ws_url = (
|
||
get_env_from_db("KIS_WS_URL_MOCK", _default_mock)
|
||
if is_mock
|
||
else get_env_from_db("KIS_WS_URL_REAL", _default_real)
|
||
)
|
||
self._base_url = (
|
||
"https://openapivts.koreainvestment.com:29443"
|
||
if is_mock
|
||
else "https://openapi.koreainvestment.com:9443"
|
||
)
|
||
|
||
# ── 가격 캐시 ──────────────────────────────────────────────
|
||
# { code: {"data": dict, "ts": float} }
|
||
self._cache: Dict[str, Dict] = {}
|
||
self._cache_lock = threading.Lock()
|
||
|
||
# ── 구독 목록 ──────────────────────────────────────────────
|
||
self._subscribed: Set[str] = set()
|
||
self._sub_lock = threading.Lock()
|
||
|
||
# ── 영구 구독 목록 (홀딩 관심종목 등) ──────────────────────
|
||
# unsubscribe() 호출에도 해제되지 않는 고정 구독 코드
|
||
self._permanent_codes: Set[str] = set()
|
||
self._load_permanent_watchlist()
|
||
|
||
# ── WebSocket 관련 ─────────────────────────────────────────
|
||
self._ws = None # 현재 WebSocketApp 인스턴스
|
||
self._ws_thread: Optional[threading.Thread] = None
|
||
self._running = False
|
||
self._connected = False
|
||
|
||
# ── approval_key (REST 토큰과 별개, WebSocket 전용 인증) ───
|
||
self._approval_key: Optional[str] = None
|
||
self._approval_key_ts: float = 0.0
|
||
|
||
# ── 재연결 관리 ────────────────────────────────────────────
|
||
self._reconnect_count = 0
|
||
self._reconnect_times: list = [] # 최근 재연결 타임스탬프 목록
|
||
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
|
||
self._last_connect_time: float = 0.0 # 마지막 연결 성공 시각 (안정 연결 판단용)
|
||
|
||
# ── CandleAggregator (스캘핑봇 연동 시 외부에서 주입) ─────
|
||
# attach_candle_aggregator(agg) 로 연결, None이면 봉 집계 비활성
|
||
self._candle_agg: Optional["CandleAggregator"] = None
|
||
|
||
# ── WS 연결 성공 시 갭보정 콜백 ───────────────────────────
|
||
# set_on_connected_callback(fn) 으로 등록.
|
||
# 연결 성공(_on_open) 후 별도 스레드에서 호출됨.
|
||
# 장 시간일 때만 실행 (새벽 재연결 시 API 빈 응답 방지)
|
||
self._on_connected_callback: Optional[callable] = None
|
||
|
||
# ── websocket-client 사용 가능 여부 ───────────────────────
|
||
try:
|
||
import websocket as _ws_lib
|
||
self._ws_lib = _ws_lib
|
||
self._available = True
|
||
except ImportError:
|
||
self._ws_lib = None
|
||
self._available = False
|
||
logger.warning(
|
||
"⚠️ websocket-client 미설치 → WebSocket 실시간 가격 비활성.\n"
|
||
" pip install websocket-client 를 실행하세요."
|
||
)
|
||
|
||
# ==================================================================
|
||
# Public API
|
||
# ==================================================================
|
||
|
||
def attach_candle_aggregator(self, agg: "CandleAggregator") -> None:
|
||
"""
|
||
CandleAggregator 를 연결합니다.
|
||
연결 후부터 틱 수신 시 자동으로 agg.on_tick() 이 호출됩니다.
|
||
kis_scalping_ver1.py 에서 ws_cache.attach_candle_aggregator(agg) 로 호출.
|
||
"""
|
||
self._candle_agg = agg
|
||
logger.info("✅ CandleAggregator 연결 완료 (봉 집계 활성화)")
|
||
|
||
def set_on_connected_callback(self, fn) -> None:
|
||
"""
|
||
WS 연결 성공(_on_open) 시 호출할 콜백 등록.
|
||
ScalpingBotV1._fill_all_gaps 를 넘겨서 매 연결 시 갭보정 자동 실행.
|
||
장 시간일 때만 콜백을 실행 (새벽 자동재연결 시 API 빈 응답 방지).
|
||
"""
|
||
self._on_connected_callback = fn
|
||
|
||
def start(self, force_cleanup: bool = True) -> bool:
|
||
"""
|
||
WebSocket 수신 백그라운드 스레드를 시작합니다.
|
||
성공 시 True, 사용 불가(모의/패키지 미설치/키 없음) 시 False.
|
||
False 를 반환해도 봇은 REST fallback으로 정상 동작합니다.
|
||
"""
|
||
if not self._available:
|
||
return False
|
||
|
||
# 모의투자는 기본 비활성 (KIS 모의 서버 WebSocket 미지원 가능성)
|
||
# KIS_WS_MOCK_ENABLED=true 로 강제 활성화 가능
|
||
if self.is_mock and not get_env_bool("KIS_WS_MOCK_ENABLED", False):
|
||
logger.info("ℹ️ 모의투자: WebSocket 기본 비활성 (KIS_WS_MOCK_ENABLED=true 로 활성 가능)")
|
||
return False
|
||
|
||
if self._running:
|
||
return True
|
||
|
||
# ── [CRITICAL] 비정상 종료 후 재시작 대비: 이전 세션 강제 정리 ──────────────
|
||
# force_cleanup=True 시:
|
||
# 1. approval_key 새로 발급 (이전 세션 무효화)
|
||
# 2. _subscribed 세트 비우기 (메모리 정리)
|
||
# 3. _cache 비우기 (오래된 데이터 제거)
|
||
# KIS 서버는 연결이 끊기면 5~10 분 내 자동 구독 해제되므로,
|
||
# 새 approval_key 로 새 세션을 열면 이전 구독은 자동 소멸.
|
||
if force_cleanup:
|
||
logger.info("🧹 WebSocket 세션 초기화 (비정상 종료 대비)")
|
||
self._approval_key = None # 이전 approval_key 무효화
|
||
self._approval_key_ts = 0.0
|
||
with self._sub_lock:
|
||
self._subscribed.clear() # 구독 목록 초기화
|
||
with self._cache_lock:
|
||
self._cache.clear() # 캐시 초기화
|
||
logger.info("✅ WebSocket 세션 초기화 완료 (구독/캐시 리셋)")
|
||
|
||
# approval_key 발급 (연결 전 확인)
|
||
if not self._get_approval_key():
|
||
logger.warning("⚠️ WebSocket approval_key 발급 실패 → REST fallback 모드")
|
||
return False
|
||
|
||
self._running = True
|
||
self._ws_thread = threading.Thread(
|
||
target=self._run_ws_loop, daemon=True, name="KIS-WS-H0STCNT0"
|
||
)
|
||
self._ws_thread.start()
|
||
logger.info(
|
||
"✅ KIS WebSocket 수신 스레드 시작 (H0STCNT0 | url=%s)", self._ws_url
|
||
)
|
||
return True
|
||
|
||
def stop(self, clear_subscriptions: bool = False) -> None:
|
||
"""
|
||
WebSocket 수신 중단 및 스레드 종료.
|
||
|
||
Args:
|
||
clear_subscriptions: True 시 모든 종목 구독 해제 후 종료
|
||
(봇 정상 종료 시 KIS 서버 정리용)
|
||
"""
|
||
# 옵션: 모든 구독 해제 (KIS 서버 정리용)
|
||
if clear_subscriptions and self._connected and self._ws:
|
||
with self._sub_lock:
|
||
codes = list(self._subscribed)
|
||
for code in codes:
|
||
self._send_sub_msg(code, subscribe=False)
|
||
with self._sub_lock:
|
||
self._subscribed.discard(code)
|
||
with self._cache_lock:
|
||
self._cache.pop(code, None)
|
||
logger.info("📡 WebSocket 구독 해제: %s", code)
|
||
logger.info("✅ 모든 WebSocket 구독 정리 완료 (%d종목)", len(codes))
|
||
|
||
self._running = False
|
||
self._connected = False
|
||
if self._ws:
|
||
try:
|
||
self._ws.close()
|
||
except Exception:
|
||
pass
|
||
if self._ws_thread and self._ws_thread.is_alive():
|
||
self._ws_thread.join(timeout=5)
|
||
logger.info("🛑 KIS WebSocket 종료")
|
||
|
||
# KIS WebSocket 세션 당 구독 가능 최대 종목 수
|
||
# 초과 시 서버에서 오류 반환 또는 계정 일시 차단 가능 (KIS 공지 준수)
|
||
MAX_SUBSCRIPTIONS = 41
|
||
|
||
def _load_permanent_watchlist(self) -> None:
|
||
"""
|
||
long_term_watchlist.json 에 있는 홀딩 관심종목을 영구 구독 목록으로 로드.
|
||
WS 연결 시 자동으로 subscribe(), unsubscribe() 호출 시에도 해제하지 않음.
|
||
"""
|
||
try:
|
||
wl_path = Path(__file__).resolve().parent / "long_term_watchlist.json"
|
||
if not wl_path.exists():
|
||
return
|
||
items = json.loads(wl_path.read_text(encoding="utf-8")).get("items", [])
|
||
codes = [i["code"] for i in items if i.get("code")]
|
||
self._permanent_codes = set(codes)
|
||
if codes:
|
||
logger.info(
|
||
"📌 홀딩 영구 구독 목록 로드: %s (%d종목)",
|
||
", ".join(codes), len(codes),
|
||
)
|
||
except Exception as e:
|
||
logger.warning("영구 구독 목록 로드 실패: %s", e)
|
||
|
||
def subscribe(self, code: str) -> None:
|
||
"""
|
||
실시간 체결가 구독 등록.
|
||
이미 연결 중이면 즉시 구독 메시지 전송, 연결 전이면 연결 성공 시 일괄 등록.
|
||
KIS 세션 한도(MAX_SUBSCRIPTIONS=41) 초과 시 등록 거부 후 경고 로그 출력.
|
||
"""
|
||
code = (code or "").strip()
|
||
if not code:
|
||
return
|
||
with self._sub_lock:
|
||
if code in self._subscribed:
|
||
return # 이미 구독 중 → 중복 전송 방지
|
||
if len(self._subscribed) >= self.MAX_SUBSCRIPTIONS:
|
||
logger.warning(
|
||
"⚠️ WebSocket 구독 한도 초과(%d/%d) → %s 구독 거부 "
|
||
"(KIS 세션 한도 준수: 불필요 종목 구독해제 후 재시도)",
|
||
len(self._subscribed), self.MAX_SUBSCRIPTIONS, code,
|
||
)
|
||
return
|
||
self._subscribed.add(code)
|
||
if self._connected and self._ws:
|
||
self._send_sub_msg(code, subscribe=True)
|
||
logger.info("📡 WebSocket 구독 추가: %s (%d/%d)", code, len(self._subscribed), self.MAX_SUBSCRIPTIONS)
|
||
|
||
def unsubscribe(self, code: str) -> None:
|
||
"""
|
||
실시간 체결가 구독 해제 및 캐시 삭제.
|
||
단, long_term_watchlist.json 의 영구 구독 종목은 해제하지 않음.
|
||
"""
|
||
code = (code or "").strip()
|
||
if code in self._permanent_codes:
|
||
logger.debug("📌 영구 구독 종목 해제 요청 무시: %s (홀딩 관심종목)", code)
|
||
return
|
||
with self._sub_lock:
|
||
self._subscribed.discard(code)
|
||
with self._cache_lock:
|
||
self._cache.pop(code, None)
|
||
if self._connected and self._ws:
|
||
self._send_sub_msg(code, subscribe=False)
|
||
logger.info("📡 WebSocket 구독 해제: %s", code)
|
||
|
||
def get_price(self, code: str, max_age_sec: float = 5.0) -> Optional[Dict]:
|
||
"""
|
||
WebSocket 캐시에서 실시간 가격 + 당일 시고저를 꺼냅니다.
|
||
max_age_sec 초 이내 수신된 체결 틱만 유효하게 취급합니다.
|
||
|
||
반환 형식: {
|
||
"stck_prpr": "73900", # 현재가 (체결가, REST inquire_price와 동일)
|
||
"prdy_vrss": "200", # 전일 대비
|
||
"prdy_ctrt": "0.27", # 전일 대비율
|
||
"stck_oprc": "73000", # 당일 시가 (REST와 동일)
|
||
"stck_hgpr": "74500", # 당일 고가 (REST와 동일)
|
||
"stck_lwpr": "72800", # 당일 저가 (REST와 동일)
|
||
}
|
||
반환 None: WebSocket 미연결 / 데이터 없음 / max_age_sec 초 초과 → REST fallback 권장
|
||
|
||
※ WS 캐시 정확도: H0STCNT0 체결 틱마다 KIS 서버가 시고저를 함께 전송
|
||
→ 장중 REST inquire_price와 동일하며 지연이 없음 (REST 왕복 100~300ms 생략)
|
||
"""
|
||
if not self._available or not self._connected:
|
||
return None
|
||
with self._cache_lock:
|
||
entry = self._cache.get(code)
|
||
if not entry:
|
||
return None
|
||
age = time.time() - entry.get("ts", 0)
|
||
if age > max_age_sec:
|
||
# 데이터가 너무 오래됨 → REST로 재확인
|
||
return None
|
||
return entry.get("data")
|
||
|
||
@property
|
||
def is_active(self) -> bool:
|
||
"""WebSocket이 연결되어 실시간 데이터를 수신 중이면 True."""
|
||
return bool(self._available and self._connected and self._running)
|
||
|
||
# ==================================================================
|
||
# 내부 메서드
|
||
# ==================================================================
|
||
|
||
def _get_approval_key(self) -> Optional[str]:
|
||
"""
|
||
WebSocket 전용 approval_key 발급.
|
||
REST 액세스 토큰과 완전히 별개 (endpoint: /oauth2/Approval).
|
||
"""
|
||
now = time.time()
|
||
if self._approval_key and (now - self._approval_key_ts) < self.APPROVAL_KEY_CACHE_SEC:
|
||
return self._approval_key
|
||
try:
|
||
url = f"{self._base_url}/oauth2/Approval"
|
||
body = {
|
||
"grant_type": "client_credentials",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
}
|
||
r = requests.post(url, json=body, timeout=10)
|
||
data = r.json()
|
||
key = data.get("approval_key")
|
||
if key:
|
||
self._approval_key = key
|
||
self._approval_key_ts = now
|
||
logger.info(
|
||
"✅ WebSocket approval_key 발급 완료 (앞8자: %s…)", key[:8]
|
||
)
|
||
return key
|
||
logger.error("❌ WebSocket approval_key 발급 실패: %s", data)
|
||
except Exception as e:
|
||
logger.error("❌ WebSocket approval_key 요청 예외: %s", e)
|
||
return None
|
||
|
||
def _build_sub_payload(self, code: str, subscribe: bool) -> str:
|
||
"""구독(tr_type=1) / 해제(tr_type=2) JSON 메시지 생성."""
|
||
return json.dumps({
|
||
"header": {
|
||
"approval_key": self._approval_key or "",
|
||
"custtype": "P",
|
||
"tr_type": "1" if subscribe else "2",
|
||
"content-type": "utf-8",
|
||
},
|
||
"body": {
|
||
"input": {
|
||
"tr_id": "H0STCNT0",
|
||
"tr_key": code,
|
||
}
|
||
},
|
||
})
|
||
|
||
def _send_sub_msg(self, code: str, subscribe: bool = True) -> None:
|
||
"""WebSocket으로 구독/해제 메시지 전송. 실패 시 조용히 무시."""
|
||
if not self._ws:
|
||
return
|
||
try:
|
||
self._ws.send(self._build_sub_payload(code, subscribe))
|
||
except Exception as e:
|
||
logger.debug("구독 메시지 전송 실패(%s): %s", code, e)
|
||
|
||
def _parse_realtime_msg(self, raw: str) -> None:
|
||
"""
|
||
H0STCNT0 실시간 체결가 메시지 파싱 및 캐시 갱신.
|
||
|
||
정상 포맷: "0|H0STCNT0|001|005930^082317^73900^5^200^0.27^..."
|
||
parts[0]: 암호화구분 (0=평문, 1=암호화)
|
||
parts[1]: TR_ID
|
||
parts[2]: 건수
|
||
parts[3]: 데이터 ('^' 구분)
|
||
|
||
KIS PINGPONG: 메시지가 "PINGPONG" 문자열 → 동일하게 echoing.
|
||
JSON 응답(구독 확인/에러): {"header":{...},"body":{...}} → 무시.
|
||
"""
|
||
if not raw:
|
||
return
|
||
|
||
# ── KIS Application-Level PINGPONG ─────────────────────────
|
||
if raw.strip() == "PINGPONG":
|
||
if self._ws:
|
||
try:
|
||
self._ws.send("PINGPONG")
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
# ── 구독 응답(JSON) 무시 ────────────────────────────────────
|
||
if raw.startswith("{"):
|
||
try:
|
||
j = json.loads(raw)
|
||
header = j.get("header", {})
|
||
if header.get("tr_id") == "H0STCNT0":
|
||
body = j.get("body", {})
|
||
rt = body.get("rt_cd", "")
|
||
msg = body.get("msg1", "")
|
||
logger.debug("H0STCNT0 구독 응답: rt_cd=%s msg=%s", rt, msg)
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
# ── 실시간 데이터 파싱 ──────────────────────────────────────
|
||
parts = raw.split("|")
|
||
if len(parts) < 4:
|
||
return
|
||
# parts[0]=암호화구분, parts[1]=TR_ID, parts[2]=건수, parts[3]=데이터
|
||
if parts[1] != "H0STCNT0":
|
||
return
|
||
|
||
# 암호화된 데이터는 아직 미지원 (평문만 처리)
|
||
if parts[0] == "1":
|
||
logger.debug("H0STCNT0 암호화 데이터 수신 (처리 스킵) → REST fallback 권장")
|
||
return
|
||
|
||
# 한 메시지에 여러 건이 포함될 수 있음 (parts[2] = 건수)
|
||
# 단순히 parts[3] 전체를 파싱 (단건 기준)
|
||
fields = parts[3].split("^")
|
||
if len(fields) <= max(self.IDX_PRICE, self.IDX_CHGPCT):
|
||
return
|
||
|
||
try:
|
||
code = fields[self.IDX_CODE].strip()
|
||
price = float(fields[self.IDX_PRICE])
|
||
if not code or price <= 0:
|
||
return
|
||
|
||
chg_raw = fields[self.IDX_CHANGE] if len(fields) > self.IDX_CHANGE else "0"
|
||
pct_raw = fields[self.IDX_CHGPCT] if len(fields) > self.IDX_CHGPCT else "0.00"
|
||
# 당일 시고저: REST inquire_price 대체용 (매수 체크 시 API 과부하 방지)
|
||
open_raw = fields[self.IDX_OPEN] if len(fields) > self.IDX_OPEN else "0"
|
||
high_raw = fields[self.IDX_HIGH] if len(fields) > self.IDX_HIGH else "0"
|
||
low_raw = fields[self.IDX_LOW] if len(fields) > self.IDX_LOW else "0"
|
||
|
||
# inquire_price output 딕셔너리와 키 이름을 맞춤
|
||
# stck_oprc/hgpr/lwpr 도 함께 저장 → kis_short_ver2 매수 체크에서 REST 없이 활용
|
||
data_compat = {
|
||
"stck_prpr": str(int(price)), # 현재가 (int 문자열)
|
||
"prdy_vrss": chg_raw, # 전일 대비
|
||
"prdy_ctrt": pct_raw, # 전일 대비율
|
||
"stck_oprc": open_raw, # 당일 시가
|
||
"stck_hgpr": high_raw, # 당일 고가
|
||
"stck_lwpr": low_raw, # 당일 저가
|
||
}
|
||
|
||
with self._cache_lock:
|
||
self._cache[code] = {"data": data_compat, "ts": time.time()}
|
||
|
||
# ── CandleAggregator 연동: 틱 → 봉 집계 (스캘핑봇 전용, 연결 시 활성화)
|
||
if self._candle_agg is not None:
|
||
tick_time = fields[self.IDX_TIME].strip() if len(fields) > self.IDX_TIME else ""
|
||
vol_raw = fields[self.IDX_VOLUME].strip() if len(fields) > self.IDX_VOLUME else "0"
|
||
try:
|
||
tick_vol = int(vol_raw)
|
||
except ValueError:
|
||
tick_vol = 0
|
||
self._candle_agg.on_tick(code, price, tick_vol, tick_time)
|
||
|
||
logger.debug("H0STCNT0 수신: %s → %s원", code, int(price))
|
||
|
||
except (ValueError, IndexError) as e:
|
||
logger.debug("H0STCNT0 파싱 오류: %s | raw=%s", e, raw[:80])
|
||
|
||
# ==================================================================
|
||
# WebSocket 루프 (백그라운드 스레드)
|
||
# ==================================================================
|
||
|
||
def _is_market_hours(self) -> bool:
|
||
"""
|
||
KIS WebSocket 서비스 시간 여부.
|
||
- 실전투자: 08:25~ (장전 호가 포함)
|
||
- 모의투자: 09:00~ (모의 WS 서버가 09:00 이전 연결 즉시 거부)
|
||
False 반환 시 재연결 시도 없이 다음 장 시작까지 대기.
|
||
"""
|
||
import datetime as _dt
|
||
now = _dt.datetime.now()
|
||
if now.weekday() >= 5: # 토·일
|
||
return False
|
||
# 모의투자는 09:00, 실전투자는 08:25 부터 서버 응답
|
||
open_h, open_m = (9, 0) if self.is_mock else (8, 25)
|
||
return _dt.time(open_h, open_m) <= now.time() <= _dt.time(16, 5)
|
||
|
||
def _seconds_until_market_open(self) -> float:
|
||
"""다음 장 시작까지 남은 초. 최소 60초 반환.
|
||
- 실전투자: 08:25 기준
|
||
- 모의투자: 09:00 기준
|
||
"""
|
||
import datetime as _dt
|
||
now = _dt.datetime.now()
|
||
open_h, open_m = (9, 0) if self.is_mock else (8, 25)
|
||
target = now.replace(hour=open_h, minute=open_m, second=0, microsecond=0)
|
||
if now.time() >= _dt.time(16, 5):
|
||
# 오늘 장 마감 → 내일 장 시작
|
||
target += _dt.timedelta(days=1)
|
||
# 주말 건너뛰기
|
||
while target.weekday() >= 5:
|
||
target += _dt.timedelta(days=1)
|
||
return max(60.0, (target - now).total_seconds())
|
||
|
||
def _run_ws_loop(self) -> None:
|
||
"""
|
||
재연결 정책을 포함한 WebSocket 메인 루프.
|
||
run_forever() 가 반환(연결 끊김)되면 지수 백오프 후 재연결 시도.
|
||
KIS 정책: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 강제 대기.
|
||
장외 시간(16:05~08:25, 주말): 재연결 없이 다음 장까지 대기.
|
||
"""
|
||
# 즉시 끊김(장외 서버 거부) 감지: INSTANT_DROP_SEC 이내 끊김이 N회 연속이면 장외 슬립
|
||
INSTANT_DROP_SEC = 3.0 # 연결 후 이 초 이내에 끊기면 "즉시 종료"로 판정
|
||
INSTANT_DROP_MAX = 3 # 연속 N회 즉시 종료 → 장외 슬립으로 전환
|
||
_instant_drop_streak = 0 # 연속 즉시 종료 카운터
|
||
|
||
while self._running:
|
||
now = time.time()
|
||
|
||
# ── 장외 시간이면 다음 장 시작까지 대기 ─────────────────────
|
||
if not self._is_market_hours():
|
||
wait_sec = self._seconds_until_market_open()
|
||
logger.info(
|
||
"🌙 장외 시간 — WebSocket 재연결 중지, 다음 장 시작까지 %.0f분 대기",
|
||
wait_sec / 60,
|
||
)
|
||
# 재연결 카운터·백오프 초기화 (장 시작 시 깨끗하게 재접속)
|
||
self._reconnect_count = 0
|
||
self._reconnect_times = []
|
||
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
|
||
_instant_drop_streak = 0
|
||
# 60초 단위로 쪼개서 슬립 (stop() 신호 빠르게 감지)
|
||
for _ in range(int(wait_sec // 60)):
|
||
if not self._running:
|
||
return
|
||
time.sleep(60)
|
||
time.sleep(wait_sec % 60)
|
||
continue
|
||
|
||
# ── 연속 즉시 종료 감지 → 장외 서버 거부로 간주 ─────────────
|
||
# 연결 직후 INSTANT_DROP_SEC 이내에 끊기는 패턴이 N회 반복되면
|
||
# 서버가 장외이거나 앱키가 차단된 것으로 간주하고 긴 슬립 진입
|
||
if _instant_drop_streak >= INSTANT_DROP_MAX:
|
||
wait_sec = self._seconds_until_market_open()
|
||
logger.warning(
|
||
"⚠️ WebSocket 연속 즉시 끊김 %d회 감지 (장외 서버 거부 추정) "
|
||
"→ %.0f분 대기 후 재시도 (KIS 차단 방지)",
|
||
_instant_drop_streak, wait_sec / 60,
|
||
)
|
||
self._reconnect_count = 0
|
||
self._reconnect_times = []
|
||
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
|
||
_instant_drop_streak = 0
|
||
for _ in range(int(wait_sec // 60)):
|
||
if not self._running:
|
||
return
|
||
time.sleep(60)
|
||
time.sleep(wait_sec % 60)
|
||
continue
|
||
|
||
# ── 안정 연결 후 끊김이면 재연결 카운터 초기화 ───────────
|
||
# STABLE_CONN_RESET_SEC(5분) 이상 정상 운영 후 끊겼다면
|
||
# 이전 재연결 이력을 소거하여 'KIS 정상 케이스'로 처리.
|
||
# (수개월 운영 중 산발적 네트워크 단절에 의해 10회 소진 → 영구 비활성화 방지)
|
||
if (
|
||
self._last_connect_time > 0
|
||
and (now - self._last_connect_time) > self.STABLE_CONN_RESET_SEC
|
||
and self._reconnect_count > 0
|
||
):
|
||
logger.info(
|
||
"♻️ WebSocket 안정 운영(%.0f분) 후 끊김 → 재연결 카운터 초기화 (%d→0)",
|
||
(now - self._last_connect_time) / 60,
|
||
self._reconnect_count,
|
||
)
|
||
self._reconnect_count = 0
|
||
self._reconnect_times = []
|
||
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
|
||
|
||
# ── 1시간 내 재연결 횟수 점검 ────────────────────────────
|
||
self._reconnect_times = [t for t in self._reconnect_times if now - t < 3600]
|
||
|
||
if len(self._reconnect_times) >= self.MAX_RECONNECTS_PER_HOUR:
|
||
# 가장 오래된 재연결로부터 1시간이 지날 때까지 대기
|
||
wait = max(10, 3600 - (now - self._reconnect_times[0]) + 10)
|
||
logger.warning(
|
||
"⛔ 1시간 내 WebSocket 재연결 %d회 초과 → %.0f초(%.0f분) 강제 대기 (KIS 차단 방지)",
|
||
self.MAX_RECONNECTS_PER_HOUR, wait, wait / 60,
|
||
)
|
||
time.sleep(wait)
|
||
continue
|
||
|
||
# ── 총 재연결 횟수 초과 → REST fallback 전환 ─────────────
|
||
if self._reconnect_count >= self.MAX_RECONNECT_ATTEMPTS:
|
||
logger.error(
|
||
"❌ WebSocket 최대 재연결 %d회 초과 → WebSocket 종료, REST fallback 전환",
|
||
self.MAX_RECONNECT_ATTEMPTS,
|
||
)
|
||
self._running = False
|
||
break
|
||
|
||
# ── approval_key 갱신 ─────────────────────────────────────
|
||
approval_key = self._get_approval_key()
|
||
if not approval_key:
|
||
logger.warning(
|
||
"WebSocket approval_key 발급 불가 → %ds 대기 후 재시도",
|
||
int(self._reconnect_delay),
|
||
)
|
||
time.sleep(self._reconnect_delay)
|
||
self._reconnect_delay = min(
|
||
self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC
|
||
)
|
||
continue
|
||
|
||
# ── 재연결 카운트 및 대기 ─────────────────────────────────
|
||
self._reconnect_count += 1
|
||
if self._reconnect_count > 1:
|
||
self._reconnect_times.append(time.time())
|
||
logger.info(
|
||
"🔄 WebSocket 재연결 시도 %d/%d (대기 %ds 완료)",
|
||
self._reconnect_count,
|
||
self.MAX_RECONNECT_ATTEMPTS,
|
||
int(self._reconnect_delay),
|
||
)
|
||
time.sleep(self._reconnect_delay)
|
||
self._reconnect_delay = min(
|
||
self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC
|
||
)
|
||
|
||
# ── WebSocketApp 생성 및 실행 ─────────────────────────────
|
||
_conn_start = time.time() # 연결 시작 시각 (즉시 종료 감지용)
|
||
try:
|
||
ws_app = self._ws_lib.WebSocketApp(
|
||
self._ws_url,
|
||
on_open = self._on_open,
|
||
on_message = self._on_message,
|
||
on_error = self._on_error,
|
||
on_close = self._on_close,
|
||
)
|
||
self._ws = ws_app
|
||
# ping_interval: WebSocket 프로토콜 레벨 PING (연결 유지)
|
||
# KIS Application-Level PINGPONG은 _parse_realtime_msg 에서 처리
|
||
ws_app.run_forever(ping_interval=20, ping_timeout=10)
|
||
except Exception as e:
|
||
logger.error("WebSocket run_forever 예외: %s", e)
|
||
|
||
if not self._running:
|
||
break
|
||
|
||
# ── 즉시 종료 여부 판정 (장외 서버 거부 감지) ────────────────
|
||
_conn_duration = time.time() - _conn_start
|
||
if _conn_duration < INSTANT_DROP_SEC:
|
||
_instant_drop_streak += 1
|
||
logger.debug(
|
||
"⚡ WebSocket 즉시 종료 감지 (%.1f초, streak=%d/%d)",
|
||
_conn_duration, _instant_drop_streak, INSTANT_DROP_MAX,
|
||
)
|
||
else:
|
||
# 어느 정도 연결 유지됐다면 즉시 종료 streak 초기화
|
||
_instant_drop_streak = 0
|
||
|
||
logger.info("KIS WebSocket 루프 종료 (is_active=False, REST fallback 전환)")
|
||
|
||
# ------------------------------------------------------------------
|
||
# WebSocket 콜백
|
||
# ------------------------------------------------------------------
|
||
|
||
def _on_open(self, ws) -> None:
|
||
"""연결 성공: 등록된 모든 종목 구독 요청 전송."""
|
||
self._connected = True
|
||
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC # 대기 시간 리셋
|
||
self._last_connect_time = time.time() # 안정 연결 판단용 타임스탬프 기록
|
||
|
||
logger.info("✅ KIS WebSocket 연결 성공 (H0STCNT0 | url=%s)", self._ws_url)
|
||
|
||
# 영구 구독 목록(홀딩 관심종목) 먼저 구독 등록
|
||
for code in sorted(self._permanent_codes):
|
||
with self._sub_lock:
|
||
if code not in self._subscribed:
|
||
if len(self._subscribed) < self.MAX_SUBSCRIPTIONS:
|
||
self._subscribed.add(code)
|
||
else:
|
||
logger.warning("⚠️ 구독 한도로 영구구독 추가 불가: %s", code)
|
||
continue
|
||
self._send_sub_msg(code, subscribe=True)
|
||
if self._permanent_codes:
|
||
logger.info("📌 영구 구독 등록 완료: %d종목", len(self._permanent_codes))
|
||
|
||
# 재연결 시에도 구독 목록 재등록
|
||
with self._sub_lock:
|
||
codes = set(self._subscribed)
|
||
for code in sorted(codes):
|
||
self._send_sub_msg(code, subscribe=True)
|
||
if codes:
|
||
logger.info("📡 WebSocket 구독 일괄 등록: %s", ", ".join(sorted(codes)))
|
||
|
||
# ── 연결 성공 시 갭보정 콜백 (장 시간일 때만) ───────────────
|
||
# 봇 시작 후 처음 WS가 안정 연결되는 시점(9:00 이후)에
|
||
# REST 분봉 데이터로 DB를 채워 '봉부족' 문제 해소.
|
||
# 새벽 재연결 시에는 is_market_hours() False → 콜백 스킵.
|
||
if self._on_connected_callback and self._is_market_hours():
|
||
try:
|
||
cb_thread = threading.Thread(
|
||
target=self._on_connected_callback,
|
||
name="WS-GapFill",
|
||
daemon=True,
|
||
)
|
||
cb_thread.start()
|
||
except Exception as e:
|
||
logger.debug("갭보정 콜백 실행 실패: %s", e)
|
||
|
||
def _on_message(self, ws, message: str) -> None:
|
||
self._parse_realtime_msg(message)
|
||
|
||
def _on_error(self, ws, error) -> None:
|
||
self._connected = False
|
||
logger.warning("⚠️ KIS WebSocket 오류: %s", error)
|
||
|
||
def _on_close(self, ws, close_status_code, close_msg) -> None:
|
||
self._connected = False
|
||
logger.info(
|
||
"🔌 KIS WebSocket 연결 종료 (code=%s msg=%s)",
|
||
close_status_code, close_msg or "",
|
||
)
|
||
|
||
|
||
# ======================================================================
|
||
# ======================================================================
|
||
# CandleAggregator — WebSocket 틱 → OHLCV 봉 실시간 집계기
|
||
# ======================================================================
|
||
class CandleAggregator:
|
||
"""
|
||
KISWebSocketPriceCache 에서 수신한 틱을 N분봉으로 집계합니다.
|
||
|
||
Two-Track 아키텍처 (Gemini/퀀트 펌 방식)
|
||
─────────────────────────────────────────
|
||
[트랙 1 — 매매 두뇌 (논블로킹)]
|
||
WebSocket 틱 → on_tick() → RAM에서 OHLCV 즉시 갱신
|
||
→ 매수/매도 판단은 get_latest_confirmed() 등 메모리 접근만 사용
|
||
→ DB 대기 시간 0ms, 타점 놓침 없음
|
||
|
||
[트랙 2 — 기록원 스레드 (백그라운드)]
|
||
봉 확정 시 dict를 Queue에 put_nowait() (논블로킹, 0.000001초)
|
||
→ 백그라운드 스레드(_db_writer)가 BATCH_SIZE개 or FLUSH_INTERVAL초마다
|
||
DB에 executemany() 한 방에 묶어서 INSERT
|
||
→ 백테스트용 봉 데이터 완전 보존, 매매 루프 블로킹 없음
|
||
|
||
진행 중 봉(is_confirmed=0) 처리
|
||
─────────────────────────────────
|
||
- RAM의 _current 버퍼에만 존재 → DB에 절대 쓰지 않음
|
||
- 매수 루프는 get_current_candle() 로 즉시 메모리 접근
|
||
- 봉 확정(분 바뀜) 순간에만 Queue → DB 기록
|
||
|
||
RSI 계산
|
||
────────
|
||
- 확정 봉 close 리스트(_closes, 최대 MAX_CLOSE_BUFFER개) RAM에 유지
|
||
- RSI(2/3/5) 계산은 순수 Python 연산, DB 조회 없음
|
||
- 봉 수 부족 시 RSI=None → 매수 루프에서 신호 무시
|
||
|
||
스레드 안전성
|
||
─────────────
|
||
- _lock : on_tick / fill_gap 간 경합 방지 (RAM 버퍼 보호)
|
||
- Queue : thread-safe, put_nowait 는 lock 불필요
|
||
- _db_writer : 독립 daemon 스레드 (봇 종료 시 자동 소멸)
|
||
"""
|
||
|
||
MAX_CLOSE_BUFFER = 200 # RSI 계산용 close 보관 최대 개수
|
||
BATCH_SIZE = 50 # 이 개수 이상 쌓이면 즉시 배치 플러시
|
||
FLUSH_INTERVAL = 2.0 # 초 — BATCH_SIZE 미달이라도 이 주기로 플러시
|
||
|
||
def __init__(self, db=None, timeframes: list = None):
|
||
"""
|
||
Args:
|
||
db : TradeDB 인스턴스. None이면 DB 쓰기 비활성(순수 RAM 모드).
|
||
timeframes: 집계할 봉 단위 리스트 (기본 [1, 3])
|
||
"""
|
||
self.db = db
|
||
self.timeframes: list = timeframes if timeframes else [1, 3]
|
||
self._lock = threading.Lock()
|
||
|
||
# ── 트랙 1: RAM 버퍼 ─────────────────────────────────────
|
||
# 진행 중인 봉: { (code, tf): {candle_time, open, high, low, close, volume} }
|
||
self._current: Dict[tuple, dict] = {}
|
||
# RSI 계산용 확정 봉 close 가격 히스토리: { (code, tf): [c1, c2, ...] }
|
||
self._closes: Dict[tuple, list] = {}
|
||
# 최근 확정 봉 보관 (get_latest_confirmed / get_candles 용): { (code, tf): [dict, ...] }
|
||
self._confirmed: Dict[tuple, list] = {}
|
||
|
||
# ── 트랙 2: 비동기 DB 배치 큐 ────────────────────────────
|
||
# 봉 확정 시 dict를 여기 넣으면 _db_writer 가 배치로 INSERT
|
||
self._write_queue: queue.Queue = queue.Queue(maxsize=10000)
|
||
self._db_writer_thread: Optional[threading.Thread] = None
|
||
if self.db is not None:
|
||
self._start_db_writer()
|
||
|
||
logger.info("✅ CandleAggregator 초기화 완료 (timeframes=%s, db=%s)",
|
||
self.timeframes, "활성" if self.db else "비활성(RAM 전용)")
|
||
|
||
# ------------------------------------------------------------------
|
||
# 트랙 2: 백그라운드 DB 배치 기록원
|
||
# ------------------------------------------------------------------
|
||
|
||
def _start_db_writer(self) -> None:
|
||
"""백그라운드 DB 배치 기록 스레드를 시작합니다."""
|
||
self._db_writer_thread = threading.Thread(
|
||
target=self._db_writer_loop,
|
||
name="CandleDBWriter",
|
||
daemon=True, # 봇 종료 시 자동 소멸
|
||
)
|
||
self._db_writer_thread.start()
|
||
logger.info("✅ CandleAggregator DB 기록원 스레드 시작 (배치=%d, 주기=%.1fs)",
|
||
self.BATCH_SIZE, self.FLUSH_INTERVAL)
|
||
|
||
def _db_writer_loop(self) -> None:
|
||
"""
|
||
배치 기록 루프.
|
||
- Queue에서 BATCH_SIZE개 모이면 즉시 배치 INSERT
|
||
- BATCH_SIZE 미달이라도 FLUSH_INTERVAL초마다 플러시
|
||
"""
|
||
batch: list = []
|
||
last_flush = time.time()
|
||
|
||
while True:
|
||
try:
|
||
# FLUSH_INTERVAL 내에서 최대한 많이 모아 배치 구성
|
||
timeout = max(0.1, self.FLUSH_INTERVAL - (time.time() - last_flush))
|
||
item = self._write_queue.get(timeout=timeout)
|
||
if item is None:
|
||
# None = 종료 신호
|
||
break
|
||
batch.append(item)
|
||
self._write_queue.task_done()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
now = time.time()
|
||
should_flush = len(batch) >= self.BATCH_SIZE or \
|
||
(batch and (now - last_flush) >= self.FLUSH_INTERVAL)
|
||
|
||
if should_flush and batch:
|
||
self._flush_batch(batch)
|
||
batch = []
|
||
last_flush = now
|
||
|
||
# 루프 종료 시 남은 배치 처리
|
||
if batch:
|
||
self._flush_batch(batch)
|
||
logger.info("CandleDBWriter 스레드 종료")
|
||
|
||
def _flush_batch(self, batch: list) -> None:
|
||
"""
|
||
배치 리스트를 DB에 한 번의 executemany 로 INSERT.
|
||
각 item: {code, tf, candle_time, open, high, low, close, volume,
|
||
is_confirmed, source, rsi_2, rsi_3, rsi_5}
|
||
"""
|
||
if not self.db or not batch:
|
||
return
|
||
try:
|
||
import datetime as _dt
|
||
now_str = _dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
rows = []
|
||
for item in batch:
|
||
rows.append((
|
||
item["code"],
|
||
item["tf"],
|
||
item["candle_time"],
|
||
item["open"],
|
||
item["high"],
|
||
item["low"],
|
||
item["close"],
|
||
item.get("volume", 0),
|
||
item.get("rsi_2"),
|
||
item.get("rsi_3"),
|
||
item.get("rsi_5"),
|
||
item.get("is_confirmed", 1),
|
||
item.get("source", "ws"),
|
||
now_str,
|
||
))
|
||
# ws_candles 테이블이 존재하면 배치 INSERT (없으면 조용히 skip)
|
||
self.db.conn.execute(
|
||
"""
|
||
INSERT INTO ws_candles
|
||
(code, timeframe, candle_time, `open`, high, low, close,
|
||
volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at)
|
||
VALUES
|
||
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
`open`=VALUES(`open`), high=VALUES(high), low=VALUES(low),
|
||
close=VALUES(close), volume=VALUES(volume),
|
||
rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5),
|
||
is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at)
|
||
""",
|
||
rows[0], # 단건 execute (pymysql executemany 는 첫 번째 인수만 받음)
|
||
) if len(rows) == 1 else self._executemany_batch(rows)
|
||
logger.debug("💾 [배치저장] %d봉 → DB", len(rows))
|
||
except Exception as e:
|
||
logger.debug("_flush_batch 실패 (무시): %s", e)
|
||
|
||
def _executemany_batch(self, rows: list) -> None:
|
||
"""여러 봉을 executemany 로 한 번에 INSERT."""
|
||
sql = """
|
||
INSERT INTO ws_candles
|
||
(code, timeframe, candle_time, `open`, high, low, close,
|
||
volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at)
|
||
VALUES
|
||
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
`open`=VALUES(`open`), high=VALUES(high), low=VALUES(low),
|
||
close=VALUES(close), volume=VALUES(volume),
|
||
rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5),
|
||
is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at)
|
||
"""
|
||
# pymysql executemany: cursor.executemany(sql, list_of_tuples)
|
||
import pymysql
|
||
with self.db.conn._lock:
|
||
self.db.conn._ensure_connected()
|
||
cur = self.db.conn._conn.cursor()
|
||
cur.executemany(sql, rows)
|
||
|
||
def stop(self) -> None:
|
||
"""기록원 스레드를 안전하게 종료합니다."""
|
||
if self._db_writer_thread and self._db_writer_thread.is_alive():
|
||
self._write_queue.put(None) # 종료 신호
|
||
self._db_writer_thread.join(timeout=5)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 시각 유틸
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _candle_key(tick_time_str: str, timeframe: int) -> str:
|
||
"""
|
||
틱 시각(HHMMSS)과 timeframe(분)으로 봉 시작 시각 키를 반환.
|
||
날짜는 오늘 날짜를 사용 (장 중에만 동작하므로 안전).
|
||
반환 형식: YYYYMMDDHHMM (분 단위, 초 버림)
|
||
|
||
예) tick_time="091523", timeframe=3 → "202603020915" (09:15봉)
|
||
"""
|
||
import datetime as _dt
|
||
try:
|
||
hh = int(tick_time_str[0:2]) if len(tick_time_str) >= 6 else 0
|
||
mm = int(tick_time_str[2:4]) if len(tick_time_str) >= 6 else 0
|
||
# timeframe 단위로 내림: 3분봉이면 09:17 → 09:15
|
||
floored_mm = (mm // timeframe) * timeframe
|
||
today = _dt.date.today().strftime("%Y%m%d")
|
||
return f"{today}{hh:02d}{floored_mm:02d}"
|
||
except Exception:
|
||
import datetime as _dt
|
||
return _dt.datetime.now().strftime("%Y%m%d%H%M")
|
||
|
||
# ------------------------------------------------------------------
|
||
# RSI 계산 (확정 봉 close 리스트 기반)
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _calc_rsi(closes: list, period: int) -> Optional[float]:
|
||
"""
|
||
close 가격 리스트에서 RSI(period) 계산.
|
||
- Wilder's smoothed moving average (단순 rolling mean 사용)
|
||
- 데이터 부족 시 None 반환
|
||
"""
|
||
if len(closes) < period + 1:
|
||
return None
|
||
recent = closes[-(period + 1):]
|
||
gains, losses = [], []
|
||
for i in range(1, len(recent)):
|
||
delta = recent[i] - recent[i - 1]
|
||
gains.append(max(delta, 0))
|
||
losses.append(max(-delta, 0))
|
||
avg_gain = sum(gains) / period if period > 0 else 0
|
||
avg_loss = sum(losses) / period if period > 0 else 0
|
||
if avg_loss == 0:
|
||
return 100.0
|
||
rs = avg_gain / avg_loss
|
||
return round(100 - (100 / (1 + rs)), 2)
|
||
|
||
def _compute_rsi_set(self, closes: list) -> tuple:
|
||
"""RSI 2/3/5 를 한 번에 계산해서 (rsi2, rsi3, rsi5) 반환."""
|
||
return (
|
||
self._calc_rsi(closes, 2),
|
||
self._calc_rsi(closes, 3),
|
||
self._calc_rsi(closes, 5),
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 핵심: 틱 수신 처리
|
||
# ------------------------------------------------------------------
|
||
|
||
def on_tick(self, code: str, price: float, volume: int, tick_time: str) -> None:
|
||
"""
|
||
KISWebSocketPriceCache._parse_realtime_msg 에서 틱마다 호출됨.
|
||
모든 timeframe 에 대해 봉 갱신/확정 처리.
|
||
"""
|
||
if price <= 0:
|
||
return
|
||
with self._lock:
|
||
for tf in self.timeframes:
|
||
self._process_tick(code, price, volume, tick_time, tf)
|
||
|
||
def _process_tick(self, code: str, price: float, volume: int,
|
||
tick_time: str, tf: int) -> None:
|
||
"""
|
||
단일 timeframe 에 대한 틱 처리 (lock 내부에서 호출).
|
||
|
||
[트랙 1] RAM 갱신만 수행, DB 호출 없음 → 블로킹 0ms
|
||
[트랙 2] 봉 확정 순간에만 Queue.put_nowait() → 기록원이 비동기 배치 저장
|
||
"""
|
||
key = (code, tf)
|
||
new_ctime = self._candle_key(tick_time, tf)
|
||
|
||
if key not in self._current:
|
||
# 첫 틱: 새 봉 시작 (RAM만)
|
||
self._current[key] = {
|
||
"candle_time": new_ctime,
|
||
"open": price, "high": price, "low": price,
|
||
"close": price, "volume": volume,
|
||
}
|
||
return
|
||
|
||
cur = self._current[key]
|
||
|
||
if new_ctime != cur["candle_time"]:
|
||
# ── 봉 확정 ────────────────────────────────────────────
|
||
closes = self._closes.setdefault(key, [])
|
||
closes.append(cur["close"])
|
||
if len(closes) > self.MAX_CLOSE_BUFFER:
|
||
closes.pop(0)
|
||
|
||
rsi2, rsi3, rsi5 = self._compute_rsi_set(closes)
|
||
|
||
confirmed_candle = {
|
||
"code": code,
|
||
"tf": tf,
|
||
"candle_time": cur["candle_time"],
|
||
"open": cur["open"],
|
||
"high": cur["high"],
|
||
"low": cur["low"],
|
||
"close": cur["close"],
|
||
"volume": cur["volume"],
|
||
"rsi_2": rsi2,
|
||
"rsi_3": rsi3,
|
||
"rsi_5": rsi5,
|
||
"is_confirmed": 1,
|
||
"source": "ws",
|
||
}
|
||
|
||
# [트랙 1] 확정 봉을 RAM _confirmed 버퍼에 보관 (매수 루프 직접 참조용)
|
||
buf = self._confirmed.setdefault(key, [])
|
||
buf.append(confirmed_candle)
|
||
if len(buf) > self.MAX_CLOSE_BUFFER:
|
||
buf.pop(0)
|
||
|
||
# [트랙 2] Queue에 던지고 즉시 반환 (논블로킹) → 기록원이 배치 저장
|
||
try:
|
||
self._write_queue.put_nowait(confirmed_candle)
|
||
except queue.Full:
|
||
logger.warning("⚠️ CandleAggregator 쓰기 Queue 가득참 — 봉 1개 DROP (코드: %s)", code)
|
||
|
||
logger.debug(
|
||
"🕯 [봉확정] %s %dM %s C=%.0f RSI3=%s",
|
||
code, tf, cur["candle_time"], cur["close"],
|
||
f"{rsi3:.1f}" if rsi3 is not None else "N/A",
|
||
)
|
||
|
||
# ── 새 봉 시작 (RAM만) ──────────────────────────────────
|
||
self._current[key] = {
|
||
"candle_time": new_ctime,
|
||
"open": price, "high": price, "low": price,
|
||
"close": price, "volume": volume,
|
||
}
|
||
else:
|
||
# 같은 봉: OHLCV 갱신 (RAM만, DB 쓰기 없음)
|
||
cur["high"] = max(cur["high"], price)
|
||
cur["low"] = min(cur["low"], price)
|
||
cur["close"] = price
|
||
cur["volume"] = volume # 누적 거래량(ACML_VOL) 그대로 덮어씀
|
||
|
||
# ------------------------------------------------------------------
|
||
# 재접속 갭 보정: REST get_minute_chart 로 빈 봉 채우기
|
||
# ------------------------------------------------------------------
|
||
|
||
def fill_gap_from_rest(self, code: str, tf: int, rest_df) -> int:
|
||
"""
|
||
WS 재접속 후 빠진 봉 구간을 REST 분봉 데이터로 채움.
|
||
|
||
[트랙 1] close 가격을 _closes / _confirmed 에 넣어 RSI 웜업
|
||
[트랙 2] 봉 dict를 Queue 에 넣어 기록원이 배치로 DB 저장 (source='rest')
|
||
|
||
Args:
|
||
code : 종목코드
|
||
tf : timeframe (분)
|
||
rest_df : get_minute_chart 반환 DataFrame (오래된→최신 순 정렬 필요)
|
||
컬럼: time, open, high, low, close, volume
|
||
|
||
Returns:
|
||
채워진 봉 수
|
||
"""
|
||
if rest_df is None or rest_df.empty:
|
||
return 0
|
||
|
||
with self._lock:
|
||
key = (code, tf)
|
||
closes = self._closes.setdefault(key, [])
|
||
conf_buf = self._confirmed.setdefault(key, [])
|
||
inserted = 0
|
||
|
||
for _, row in rest_df.iterrows():
|
||
ctime = str(row.get("time", ""))[:12] # YYYYMMDDHHMM 12자리
|
||
if not ctime or len(ctime) < 12:
|
||
continue
|
||
close = float(row.get("close", 0))
|
||
if close <= 0:
|
||
continue
|
||
|
||
# [트랙 1] RAM 웜업
|
||
closes.append(close)
|
||
if len(closes) > self.MAX_CLOSE_BUFFER:
|
||
closes.pop(0)
|
||
|
||
rsi2, rsi3, rsi5 = self._compute_rsi_set(closes)
|
||
|
||
candle = {
|
||
"code": code,
|
||
"tf": tf,
|
||
"candle_time": ctime,
|
||
"open": float(row.get("open", close)),
|
||
"high": float(row.get("high", close)),
|
||
"low": float(row.get("low", close)),
|
||
"close": close,
|
||
"volume": int(row.get("volume", 0)),
|
||
"rsi_2": rsi2,
|
||
"rsi_3": rsi3,
|
||
"rsi_5": rsi5,
|
||
"is_confirmed": 1,
|
||
"source": "rest",
|
||
}
|
||
conf_buf.append(candle)
|
||
if len(conf_buf) > self.MAX_CLOSE_BUFFER:
|
||
conf_buf.pop(0)
|
||
|
||
# [트랙 2] Queue에 넣어 기록원이 배치로 DB 저장
|
||
try:
|
||
self._write_queue.put_nowait(candle)
|
||
except queue.Full:
|
||
pass
|
||
inserted += 1
|
||
|
||
if inserted:
|
||
logger.info("🔧 [갭보정] %s %dM → REST %d봉 RAM 적재 + DB 큐 등록", code, tf, inserted)
|
||
return inserted
|
||
|
||
# ------------------------------------------------------------------
|
||
# [트랙 1] RAM 버퍼 조회 — 매수/매도 루프에서 직접 호출 (DB 조회 없음)
|
||
# ------------------------------------------------------------------
|
||
|
||
def get_latest_confirmed(self, code: str, tf: int) -> Optional[dict]:
|
||
"""
|
||
가장 최근 확정된 봉(완성된 마지막 봉)을 반환.
|
||
None이면 아직 봉이 확정되지 않음 (장 초반 등).
|
||
"""
|
||
with self._lock:
|
||
buf = self._confirmed.get((code, tf))
|
||
return buf[-1] if buf else None
|
||
|
||
def get_prev_confirmed(self, code: str, tf: int) -> Optional[dict]:
|
||
"""직전 확정봉 (최신에서 2번째). 패턴 확인용 (현재봉 - 1)."""
|
||
with self._lock:
|
||
buf = self._confirmed.get((code, tf))
|
||
return buf[-2] if buf and len(buf) >= 2 else None
|
||
|
||
def get_candles(self, code: str, tf: int, n: int = 10) -> list:
|
||
"""최근 n개 확정 봉 리스트 반환 (오래된→최신 순)."""
|
||
with self._lock:
|
||
buf = self._confirmed.get((code, tf), [])
|
||
return list(buf[-n:])
|
||
|
||
def get_confirmed_count(self, code: str, tf: int) -> int:
|
||
"""확정된 봉 수 (RSI 안정화 여부 확인용)."""
|
||
with self._lock:
|
||
return len(self._confirmed.get((code, tf), []))
|
||
|
||
def get_current_candle(self, code: str, tf: int) -> Optional[dict]:
|
||
"""
|
||
현재 진행 중인 봉(미확정, is_confirmed=0) 반환.
|
||
RSI는 포함되지 않음 (확정 봉 기준으로만 계산).
|
||
"""
|
||
with self._lock:
|
||
return dict(self._current.get((code, tf), {})) or None
|
||
|
||
def get_rsi(self, code: str, tf: int, period: int = 3) -> Optional[float]:
|
||
"""
|
||
최신 확정 봉의 RSI(period) 값 반환.
|
||
period: 2, 3, 5 중 하나 (스캘핑 단타용 초단기 RSI)
|
||
"""
|
||
candle = self.get_latest_confirmed(code, tf)
|
||
if candle is None:
|
||
return None
|
||
return candle.get(f"rsi_{period}")
|
||
|
||
def remove_code(self, code: str) -> None:
|
||
"""
|
||
유니버스에서 빠진 종목의 RAM 버퍼를 정리합니다.
|
||
_sync_subscriptions()에서 구독 해제 시 같이 호출.
|
||
등록된 모든 timeframe 의 _confirmed / _closes / _current 를 삭제.
|
||
"""
|
||
with self._lock:
|
||
for tf in list(self.timeframes):
|
||
key = (code, tf)
|
||
self._confirmed.pop(key, None)
|
||
self._closes.pop(key, None)
|
||
self._current.pop(key, None)
|
||
logger.debug("🗑️ CandleAggregator RAM 정리: %s", code)
|
||
|
||
|
||
# ======================================================================
|
||
# 키움 REST API 공통 유틸 — 봇 시작 시 WS 갭보정용
|
||
# KIS get_minute_chart() 는 당일봉만 제공하지만,
|
||
# 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월) + 페이지네이션 지원.
|
||
# ======================================================================
|
||
|
||
# ──────────────────────────────────────────────────────────────────────
|
||
# 키움 토큰 매니저 싱글톤 풀
|
||
#
|
||
# kiwoom_rest_api/auth/token.py 의 TokenManager 와 동일한 설계:
|
||
# - _is_access_token_valid(): expires_dt 기반 정확한 만료 판별 (30s 버퍼)
|
||
# - _request_new_token(): 만료 시에만 발급 (au10001 rate limit 방지)
|
||
#
|
||
# 차이점: DB에서 읽은 app_key/secret 직접 주입 (환경변수 불필요)
|
||
# 싱글톤 풀: 캐시 키(domain:appkey앞8자) → 인스턴스 재사용
|
||
# 종목×timeframe마다 새 인스턴스를 만들지 않음
|
||
# ──────────────────────────────────────────────────────────────────────
|
||
import threading as _threading
|
||
from datetime import datetime as _datetime, timedelta as _timedelta
|
||
|
||
_kiwoom_token_lock = _threading.Lock()
|
||
_kiwoom_managers: Dict[str, "KiwoomTokenManager"] = {} # 싱글톤 풀
|
||
|
||
|
||
class KiwoomTokenManager:
|
||
"""
|
||
kiwoom_rest_api/auth/token.py TokenManager 와 동일한 로직 — DB 키 직접 주입 버전.
|
||
|
||
- get_token(): 유효한 토큰이면 바로 반환, 만료 시에만 재발급 (au10001 방지)
|
||
- expires_dt 를 정확히 파싱 (하드코딩 23h 아님)
|
||
- 30초 버퍼: 경계 케이스 방지 (TokenManager 와 동일)
|
||
- 스레드 안전: 인스턴스당 Lock 보유
|
||
"""
|
||
|
||
def __init__(self, app_key: str, app_secret: str, is_mock: bool = False):
|
||
self._app_key = app_key
|
||
self._app_secret = app_secret
|
||
self._is_mock = is_mock
|
||
self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
|
||
self._mode_str = "모의" if is_mock else "실전"
|
||
self._token: Optional[str] = None
|
||
self._expiry: Optional[_datetime] = None
|
||
self._lock = _threading.Lock()
|
||
|
||
def _is_valid(self) -> bool:
|
||
"""만료 30초 전까지 유효 (TokenManager._is_access_token_valid 와 동일)"""
|
||
if not self._token or not self._expiry:
|
||
return False
|
||
return _datetime.now() < self._expiry - _timedelta(seconds=30)
|
||
|
||
def _request_new_token(self) -> None:
|
||
"""토큰 신규 발급 — 만료 시에만 호출됨"""
|
||
resp = requests.post(
|
||
f"https://{self._domain}/oauth2/token",
|
||
json={
|
||
"grant_type": "client_credentials",
|
||
"appkey": self._app_key,
|
||
"secretkey": self._app_secret,
|
||
},
|
||
timeout=10,
|
||
)
|
||
data = resp.json()
|
||
token = (data.get("token") or data.get("access_token") or "").strip()
|
||
if not token:
|
||
raise RuntimeError(f"키움 토큰 발급 실패 [{self._mode_str}]: {data}")
|
||
|
||
# expires_dt 정확히 파싱 (TokenManager._update_token_info 와 동일 로직)
|
||
exp_s = data.get("expires_dt", "")
|
||
try:
|
||
self._expiry = _datetime.strptime(str(exp_s), "%Y%m%d%H%M%S")
|
||
except Exception:
|
||
# expires_in 도 없으면 24h 기본 (보수적 fallback)
|
||
exp_in = data.get("expires_in", 86400)
|
||
self._expiry = _datetime.now() + _timedelta(seconds=int(exp_in))
|
||
|
||
self._token = token
|
||
logger.info(
|
||
"✅ 키움 토큰 발급 완료 [%s] (앞8자: %s…, 만료: %s)",
|
||
self._mode_str, token[:8], self._expiry.strftime("%Y-%m-%d %H:%M:%S"),
|
||
)
|
||
|
||
def get_token(self) -> Optional[str]:
|
||
"""
|
||
유효한 토큰 반환. 만료 시에만 재발급.
|
||
kiwoom_rest_api TokenManager.get_token() 호환 인터페이스.
|
||
"""
|
||
with self._lock:
|
||
if not self._is_valid():
|
||
try:
|
||
self._request_new_token()
|
||
except Exception as e:
|
||
logger.warning("⚠️ 키움 토큰 발급 예외 [%s]: %s", self._mode_str, e)
|
||
return None
|
||
return self._token
|
||
|
||
|
||
def _get_kiwoom_token_cached(
|
||
kiwoom_key: str,
|
||
kiwoom_secret: str,
|
||
is_mock: bool,
|
||
) -> Optional[str]:
|
||
"""
|
||
KiwoomTokenManager 싱글톤 풀에서 인스턴스를 꺼내 토큰 반환.
|
||
동일 키 조합은 같은 인스턴스를 재사용 → au10001 rate limit 방지.
|
||
"""
|
||
domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
|
||
cache_key = f"{domain}:{kiwoom_key[:8]}"
|
||
|
||
with _kiwoom_token_lock:
|
||
if cache_key not in _kiwoom_managers:
|
||
_kiwoom_managers[cache_key] = KiwoomTokenManager(
|
||
kiwoom_key, kiwoom_secret, is_mock=is_mock
|
||
)
|
||
mgr = _kiwoom_managers[cache_key]
|
||
|
||
return mgr.get_token()
|
||
|
||
|
||
def _get_kiwoom_creds(db) -> tuple:
|
||
"""
|
||
DB env_config 최신 행에서 키움 앱키/시크릿 반환.
|
||
KIS_MOCK 설정에 따라 MOCK / REAL 키를 자동 선택.
|
||
|
||
Returns:
|
||
(app_key, app_secret, is_mock) — 키 없으면 (None, None, False)
|
||
"""
|
||
try:
|
||
row = db.conn.execute(
|
||
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
|
||
).fetchone()
|
||
if not row:
|
||
return None, None, False
|
||
r = dict(row)
|
||
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
|
||
if is_mock:
|
||
key = str(r.get("KIWOOM_APP_KEY_MOCK", "") or "").strip()
|
||
secret = str(r.get("KIWOOM_APP_SECRET_MOCK", "") or "").strip()
|
||
else:
|
||
key = str(r.get("KIWOOM_APP_KEY_REAL", "") or "").strip()
|
||
secret = str(r.get("KIWOOM_APP_SECRET_REAL", "") or "").strip()
|
||
# 레거시 필드 폴백 (KIWOOM_APP_KEY)
|
||
if not key or not secret:
|
||
key = str(r.get("KIWOOM_APP_KEY", "") or "").strip()
|
||
secret = str(r.get("KIWOOM_APP_SECRET", "") or "").strip()
|
||
if not key or not secret:
|
||
return None, None, is_mock
|
||
return key, secret, is_mock
|
||
except Exception as e:
|
||
logger.debug("키움 크레덴셜 조회 실패: %s", e)
|
||
return None, None, False
|
||
|
||
|
||
def get_kiwoom_candles_df(
|
||
code: str,
|
||
tf_min: int,
|
||
kiwoom_key: str,
|
||
kiwoom_secret: str,
|
||
is_mock: bool = False,
|
||
n: int = 120,
|
||
) -> "object": # pd.DataFrame
|
||
"""
|
||
키움 REST API (ka10080 — 주식분봉차트조회) 로 분봉 조회.
|
||
fill_gap_from_rest() 호환 DataFrame 반환.
|
||
|
||
Args:
|
||
code : 종목코드 (6자리)
|
||
tf_min : 봉 단위 분 (1 / 3 / 5 / 10 / 15 / 30 / 45 / 60)
|
||
kiwoom_key : 키움 앱키
|
||
kiwoom_secret: 키움 앱시크릿
|
||
is_mock : True = 모의투자 도메인 사용
|
||
n : 최대 수집 봉 수 (기본 120, 60분봉 기준 약 17 영업일)
|
||
|
||
Returns:
|
||
pd.DataFrame — 컬럼: time(YYYYMMDDHHMM), open, high, low, close, volume
|
||
오래된→최신 순 (fill_gap_from_rest 기대 순서)
|
||
빈 DataFrame (오류 시)
|
||
"""
|
||
try:
|
||
import pandas as pd
|
||
except ImportError:
|
||
logger.error("pandas 미설치 → 키움 갭보정 불가")
|
||
return None # type: ignore[return-value]
|
||
|
||
domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
|
||
|
||
# ── 1. 키움 OAuth 토큰 (캐시 우선 — 23시간 재사용, au10001 rate limit 방지) ──
|
||
token = _get_kiwoom_token_cached(kiwoom_key, kiwoom_secret, is_mock)
|
||
if not token:
|
||
return pd.DataFrame()
|
||
|
||
# ── 2. ka10080 분봉 차트 조회 (페이지네이션) ──────────────────────
|
||
# 키움은 최신→과거 순으로 반환, 한 페이지에 최대 900봉
|
||
base_url = f"https://{domain}/api/dostk/chart"
|
||
headers = {
|
||
"content-type": "application/json;charset=UTF-8",
|
||
"appkey": kiwoom_key,
|
||
"appsecret": kiwoom_secret,
|
||
"authorization": f"Bearer {token}",
|
||
"api-id": "ka10080",
|
||
"cont-yn": "N",
|
||
"next-key": "",
|
||
}
|
||
body = {
|
||
"stk_cd": code,
|
||
"tic_scope": str(tf_min), # "1" / "3" / "15" / "60" etc.
|
||
"upd_stkpc_tp": "1", # 수정주가 반영
|
||
}
|
||
|
||
rows: list = []
|
||
while len(rows) < n:
|
||
try:
|
||
resp = requests.post(base_url, json=body, headers=headers, timeout=15)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
logger.warning("⚠️ 키움 ka10080 조회 실패 (%s %dM): %s", code, tf_min, e)
|
||
break
|
||
|
||
records = data.get("stk_min_pole_chart_qry") or []
|
||
if not records:
|
||
break
|
||
|
||
for rec in records:
|
||
raw_dt = str(rec.get("cntr_tm", ""))
|
||
if len(raw_dt) < 12:
|
||
continue
|
||
try:
|
||
rows.append({
|
||
# cntr_tm = YYYYMMDDHHMMSS → 12자리로 잘라 YYYYMMDDHHMM
|
||
"time": raw_dt[:12],
|
||
"open": abs(float(rec.get("open_pric", 0) or 0)),
|
||
"high": abs(float(rec.get("high_pric", 0) or 0)),
|
||
"low": abs(float(rec.get("low_pric", 0) or 0)),
|
||
"close": abs(float(rec.get("cur_prc", 0) or 0)),
|
||
"volume": abs(int(float(rec.get("trde_qty", 0) or 0))),
|
||
})
|
||
except (ValueError, TypeError):
|
||
continue
|
||
if len(rows) >= n:
|
||
break
|
||
|
||
# 연속 조회 여부 확인 (응답 헤더)
|
||
cont_yn = str(resp.headers.get("cont-yn", "N")).upper()
|
||
next_key = str(resp.headers.get("next-key", "")).strip()
|
||
if cont_yn != "Y" or not next_key or len(rows) >= n:
|
||
break
|
||
headers["cont-yn"] = cont_yn
|
||
headers["next-key"] = next_key
|
||
time.sleep(0.35) # 키움 API 레이트리밋
|
||
|
||
if not rows:
|
||
logger.debug("키움 ka10080 반환 행 없음 (%s %dM)", code, tf_min)
|
||
return pd.DataFrame()
|
||
|
||
# 키움은 최신→과거 순 → fill_gap_from_rest 는 오래된→최신 순 필요 → 역순
|
||
df = pd.DataFrame(rows[:n][::-1])
|
||
logger.info("✅ 키움 %dM 갭보정 데이터: %s %d봉", tf_min, code, len(df))
|
||
return df
|
||
|
||
|