#!/usr/bin/env python3 """ kis_token_manager.py — KIS 실전/모의 토큰 통합 자동 관리 ========================================================== 문제: 모의 모드로만 봇을 돌리면 실전 토큰이 갱신되지 않음. → 실전 토큰 만료 → WebSocket/홀딩봇 REST 호출 실패 → 거래 없음 해결: 어느 모드로 봇을 실행해도 실전+모의 토큰 모두 유효하게 유지. 파일 잠금(lockfile) 으로 동시에 여러 봇이 실행해도 중복 발급 방지. KIS 제한: - 하루 1회 원칙 (23시간 내 중복 발급 시 제한 가능) - 이 모듈은 만료 1시간 전까지 기존 토큰 재사용 → 하루 1회만 발급 사용 예시 (각 봇 시작부): from kis_token_manager import ensure_both_tokens ensure_both_tokens() # 실전+모의 토큰 자동 갱신 (만료된 것만) CLI: python3 kis_token_manager.py # 상태 확인 python3 kis_token_manager.py --refresh # 강제 갱신 시도 """ import json import logging import os import threading import time from datetime import datetime, timedelta from pathlib import Path from typing import Optional import requests logger = logging.getLogger(__name__) ROOT = Path(__file__).parent CACHE_MOCK = ROOT / ".kis_token_cache_mock.json" CACHE_REAL = ROOT / ".kis_token_cache_real.json" LOCK_FILE = ROOT / ".kis_token_manager.lock" EXPIRE_MARGIN_H = 1 # 파일 캐시 상태 조회용 (get_token_status, ensure_token) 버퍼 EXPIRE_MARGIN_M = 10 # KisTokenManager.get_token() 선제 갱신 버퍼 (10분) LOCK_TIMEOUT_S = 60 # 잠금 최대 대기 시간(초) # ───────────────────────────────────────────────────────────────────────────── # 내부 유틸 # ───────────────────────────────────────────────────────────────────────────── def _load_env() -> dict: """DB env_config 테이블에서 KIS 키 로드""" try: from database import TradeDB db = TradeDB() row = db.conn.execute( "SELECT * FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() db.close() return dict(row) if row else {} except Exception as e: logger.warning(f"DB env 로드 실패: {e}") return {} def _parse_expired(s: str): """만료 시간 문자열 → datetime. 실패 시 None.""" for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S"): try: return datetime.strptime(str(s).strip(), fmt) except Exception: pass return None def get_token_status(is_mock: bool) -> dict: """ 캐시 파일 상태 반환. 반환: {"valid": bool, "token": str, "expires": str, "expires_in_h": float} """ cache_path = CACHE_MOCK if is_mock else CACHE_REAL mode = "모의" if is_mock else "실전" if not cache_path.exists(): return {"valid": False, "token": "", "expires": "파일없음", "expires_in_h": -999} try: cache = json.loads(cache_path.read_text(encoding="utf-8")) token = cache.get("access_token", "") expired_s = cache.get("access_token_token_expired", "") exp_dt = _parse_expired(expired_s) if not token or exp_dt is None: return {"valid": False, "token": "", "expires": expired_s, "expires_in_h": -999} now = datetime.now() expires_in = (exp_dt - now).total_seconds() / 3600 valid = now < exp_dt - timedelta(hours=EXPIRE_MARGIN_H) return { "valid": valid, "token": token[:12] + "…", "expires": expired_s, "expires_in_h": round(expires_in, 1), } except Exception as e: return {"valid": False, "token": "", "expires": f"읽기오류: {e}", "expires_in_h": -999} def _acquire_lock() -> bool: """파일 잠금 획득 (중복 발급 방지). 성공 시 True.""" deadline = time.time() + LOCK_TIMEOUT_S while time.time() < deadline: try: # O_CREAT | O_EXCL: 파일이 없을 때만 생성 (원자적) fd = os.open(str(LOCK_FILE), os.O_CREAT | os.O_EXCL | os.O_WRONLY) os.write(fd, str(os.getpid()).encode()) os.close(fd) return True except FileExistsError: # 기존 잠금이 5분 이상 됐으면 강제 해제 (좀비 잠금) try: mtime = LOCK_FILE.stat().st_mtime if time.time() - mtime > 300: LOCK_FILE.unlink(missing_ok=True) continue except Exception: pass time.sleep(2) except Exception as e: logger.warning(f"잠금 획득 실패: {e}") return False logger.warning("잠금 획득 타임아웃 → 갱신 건너뜀") return False def _release_lock(): """파일 잠금 해제""" try: LOCK_FILE.unlink(missing_ok=True) except Exception: pass def _issue_token(app_key: str, app_secret: str, is_mock: bool) -> bool: """ KIS /oauth2/tokenP 엔드포인트로 새 토큰 발급 후 캐시 저장. 성공 시 True, 실패 시 False. ※ KIS 정책: 23시간 내 중복 발급 시 서비스 제한 가능. 이 함수는 만료 1시간 전까지 기존 토큰을 재사용하므로 하루 1회만 호출됨. """ base_url = ( "https://openapivts.koreainvestment.com:29443" if is_mock else "https://openapi.koreainvestment.com:9443" ) cache_path = CACHE_MOCK if is_mock else CACHE_REAL mode = "모의" if is_mock else "실전" try: resp = requests.post( f"{base_url}/oauth2/tokenP", json={"grant_type": "client_credentials", "appkey": app_key, "appsecret": app_secret}, timeout=15, ) data = resp.json() token = data.get("access_token", "") exp = data.get("access_token_token_expired", "") if not token: err = data.get("message") or data.get("error_description") or str(data) logger.error(f"❌ {mode} 토큰 발급 실패: {err}") # EGW00133: 1분당 1회 제한 if "EGW00133" in str(data): logger.warning("⚠️ KIS 분당 발급 제한. 1분 후 재시도하세요.") return False cache_path.write_text( json.dumps({ "access_token": token, "access_token_token_expired": exp, "mock": is_mock, }, ensure_ascii=False, indent=2), encoding="utf-8", ) logger.info(f"✅ {mode} 토큰 발급 완료 | 만료: {exp} | 앞12자: {token[:12]}…") return True except Exception as e: logger.error(f"❌ {mode} 토큰 발급 예외: {e}") return False # ───────────────────────────────────────────────────────────────────────────── # 공개 API # ───────────────────────────────────────────────────────────────────────────── def ensure_token(is_mock: bool, env: dict = None) -> bool: """ 단일 모드(실전/모의) 토큰 유효성 확인 + 필요 시 자동 갱신. 이미 유효한 경우 → 발급 없이 True 반환 (23시간 정책 자동 준수). """ status = get_token_status(is_mock) mode = "모의" if is_mock else "실전" if status["valid"]: logger.info( f"🔑 {mode} 토큰 유효 ({status['expires_in_h']:.1f}h 남음) → 재사용" ) return True logger.info(f"⏰ {mode} 토큰 만료/없음 → 갱신 시도 (남은시간: {status['expires_in_h']:.1f}h)") if env is None: env = _load_env() key_suffix = "MOCK" if is_mock else "REAL" app_key = str(env.get(f"KIS_APP_KEY_{key_suffix}", "") or "").strip() app_secret = str(env.get(f"KIS_APP_SECRET_{key_suffix}", "") or "").strip() if not app_key or not app_secret: logger.warning(f"⚠️ {mode} 키 없음 (KIS_APP_KEY_{key_suffix}) → 발급 불가") return False if not _acquire_lock(): return False try: # 잠금 획득 후 다시 확인 (다른 프로세스가 갱신했을 수 있음) status = get_token_status(is_mock) if status["valid"]: logger.info(f"🔑 {mode} 토큰 이미 갱신됨 (다른 프로세스) → 재사용") return True return _issue_token(app_key, app_secret, is_mock) finally: _release_lock() def ensure_both_tokens(env: dict = None) -> dict: """ 실전 + 모의 토큰 모두 유효하게 유지. 어느 모드로 봇을 실행하든 두 토큰을 동시에 관리. 만료된 것만 갱신 → KIS 23시간 정책 자동 준수. 반환: {"real": True/False, "mock": True/False} """ if env is None: env = _load_env() results = {} # 실전 먼저 (더 중요, 잠금 경쟁 최소화) for is_mock in [False, True]: mode = "mock" if is_mock else "real" results[mode] = ensure_token(is_mock, env) ok_str = " | ".join( f"{'모의' if k == 'mock' else '실전'}={'✅' if v else '❌'}" for k, v in results.items() ) logger.info(f"🔄 토큰 상태: {ok_str}") return results # ───────────────────────────────────────────────────────────────────────────── # KisTokenManager — KiwoomTokenManager 와 동일한 get_token() 인터페이스 # # 동작 원리: # 1. 메모리 캐시 (_token, _expiry) — 파일 읽기 최소화 # 2. JSON 파일 캐시 — 크로스 프로세스 공유 + 재시작 후 토큰 유지 # 3. 만료 10분 전 선제 갱신 — API 에러(EGW00123) 없이 자동 교체 # # KIS 에는 토큰 상태 조회 API 가 없음 → 발급 시 받은 expires_dt 를 # 서버 현재시간과 비교해서 판단 (JSON 파일에 저장된 날짜 사용) # # 봇에서 사용: # from kis_token_manager import KisTokenManager # token = KisTokenManager.instance(is_mock).get_token() # 매 API 호출 전 # ───────────────────────────────────────────────────────────────────────────── _km_instances: dict = {} # is_mock(bool) → KisTokenManager 싱글톤 _km_instances_lock = threading.Lock() class KisTokenManager: """ KIS 토큰 싱글톤 매니저. - get_token(): 유효하면 바로 반환, 만료 10분 전부터 선제 갱신 - kiwoom_rest_api TokenManager / KiwoomTokenManager 와 동일한 인터페이스 """ @classmethod def instance(cls, is_mock: bool) -> "KisTokenManager": """프로세스당 실전/모의 각 1개 싱글톤 반환""" with _km_instances_lock: if is_mock not in _km_instances: _km_instances[is_mock] = cls(is_mock) return _km_instances[is_mock] def __init__(self, is_mock: bool): self._is_mock = is_mock self._mode_str = "모의" if is_mock else "실전" self._cache_path = CACHE_MOCK if is_mock else CACHE_REAL self._lock = threading.Lock() self._token: Optional[str] = None self._expiry: Optional[datetime] = None self._load_from_file() # 재시작 후에도 기존 토큰 재사용 # ── 내부 ────────────────────────────────────────────────────── def _load_from_file(self) -> None: """JSON 파일 → 메모리 로드""" if not self._cache_path.exists(): return try: data = json.loads(self._cache_path.read_text(encoding="utf-8")) token = data.get("access_token", "") exp_dt = _parse_expired(data.get("access_token_token_expired", "")) if token and exp_dt: self._token = token self._expiry = exp_dt except Exception as e: logger.debug("KIS 파일 캐시 로드 실패 [%s]: %s", self._mode_str, e) def _is_valid(self) -> bool: """ 만료 10분 전부터 유효하지 않다고 판단 → 선제 갱신 트리거. KIS에는 토큰 상태 조회 API 없음 → JSON 파일의 expires_dt 와 서버시간 비교. """ if not self._token or not self._expiry: return False return datetime.now() < self._expiry - timedelta(minutes=EXPIRE_MARGIN_M) # ── 공개 ────────────────────────────────────────────────────── def get_token(self) -> Optional[str]: """ 유효한 토큰 반환. 만료 10분 전 자동 갱신. 매수/매도 API 호출 직전마다 호출해도 안전 (갱신은 만료 임박 시만). 흐름: 1. 메모리 캐시 유효 → 즉시 반환 (파일 읽기 없음) 2. 메모리 만료 → 파일 재로드 (다른 프로세스가 갱신했을 수 있음) 3. 파일도 만료 → ensure_token() 으로 신규 발급 + 파일/메모리 갱신 """ with self._lock: if self._is_valid(): return self._token # 다른 프로세스가 갱신했을 수 있으니 파일 재로드 self._load_from_file() if self._is_valid(): logger.info("🔑 %s 토큰 파일 갱신 감지 → 메모리 갱신", self._mode_str) return self._token # 파일도 만료 → 잠금 기반 신규 발급 ok = ensure_token(self._is_mock) if ok: with self._lock: self._load_from_file() logger.info( "🔄 %s 토큰 선제 갱신 완료 (만료 10분 전) → 앞8자: %s…", self._mode_str, (self._token[:8] if self._token else "N/A"), ) return self._token if ok else None def status(self) -> dict: """현재 상태 반환 (get_token_status 래퍼, 남은 시간 포함)""" return get_token_status(self._is_mock) # ───────────────────────────────────────────────────────────────────────────── # CLI: 상태 확인 / 강제 갱신 # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse import logging as _lg _lg.basicConfig( level=_lg.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) parser = argparse.ArgumentParser(description="KIS 토큰 관리자") parser.add_argument("--refresh", action="store_true", help="만료 여부와 관계없이 강제 갱신 시도") args = parser.parse_args() print("\n" + "=" * 55) print(" KIS 토큰 상태") print("=" * 55) for is_mock, label in [(False, "실전"), (True, "모의")]: s = get_token_status(is_mock) flag = "✅ 유효" if s["valid"] else "❌ 만료/없음" print(f" {label:4s} | {flag} | 만료: {s['expires']} ({s['expires_in_h']:+.1f}h)") print("=" * 55) if args.refresh: print("\n강제 갱신 시도 중...") # 캐시 무효화 후 재발급 for path in [CACHE_REAL, CACHE_MOCK]: if path.exists(): path.unlink() print(f" 캐시 삭제: {path.name}") result = ensure_both_tokens() print("\n갱신 결과:", result, "\n")