Files
kis_bot/kis_ws.py
2026-03-17 12:33:30 +09:00

1546 lines
68 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
kis_ws.py — KIS WebSocket 실시간 체결가 캐시 (H0STCNT0)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
역할: check_sell_signals() 의 inquire_price() REST 폴링을 대체.
보유 종목 코드를 구독해두면 KIS가 체결마다 push → 즉시 가격 캐시 갱신.
WebSocket vs REST:
REST : 요청마다 0.5s 딜레이(API 제한) + 응답 대기 → 보유 3종목 ≈ 3초 낭비
WebSocket: 연결 1회 + push 수신 → 체결 즉시 캐시 갱신, API 카운트 무관
KIS 공식 스펙:
TR_ID : H0STCNT0 (국내주식 실시간체결가)
실전 URL : ws://ops.koreainvestment.com:21000 (KIS_WS_URL_REAL, env/DB 변경 가능)
모의 URL : ws://ops.koreainvestment.com:31000 (KIS_WS_URL_MOCK, env/DB 변경 가능)
세션 구독 한도: 최대 41종목 (MAX_STOCKS=3~4 수준에서 문제 없음)
KIS 2026-02-24 경고 준수 (무한 재연결 차단 정책):
- 재연결 횟수 제한: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 자동 대기
- 최대 총 재연결 횟수: MAX_RECONNECT_ATTEMPTS 회 초과 시 WebSocket 종료 → REST fallback
- 지수 백오프: 5초 → 10초 → 20초 ... 최대 300초
모의투자(VTS):
KIS 모의투자는 WebSocket을 지원하지 않는 경우가 많음.
연결 실패 시 is_active=False → check_sell_signals()가 REST로 자동 fallback.
KIS_WS_MOCK_ENABLED=true (env/DB) 로 강제 활성화 가능.
설치 필요:
pip install websocket-client
"""
from __future__ import annotations
import json
import logging
import queue
import threading
import time
from pathlib import Path
from typing import Dict, Optional, Set
import requests
logger = logging.getLogger("KISWebSocket")
# ------------------------------------------------------------------
# 모듈 수준에서 get_env 함수를 참조 (kis_long_ver1 공용 함수 재사용)
# 이 파일이 단독으로도 동작할 수 있도록 fallback import 포함
# ------------------------------------------------------------------
try:
from kis_long_ver1 import get_env_from_db, get_env_int, get_env_bool
except ImportError:
try:
from kis_short_ver2 import get_env_from_db, get_env_int, get_env_bool
except ImportError:
# 모듈 임포트 전 단계에서는 기본값만 사용
def get_env_from_db(key, default=""): # type: ignore[misc]
return default
def get_env_int(key, default): # type: ignore[misc]
return default
def get_env_bool(key, default=False): # type: ignore[misc]
return default
class KISWebSocketPriceCache:
"""
KIS H0STCNT0 실시간 체결가 WebSocket 수신기.
사용법:
ws_cache = KISWebSocketPriceCache(app_key, app_secret, is_mock=False)
ok = ws_cache.start() # 백그라운드 스레드 시작
ws_cache.subscribe("005930") # 종목 구독
data = ws_cache.get_price("005930") # inquire_price 호환 dict 반환
ws_cache.stop()
get_price() 반환 값이 None 이면 → REST inquire_price() 로 fallback
"""
# H0STCNT0 데이터 필드 인덱스 ('^' 구분)
# KIS H0STCNT0 국내주식 실시간체결 전문 순서 (0-based)
IDX_CODE = 0 # MKSC_SHRN_ISCD: 유가증권 단축 종목코드
IDX_TIME = 1 # STCK_CNTG_HOUR: 체결 시간
IDX_PRICE = 2 # STCK_PRPR: 주식 현재가 (체결가)
IDX_SIGN = 3 # PRDY_VRSS_SIGN: 전일 대비 부호
IDX_CHANGE = 4 # PRDY_VRSS: 전일 대비
IDX_CHGPCT = 5 # PRDY_CTRT: 전일 대비율
# 당일 시고저 (매수 체크 시 REST inquire_price 대체용 → API 과부하 방지)
IDX_OPEN = 7 # STCK_OPRC: 주식 시가 (당일)
IDX_HIGH = 8 # STCK_HGPR: 주식 고가 (당일)
IDX_LOW = 9 # STCK_LWPR: 주식 저가 (당일)
IDX_VOLUME = 11 # ACML_VOL: 누적 거래량 (참고용)
# 재연결 정책 (KIS 2026-02-24 경고 준수)
MAX_RECONNECT_ATTEMPTS = 10 # 총 재연결 최대 횟수 (STABLE_CONN_RESET_SEC 이상 안정 연결 후 끊기면 초기화)
MAX_RECONNECTS_PER_HOUR = 6 # 1시간 내 재연결 허용 횟수
RECONNECT_BASE_DELAY_SEC = 5.0 # 초기 재연결 대기(초)
RECONNECT_MAX_DELAY_SEC = 300.0 # 최대 재연결 대기(초)
# 이 시간(초) 이상 안정적으로 연결이 유지됐다가 끊기면 카운터를 초기화.
# 예: 5분 이상 정상 운영 후 네트워크 일시 장애 → '버스트 차단'이 아닌 '정상 재연결'로 간주.
STABLE_CONN_RESET_SEC = 300.0 # 5분
# Approval key 유효시간 (23시간, KIS REST 토큰과 별개)
APPROVAL_KEY_CACHE_SEC = 82800
def __init__(self, app_key: str, app_secret: str, is_mock: bool = True):
self.app_key = app_key
self.app_secret = app_secret
self.is_mock = is_mock
# WebSocket URL (env/DB 로 재정의 가능 → 연결 실패 시 사용자가 수정)
_default_real = "ws://ops.koreainvestment.com:21000"
_default_mock = "ws://ops.koreainvestment.com:31000"
self._ws_url = (
get_env_from_db("KIS_WS_URL_MOCK", _default_mock)
if is_mock
else get_env_from_db("KIS_WS_URL_REAL", _default_real)
)
self._base_url = (
"https://openapivts.koreainvestment.com:29443"
if is_mock
else "https://openapi.koreainvestment.com:9443"
)
# ── 가격 캐시 ──────────────────────────────────────────────
# { code: {"data": dict, "ts": float} }
self._cache: Dict[str, Dict] = {}
self._cache_lock = threading.Lock()
# ── 구독 목록 ──────────────────────────────────────────────
self._subscribed: Set[str] = set()
self._sub_lock = threading.Lock()
# ── 영구 구독 목록 (홀딩 관심종목 등) ──────────────────────
# unsubscribe() 호출에도 해제되지 않는 고정 구독 코드
self._permanent_codes: Set[str] = set()
self._load_permanent_watchlist()
# ── WebSocket 관련 ─────────────────────────────────────────
self._ws = None # 현재 WebSocketApp 인스턴스
self._ws_thread: Optional[threading.Thread] = None
self._running = False
self._connected = False
# ── approval_key (REST 토큰과 별개, WebSocket 전용 인증) ───
self._approval_key: Optional[str] = None
self._approval_key_ts: float = 0.0
# ── 재연결 관리 ────────────────────────────────────────────
self._reconnect_count = 0
self._reconnect_times: list = [] # 최근 재연결 타임스탬프 목록
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
self._last_connect_time: float = 0.0 # 마지막 연결 성공 시각 (안정 연결 판단용)
# ── CandleAggregator (스캘핑봇 연동 시 외부에서 주입) ─────
# attach_candle_aggregator(agg) 로 연결, None이면 봉 집계 비활성
self._candle_agg: Optional["CandleAggregator"] = None
# ── WS 연결 성공 시 갭보정 콜백 ───────────────────────────
# set_on_connected_callback(fn) 으로 등록.
# 연결 성공(_on_open) 후 별도 스레드에서 호출됨.
# 장 시간일 때만 실행 (새벽 재연결 시 API 빈 응답 방지)
self._on_connected_callback: Optional[callable] = None
# ── websocket-client 사용 가능 여부 ───────────────────────
try:
import websocket as _ws_lib
self._ws_lib = _ws_lib
self._available = True
except ImportError:
self._ws_lib = None
self._available = False
logger.warning(
"⚠️ websocket-client 미설치 → WebSocket 실시간 가격 비활성.\n"
" pip install websocket-client 를 실행하세요."
)
# ==================================================================
# Public API
# ==================================================================
def attach_candle_aggregator(self, agg: "CandleAggregator") -> None:
"""
CandleAggregator 를 연결합니다.
연결 후부터 틱 수신 시 자동으로 agg.on_tick() 이 호출됩니다.
kis_scalping_ver1.py 에서 ws_cache.attach_candle_aggregator(agg) 로 호출.
"""
self._candle_agg = agg
logger.info("✅ CandleAggregator 연결 완료 (봉 집계 활성화)")
def set_on_connected_callback(self, fn) -> None:
"""
WS 연결 성공(_on_open) 시 호출할 콜백 등록.
ScalpingBotV1._fill_all_gaps 를 넘겨서 매 연결 시 갭보정 자동 실행.
장 시간일 때만 콜백을 실행 (새벽 자동재연결 시 API 빈 응답 방지).
"""
self._on_connected_callback = fn
def start(self, force_cleanup: bool = True) -> bool:
"""
WebSocket 수신 백그라운드 스레드를 시작합니다.
성공 시 True, 사용 불가(모의/패키지 미설치/키 없음) 시 False.
False 를 반환해도 봇은 REST fallback으로 정상 동작합니다.
"""
if not self._available:
return False
# 모의투자는 기본 비활성 (KIS 모의 서버 WebSocket 미지원 가능성)
# KIS_WS_MOCK_ENABLED=true 로 강제 활성화 가능
if self.is_mock and not get_env_bool("KIS_WS_MOCK_ENABLED", False):
logger.info(" 모의투자: WebSocket 기본 비활성 (KIS_WS_MOCK_ENABLED=true 로 활성 가능)")
return False
if self._running:
return True
# ── [CRITICAL] 비정상 종료 후 재시작 대비: 이전 세션 강제 정리 ──────────────
# force_cleanup=True 시:
# 1. approval_key 새로 발급 (이전 세션 무효화)
# 2. _subscribed 세트 비우기 (메모리 정리)
# 3. _cache 비우기 (오래된 데이터 제거)
# KIS 서버는 연결이 끊기면 5~10 분 내 자동 구독 해제되므로,
# 새 approval_key 로 새 세션을 열면 이전 구독은 자동 소멸.
if force_cleanup:
logger.info("🧹 WebSocket 세션 초기화 (비정상 종료 대비)")
self._approval_key = None # 이전 approval_key 무효화
self._approval_key_ts = 0.0
with self._sub_lock:
self._subscribed.clear() # 구독 목록 초기화
with self._cache_lock:
self._cache.clear() # 캐시 초기화
logger.info("✅ WebSocket 세션 초기화 완료 (구독/캐시 리셋)")
# approval_key 발급 (연결 전 확인)
if not self._get_approval_key():
logger.warning("⚠️ WebSocket approval_key 발급 실패 → REST fallback 모드")
return False
self._running = True
self._ws_thread = threading.Thread(
target=self._run_ws_loop, daemon=True, name="KIS-WS-H0STCNT0"
)
self._ws_thread.start()
logger.info(
"✅ KIS WebSocket 수신 스레드 시작 (H0STCNT0 | url=%s)", self._ws_url
)
return True
def stop(self, clear_subscriptions: bool = False) -> None:
"""
WebSocket 수신 중단 및 스레드 종료.
Args:
clear_subscriptions: True 시 모든 종목 구독 해제 후 종료
(봇 정상 종료 시 KIS 서버 정리용)
"""
# 옵션: 모든 구독 해제 (KIS 서버 정리용)
if clear_subscriptions and self._connected and self._ws:
with self._sub_lock:
codes = list(self._subscribed)
for code in codes:
self._send_sub_msg(code, subscribe=False)
with self._sub_lock:
self._subscribed.discard(code)
with self._cache_lock:
self._cache.pop(code, None)
logger.info("📡 WebSocket 구독 해제: %s", code)
logger.info("✅ 모든 WebSocket 구독 정리 완료 (%d종목)", len(codes))
self._running = False
self._connected = False
if self._ws:
try:
self._ws.close()
except Exception:
pass
if self._ws_thread and self._ws_thread.is_alive():
self._ws_thread.join(timeout=5)
logger.info("🛑 KIS WebSocket 종료")
# KIS WebSocket 세션 당 구독 가능 최대 종목 수
# 초과 시 서버에서 오류 반환 또는 계정 일시 차단 가능 (KIS 공지 준수)
MAX_SUBSCRIPTIONS = 41
def _load_permanent_watchlist(self) -> None:
"""
long_term_watchlist.json 에 있는 홀딩 관심종목을 영구 구독 목록으로 로드.
WS 연결 시 자동으로 subscribe(), unsubscribe() 호출 시에도 해제하지 않음.
"""
try:
wl_path = Path(__file__).resolve().parent / "long_term_watchlist.json"
if not wl_path.exists():
return
items = json.loads(wl_path.read_text(encoding="utf-8")).get("items", [])
codes = [i["code"] for i in items if i.get("code")]
self._permanent_codes = set(codes)
if codes:
logger.info(
"📌 홀딩 영구 구독 목록 로드: %s (%d종목)",
", ".join(codes), len(codes),
)
except Exception as e:
logger.warning("영구 구독 목록 로드 실패: %s", e)
def subscribe(self, code: str) -> None:
"""
실시간 체결가 구독 등록.
이미 연결 중이면 즉시 구독 메시지 전송, 연결 전이면 연결 성공 시 일괄 등록.
KIS 세션 한도(MAX_SUBSCRIPTIONS=41) 초과 시 등록 거부 후 경고 로그 출력.
"""
code = (code or "").strip()
if not code:
return
with self._sub_lock:
if code in self._subscribed:
return # 이미 구독 중 → 중복 전송 방지
if len(self._subscribed) >= self.MAX_SUBSCRIPTIONS:
logger.warning(
"⚠️ WebSocket 구독 한도 초과(%d/%d) → %s 구독 거부 "
"(KIS 세션 한도 준수: 불필요 종목 구독해제 후 재시도)",
len(self._subscribed), self.MAX_SUBSCRIPTIONS, code,
)
return
self._subscribed.add(code)
if self._connected and self._ws:
self._send_sub_msg(code, subscribe=True)
logger.info("📡 WebSocket 구독 추가: %s (%d/%d)", code, len(self._subscribed), self.MAX_SUBSCRIPTIONS)
def unsubscribe(self, code: str) -> None:
"""
실시간 체결가 구독 해제 및 캐시 삭제.
단, long_term_watchlist.json 의 영구 구독 종목은 해제하지 않음.
"""
code = (code or "").strip()
if code in self._permanent_codes:
logger.debug("📌 영구 구독 종목 해제 요청 무시: %s (홀딩 관심종목)", code)
return
with self._sub_lock:
self._subscribed.discard(code)
with self._cache_lock:
self._cache.pop(code, None)
if self._connected and self._ws:
self._send_sub_msg(code, subscribe=False)
logger.info("📡 WebSocket 구독 해제: %s", code)
def get_price(self, code: str, max_age_sec: float = 5.0) -> Optional[Dict]:
"""
WebSocket 캐시에서 실시간 가격 + 당일 시고저를 꺼냅니다.
max_age_sec 초 이내 수신된 체결 틱만 유효하게 취급합니다.
반환 형식: {
"stck_prpr": "73900", # 현재가 (체결가, REST inquire_price와 동일)
"prdy_vrss": "200", # 전일 대비
"prdy_ctrt": "0.27", # 전일 대비율
"stck_oprc": "73000", # 당일 시가 (REST와 동일)
"stck_hgpr": "74500", # 당일 고가 (REST와 동일)
"stck_lwpr": "72800", # 당일 저가 (REST와 동일)
}
반환 None: WebSocket 미연결 / 데이터 없음 / max_age_sec 초 초과 → REST fallback 권장
※ WS 캐시 정확도: H0STCNT0 체결 틱마다 KIS 서버가 시고저를 함께 전송
→ 장중 REST inquire_price와 동일하며 지연이 없음 (REST 왕복 100~300ms 생략)
"""
if not self._available or not self._connected:
return None
with self._cache_lock:
entry = self._cache.get(code)
if not entry:
return None
age = time.time() - entry.get("ts", 0)
if age > max_age_sec:
# 데이터가 너무 오래됨 → REST로 재확인
return None
return entry.get("data")
@property
def is_active(self) -> bool:
"""WebSocket이 연결되어 실시간 데이터를 수신 중이면 True."""
return bool(self._available and self._connected and self._running)
# ==================================================================
# 내부 메서드
# ==================================================================
def _get_approval_key(self) -> Optional[str]:
"""
WebSocket 전용 approval_key 발급.
REST 액세스 토큰과 완전히 별개 (endpoint: /oauth2/Approval).
"""
now = time.time()
if self._approval_key and (now - self._approval_key_ts) < self.APPROVAL_KEY_CACHE_SEC:
return self._approval_key
try:
url = f"{self._base_url}/oauth2/Approval"
body = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"secretkey": self.app_secret,
}
r = requests.post(url, json=body, timeout=10)
data = r.json()
key = data.get("approval_key")
if key:
self._approval_key = key
self._approval_key_ts = now
logger.info(
"✅ WebSocket approval_key 발급 완료 (앞8자: %s…)", key[:8]
)
return key
logger.error("❌ WebSocket approval_key 발급 실패: %s", data)
except Exception as e:
logger.error("❌ WebSocket approval_key 요청 예외: %s", e)
return None
def _build_sub_payload(self, code: str, subscribe: bool) -> str:
"""구독(tr_type=1) / 해제(tr_type=2) JSON 메시지 생성."""
return json.dumps({
"header": {
"approval_key": self._approval_key or "",
"custtype": "P",
"tr_type": "1" if subscribe else "2",
"content-type": "utf-8",
},
"body": {
"input": {
"tr_id": "H0STCNT0",
"tr_key": code,
}
},
})
def _send_sub_msg(self, code: str, subscribe: bool = True) -> None:
"""WebSocket으로 구독/해제 메시지 전송. 실패 시 조용히 무시."""
if not self._ws:
return
try:
self._ws.send(self._build_sub_payload(code, subscribe))
except Exception as e:
logger.debug("구독 메시지 전송 실패(%s): %s", code, e)
def _parse_realtime_msg(self, raw: str) -> None:
"""
H0STCNT0 실시간 체결가 메시지 파싱 및 캐시 갱신.
정상 포맷: "0|H0STCNT0|001|005930^082317^73900^5^200^0.27^..."
parts[0]: 암호화구분 (0=평문, 1=암호화)
parts[1]: TR_ID
parts[2]: 건수
parts[3]: 데이터 ('^' 구분)
KIS PINGPONG: 메시지가 "PINGPONG" 문자열 → 동일하게 echoing.
JSON 응답(구독 확인/에러): {"header":{...},"body":{...}} → 무시.
"""
if not raw:
return
# ── KIS Application-Level PINGPONG ─────────────────────────
if raw.strip() == "PINGPONG":
if self._ws:
try:
self._ws.send("PINGPONG")
except Exception:
pass
return
# ── 구독 응답(JSON) 무시 ────────────────────────────────────
if raw.startswith("{"):
try:
j = json.loads(raw)
header = j.get("header", {})
if header.get("tr_id") == "H0STCNT0":
body = j.get("body", {})
rt = body.get("rt_cd", "")
msg = body.get("msg1", "")
logger.debug("H0STCNT0 구독 응답: rt_cd=%s msg=%s", rt, msg)
except Exception:
pass
return
# ── 실시간 데이터 파싱 ──────────────────────────────────────
parts = raw.split("|")
if len(parts) < 4:
return
# parts[0]=암호화구분, parts[1]=TR_ID, parts[2]=건수, parts[3]=데이터
if parts[1] != "H0STCNT0":
return
# 암호화된 데이터는 아직 미지원 (평문만 처리)
if parts[0] == "1":
logger.debug("H0STCNT0 암호화 데이터 수신 (처리 스킵) → REST fallback 권장")
return
# 한 메시지에 여러 건이 포함될 수 있음 (parts[2] = 건수)
# 단순히 parts[3] 전체를 파싱 (단건 기준)
fields = parts[3].split("^")
if len(fields) <= max(self.IDX_PRICE, self.IDX_CHGPCT):
return
try:
code = fields[self.IDX_CODE].strip()
price = float(fields[self.IDX_PRICE])
if not code or price <= 0:
return
chg_raw = fields[self.IDX_CHANGE] if len(fields) > self.IDX_CHANGE else "0"
pct_raw = fields[self.IDX_CHGPCT] if len(fields) > self.IDX_CHGPCT else "0.00"
# 당일 시고저: REST inquire_price 대체용 (매수 체크 시 API 과부하 방지)
open_raw = fields[self.IDX_OPEN] if len(fields) > self.IDX_OPEN else "0"
high_raw = fields[self.IDX_HIGH] if len(fields) > self.IDX_HIGH else "0"
low_raw = fields[self.IDX_LOW] if len(fields) > self.IDX_LOW else "0"
# inquire_price output 딕셔너리와 키 이름을 맞춤
# stck_oprc/hgpr/lwpr 도 함께 저장 → kis_short_ver2 매수 체크에서 REST 없이 활용
data_compat = {
"stck_prpr": str(int(price)), # 현재가 (int 문자열)
"prdy_vrss": chg_raw, # 전일 대비
"prdy_ctrt": pct_raw, # 전일 대비율
"stck_oprc": open_raw, # 당일 시가
"stck_hgpr": high_raw, # 당일 고가
"stck_lwpr": low_raw, # 당일 저가
}
with self._cache_lock:
self._cache[code] = {"data": data_compat, "ts": time.time()}
# ── CandleAggregator 연동: 틱 → 봉 집계 (스캘핑봇 전용, 연결 시 활성화)
if self._candle_agg is not None:
tick_time = fields[self.IDX_TIME].strip() if len(fields) > self.IDX_TIME else ""
vol_raw = fields[self.IDX_VOLUME].strip() if len(fields) > self.IDX_VOLUME else "0"
try:
tick_vol = int(vol_raw)
except ValueError:
tick_vol = 0
self._candle_agg.on_tick(code, price, tick_vol, tick_time)
logger.debug("H0STCNT0 수신: %s%s", code, int(price))
except (ValueError, IndexError) as e:
logger.debug("H0STCNT0 파싱 오류: %s | raw=%s", e, raw[:80])
# ==================================================================
# WebSocket 루프 (백그라운드 스레드)
# ==================================================================
def _is_market_hours(self) -> bool:
"""
KIS WebSocket 서비스 시간 여부.
- 실전투자: 08:25~ (장전 호가 포함)
- 모의투자: 09:00~ (모의 WS 서버가 09:00 이전 연결 즉시 거부)
False 반환 시 재연결 시도 없이 다음 장 시작까지 대기.
"""
import datetime as _dt
now = _dt.datetime.now()
if now.weekday() >= 5: # 토·일
return False
# 모의투자는 09:00, 실전투자는 08:25 부터 서버 응답
open_h, open_m = (9, 0) if self.is_mock else (8, 25)
return _dt.time(open_h, open_m) <= now.time() <= _dt.time(16, 5)
def _seconds_until_market_open(self) -> float:
"""다음 장 시작까지 남은 초. 최소 60초 반환.
- 실전투자: 08:25 기준
- 모의투자: 09:00 기준
"""
import datetime as _dt
now = _dt.datetime.now()
open_h, open_m = (9, 0) if self.is_mock else (8, 25)
target = now.replace(hour=open_h, minute=open_m, second=0, microsecond=0)
if now.time() >= _dt.time(16, 5):
# 오늘 장 마감 → 내일 장 시작
target += _dt.timedelta(days=1)
# 주말 건너뛰기
while target.weekday() >= 5:
target += _dt.timedelta(days=1)
return max(60.0, (target - now).total_seconds())
def _run_ws_loop(self) -> None:
"""
재연결 정책을 포함한 WebSocket 메인 루프.
run_forever() 가 반환(연결 끊김)되면 지수 백오프 후 재연결 시도.
KIS 정책: 1시간 내 MAX_RECONNECTS_PER_HOUR 회 초과 시 강제 대기.
장외 시간(16:05~08:25, 주말): 재연결 없이 다음 장까지 대기.
"""
# 즉시 끊김(장외 서버 거부) 감지: INSTANT_DROP_SEC 이내 끊김이 N회 연속이면 장외 슬립
INSTANT_DROP_SEC = 3.0 # 연결 후 이 초 이내에 끊기면 "즉시 종료"로 판정
INSTANT_DROP_MAX = 3 # 연속 N회 즉시 종료 → 장외 슬립으로 전환
_instant_drop_streak = 0 # 연속 즉시 종료 카운터
while self._running:
now = time.time()
# ── 장외 시간이면 다음 장 시작까지 대기 ─────────────────────
if not self._is_market_hours():
wait_sec = self._seconds_until_market_open()
logger.info(
"🌙 장외 시간 — WebSocket 재연결 중지, 다음 장 시작까지 %.0f분 대기",
wait_sec / 60,
)
# 재연결 카운터·백오프 초기화 (장 시작 시 깨끗하게 재접속)
self._reconnect_count = 0
self._reconnect_times = []
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
_instant_drop_streak = 0
# 60초 단위로 쪼개서 슬립 (stop() 신호 빠르게 감지)
for _ in range(int(wait_sec // 60)):
if not self._running:
return
time.sleep(60)
time.sleep(wait_sec % 60)
continue
# ── 연속 즉시 종료 감지 → 장외 서버 거부로 간주 ─────────────
# 연결 직후 INSTANT_DROP_SEC 이내에 끊기는 패턴이 N회 반복되면
# 서버가 장외이거나 앱키가 차단된 것으로 간주하고 긴 슬립 진입
if _instant_drop_streak >= INSTANT_DROP_MAX:
wait_sec = self._seconds_until_market_open()
logger.warning(
"⚠️ WebSocket 연속 즉시 끊김 %d회 감지 (장외 서버 거부 추정) "
"%.0f분 대기 후 재시도 (KIS 차단 방지)",
_instant_drop_streak, wait_sec / 60,
)
self._reconnect_count = 0
self._reconnect_times = []
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
_instant_drop_streak = 0
for _ in range(int(wait_sec // 60)):
if not self._running:
return
time.sleep(60)
time.sleep(wait_sec % 60)
continue
# ── 안정 연결 후 끊김이면 재연결 카운터 초기화 ───────────
# STABLE_CONN_RESET_SEC(5분) 이상 정상 운영 후 끊겼다면
# 이전 재연결 이력을 소거하여 'KIS 정상 케이스'로 처리.
# (수개월 운영 중 산발적 네트워크 단절에 의해 10회 소진 → 영구 비활성화 방지)
if (
self._last_connect_time > 0
and (now - self._last_connect_time) > self.STABLE_CONN_RESET_SEC
and self._reconnect_count > 0
):
logger.info(
"♻️ WebSocket 안정 운영(%.0f분) 후 끊김 → 재연결 카운터 초기화 (%d→0)",
(now - self._last_connect_time) / 60,
self._reconnect_count,
)
self._reconnect_count = 0
self._reconnect_times = []
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC
# ── 1시간 내 재연결 횟수 점검 ────────────────────────────
self._reconnect_times = [t for t in self._reconnect_times if now - t < 3600]
if len(self._reconnect_times) >= self.MAX_RECONNECTS_PER_HOUR:
# 가장 오래된 재연결로부터 1시간이 지날 때까지 대기
wait = max(10, 3600 - (now - self._reconnect_times[0]) + 10)
logger.warning(
"⛔ 1시간 내 WebSocket 재연결 %d회 초과 → %.0f초(%.0f분) 강제 대기 (KIS 차단 방지)",
self.MAX_RECONNECTS_PER_HOUR, wait, wait / 60,
)
time.sleep(wait)
continue
# ── 총 재연결 횟수 초과 → REST fallback 전환 ─────────────
if self._reconnect_count >= self.MAX_RECONNECT_ATTEMPTS:
logger.error(
"❌ WebSocket 최대 재연결 %d회 초과 → WebSocket 종료, REST fallback 전환",
self.MAX_RECONNECT_ATTEMPTS,
)
self._running = False
break
# ── approval_key 갱신 ─────────────────────────────────────
approval_key = self._get_approval_key()
if not approval_key:
logger.warning(
"WebSocket approval_key 발급 불가 → %ds 대기 후 재시도",
int(self._reconnect_delay),
)
time.sleep(self._reconnect_delay)
self._reconnect_delay = min(
self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC
)
continue
# ── 재연결 카운트 및 대기 ─────────────────────────────────
self._reconnect_count += 1
if self._reconnect_count > 1:
self._reconnect_times.append(time.time())
logger.info(
"🔄 WebSocket 재연결 시도 %d/%d (대기 %ds 완료)",
self._reconnect_count,
self.MAX_RECONNECT_ATTEMPTS,
int(self._reconnect_delay),
)
time.sleep(self._reconnect_delay)
self._reconnect_delay = min(
self._reconnect_delay * 2, self.RECONNECT_MAX_DELAY_SEC
)
# ── WebSocketApp 생성 및 실행 ─────────────────────────────
_conn_start = time.time() # 연결 시작 시각 (즉시 종료 감지용)
try:
ws_app = self._ws_lib.WebSocketApp(
self._ws_url,
on_open = self._on_open,
on_message = self._on_message,
on_error = self._on_error,
on_close = self._on_close,
)
self._ws = ws_app
# ping_interval: WebSocket 프로토콜 레벨 PING (연결 유지)
# KIS Application-Level PINGPONG은 _parse_realtime_msg 에서 처리
ws_app.run_forever(ping_interval=20, ping_timeout=10)
except Exception as e:
logger.error("WebSocket run_forever 예외: %s", e)
if not self._running:
break
# ── 즉시 종료 여부 판정 (장외 서버 거부 감지) ────────────────
_conn_duration = time.time() - _conn_start
if _conn_duration < INSTANT_DROP_SEC:
_instant_drop_streak += 1
logger.debug(
"⚡ WebSocket 즉시 종료 감지 (%.1f초, streak=%d/%d)",
_conn_duration, _instant_drop_streak, INSTANT_DROP_MAX,
)
else:
# 어느 정도 연결 유지됐다면 즉시 종료 streak 초기화
_instant_drop_streak = 0
logger.info("KIS WebSocket 루프 종료 (is_active=False, REST fallback 전환)")
# ------------------------------------------------------------------
# WebSocket 콜백
# ------------------------------------------------------------------
def _on_open(self, ws) -> None:
"""연결 성공: 등록된 모든 종목 구독 요청 전송."""
self._connected = True
self._reconnect_delay = self.RECONNECT_BASE_DELAY_SEC # 대기 시간 리셋
self._last_connect_time = time.time() # 안정 연결 판단용 타임스탬프 기록
logger.info("✅ KIS WebSocket 연결 성공 (H0STCNT0 | url=%s)", self._ws_url)
# 영구 구독 목록(홀딩 관심종목) 먼저 구독 등록
for code in sorted(self._permanent_codes):
with self._sub_lock:
if code not in self._subscribed:
if len(self._subscribed) < self.MAX_SUBSCRIPTIONS:
self._subscribed.add(code)
else:
logger.warning("⚠️ 구독 한도로 영구구독 추가 불가: %s", code)
continue
self._send_sub_msg(code, subscribe=True)
if self._permanent_codes:
logger.info("📌 영구 구독 등록 완료: %d종목", len(self._permanent_codes))
# 재연결 시에도 구독 목록 재등록
with self._sub_lock:
codes = set(self._subscribed)
for code in sorted(codes):
self._send_sub_msg(code, subscribe=True)
if codes:
logger.info("📡 WebSocket 구독 일괄 등록: %s", ", ".join(sorted(codes)))
# ── 연결 성공 시 갭보정 콜백 (장 시간일 때만) ───────────────
# 봇 시작 후 처음 WS가 안정 연결되는 시점(9:00 이후)에
# REST 분봉 데이터로 DB를 채워 '봉부족' 문제 해소.
# 새벽 재연결 시에는 is_market_hours() False → 콜백 스킵.
if self._on_connected_callback and self._is_market_hours():
try:
cb_thread = threading.Thread(
target=self._on_connected_callback,
name="WS-GapFill",
daemon=True,
)
cb_thread.start()
except Exception as e:
logger.debug("갭보정 콜백 실행 실패: %s", e)
def _on_message(self, ws, message: str) -> None:
self._parse_realtime_msg(message)
def _on_error(self, ws, error) -> None:
self._connected = False
logger.warning("⚠️ KIS WebSocket 오류: %s", error)
def _on_close(self, ws, close_status_code, close_msg) -> None:
self._connected = False
logger.info(
"🔌 KIS WebSocket 연결 종료 (code=%s msg=%s)",
close_status_code, close_msg or "",
)
# ======================================================================
# ======================================================================
# CandleAggregator — WebSocket 틱 → OHLCV 봉 실시간 집계기
# ======================================================================
class CandleAggregator:
"""
KISWebSocketPriceCache 에서 수신한 틱을 N분봉으로 집계합니다.
Two-Track 아키텍처 (Gemini/퀀트 펌 방식)
─────────────────────────────────────────
[트랙 1 — 매매 두뇌 (논블로킹)]
WebSocket 틱 → on_tick() → RAM에서 OHLCV 즉시 갱신
→ 매수/매도 판단은 get_latest_confirmed() 등 메모리 접근만 사용
→ DB 대기 시간 0ms, 타점 놓침 없음
[트랙 2 — 기록원 스레드 (백그라운드)]
봉 확정 시 dict를 Queue에 put_nowait() (논블로킹, 0.000001초)
→ 백그라운드 스레드(_db_writer)가 BATCH_SIZE개 or FLUSH_INTERVAL초마다
DB에 executemany() 한 방에 묶어서 INSERT
→ 백테스트용 봉 데이터 완전 보존, 매매 루프 블로킹 없음
진행 중 봉(is_confirmed=0) 처리
─────────────────────────────────
- RAM의 _current 버퍼에만 존재 → DB에 절대 쓰지 않음
- 매수 루프는 get_current_candle() 로 즉시 메모리 접근
- 봉 확정(분 바뀜) 순간에만 Queue → DB 기록
RSI 계산
────────
- 확정 봉 close 리스트(_closes, 최대 MAX_CLOSE_BUFFER개) RAM에 유지
- RSI(2/3/5) 계산은 순수 Python 연산, DB 조회 없음
- 봉 수 부족 시 RSI=None → 매수 루프에서 신호 무시
스레드 안전성
─────────────
- _lock : on_tick / fill_gap 간 경합 방지 (RAM 버퍼 보호)
- Queue : thread-safe, put_nowait 는 lock 불필요
- _db_writer : 독립 daemon 스레드 (봇 종료 시 자동 소멸)
"""
MAX_CLOSE_BUFFER = 200 # RSI 계산용 close 보관 최대 개수
BATCH_SIZE = 50 # 이 개수 이상 쌓이면 즉시 배치 플러시
FLUSH_INTERVAL = 2.0 # 초 — BATCH_SIZE 미달이라도 이 주기로 플러시
def __init__(self, db=None, timeframes: list = None):
"""
Args:
db : TradeDB 인스턴스. None이면 DB 쓰기 비활성(순수 RAM 모드).
timeframes: 집계할 봉 단위 리스트 (기본 [1, 3])
"""
self.db = db
self.timeframes: list = timeframes if timeframes else [1, 3]
self._lock = threading.Lock()
# ── 트랙 1: RAM 버퍼 ─────────────────────────────────────
# 진행 중인 봉: { (code, tf): {candle_time, open, high, low, close, volume} }
self._current: Dict[tuple, dict] = {}
# RSI 계산용 확정 봉 close 가격 히스토리: { (code, tf): [c1, c2, ...] }
self._closes: Dict[tuple, list] = {}
# 최근 확정 봉 보관 (get_latest_confirmed / get_candles 용): { (code, tf): [dict, ...] }
self._confirmed: Dict[tuple, list] = {}
# ── 트랙 2: 비동기 DB 배치 큐 ────────────────────────────
# 봉 확정 시 dict를 여기 넣으면 _db_writer 가 배치로 INSERT
self._write_queue: queue.Queue = queue.Queue(maxsize=10000)
self._db_writer_thread: Optional[threading.Thread] = None
if self.db is not None:
self._start_db_writer()
logger.info("✅ CandleAggregator 초기화 완료 (timeframes=%s, db=%s)",
self.timeframes, "활성" if self.db else "비활성(RAM 전용)")
# ------------------------------------------------------------------
# 트랙 2: 백그라운드 DB 배치 기록원
# ------------------------------------------------------------------
def _start_db_writer(self) -> None:
"""백그라운드 DB 배치 기록 스레드를 시작합니다."""
self._db_writer_thread = threading.Thread(
target=self._db_writer_loop,
name="CandleDBWriter",
daemon=True, # 봇 종료 시 자동 소멸
)
self._db_writer_thread.start()
logger.info("✅ CandleAggregator DB 기록원 스레드 시작 (배치=%d, 주기=%.1fs)",
self.BATCH_SIZE, self.FLUSH_INTERVAL)
def _db_writer_loop(self) -> None:
"""
배치 기록 루프.
- Queue에서 BATCH_SIZE개 모이면 즉시 배치 INSERT
- BATCH_SIZE 미달이라도 FLUSH_INTERVAL초마다 플러시
"""
batch: list = []
last_flush = time.time()
while True:
try:
# FLUSH_INTERVAL 내에서 최대한 많이 모아 배치 구성
timeout = max(0.1, self.FLUSH_INTERVAL - (time.time() - last_flush))
item = self._write_queue.get(timeout=timeout)
if item is None:
# None = 종료 신호
break
batch.append(item)
self._write_queue.task_done()
except queue.Empty:
pass
now = time.time()
should_flush = len(batch) >= self.BATCH_SIZE or \
(batch and (now - last_flush) >= self.FLUSH_INTERVAL)
if should_flush and batch:
self._flush_batch(batch)
batch = []
last_flush = now
# 루프 종료 시 남은 배치 처리
if batch:
self._flush_batch(batch)
logger.info("CandleDBWriter 스레드 종료")
def _flush_batch(self, batch: list) -> None:
"""
배치 리스트를 DB에 한 번의 executemany 로 INSERT.
각 item: {code, tf, candle_time, open, high, low, close, volume,
is_confirmed, source, rsi_2, rsi_3, rsi_5}
"""
if not self.db or not batch:
return
try:
import datetime as _dt
now_str = _dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
rows = []
for item in batch:
rows.append((
item["code"],
item["tf"],
item["candle_time"],
item["open"],
item["high"],
item["low"],
item["close"],
item.get("volume", 0),
item.get("rsi_2"),
item.get("rsi_3"),
item.get("rsi_5"),
item.get("is_confirmed", 1),
item.get("source", "ws"),
now_str,
))
# ws_candles 테이블이 존재하면 배치 INSERT (없으면 조용히 skip)
self.db.conn.execute(
"""
INSERT INTO ws_candles
(code, timeframe, candle_time, `open`, high, low, close,
volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
`open`=VALUES(`open`), high=VALUES(high), low=VALUES(low),
close=VALUES(close), volume=VALUES(volume),
rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5),
is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at)
""",
rows[0], # 단건 execute (pymysql executemany 는 첫 번째 인수만 받음)
) if len(rows) == 1 else self._executemany_batch(rows)
logger.debug("💾 [배치저장] %d봉 → DB", len(rows))
except Exception as e:
logger.debug("_flush_batch 실패 (무시): %s", e)
def _executemany_batch(self, rows: list) -> None:
"""여러 봉을 executemany 로 한 번에 INSERT."""
sql = """
INSERT INTO ws_candles
(code, timeframe, candle_time, `open`, high, low, close,
volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
`open`=VALUES(`open`), high=VALUES(high), low=VALUES(low),
close=VALUES(close), volume=VALUES(volume),
rsi_2=VALUES(rsi_2), rsi_3=VALUES(rsi_3), rsi_5=VALUES(rsi_5),
is_confirmed=VALUES(is_confirmed), updated_at=VALUES(updated_at)
"""
# pymysql executemany: cursor.executemany(sql, list_of_tuples)
import pymysql
with self.db.conn._lock:
self.db.conn._ensure_connected()
cur = self.db.conn._conn.cursor()
cur.executemany(sql, rows)
def stop(self) -> None:
"""기록원 스레드를 안전하게 종료합니다."""
if self._db_writer_thread and self._db_writer_thread.is_alive():
self._write_queue.put(None) # 종료 신호
self._db_writer_thread.join(timeout=5)
# ------------------------------------------------------------------
# 시각 유틸
# ------------------------------------------------------------------
@staticmethod
def _candle_key(tick_time_str: str, timeframe: int) -> str:
"""
틱 시각(HHMMSS)과 timeframe(분)으로 봉 시작 시각 키를 반환.
날짜는 오늘 날짜를 사용 (장 중에만 동작하므로 안전).
반환 형식: YYYYMMDDHHMM (분 단위, 초 버림)
예) tick_time="091523", timeframe=3 → "202603020915" (09:15봉)
"""
import datetime as _dt
try:
hh = int(tick_time_str[0:2]) if len(tick_time_str) >= 6 else 0
mm = int(tick_time_str[2:4]) if len(tick_time_str) >= 6 else 0
# timeframe 단위로 내림: 3분봉이면 09:17 → 09:15
floored_mm = (mm // timeframe) * timeframe
today = _dt.date.today().strftime("%Y%m%d")
return f"{today}{hh:02d}{floored_mm:02d}"
except Exception:
import datetime as _dt
return _dt.datetime.now().strftime("%Y%m%d%H%M")
# ------------------------------------------------------------------
# RSI 계산 (확정 봉 close 리스트 기반)
# ------------------------------------------------------------------
@staticmethod
def _calc_rsi(closes: list, period: int) -> Optional[float]:
"""
close 가격 리스트에서 RSI(period) 계산.
- Wilder's smoothed moving average (단순 rolling mean 사용)
- 데이터 부족 시 None 반환
"""
if len(closes) < period + 1:
return None
recent = closes[-(period + 1):]
gains, losses = [], []
for i in range(1, len(recent)):
delta = recent[i] - recent[i - 1]
gains.append(max(delta, 0))
losses.append(max(-delta, 0))
avg_gain = sum(gains) / period if period > 0 else 0
avg_loss = sum(losses) / period if period > 0 else 0
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return round(100 - (100 / (1 + rs)), 2)
def _compute_rsi_set(self, closes: list) -> tuple:
"""RSI 2/3/5 를 한 번에 계산해서 (rsi2, rsi3, rsi5) 반환."""
return (
self._calc_rsi(closes, 2),
self._calc_rsi(closes, 3),
self._calc_rsi(closes, 5),
)
# ------------------------------------------------------------------
# 핵심: 틱 수신 처리
# ------------------------------------------------------------------
def on_tick(self, code: str, price: float, volume: int, tick_time: str) -> None:
"""
KISWebSocketPriceCache._parse_realtime_msg 에서 틱마다 호출됨.
모든 timeframe 에 대해 봉 갱신/확정 처리.
"""
if price <= 0:
return
with self._lock:
for tf in self.timeframes:
self._process_tick(code, price, volume, tick_time, tf)
def _process_tick(self, code: str, price: float, volume: int,
tick_time: str, tf: int) -> None:
"""
단일 timeframe 에 대한 틱 처리 (lock 내부에서 호출).
[트랙 1] RAM 갱신만 수행, DB 호출 없음 → 블로킹 0ms
[트랙 2] 봉 확정 순간에만 Queue.put_nowait() → 기록원이 비동기 배치 저장
"""
key = (code, tf)
new_ctime = self._candle_key(tick_time, tf)
if key not in self._current:
# 첫 틱: 새 봉 시작 (RAM만)
self._current[key] = {
"candle_time": new_ctime,
"open": price, "high": price, "low": price,
"close": price, "volume": volume,
}
return
cur = self._current[key]
if new_ctime != cur["candle_time"]:
# ── 봉 확정 ────────────────────────────────────────────
closes = self._closes.setdefault(key, [])
closes.append(cur["close"])
if len(closes) > self.MAX_CLOSE_BUFFER:
closes.pop(0)
rsi2, rsi3, rsi5 = self._compute_rsi_set(closes)
confirmed_candle = {
"code": code,
"tf": tf,
"candle_time": cur["candle_time"],
"open": cur["open"],
"high": cur["high"],
"low": cur["low"],
"close": cur["close"],
"volume": cur["volume"],
"rsi_2": rsi2,
"rsi_3": rsi3,
"rsi_5": rsi5,
"is_confirmed": 1,
"source": "ws",
}
# [트랙 1] 확정 봉을 RAM _confirmed 버퍼에 보관 (매수 루프 직접 참조용)
buf = self._confirmed.setdefault(key, [])
buf.append(confirmed_candle)
if len(buf) > self.MAX_CLOSE_BUFFER:
buf.pop(0)
# [트랙 2] Queue에 던지고 즉시 반환 (논블로킹) → 기록원이 배치 저장
try:
self._write_queue.put_nowait(confirmed_candle)
except queue.Full:
logger.warning("⚠️ CandleAggregator 쓰기 Queue 가득참 — 봉 1개 DROP (코드: %s)", code)
logger.debug(
"🕯 [봉확정] %s %dM %s C=%.0f RSI3=%s",
code, tf, cur["candle_time"], cur["close"],
f"{rsi3:.1f}" if rsi3 is not None else "N/A",
)
# ── 새 봉 시작 (RAM만) ──────────────────────────────────
self._current[key] = {
"candle_time": new_ctime,
"open": price, "high": price, "low": price,
"close": price, "volume": volume,
}
else:
# 같은 봉: OHLCV 갱신 (RAM만, DB 쓰기 없음)
cur["high"] = max(cur["high"], price)
cur["low"] = min(cur["low"], price)
cur["close"] = price
cur["volume"] = volume # 누적 거래량(ACML_VOL) 그대로 덮어씀
# ------------------------------------------------------------------
# 재접속 갭 보정: REST get_minute_chart 로 빈 봉 채우기
# ------------------------------------------------------------------
def fill_gap_from_rest(self, code: str, tf: int, rest_df) -> int:
"""
WS 재접속 후 빠진 봉 구간을 REST 분봉 데이터로 채움.
[트랙 1] close 가격을 _closes / _confirmed 에 넣어 RSI 웜업
[트랙 2] 봉 dict를 Queue 에 넣어 기록원이 배치로 DB 저장 (source='rest')
Args:
code : 종목코드
tf : timeframe (분)
rest_df : get_minute_chart 반환 DataFrame (오래된→최신 순 정렬 필요)
컬럼: time, open, high, low, close, volume
Returns:
채워진 봉 수
"""
if rest_df is None or rest_df.empty:
return 0
with self._lock:
key = (code, tf)
closes = self._closes.setdefault(key, [])
conf_buf = self._confirmed.setdefault(key, [])
inserted = 0
for _, row in rest_df.iterrows():
ctime = str(row.get("time", ""))[:12] # YYYYMMDDHHMM 12자리
if not ctime or len(ctime) < 12:
continue
close = float(row.get("close", 0))
if close <= 0:
continue
# [트랙 1] RAM 웜업
closes.append(close)
if len(closes) > self.MAX_CLOSE_BUFFER:
closes.pop(0)
rsi2, rsi3, rsi5 = self._compute_rsi_set(closes)
candle = {
"code": code,
"tf": tf,
"candle_time": ctime,
"open": float(row.get("open", close)),
"high": float(row.get("high", close)),
"low": float(row.get("low", close)),
"close": close,
"volume": int(row.get("volume", 0)),
"rsi_2": rsi2,
"rsi_3": rsi3,
"rsi_5": rsi5,
"is_confirmed": 1,
"source": "rest",
}
conf_buf.append(candle)
if len(conf_buf) > self.MAX_CLOSE_BUFFER:
conf_buf.pop(0)
# [트랙 2] Queue에 넣어 기록원이 배치로 DB 저장
try:
self._write_queue.put_nowait(candle)
except queue.Full:
pass
inserted += 1
if inserted:
logger.info("🔧 [갭보정] %s %dM → REST %d봉 RAM 적재 + DB 큐 등록", code, tf, inserted)
return inserted
# ------------------------------------------------------------------
# [트랙 1] RAM 버퍼 조회 — 매수/매도 루프에서 직접 호출 (DB 조회 없음)
# ------------------------------------------------------------------
def get_latest_confirmed(self, code: str, tf: int) -> Optional[dict]:
"""
가장 최근 확정된 봉(완성된 마지막 봉)을 반환.
None이면 아직 봉이 확정되지 않음 (장 초반 등).
"""
with self._lock:
buf = self._confirmed.get((code, tf))
return buf[-1] if buf else None
def get_prev_confirmed(self, code: str, tf: int) -> Optional[dict]:
"""직전 확정봉 (최신에서 2번째). 패턴 확인용 (현재봉 - 1)."""
with self._lock:
buf = self._confirmed.get((code, tf))
return buf[-2] if buf and len(buf) >= 2 else None
def get_candles(self, code: str, tf: int, n: int = 10) -> list:
"""최근 n개 확정 봉 리스트 반환 (오래된→최신 순)."""
with self._lock:
buf = self._confirmed.get((code, tf), [])
return list(buf[-n:])
def get_confirmed_count(self, code: str, tf: int) -> int:
"""확정된 봉 수 (RSI 안정화 여부 확인용)."""
with self._lock:
return len(self._confirmed.get((code, tf), []))
def get_current_candle(self, code: str, tf: int) -> Optional[dict]:
"""
현재 진행 중인 봉(미확정, is_confirmed=0) 반환.
RSI는 포함되지 않음 (확정 봉 기준으로만 계산).
"""
with self._lock:
return dict(self._current.get((code, tf), {})) or None
def get_rsi(self, code: str, tf: int, period: int = 3) -> Optional[float]:
"""
최신 확정 봉의 RSI(period) 값 반환.
period: 2, 3, 5 중 하나 (스캘핑 단타용 초단기 RSI)
"""
candle = self.get_latest_confirmed(code, tf)
if candle is None:
return None
return candle.get(f"rsi_{period}")
def remove_code(self, code: str) -> None:
"""
유니버스에서 빠진 종목의 RAM 버퍼를 정리합니다.
_sync_subscriptions()에서 구독 해제 시 같이 호출.
등록된 모든 timeframe 의 _confirmed / _closes / _current 를 삭제.
"""
with self._lock:
for tf in list(self.timeframes):
key = (code, tf)
self._confirmed.pop(key, None)
self._closes.pop(key, None)
self._current.pop(key, None)
logger.debug("🗑️ CandleAggregator RAM 정리: %s", code)
# ======================================================================
# 키움 REST API 공통 유틸 — 봇 시작 시 WS 갭보정용
# KIS get_minute_chart() 는 당일봉만 제공하지만,
# 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월) + 페이지네이션 지원.
# ======================================================================
# ──────────────────────────────────────────────────────────────────────
# 키움 토큰 매니저 싱글톤 풀
#
# kiwoom_rest_api/auth/token.py 의 TokenManager 와 동일한 설계:
# - _is_access_token_valid(): expires_dt 기반 정확한 만료 판별 (30s 버퍼)
# - _request_new_token(): 만료 시에만 발급 (au10001 rate limit 방지)
#
# 차이점: DB에서 읽은 app_key/secret 직접 주입 (환경변수 불필요)
# 싱글톤 풀: 캐시 키(domain:appkey앞8자) → 인스턴스 재사용
# 종목×timeframe마다 새 인스턴스를 만들지 않음
# ──────────────────────────────────────────────────────────────────────
import threading as _threading
from datetime import datetime as _datetime, timedelta as _timedelta
_kiwoom_token_lock = _threading.Lock()
_kiwoom_managers: Dict[str, "KiwoomTokenManager"] = {} # 싱글톤 풀
class KiwoomTokenManager:
"""
kiwoom_rest_api/auth/token.py TokenManager 와 동일한 로직 — DB 키 직접 주입 버전.
- get_token(): 유효한 토큰이면 바로 반환, 만료 시에만 재발급 (au10001 방지)
- expires_dt 를 정확히 파싱 (하드코딩 23h 아님)
- 30초 버퍼: 경계 케이스 방지 (TokenManager 와 동일)
- 스레드 안전: 인스턴스당 Lock 보유
"""
def __init__(self, app_key: str, app_secret: str, is_mock: bool = False):
self._app_key = app_key
self._app_secret = app_secret
self._is_mock = is_mock
self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
self._mode_str = "모의" if is_mock else "실전"
self._token: Optional[str] = None
self._expiry: Optional[_datetime] = None
self._lock = _threading.Lock()
def _is_valid(self) -> bool:
"""만료 30초 전까지 유효 (TokenManager._is_access_token_valid 와 동일)"""
if not self._token or not self._expiry:
return False
return _datetime.now() < self._expiry - _timedelta(seconds=30)
def _request_new_token(self) -> None:
"""토큰 신규 발급 — 만료 시에만 호출됨"""
resp = requests.post(
f"https://{self._domain}/oauth2/token",
json={
"grant_type": "client_credentials",
"appkey": self._app_key,
"secretkey": self._app_secret,
},
timeout=10,
)
data = resp.json()
token = (data.get("token") or data.get("access_token") or "").strip()
if not token:
raise RuntimeError(f"키움 토큰 발급 실패 [{self._mode_str}]: {data}")
# expires_dt 정확히 파싱 (TokenManager._update_token_info 와 동일 로직)
exp_s = data.get("expires_dt", "")
try:
self._expiry = _datetime.strptime(str(exp_s), "%Y%m%d%H%M%S")
except Exception:
# expires_in 도 없으면 24h 기본 (보수적 fallback)
exp_in = data.get("expires_in", 86400)
self._expiry = _datetime.now() + _timedelta(seconds=int(exp_in))
self._token = token
logger.info(
"✅ 키움 토큰 발급 완료 [%s] (앞8자: %s…, 만료: %s)",
self._mode_str, token[:8], self._expiry.strftime("%Y-%m-%d %H:%M:%S"),
)
def get_token(self) -> Optional[str]:
"""
유효한 토큰 반환. 만료 시에만 재발급.
kiwoom_rest_api TokenManager.get_token() 호환 인터페이스.
"""
with self._lock:
if not self._is_valid():
try:
self._request_new_token()
except Exception as e:
logger.warning("⚠️ 키움 토큰 발급 예외 [%s]: %s", self._mode_str, e)
return None
return self._token
def _get_kiwoom_token_cached(
kiwoom_key: str,
kiwoom_secret: str,
is_mock: bool,
) -> Optional[str]:
"""
KiwoomTokenManager 싱글톤 풀에서 인스턴스를 꺼내 토큰 반환.
동일 키 조합은 같은 인스턴스를 재사용 → au10001 rate limit 방지.
"""
domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
cache_key = f"{domain}:{kiwoom_key[:8]}"
with _kiwoom_token_lock:
if cache_key not in _kiwoom_managers:
_kiwoom_managers[cache_key] = KiwoomTokenManager(
kiwoom_key, kiwoom_secret, is_mock=is_mock
)
mgr = _kiwoom_managers[cache_key]
return mgr.get_token()
def _get_kiwoom_creds(db) -> tuple:
"""
DB env_config 최신 행에서 키움 앱키/시크릿 반환.
KIS_MOCK 설정에 따라 MOCK / REAL 키를 자동 선택.
Returns:
(app_key, app_secret, is_mock) — 키 없으면 (None, None, False)
"""
try:
row = db.conn.execute(
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
).fetchone()
if not row:
return None, None, False
r = dict(row)
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
if is_mock:
key = str(r.get("KIWOOM_APP_KEY_MOCK", "") or "").strip()
secret = str(r.get("KIWOOM_APP_SECRET_MOCK", "") or "").strip()
else:
key = str(r.get("KIWOOM_APP_KEY_REAL", "") or "").strip()
secret = str(r.get("KIWOOM_APP_SECRET_REAL", "") or "").strip()
# 레거시 필드 폴백 (KIWOOM_APP_KEY)
if not key or not secret:
key = str(r.get("KIWOOM_APP_KEY", "") or "").strip()
secret = str(r.get("KIWOOM_APP_SECRET", "") or "").strip()
if not key or not secret:
return None, None, is_mock
return key, secret, is_mock
except Exception as e:
logger.debug("키움 크레덴셜 조회 실패: %s", e)
return None, None, False
def get_kiwoom_candles_df(
code: str,
tf_min: int,
kiwoom_key: str,
kiwoom_secret: str,
is_mock: bool = False,
n: int = 120,
) -> "object": # pd.DataFrame
"""
키움 REST API (ka10080 — 주식분봉차트조회) 로 분봉 조회.
fill_gap_from_rest() 호환 DataFrame 반환.
Args:
code : 종목코드 (6자리)
tf_min : 봉 단위 분 (1 / 3 / 5 / 10 / 15 / 30 / 45 / 60)
kiwoom_key : 키움 앱키
kiwoom_secret: 키움 앱시크릿
is_mock : True = 모의투자 도메인 사용
n : 최대 수집 봉 수 (기본 120, 60분봉 기준 약 17 영업일)
Returns:
pd.DataFrame — 컬럼: time(YYYYMMDDHHMM), open, high, low, close, volume
오래된→최신 순 (fill_gap_from_rest 기대 순서)
빈 DataFrame (오류 시)
"""
try:
import pandas as pd
except ImportError:
logger.error("pandas 미설치 → 키움 갭보정 불가")
return None # type: ignore[return-value]
domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
# ── 1. 키움 OAuth 토큰 (캐시 우선 — 23시간 재사용, au10001 rate limit 방지) ──
token = _get_kiwoom_token_cached(kiwoom_key, kiwoom_secret, is_mock)
if not token:
return pd.DataFrame()
# ── 2. ka10080 분봉 차트 조회 (페이지네이션) ──────────────────────
# 키움은 최신→과거 순으로 반환, 한 페이지에 최대 900봉
base_url = f"https://{domain}/api/dostk/chart"
headers = {
"content-type": "application/json;charset=UTF-8",
"appkey": kiwoom_key,
"appsecret": kiwoom_secret,
"authorization": f"Bearer {token}",
"api-id": "ka10080",
"cont-yn": "N",
"next-key": "",
}
body = {
"stk_cd": code,
"tic_scope": str(tf_min), # "1" / "3" / "15" / "60" etc.
"upd_stkpc_tp": "1", # 수정주가 반영
}
rows: list = []
while len(rows) < n:
try:
resp = requests.post(base_url, json=body, headers=headers, timeout=15)
data = resp.json()
except Exception as e:
logger.warning("⚠️ 키움 ka10080 조회 실패 (%s %dM): %s", code, tf_min, e)
break
records = data.get("stk_min_pole_chart_qry") or []
if not records:
break
for rec in records:
raw_dt = str(rec.get("cntr_tm", ""))
if len(raw_dt) < 12:
continue
try:
rows.append({
# cntr_tm = YYYYMMDDHHMMSS → 12자리로 잘라 YYYYMMDDHHMM
"time": raw_dt[:12],
"open": abs(float(rec.get("open_pric", 0) or 0)),
"high": abs(float(rec.get("high_pric", 0) or 0)),
"low": abs(float(rec.get("low_pric", 0) or 0)),
"close": abs(float(rec.get("cur_prc", 0) or 0)),
"volume": abs(int(float(rec.get("trde_qty", 0) or 0))),
})
except (ValueError, TypeError):
continue
if len(rows) >= n:
break
# 연속 조회 여부 확인 (응답 헤더)
cont_yn = str(resp.headers.get("cont-yn", "N")).upper()
next_key = str(resp.headers.get("next-key", "")).strip()
if cont_yn != "Y" or not next_key or len(rows) >= n:
break
headers["cont-yn"] = cont_yn
headers["next-key"] = next_key
time.sleep(0.35) # 키움 API 레이트리밋
if not rows:
logger.debug("키움 ka10080 반환 행 없음 (%s %dM)", code, tf_min)
return pd.DataFrame()
# 키움은 최신→과거 순 → fill_gap_from_rest 는 오래된→최신 순 필요 → 역순
df = pd.DataFrame(rows[:n][::-1])
logger.info("✅ 키움 %dM 갭보정 데이터: %s %d", tf_min, code, len(df))
return df