398 lines
16 KiB
Python
398 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
kis_token_manager.py — KIS 실전/모의 토큰 통합 자동 관리
|
|
==========================================================
|
|
문제: 모의 모드로만 봇을 돌리면 실전 토큰이 갱신되지 않음.
|
|
→ 실전 토큰 만료 → WebSocket/홀딩봇 REST 호출 실패 → 거래 없음
|
|
|
|
해결: 어느 모드로 봇을 실행해도 실전+모의 토큰 모두 유효하게 유지.
|
|
파일 잠금(lockfile) 으로 동시에 여러 봇이 실행해도 중복 발급 방지.
|
|
|
|
KIS 제한:
|
|
- 하루 1회 원칙 (23시간 내 중복 발급 시 제한 가능)
|
|
- 이 모듈은 만료 1시간 전까지 기존 토큰 재사용 → 하루 1회만 발급
|
|
|
|
사용 예시 (각 봇 시작부):
|
|
from kis_token_manager import ensure_both_tokens
|
|
ensure_both_tokens() # 실전+모의 토큰 자동 갱신 (만료된 것만)
|
|
|
|
CLI:
|
|
python3 kis_token_manager.py # 상태 확인
|
|
python3 kis_token_manager.py --refresh # 강제 갱신 시도
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ROOT = Path(__file__).parent
|
|
CACHE_MOCK = ROOT / ".kis_token_cache_mock.json"
|
|
CACHE_REAL = ROOT / ".kis_token_cache_real.json"
|
|
LOCK_FILE = ROOT / ".kis_token_manager.lock"
|
|
EXPIRE_MARGIN_H = 1 # 파일 캐시 상태 조회용 (get_token_status, ensure_token) 버퍼
|
|
EXPIRE_MARGIN_M = 10 # KisTokenManager.get_token() 선제 갱신 버퍼 (10분)
|
|
LOCK_TIMEOUT_S = 60 # 잠금 최대 대기 시간(초)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 내부 유틸
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
def _load_env() -> dict:
|
|
"""DB env_config 테이블에서 KIS 키 로드"""
|
|
try:
|
|
from database import TradeDB
|
|
db = TradeDB()
|
|
row = db.conn.execute(
|
|
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
db.close()
|
|
return dict(row) if row else {}
|
|
except Exception as e:
|
|
logger.warning(f"DB env 로드 실패: {e}")
|
|
return {}
|
|
|
|
|
|
def _parse_expired(s: str):
|
|
"""만료 시간 문자열 → datetime. 실패 시 None."""
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S"):
|
|
try:
|
|
return datetime.strptime(str(s).strip(), fmt)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_token_status(is_mock: bool) -> dict:
|
|
"""
|
|
캐시 파일 상태 반환.
|
|
반환: {"valid": bool, "token": str, "expires": str, "expires_in_h": float}
|
|
"""
|
|
cache_path = CACHE_MOCK if is_mock else CACHE_REAL
|
|
mode = "모의" if is_mock else "실전"
|
|
if not cache_path.exists():
|
|
return {"valid": False, "token": "", "expires": "파일없음", "expires_in_h": -999}
|
|
try:
|
|
cache = json.loads(cache_path.read_text(encoding="utf-8"))
|
|
token = cache.get("access_token", "")
|
|
expired_s = cache.get("access_token_token_expired", "")
|
|
exp_dt = _parse_expired(expired_s)
|
|
if not token or exp_dt is None:
|
|
return {"valid": False, "token": "", "expires": expired_s, "expires_in_h": -999}
|
|
now = datetime.now()
|
|
expires_in = (exp_dt - now).total_seconds() / 3600
|
|
valid = now < exp_dt - timedelta(hours=EXPIRE_MARGIN_H)
|
|
return {
|
|
"valid": valid,
|
|
"token": token[:12] + "…",
|
|
"expires": expired_s,
|
|
"expires_in_h": round(expires_in, 1),
|
|
}
|
|
except Exception as e:
|
|
return {"valid": False, "token": "", "expires": f"읽기오류: {e}", "expires_in_h": -999}
|
|
|
|
|
|
def _acquire_lock() -> bool:
|
|
"""파일 잠금 획득 (중복 발급 방지). 성공 시 True."""
|
|
deadline = time.time() + LOCK_TIMEOUT_S
|
|
while time.time() < deadline:
|
|
try:
|
|
# O_CREAT | O_EXCL: 파일이 없을 때만 생성 (원자적)
|
|
fd = os.open(str(LOCK_FILE), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
os.write(fd, str(os.getpid()).encode())
|
|
os.close(fd)
|
|
return True
|
|
except FileExistsError:
|
|
# 기존 잠금이 5분 이상 됐으면 강제 해제 (좀비 잠금)
|
|
try:
|
|
mtime = LOCK_FILE.stat().st_mtime
|
|
if time.time() - mtime > 300:
|
|
LOCK_FILE.unlink(missing_ok=True)
|
|
continue
|
|
except Exception:
|
|
pass
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
logger.warning(f"잠금 획득 실패: {e}")
|
|
return False
|
|
logger.warning("잠금 획득 타임아웃 → 갱신 건너뜀")
|
|
return False
|
|
|
|
|
|
def _release_lock():
|
|
"""파일 잠금 해제"""
|
|
try:
|
|
LOCK_FILE.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _issue_token(app_key: str, app_secret: str, is_mock: bool) -> bool:
|
|
"""
|
|
KIS /oauth2/tokenP 엔드포인트로 새 토큰 발급 후 캐시 저장.
|
|
성공 시 True, 실패 시 False.
|
|
|
|
※ KIS 정책: 23시간 내 중복 발급 시 서비스 제한 가능.
|
|
이 함수는 만료 1시간 전까지 기존 토큰을 재사용하므로 하루 1회만 호출됨.
|
|
"""
|
|
base_url = (
|
|
"https://openapivts.koreainvestment.com:29443"
|
|
if is_mock else
|
|
"https://openapi.koreainvestment.com:9443"
|
|
)
|
|
cache_path = CACHE_MOCK if is_mock else CACHE_REAL
|
|
mode = "모의" if is_mock else "실전"
|
|
|
|
try:
|
|
resp = requests.post(
|
|
f"{base_url}/oauth2/tokenP",
|
|
json={"grant_type": "client_credentials",
|
|
"appkey": app_key, "appsecret": app_secret},
|
|
timeout=15,
|
|
)
|
|
data = resp.json()
|
|
token = data.get("access_token", "")
|
|
exp = data.get("access_token_token_expired", "")
|
|
|
|
if not token:
|
|
err = data.get("message") or data.get("error_description") or str(data)
|
|
logger.error(f"❌ {mode} 토큰 발급 실패: {err}")
|
|
# EGW00133: 1분당 1회 제한
|
|
if "EGW00133" in str(data):
|
|
logger.warning("⚠️ KIS 분당 발급 제한. 1분 후 재시도하세요.")
|
|
return False
|
|
|
|
cache_path.write_text(
|
|
json.dumps({
|
|
"access_token": token,
|
|
"access_token_token_expired": exp,
|
|
"mock": is_mock,
|
|
}, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
logger.info(f"✅ {mode} 토큰 발급 완료 | 만료: {exp} | 앞12자: {token[:12]}…")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ {mode} 토큰 발급 예외: {e}")
|
|
return False
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# 공개 API
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
def ensure_token(is_mock: bool, env: dict = None) -> bool:
|
|
"""
|
|
단일 모드(실전/모의) 토큰 유효성 확인 + 필요 시 자동 갱신.
|
|
이미 유효한 경우 → 발급 없이 True 반환 (23시간 정책 자동 준수).
|
|
"""
|
|
status = get_token_status(is_mock)
|
|
mode = "모의" if is_mock else "실전"
|
|
|
|
if status["valid"]:
|
|
logger.info(
|
|
f"🔑 {mode} 토큰 유효 ({status['expires_in_h']:.1f}h 남음) → 재사용"
|
|
)
|
|
return True
|
|
|
|
logger.info(f"⏰ {mode} 토큰 만료/없음 → 갱신 시도 (남은시간: {status['expires_in_h']:.1f}h)")
|
|
|
|
if env is None:
|
|
env = _load_env()
|
|
|
|
key_suffix = "MOCK" if is_mock else "REAL"
|
|
app_key = str(env.get(f"KIS_APP_KEY_{key_suffix}", "") or "").strip()
|
|
app_secret = str(env.get(f"KIS_APP_SECRET_{key_suffix}", "") or "").strip()
|
|
|
|
if not app_key or not app_secret:
|
|
logger.warning(f"⚠️ {mode} 키 없음 (KIS_APP_KEY_{key_suffix}) → 발급 불가")
|
|
return False
|
|
|
|
if not _acquire_lock():
|
|
return False
|
|
try:
|
|
# 잠금 획득 후 다시 확인 (다른 프로세스가 갱신했을 수 있음)
|
|
status = get_token_status(is_mock)
|
|
if status["valid"]:
|
|
logger.info(f"🔑 {mode} 토큰 이미 갱신됨 (다른 프로세스) → 재사용")
|
|
return True
|
|
return _issue_token(app_key, app_secret, is_mock)
|
|
finally:
|
|
_release_lock()
|
|
|
|
|
|
def ensure_both_tokens(env: dict = None) -> dict:
|
|
"""
|
|
실전 + 모의 토큰 모두 유효하게 유지.
|
|
|
|
어느 모드로 봇을 실행하든 두 토큰을 동시에 관리.
|
|
만료된 것만 갱신 → KIS 23시간 정책 자동 준수.
|
|
|
|
반환: {"real": True/False, "mock": True/False}
|
|
"""
|
|
if env is None:
|
|
env = _load_env()
|
|
|
|
results = {}
|
|
# 실전 먼저 (더 중요, 잠금 경쟁 최소화)
|
|
for is_mock in [False, True]:
|
|
mode = "mock" if is_mock else "real"
|
|
results[mode] = ensure_token(is_mock, env)
|
|
|
|
ok_str = " | ".join(
|
|
f"{'모의' if k == 'mock' else '실전'}={'✅' if v else '❌'}"
|
|
for k, v in results.items()
|
|
)
|
|
logger.info(f"🔄 토큰 상태: {ok_str}")
|
|
return results
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# KisTokenManager — KiwoomTokenManager 와 동일한 get_token() 인터페이스
|
|
#
|
|
# 동작 원리:
|
|
# 1. 메모리 캐시 (_token, _expiry) — 파일 읽기 최소화
|
|
# 2. JSON 파일 캐시 — 크로스 프로세스 공유 + 재시작 후 토큰 유지
|
|
# 3. 만료 10분 전 선제 갱신 — API 에러(EGW00123) 없이 자동 교체
|
|
#
|
|
# KIS 에는 토큰 상태 조회 API 가 없음 → 발급 시 받은 expires_dt 를
|
|
# 서버 현재시간과 비교해서 판단 (JSON 파일에 저장된 날짜 사용)
|
|
#
|
|
# 봇에서 사용:
|
|
# from kis_token_manager import KisTokenManager
|
|
# token = KisTokenManager.instance(is_mock).get_token() # 매 API 호출 전
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
_km_instances: dict = {} # is_mock(bool) → KisTokenManager 싱글톤
|
|
_km_instances_lock = threading.Lock()
|
|
|
|
|
|
class KisTokenManager:
|
|
"""
|
|
KIS 토큰 싱글톤 매니저.
|
|
- get_token(): 유효하면 바로 반환, 만료 10분 전부터 선제 갱신
|
|
- kiwoom_rest_api TokenManager / KiwoomTokenManager 와 동일한 인터페이스
|
|
"""
|
|
|
|
@classmethod
|
|
def instance(cls, is_mock: bool) -> "KisTokenManager":
|
|
"""프로세스당 실전/모의 각 1개 싱글톤 반환"""
|
|
with _km_instances_lock:
|
|
if is_mock not in _km_instances:
|
|
_km_instances[is_mock] = cls(is_mock)
|
|
return _km_instances[is_mock]
|
|
|
|
def __init__(self, is_mock: bool):
|
|
self._is_mock = is_mock
|
|
self._mode_str = "모의" if is_mock else "실전"
|
|
self._cache_path = CACHE_MOCK if is_mock else CACHE_REAL
|
|
self._lock = threading.Lock()
|
|
self._token: Optional[str] = None
|
|
self._expiry: Optional[datetime] = None
|
|
self._load_from_file() # 재시작 후에도 기존 토큰 재사용
|
|
|
|
# ── 내부 ──────────────────────────────────────────────────────
|
|
def _load_from_file(self) -> None:
|
|
"""JSON 파일 → 메모리 로드"""
|
|
if not self._cache_path.exists():
|
|
return
|
|
try:
|
|
data = json.loads(self._cache_path.read_text(encoding="utf-8"))
|
|
token = data.get("access_token", "")
|
|
exp_dt = _parse_expired(data.get("access_token_token_expired", ""))
|
|
if token and exp_dt:
|
|
self._token = token
|
|
self._expiry = exp_dt
|
|
except Exception as e:
|
|
logger.debug("KIS 파일 캐시 로드 실패 [%s]: %s", self._mode_str, e)
|
|
|
|
def _is_valid(self) -> bool:
|
|
"""
|
|
만료 10분 전부터 유효하지 않다고 판단 → 선제 갱신 트리거.
|
|
KIS에는 토큰 상태 조회 API 없음 → JSON 파일의 expires_dt 와 서버시간 비교.
|
|
"""
|
|
if not self._token or not self._expiry:
|
|
return False
|
|
return datetime.now() < self._expiry - timedelta(minutes=EXPIRE_MARGIN_M)
|
|
|
|
# ── 공개 ──────────────────────────────────────────────────────
|
|
def get_token(self) -> Optional[str]:
|
|
"""
|
|
유효한 토큰 반환. 만료 10분 전 자동 갱신.
|
|
매수/매도 API 호출 직전마다 호출해도 안전 (갱신은 만료 임박 시만).
|
|
|
|
흐름:
|
|
1. 메모리 캐시 유효 → 즉시 반환 (파일 읽기 없음)
|
|
2. 메모리 만료 → 파일 재로드 (다른 프로세스가 갱신했을 수 있음)
|
|
3. 파일도 만료 → ensure_token() 으로 신규 발급 + 파일/메모리 갱신
|
|
"""
|
|
with self._lock:
|
|
if self._is_valid():
|
|
return self._token
|
|
# 다른 프로세스가 갱신했을 수 있으니 파일 재로드
|
|
self._load_from_file()
|
|
if self._is_valid():
|
|
logger.info("🔑 %s 토큰 파일 갱신 감지 → 메모리 갱신", self._mode_str)
|
|
return self._token
|
|
|
|
# 파일도 만료 → 잠금 기반 신규 발급
|
|
ok = ensure_token(self._is_mock)
|
|
if ok:
|
|
with self._lock:
|
|
self._load_from_file()
|
|
logger.info(
|
|
"🔄 %s 토큰 선제 갱신 완료 (만료 10분 전) → 앞8자: %s…",
|
|
self._mode_str,
|
|
(self._token[:8] if self._token else "N/A"),
|
|
)
|
|
return self._token if ok else None
|
|
|
|
def status(self) -> dict:
|
|
"""현재 상태 반환 (get_token_status 래퍼, 남은 시간 포함)"""
|
|
return get_token_status(self._is_mock)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# CLI: 상태 확인 / 강제 갱신
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import logging as _lg
|
|
|
|
_lg.basicConfig(
|
|
level=_lg.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
parser = argparse.ArgumentParser(description="KIS 토큰 관리자")
|
|
parser.add_argument("--refresh", action="store_true", help="만료 여부와 관계없이 강제 갱신 시도")
|
|
args = parser.parse_args()
|
|
|
|
print("\n" + "=" * 55)
|
|
print(" KIS 토큰 상태")
|
|
print("=" * 55)
|
|
for is_mock, label in [(False, "실전"), (True, "모의")]:
|
|
s = get_token_status(is_mock)
|
|
flag = "✅ 유효" if s["valid"] else "❌ 만료/없음"
|
|
print(f" {label:4s} | {flag} | 만료: {s['expires']} ({s['expires_in_h']:+.1f}h)")
|
|
print("=" * 55)
|
|
|
|
if args.refresh:
|
|
print("\n강제 갱신 시도 중...")
|
|
# 캐시 무효화 후 재발급
|
|
for path in [CACHE_REAL, CACHE_MOCK]:
|
|
if path.exists():
|
|
path.unlink()
|
|
print(f" 캐시 삭제: {path.name}")
|
|
|
|
result = ensure_both_tokens()
|
|
print("\n갱신 결과:", result, "\n")
|