Files
kis_bot/database.py
2026-02-22 21:42:41 +09:00

941 lines
40 KiB
Python

"""
트레이딩 봇 데이터베이스 관리 모듈
- SQLite 기반 데이터 무결성 보장 (JSON 대비 재시작 안전성 향상)
- 활성 트레이딩 관리 (active_trades)
- 매매 히스토리 관리 (trade_history)
"""
import json
import sqlite3
import datetime
import logging
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger("TradeDB")
# env_config 테이블 컬럼 (키 하나당 컬럼 하나, 추가/삭제 시 여기와 CREATE TABLE만 수정)
ENV_CONFIG_KEYS = (
"STOP_LOSS_PCT", "SHOULDER_CUT_PCT", "STOP_ATR_MULTIPLIER_TAIL", "TARGET_ATR_MULTIPLIER_TAIL",
"MAX_POSITION_PCT", "USE_SLOT_CAP", "SLOT_CAP_PCT", "MAX_STOCKS",
"USE_KELLY", "RISK_PCT_PER_TRADE", "MIN_POSITION_AMOUNT",
"USE_RISK_CHECK", "DAILY_STOP_LOSS_PCT", "CONSECUTIVE_LOSS_LIMIT",
"USE_BAN_SYSTEM", "BAN_HOURS", "USE_STOCK_FILTER", "RSI_OVERHEAT_THRESHOLD",
"MIN_RECOVERY_RATIO", "MAX_RECOVERY_RATIO",
"USE_TWAP", "TWAP_MIN_SPLIT", "TWAP_MAX_SPLIT", "TWAP_MIN_DELAY", "TWAP_MAX_DELAY",
"USE_ML_SIGNAL", "ML_MIN_PROBABILITY", "USE_NEWS_ANALYSIS", "NEWS_ANALYSIS_HOUR", "NEWS_MAX_COUNT",
"USE_QUICK_PROFIT_PROTECTION", "HIGH_PRICE_CHASE_THRESHOLD", "MAX_DAILY_CHANGE_PCT",
"MA20_MAX_ABOVE_PCT", "VOLUME_AVG_MULTIPLIER", "CANDLE_OPEN_PRICE_BUFFER",
"INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "SIZE_CLASS_LARGE_MIN", "SIZE_CLASS_MID_MIN",
"USE_RANDOM_SPLIT", "FORCE_MARKET_OPEN", "TOTAL_DEPOSIT",
# POP/LOCK·금액 손절 관련 추가 키
"ROUND_TRIP_COST_PCT", "POP_NET_PCT", "LOCK_NET_PCT", "MAX_LOSS_PER_TRADE_KRW",
# 한투 API 관련 키 추가 (실전/모의 계좌 분리)
"KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL",
"KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK",
"KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", # 실전 계좌 (KIS_MOCK=false 시 사용)
"KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK", # 모의 계좌 (KIS_MOCK=true 시 사용)
"KIS_MOCK",
# 단타 봇 전용 키
"TAKE_PROFIT_PCT", "MIN_DROP_RATE", "MIN_RECOVERY_RATIO_SHORT",
# 늘림목 봇 전용 키
"MAX_PER", "MAX_PEG", "MIN_GROWTH_PCT", "DCA_INTERVALS", "DCA_AMOUNTS",
# Mattermost 및 AI 리포트 관련 키
"MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY",
# 봇별 Mattermost 채널 구분용 키
"KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL",
)
class TradeDB:
"""
트레이딩 봇용 SQLite 데이터베이스 관리 클래스
"""
def __init__(self, db_path="quant_bot.db"):
"""
Args:
db_path: SQLite DB 파일 경로 (기본: quant_bot.db)
"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row # 딕셔너리처럼 접근 가능
self._create_tables()
logger.info(f"✅ TradeDB 초기화 완료: {db_path}")
def _create_tables(self):
"""DB 테이블 생성 (없을 경우)"""
with self.conn:
# 1. 활성 트레이딩 테이블 (현재 보유 중이거나 매수 중인 종목)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS active_trades (
code TEXT PRIMARY KEY, -- 종목코드
name TEXT NOT NULL, -- 종목명
strategy TEXT, -- 매매 전략 (TAIL_CATCH_3M 등)
-- [가격 정보]
avg_buy_price REAL NOT NULL, -- 평단가
current_price REAL, -- 현재가 (업데이트용)
stop_price REAL, -- 손절가
target_price REAL, -- 목표가
max_price REAL, -- 최고가 (트레일링 스탑용)
atr_entry REAL, -- 진입 시 ATR 변동성
-- [수량 및 진행 상태 (분할매수용)]
target_qty INTEGER NOT NULL, -- 목표 매수 수량
current_qty INTEGER NOT NULL,-- 현재 체결 수량
total_invested REAL, -- 총 투입 금액 (수수료 제외)
-- [상태 관리]
status TEXT NOT NULL, -- BUYING(매수중), HOLDING(보유중), SELLING(매도중)
buy_date TEXT NOT NULL, -- 첫 매수 시작 시간
updated_at TEXT NOT NULL, -- 마지막 업데이트 시간
size_class TEXT -- 대/중/소형 (매수 시점)
)
""")
# 2. 매매 기록 테이블 (손익 분석 & 켈리 공식용)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS trade_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL,
name TEXT NOT NULL,
strategy TEXT,
buy_price REAL NOT NULL, -- 평단가
sell_price REAL NOT NULL, -- 매도가
qty INTEGER NOT NULL, -- 수량
profit_rate REAL NOT NULL, -- 수익률 (%)
realized_pnl REAL NOT NULL, -- 실현 손익금 (원)
hold_minutes INTEGER, -- 보유 시간 (분)
buy_date TEXT, -- 매수 시작 시간
sell_date TEXT NOT NULL, -- 매도 완료 시간
sell_reason TEXT, -- 매도 사유
env_snapshot TEXT, -- 매도 시점 env (JSON)
size_class TEXT -- 대/중/소형 (변동성 구간)
)
""")
# 3. 일일 손익 요약 테이블 (대시보드용)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS daily_summary (
date TEXT PRIMARY KEY, -- 날짜 (YYYY-MM-DD)
start_asset REAL, -- 시작 자산
end_asset REAL, -- 종료 자산
total_trades INTEGER, -- 총 매매 횟수
win_trades INTEGER, -- 익절 횟수
total_pnl REAL, -- 총 손익
win_rate REAL -- 승률 (%)
)
""")
# 4. 주문·체결 보강 테이블 (kt00007 / ka10076)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS order_execution_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL, -- 'kt00007' or 'ka10076'
ord_no TEXT,
stk_cd TEXT,
stk_nm TEXT,
trde_tp TEXT, -- 매매구분
ord_qty TEXT,
ord_uv TEXT,
cntr_qty TEXT,
cntr_uv TEXT,
ord_tm TEXT,
cnfm_tm TEXT,
sell_tp TEXT,
ord_dt TEXT,
raw_json TEXT,
fetched_at TEXT NOT NULL
)
""")
# 5. 매수 체결 이력 (일일 한도용 - '산 시점' 날짜 기준 누적)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS buy_execution_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL,
name TEXT NOT NULL,
strategy TEXT NOT NULL,
buy_date TEXT NOT NULL, -- YYYY-MM-DD (산 날짜 = 한도 기준일)
executed_at TEXT NOT NULL, -- 체결 시각
amount REAL NOT NULL, -- 매수 금액 (주가×수량)
qty INTEGER NOT NULL
)
""")
# 6. 매수 후보군 테이블 (target_universe 대체)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS target_candidates (
code TEXT PRIMARY KEY, -- 종목코드
name TEXT NOT NULL, -- 종목명
score REAL NOT NULL, -- 개미털기 점수 (높을수록 좋음)
price REAL NOT NULL, -- 현재가
scan_time TEXT NOT NULL, -- 스캔 시간
updated_at TEXT NOT NULL -- 마지막 업데이트
)
""")
# 7. env 설정 전용 테이블 (관리자용, INSERT만 / 최신 1건 = 현재 설정, 키당 컬럼)
cols = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS])
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS env_config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
{cols}
)
""")
self._migrate_add_columns()
self._migrate_env_config_to_columns()
logger.info("📊 DB 테이블 생성/확인 완료")
def _migrate_add_columns(self):
"""기존 DB에 누락된 컬럼 추가 (한 번만)"""
try:
cur = self.conn.execute("PRAGMA table_info(trade_history)")
cols = [row[1] for row in cur.fetchall()]
if "env_snapshot" not in cols:
self.conn.execute("ALTER TABLE trade_history ADD COLUMN env_snapshot TEXT")
logger.info("📌 trade_history.env_snapshot 컬럼 추가")
if "size_class" not in cols:
self.conn.execute("ALTER TABLE trade_history ADD COLUMN size_class TEXT")
logger.info("📌 trade_history.size_class 컬럼 추가")
except Exception as e:
logger.debug(f"migrate trade_history: {e}")
try:
cur = self.conn.execute("PRAGMA table_info(active_trades)")
cols = [row[1] for row in cur.fetchall()]
if "size_class" not in cols:
self.conn.execute("ALTER TABLE active_trades ADD COLUMN size_class TEXT")
logger.info("📌 active_trades.size_class 컬럼 추가")
except Exception as e:
logger.debug(f"migrate active_trades: {e}")
# env_config 테이블에 ENV_CONFIG_KEYS에 정의된 컬럼 누락 시 추가
try:
cur = self.conn.execute("PRAGMA table_info(env_config)")
cols = [row[1] for row in cur.fetchall()]
for key in ENV_CONFIG_KEYS:
if key not in cols:
self.conn.execute(f'ALTER TABLE env_config ADD COLUMN "{key}" TEXT')
logger.info(f"📌 env_config.{key} 컬럼 추가")
except Exception as e:
logger.debug(f"migrate env_config columns: {e}")
def _migrate_env_config_to_columns(self):
"""env_config가 예전 JSON 컬럼(snapshot_json)이면 컬럼 스키마로 이전"""
try:
cur = self.conn.execute("PRAGMA table_info(env_config)")
cols = [row[1] for row in cur.fetchall()]
if "snapshot_json" not in cols:
return
# 기존 데이터 백업 후 새 테이블로 이전
rows = self.conn.execute("SELECT id, created_at, snapshot_json FROM env_config ORDER BY id").fetchall()
col_defs = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS])
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS env_config_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
{col_defs}
)
""")
key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS)
placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS)))
for row in rows:
snap = json.loads(row["snapshot_json"]) if row["snapshot_json"] else {}
vals = [row["created_at"]] + [snap.get(k) for k in ENV_CONFIG_KEYS]
self.conn.execute(
f"INSERT INTO env_config_new (created_at, {key_list}) VALUES ({placeholders})",
vals,
)
self.conn.execute("DROP TABLE env_config")
self.conn.execute("ALTER TABLE env_config_new RENAME TO env_config")
self.conn.commit()
logger.info("📌 env_config: snapshot_json -> 컬럼 스키마 마이그레이션 완료")
except Exception as e:
logger.debug(f"migrate env_config: {e}")
# ============================================================
# [CRUD] Active Trades (활성 트레이딩 관리)
# ============================================================
def upsert_trade(self, trade_data: Dict):
"""
신규 매수하거나 정보 업데이트 (평단가, 수량 등)
Args:
trade_data: 트레이드 정보 딕셔너리
필수: code, name, avg_buy_price, target_qty, current_qty, status
선택: strategy, stop_price, target_price, max_price, atr_entry, total_invested
"""
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 기본값 설정
code = trade_data.get('code')
if not code:
logger.error("종목코드 누락: upsert 실패")
return False
size_class = trade_data.get('size_class')
sql = """
INSERT INTO active_trades (
code, name, strategy, avg_buy_price, current_price, stop_price, target_price,
max_price, atr_entry, target_qty, current_qty, total_invested,
status, buy_date, updated_at, size_class
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(code) DO UPDATE SET
avg_buy_price = excluded.avg_buy_price,
current_price = excluded.current_price,
stop_price = COALESCE(excluded.stop_price, active_trades.stop_price),
target_price = COALESCE(excluded.target_price, active_trades.target_price),
atr_entry = COALESCE(excluded.atr_entry, active_trades.atr_entry),
current_qty = excluded.current_qty,
total_invested = excluded.total_invested,
max_price = MAX(active_trades.max_price, excluded.max_price),
status = excluded.status,
updated_at = excluded.updated_at,
size_class = COALESCE(excluded.size_class, active_trades.size_class)
"""
params = (
code,
trade_data.get('name', 'Unknown'),
trade_data.get('strategy', 'MANUAL'),
trade_data.get('avg_buy_price') or trade_data.get('buy_price', 0),
trade_data.get('current_price', 0),
trade_data.get('stop_price', 0),
trade_data.get('target_price', 0),
trade_data.get('max_price', trade_data.get('buy_price', 0)),
trade_data.get('atr_at_entry') or trade_data.get('atr_entry', 0),
trade_data.get('target_qty', trade_data.get('qty', 0)),
trade_data.get('current_qty') or trade_data.get('qty', 0),
trade_data.get('total_invested', 0),
trade_data.get('status', 'HOLDING'),
trade_data.get('buy_date', now),
now,
size_class
)
try:
with self.conn:
self.conn.execute(sql, params)
return True
except Exception as e:
logger.error(f"❌ upsert_trade 실패 ({code}): {e}")
return False
def get_active_trades(self, strategy_prefix: Optional[str] = None):
"""
활성 트레이딩 목록 조회 (봇 재시작 시 사용)
Args:
strategy_prefix: None이면 전부, 'LONG'이면 strategy LIKE 'LONG%'만, 'SHORT''SHORT%'
(늘림목/단타 섞임 방지)
Returns:
{종목코드: {trade_info}} 형태의 딕셔너리
"""
try:
if strategy_prefix:
cursor = self.conn.execute(
"SELECT * FROM active_trades WHERE strategy LIKE ?",
(strategy_prefix.strip().upper() + "%",)
)
else:
cursor = self.conn.execute("SELECT * FROM active_trades")
rows = cursor.fetchall()
# 기존 JSON 포맷과 호환되도록 딕셔너리 변환
result = {}
for row in rows:
code = row['code']
result[code] = {
'code': code,
'name': row['name'],
'strategy': row['strategy'],
'buy_price': row['avg_buy_price'], # JSON 호환
'avg_buy_price': row['avg_buy_price'],
'current_price': row['current_price'],
'stop_price': row['stop_price'],
'target_price': row['target_price'],
'max_price': row['max_price'],
'atr_at_entry': row['atr_entry'],
'qty': row['current_qty'], # JSON 호환
'target_qty': row['target_qty'],
'current_qty': row['current_qty'],
'total_invested': row['total_invested'],
'status': row['status'],
'buy_date': row['buy_date'],
'updated_at': row['updated_at'],
'size_class': row['size_class'] if 'size_class' in row.keys() else None,
}
logger.debug(f"📂 활성 트레이드 로드: {len(result)}")
return result
except Exception as e:
logger.error(f"❌ get_active_trades 실패: {e}")
return {}
def update_current_price(self, code: str, current_price: float):
"""현재가 업데이트 (매도 판단용)"""
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with self.conn:
self.conn.execute(
"UPDATE active_trades SET current_price=?, updated_at=? WHERE code=?",
(current_price, now, code)
)
except Exception as e:
logger.error(f"❌ 현재가 업데이트 실패 ({code}): {e}")
def update_max_price(self, code: str, new_max_price: float):
"""최고가 갱신 (트레일링 스탑용)"""
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with self.conn:
# 기존 max_price보다 클 때만 업데이트
self.conn.execute(
"""UPDATE active_trades
SET max_price = MAX(max_price, ?), updated_at = ?
WHERE code = ?""",
(new_max_price, now, code)
)
except Exception as e:
logger.error(f"❌ 최고가 갱신 실패 ({code}): {e}")
def close_trade(
self,
code: str,
sell_price: float,
sell_reason: str = "",
env_snapshot: str = None,
size_class: str = None,
):
"""
매도 완료 처리: active_trades 삭제 -> trade_history 이동 (INSERT만, env 스냅샷 포함)
Args:
code: 종목코드
sell_price: 매도가
sell_reason: 매도 사유
env_snapshot: 매도 시점 env JSON (백테스트/대시보드용)
size_class: 대/중/소형 (매수 시점 저장값)
"""
try:
# 1. 활성 트레이드 정보 조회
cursor = self.conn.execute("SELECT * FROM active_trades WHERE code=?", (code,))
trade = cursor.fetchone()
if not trade:
logger.warning(f"⚠️ close_trade: {code} 종목이 active_trades에 없음")
return False
# 2. 손익 계산
buy_price = trade['avg_buy_price']
qty = trade['current_qty']
profit_rate = ((sell_price - buy_price) / buy_price) * 100 if buy_price > 0 else 0
realized_pnl = (sell_price - buy_price) * qty
# 3. 보유 시간 계산
buy_time = datetime.datetime.strptime(trade['buy_date'], '%Y-%m-%d %H:%M:%S')
sell_time = datetime.datetime.now()
hold_minutes = int((sell_time - buy_time).total_seconds() / 60)
# size_class는 active_trades에 있으면 그대로 사용
if size_class is None and 'size_class' in trade.keys() and trade['size_class']:
size_class = trade['size_class']
# 4. trade_history에 저장 (env_snapshot, size_class 포함 INSERT)
with self.conn:
self.conn.execute("""
INSERT INTO trade_history (
code, name, strategy, buy_price, sell_price, qty,
profit_rate, realized_pnl, hold_minutes, buy_date, sell_date, sell_reason,
env_snapshot, size_class
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
trade['code'],
trade['name'],
trade['strategy'],
buy_price,
sell_price,
qty,
profit_rate,
realized_pnl,
hold_minutes,
trade['buy_date'],
sell_time.strftime('%Y-%m-%d %H:%M:%S'),
sell_reason,
env_snapshot,
size_class,
))
# 5. active_trades에서 삭제
self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,))
logger.info(f"✅ [{trade['name']}] 매매 종료: 수익률 {profit_rate:.2f}% ({realized_pnl:+,.0f}원)")
return True
except Exception as e:
logger.error(f"❌ close_trade 실패 ({code}): {e}")
return False
def delete_active_trade(self, code: str):
"""활성 트레이드 삭제 (긴급 정리용)"""
try:
with self.conn:
self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,))
logger.info(f"🗑️ active_trade 삭제: {code}")
return True
except Exception as e:
logger.error(f"❌ 삭제 실패 ({code}): {e}")
return False
def insert_buy_execution(
self,
code: str,
name: str,
strategy: str,
amount: float,
qty: int,
):
"""
매수 체결 이력 저장 (일일 한도용). '하루' = 산 날짜(buy_date) 기준.
"""
now = datetime.datetime.now()
buy_date = now.strftime("%Y-%m-%d")
executed_at = now.strftime("%Y-%m-%d %H:%M:%S")
try:
with self.conn:
self.conn.execute("""
INSERT INTO buy_execution_log (code, name, strategy, buy_date, executed_at, amount, qty)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (code, name, strategy, buy_date, executed_at, amount, qty))
return True
except Exception as e:
logger.error(f"❌ insert_buy_execution 실패 ({code}): {e}")
return False
def get_daily_buy_amount(self, date_str: str, strategy_prefix: str = "LONG") -> Tuple[float, int]:
"""
해당 날짜(산 시점 기준)에 strategy_prefix에 해당하는 매수 누적 금액·건수.
date_str: YYYY-MM-DD
Returns:
(누적 금액, 건수)
"""
try:
cursor = self.conn.execute("""
SELECT COALESCE(SUM(amount), 0), COUNT(*)
FROM buy_execution_log
WHERE buy_date = ? AND strategy LIKE ?
""", (date_str, strategy_prefix.strip().upper() + "%"))
row = cursor.fetchone()
return (float(row[0]), int(row[1]))
except Exception as e:
logger.error(f"❌ get_daily_buy_amount 실패: {e}")
return (0.0, 0)
# ============================================================
# [보강] 주문·체결 이력 (kt00007 / ka10076)
# ============================================================
def insert_order_execution(
self, source: str, row: dict, ord_dt: str = None, sell_tp: str = None, raw_json: str = None
):
"""주문·체결 1건 INSERT (보강용, 이력만 쌓음)"""
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
self.conn.execute("""
INSERT INTO order_execution_history (
source, ord_no, stk_cd, stk_nm, trde_tp, ord_qty, ord_uv,
cntr_qty, cntr_uv, ord_tm, cnfm_tm, sell_tp, ord_dt, raw_json, fetched_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
source,
row.get('ord_no') or row.get('orig_ord_no'),
row.get('stk_cd', ''),
row.get('stk_nm', ''),
row.get('trde_tp', ''),
str(row.get('ord_qty', '') or row.get('cntr_qty', '')),
str(row.get('ord_uv', '') or row.get('ord_pric', '') or row.get('cntr_uv', '')),
str(row.get('cntr_qty', '') or row.get('cnfm_qty', '')),
str(row.get('cntr_uv', '') or row.get('cntr_pric', '')),
row.get('ord_tm', ''),
row.get('cnfm_tm', ''),
sell_tp or '',
ord_dt or '',
raw_json,
now,
))
self.conn.commit()
return True
except Exception as e:
logger.debug(f"insert_order_execution: {e}")
return False
# ============================================================
# [분석] 켈리 공식 및 통계 계산
# ============================================================
def calculate_half_kelly(self, recent_days: int = 30) -> float:
"""
하프 켈리 공식 계산 (과거 매매 기록 기반)
Args:
recent_days: 최근 N일 데이터만 사용
Returns:
하프 켈리 비율 (0.0 ~ 1.0)
예: 0.15 리턴 -> "예수금의 15%씩 배팅하는 게 최적"
"""
try:
# 최근 N일 데이터 조회
cutoff_date = (datetime.datetime.now() - datetime.timedelta(days=recent_days)).strftime('%Y-%m-%d')
cursor = self.conn.execute(
"SELECT profit_rate FROM trade_history WHERE sell_date >= ? ORDER BY sell_date DESC",
(cutoff_date,)
)
rows = cursor.fetchall()
if len(rows) < 20: # 최소 20건 이상 필요
logger.warning(f"⚠️ 켈리 공식: 데이터 부족 ({len(rows)}건) -> 기본값 10% 리턴")
return 0.10
# 승률 계산
wins = [r['profit_rate'] for r in rows if r['profit_rate'] > 0]
losses = [r['profit_rate'] for r in rows if r['profit_rate'] <= 0]
total_count = len(rows)
win_count = len(wins)
win_rate = win_count / total_count
loss_rate = 1.0 - win_rate
# 손익비 계산 (평균 수익 / 평균 손실)
if not wins or not losses:
logger.warning("⚠️ 켈리 공식: 승 또는 패만 있음 -> 기본값 10%")
return 0.10
avg_win = sum(wins) / len(wins)
avg_loss = abs(sum(losses) / len(losses))
if avg_loss == 0:
return 0.50 # 손실이 0이면 최대치
odds = avg_win / avg_loss
# 켈리 공식: f = (p * b - q) / b
# p=승률, b=손익비, q=패율
kelly_fraction = ((win_rate * odds) - loss_rate) / odds
# 하프 켈리 (안전성 확보)
half_kelly = kelly_fraction * 0.5
# 음수면 0 리턴 (통계적으로 지는 구조)
final_kelly = max(0.0, min(half_kelly, 0.5)) # 최대 50%로 제한
logger.info(
f"📊 [켈리 분석] 승률:{win_rate*100:.1f}% | 손익비:{odds:.2f} | "
f"켈리:{kelly_fraction*100:.1f}% | 하프켈리:{final_kelly*100:.1f}%"
)
return final_kelly
except Exception as e:
logger.error(f"❌ 켈리 계산 실패: {e}")
return 0.10
def get_recent_performance(self, days: int = 7) -> Tuple[float, int, int]:
"""
최근 N일 성과 조회
Returns:
(총손익, 익절횟수, 손절횟수)
"""
try:
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime('%Y-%m-%d')
cursor = self.conn.execute(
"SELECT realized_pnl FROM trade_history WHERE sell_date >= ?",
(cutoff,)
)
rows = cursor.fetchall()
total_pnl = sum([r['realized_pnl'] for r in rows])
wins = len([r for r in rows if r['realized_pnl'] > 0])
losses = len([r for r in rows if r['realized_pnl'] <= 0])
return total_pnl, wins, losses
except Exception as e:
logger.error(f"❌ 성과 조회 실패: {e}")
return 0.0, 0, 0
def get_trade_stats(self) -> Dict:
"""전체 매매 통계"""
try:
cursor = self.conn.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN profit_rate > 0 THEN 1 ELSE 0 END) as wins,
AVG(profit_rate) as avg_profit_rate,
SUM(realized_pnl) as total_pnl
FROM trade_history
""")
row = cursor.fetchone()
return {
'total_trades': row['total'] or 0,
'win_trades': row['wins'] or 0,
'win_rate': (row['wins'] / row['total'] * 100) if row['total'] > 0 else 0,
'avg_profit_rate': row['avg_profit_rate'] or 0,
'total_pnl': row['total_pnl'] or 0
}
except Exception as e:
logger.error(f"❌ 통계 조회 실패: {e}")
return {}
# ============================================================
# [유틸] JSON 마이그레이션
# ============================================================
def migrate_from_json(self, json_data: Dict):
"""
기존 JSON 포트폴리오를 DB로 마이그레이션
Args:
json_data: portfolio.json 내용 (딕셔너리)
"""
count = 0
for code, info in json_data.items():
trade_data = info.copy()
trade_data['code'] = code
# 필드 매핑 (JSON -> DB)
if 'target_qty' not in trade_data:
trade_data['target_qty'] = info.get('qty', 0)
if 'current_qty' not in trade_data:
trade_data['current_qty'] = info.get('qty', 0)
if 'total_invested' not in trade_data:
trade_data['total_invested'] = info.get('buy_price', 0) * info.get('qty', 0)
if 'status' not in trade_data:
trade_data['status'] = 'HOLDING'
if self.upsert_trade(trade_data):
count += 1
logger.info(f"✅ JSON -> DB 마이그레이션 완료: {count}개 종목")
return count
# ============================================================
# [CRUD] Target Candidates (매수 후보군 관리)
# ============================================================
def update_target_candidates(self, candidates: List[Dict]):
"""
매수 후보군 업데이트 (5분마다 호출)
Args:
candidates: [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...]
"""
try:
scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 기존 데이터 전체 삭제 (5분마다 새로 갱신)
with self.conn:
self.conn.execute("DELETE FROM target_candidates")
# 새 후보군 삽입
for item in candidates:
self.conn.execute("""
INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
item['code'],
item['name'],
item['score'],
item['price'],
scan_time,
scan_time
))
logger.info(f"✅ 매수 후보군 DB 저장: {len(candidates)}")
return True
except Exception as e:
logger.error(f"❌ 후보군 저장 실패: {e}")
return False
def add_target_candidate(self, candidate: Dict):
"""
매수 후보군 개별 추가 (통과 즉시 저장용, UPSERT 방식)
- 500개 스캔 시 시간이 오래 걸려서 통과하는 즉시 DB에 저장
Args:
candidate: {'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, ...}
"""
try:
scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with self.conn:
# UPSERT: 있으면 업데이트, 없으면 삽입
self.conn.execute("""
INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(code) DO UPDATE SET
name = excluded.name,
score = excluded.score,
price = excluded.price,
scan_time = excluded.scan_time,
updated_at = excluded.updated_at
""", (
candidate['code'],
candidate.get('name', ''),
candidate.get('score', 0),
candidate.get('price', 0),
scan_time,
scan_time
))
return True
except Exception as e:
logger.debug(f"후보 개별 저장 실패({candidate.get('code', '')}): {e}")
return False
def get_target_candidates(self) -> List[Dict]:
"""
매수 후보군 조회 (점수 순)
Returns:
[{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...]
"""
try:
cursor = self.conn.execute("""
SELECT code, name, score, price, scan_time
FROM target_candidates
ORDER BY score DESC, price ASC
""")
rows = cursor.fetchall()
result = []
for row in rows:
result.append({
'code': row['code'],
'name': row['name'],
'score': row['score'],
'price': row['price'],
'scan_time': row['scan_time']
})
return result
except Exception as e:
logger.error(f"❌ 후보군 조회 실패: {e}")
return []
def get_trades_by_date(self, date_str: str) -> List[Dict]:
"""
특정 날짜의 매매 기록 조회
Args:
date_str: 날짜 (YYYYMMDD)
Returns:
매매 기록 리스트
"""
try:
# YYYYMMDD -> YYYY-MM-DD 변환
date_formatted = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}"
cursor = self.conn.execute("""
SELECT * FROM trade_history
WHERE DATE(sell_date) = ?
ORDER BY sell_date DESC
""", (date_formatted,))
rows = cursor.fetchall()
result = []
for row in rows:
result.append({
'id': row['id'],
'code': row['code'],
'name': row['name'],
'strategy': row['strategy'],
'buy_price': row['buy_price'],
'sell_price': row['sell_price'],
'qty': row['qty'],
'profit_rate': row['profit_rate'],
'realized_pnl': row['realized_pnl'],
'hold_minutes': row['hold_minutes'],
'buy_date': row['buy_date'],
'sell_date': row['sell_date'],
'sell_reason': row['sell_reason']
})
return result
except Exception as e:
logger.error(f"❌ 날짜별 조회 실패: {e}")
return []
# ============================================================
# [env_config] 관리자용 env (INSERT만, 최신 1건 = 살아있는 값)
# ============================================================
def insert_env_snapshot(self, snapshot) -> Optional[int]:
"""
env 설정 INSERT (UPDATE 없음). 관리자가 설정 변경할 때 호출.
snapshot: dict 또는 JSON 문자열. 키는 ENV_CONFIG_KEYS에 있는 것만 저장.
Returns:
새 행 id, 실패 시 None
"""
try:
if isinstance(snapshot, str):
snapshot = json.loads(snapshot) if snapshot else {}
if not isinstance(snapshot, dict):
return None
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS)
placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS)))
vals = [now] + [snapshot.get(k) for k in ENV_CONFIG_KEYS]
cur = self.conn.execute(
f"INSERT INTO env_config (created_at, {key_list}) VALUES ({placeholders})",
vals,
)
self.conn.commit()
return cur.lastrowid
except Exception as e:
logger.error(f"❌ env_config INSERT 실패: {e}")
return None
def get_latest_env(self) -> Optional[Dict]:
"""
살아있는 최신 env 1건 조회 (ORDER BY id DESC LIMIT 1).
Returns:
{"id": int, "created_at": str, "snapshot": dict} 또는 없으면 None
"""
try:
row = self.conn.execute(
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
).fetchone()
if not row:
return None
snapshot = {
k: (row[k] if row[k] is not None else "")
for k in ENV_CONFIG_KEYS
if k in row.keys()
}
return {
"id": row["id"],
"created_at": row["created_at"],
"snapshot": snapshot,
}
except Exception as e:
logger.error(f"❌ env_config 최신 조회 실패: {e}")
return None
def close(self):
"""DB 연결 종료"""
if self.conn:
self.conn.close()
logger.info("🔒 DB 연결 종료")