Files
kis_bot/kis_token_manager.py
2026-03-17 12:33:30 +09:00

398 lines
16 KiB
Python

#!/usr/bin/env python3
"""
kis_token_manager.py — KIS 실전/모의 토큰 통합 자동 관리
==========================================================
문제: 모의 모드로만 봇을 돌리면 실전 토큰이 갱신되지 않음.
→ 실전 토큰 만료 → WebSocket/홀딩봇 REST 호출 실패 → 거래 없음
해결: 어느 모드로 봇을 실행해도 실전+모의 토큰 모두 유효하게 유지.
파일 잠금(lockfile) 으로 동시에 여러 봇이 실행해도 중복 발급 방지.
KIS 제한:
- 하루 1회 원칙 (23시간 내 중복 발급 시 제한 가능)
- 이 모듈은 만료 1시간 전까지 기존 토큰 재사용 → 하루 1회만 발급
사용 예시 (각 봇 시작부):
from kis_token_manager import ensure_both_tokens
ensure_both_tokens() # 실전+모의 토큰 자동 갱신 (만료된 것만)
CLI:
python3 kis_token_manager.py # 상태 확인
python3 kis_token_manager.py --refresh # 강제 갱신 시도
"""
import json
import logging
import os
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import requests
logger = logging.getLogger(__name__)
ROOT = Path(__file__).parent
CACHE_MOCK = ROOT / ".kis_token_cache_mock.json"
CACHE_REAL = ROOT / ".kis_token_cache_real.json"
LOCK_FILE = ROOT / ".kis_token_manager.lock"
EXPIRE_MARGIN_H = 1 # 파일 캐시 상태 조회용 (get_token_status, ensure_token) 버퍼
EXPIRE_MARGIN_M = 10 # KisTokenManager.get_token() 선제 갱신 버퍼 (10분)
LOCK_TIMEOUT_S = 60 # 잠금 최대 대기 시간(초)
# ─────────────────────────────────────────────────────────────────────────────
# 내부 유틸
# ─────────────────────────────────────────────────────────────────────────────
def _load_env() -> dict:
"""DB env_config 테이블에서 KIS 키 로드"""
try:
from database import TradeDB
db = TradeDB()
row = db.conn.execute(
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
).fetchone()
db.close()
return dict(row) if row else {}
except Exception as e:
logger.warning(f"DB env 로드 실패: {e}")
return {}
def _parse_expired(s: str):
"""만료 시간 문자열 → datetime. 실패 시 None."""
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S"):
try:
return datetime.strptime(str(s).strip(), fmt)
except Exception:
pass
return None
def get_token_status(is_mock: bool) -> dict:
"""
캐시 파일 상태 반환.
반환: {"valid": bool, "token": str, "expires": str, "expires_in_h": float}
"""
cache_path = CACHE_MOCK if is_mock else CACHE_REAL
mode = "모의" if is_mock else "실전"
if not cache_path.exists():
return {"valid": False, "token": "", "expires": "파일없음", "expires_in_h": -999}
try:
cache = json.loads(cache_path.read_text(encoding="utf-8"))
token = cache.get("access_token", "")
expired_s = cache.get("access_token_token_expired", "")
exp_dt = _parse_expired(expired_s)
if not token or exp_dt is None:
return {"valid": False, "token": "", "expires": expired_s, "expires_in_h": -999}
now = datetime.now()
expires_in = (exp_dt - now).total_seconds() / 3600
valid = now < exp_dt - timedelta(hours=EXPIRE_MARGIN_H)
return {
"valid": valid,
"token": token[:12] + "",
"expires": expired_s,
"expires_in_h": round(expires_in, 1),
}
except Exception as e:
return {"valid": False, "token": "", "expires": f"읽기오류: {e}", "expires_in_h": -999}
def _acquire_lock() -> bool:
"""파일 잠금 획득 (중복 발급 방지). 성공 시 True."""
deadline = time.time() + LOCK_TIMEOUT_S
while time.time() < deadline:
try:
# O_CREAT | O_EXCL: 파일이 없을 때만 생성 (원자적)
fd = os.open(str(LOCK_FILE), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
return True
except FileExistsError:
# 기존 잠금이 5분 이상 됐으면 강제 해제 (좀비 잠금)
try:
mtime = LOCK_FILE.stat().st_mtime
if time.time() - mtime > 300:
LOCK_FILE.unlink(missing_ok=True)
continue
except Exception:
pass
time.sleep(2)
except Exception as e:
logger.warning(f"잠금 획득 실패: {e}")
return False
logger.warning("잠금 획득 타임아웃 → 갱신 건너뜀")
return False
def _release_lock():
"""파일 잠금 해제"""
try:
LOCK_FILE.unlink(missing_ok=True)
except Exception:
pass
def _issue_token(app_key: str, app_secret: str, is_mock: bool) -> bool:
"""
KIS /oauth2/tokenP 엔드포인트로 새 토큰 발급 후 캐시 저장.
성공 시 True, 실패 시 False.
※ KIS 정책: 23시간 내 중복 발급 시 서비스 제한 가능.
이 함수는 만료 1시간 전까지 기존 토큰을 재사용하므로 하루 1회만 호출됨.
"""
base_url = (
"https://openapivts.koreainvestment.com:29443"
if is_mock else
"https://openapi.koreainvestment.com:9443"
)
cache_path = CACHE_MOCK if is_mock else CACHE_REAL
mode = "모의" if is_mock else "실전"
try:
resp = requests.post(
f"{base_url}/oauth2/tokenP",
json={"grant_type": "client_credentials",
"appkey": app_key, "appsecret": app_secret},
timeout=15,
)
data = resp.json()
token = data.get("access_token", "")
exp = data.get("access_token_token_expired", "")
if not token:
err = data.get("message") or data.get("error_description") or str(data)
logger.error(f"{mode} 토큰 발급 실패: {err}")
# EGW00133: 1분당 1회 제한
if "EGW00133" in str(data):
logger.warning("⚠️ KIS 분당 발급 제한. 1분 후 재시도하세요.")
return False
cache_path.write_text(
json.dumps({
"access_token": token,
"access_token_token_expired": exp,
"mock": is_mock,
}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(f"{mode} 토큰 발급 완료 | 만료: {exp} | 앞12자: {token[:12]}")
return True
except Exception as e:
logger.error(f"{mode} 토큰 발급 예외: {e}")
return False
# ─────────────────────────────────────────────────────────────────────────────
# 공개 API
# ─────────────────────────────────────────────────────────────────────────────
def ensure_token(is_mock: bool, env: dict = None) -> bool:
"""
단일 모드(실전/모의) 토큰 유효성 확인 + 필요 시 자동 갱신.
이미 유효한 경우 → 발급 없이 True 반환 (23시간 정책 자동 준수).
"""
status = get_token_status(is_mock)
mode = "모의" if is_mock else "실전"
if status["valid"]:
logger.info(
f"🔑 {mode} 토큰 유효 ({status['expires_in_h']:.1f}h 남음) → 재사용"
)
return True
logger.info(f"{mode} 토큰 만료/없음 → 갱신 시도 (남은시간: {status['expires_in_h']:.1f}h)")
if env is None:
env = _load_env()
key_suffix = "MOCK" if is_mock else "REAL"
app_key = str(env.get(f"KIS_APP_KEY_{key_suffix}", "") or "").strip()
app_secret = str(env.get(f"KIS_APP_SECRET_{key_suffix}", "") or "").strip()
if not app_key or not app_secret:
logger.warning(f"⚠️ {mode} 키 없음 (KIS_APP_KEY_{key_suffix}) → 발급 불가")
return False
if not _acquire_lock():
return False
try:
# 잠금 획득 후 다시 확인 (다른 프로세스가 갱신했을 수 있음)
status = get_token_status(is_mock)
if status["valid"]:
logger.info(f"🔑 {mode} 토큰 이미 갱신됨 (다른 프로세스) → 재사용")
return True
return _issue_token(app_key, app_secret, is_mock)
finally:
_release_lock()
def ensure_both_tokens(env: dict = None) -> dict:
"""
실전 + 모의 토큰 모두 유효하게 유지.
어느 모드로 봇을 실행하든 두 토큰을 동시에 관리.
만료된 것만 갱신 → KIS 23시간 정책 자동 준수.
반환: {"real": True/False, "mock": True/False}
"""
if env is None:
env = _load_env()
results = {}
# 실전 먼저 (더 중요, 잠금 경쟁 최소화)
for is_mock in [False, True]:
mode = "mock" if is_mock else "real"
results[mode] = ensure_token(is_mock, env)
ok_str = " | ".join(
f"{'모의' if k == 'mock' else '실전'}={'' if v else ''}"
for k, v in results.items()
)
logger.info(f"🔄 토큰 상태: {ok_str}")
return results
# ─────────────────────────────────────────────────────────────────────────────
# KisTokenManager — KiwoomTokenManager 와 동일한 get_token() 인터페이스
#
# 동작 원리:
# 1. 메모리 캐시 (_token, _expiry) — 파일 읽기 최소화
# 2. JSON 파일 캐시 — 크로스 프로세스 공유 + 재시작 후 토큰 유지
# 3. 만료 10분 전 선제 갱신 — API 에러(EGW00123) 없이 자동 교체
#
# KIS 에는 토큰 상태 조회 API 가 없음 → 발급 시 받은 expires_dt 를
# 서버 현재시간과 비교해서 판단 (JSON 파일에 저장된 날짜 사용)
#
# 봇에서 사용:
# from kis_token_manager import KisTokenManager
# token = KisTokenManager.instance(is_mock).get_token() # 매 API 호출 전
# ─────────────────────────────────────────────────────────────────────────────
_km_instances: dict = {} # is_mock(bool) → KisTokenManager 싱글톤
_km_instances_lock = threading.Lock()
class KisTokenManager:
"""
KIS 토큰 싱글톤 매니저.
- get_token(): 유효하면 바로 반환, 만료 10분 전부터 선제 갱신
- kiwoom_rest_api TokenManager / KiwoomTokenManager 와 동일한 인터페이스
"""
@classmethod
def instance(cls, is_mock: bool) -> "KisTokenManager":
"""프로세스당 실전/모의 각 1개 싱글톤 반환"""
with _km_instances_lock:
if is_mock not in _km_instances:
_km_instances[is_mock] = cls(is_mock)
return _km_instances[is_mock]
def __init__(self, is_mock: bool):
self._is_mock = is_mock
self._mode_str = "모의" if is_mock else "실전"
self._cache_path = CACHE_MOCK if is_mock else CACHE_REAL
self._lock = threading.Lock()
self._token: Optional[str] = None
self._expiry: Optional[datetime] = None
self._load_from_file() # 재시작 후에도 기존 토큰 재사용
# ── 내부 ──────────────────────────────────────────────────────
def _load_from_file(self) -> None:
"""JSON 파일 → 메모리 로드"""
if not self._cache_path.exists():
return
try:
data = json.loads(self._cache_path.read_text(encoding="utf-8"))
token = data.get("access_token", "")
exp_dt = _parse_expired(data.get("access_token_token_expired", ""))
if token and exp_dt:
self._token = token
self._expiry = exp_dt
except Exception as e:
logger.debug("KIS 파일 캐시 로드 실패 [%s]: %s", self._mode_str, e)
def _is_valid(self) -> bool:
"""
만료 10분 전부터 유효하지 않다고 판단 → 선제 갱신 트리거.
KIS에는 토큰 상태 조회 API 없음 → JSON 파일의 expires_dt 와 서버시간 비교.
"""
if not self._token or not self._expiry:
return False
return datetime.now() < self._expiry - timedelta(minutes=EXPIRE_MARGIN_M)
# ── 공개 ──────────────────────────────────────────────────────
def get_token(self) -> Optional[str]:
"""
유효한 토큰 반환. 만료 10분 전 자동 갱신.
매수/매도 API 호출 직전마다 호출해도 안전 (갱신은 만료 임박 시만).
흐름:
1. 메모리 캐시 유효 → 즉시 반환 (파일 읽기 없음)
2. 메모리 만료 → 파일 재로드 (다른 프로세스가 갱신했을 수 있음)
3. 파일도 만료 → ensure_token() 으로 신규 발급 + 파일/메모리 갱신
"""
with self._lock:
if self._is_valid():
return self._token
# 다른 프로세스가 갱신했을 수 있으니 파일 재로드
self._load_from_file()
if self._is_valid():
logger.info("🔑 %s 토큰 파일 갱신 감지 → 메모리 갱신", self._mode_str)
return self._token
# 파일도 만료 → 잠금 기반 신규 발급
ok = ensure_token(self._is_mock)
if ok:
with self._lock:
self._load_from_file()
logger.info(
"🔄 %s 토큰 선제 갱신 완료 (만료 10분 전) → 앞8자: %s",
self._mode_str,
(self._token[:8] if self._token else "N/A"),
)
return self._token if ok else None
def status(self) -> dict:
"""현재 상태 반환 (get_token_status 래퍼, 남은 시간 포함)"""
return get_token_status(self._is_mock)
# ─────────────────────────────────────────────────────────────────────────────
# CLI: 상태 확인 / 강제 갱신
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
import logging as _lg
_lg.basicConfig(
level=_lg.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
parser = argparse.ArgumentParser(description="KIS 토큰 관리자")
parser.add_argument("--refresh", action="store_true", help="만료 여부와 관계없이 강제 갱신 시도")
args = parser.parse_args()
print("\n" + "=" * 55)
print(" KIS 토큰 상태")
print("=" * 55)
for is_mock, label in [(False, "실전"), (True, "모의")]:
s = get_token_status(is_mock)
flag = "✅ 유효" if s["valid"] else "❌ 만료/없음"
print(f" {label:4s} | {flag} | 만료: {s['expires']} ({s['expires_in_h']:+.1f}h)")
print("=" * 55)
if args.refresh:
print("\n강제 갱신 시도 중...")
# 캐시 무효화 후 재발급
for path in [CACHE_REAL, CACHE_MOCK]:
if path.exists():
path.unlink()
print(f" 캐시 삭제: {path.name}")
result = ensure_both_tokens()
print("\n갱신 결과:", result, "\n")