""" 트레이딩 봇 데이터베이스 관리 모듈 - SQLite 기반 데이터 무결성 보장 (JSON 대비 재시작 안전성 향상) - 활성 트레이딩 관리 (active_trades) - 매매 히스토리 관리 (trade_history) """ import json import sqlite3 import datetime import logging from typing import Dict, List, Optional, Tuple logger = logging.getLogger("TradeDB") # env_config 테이블 컬럼 (키 하나당 컬럼 하나, 추가/삭제 시 여기와 CREATE TABLE만 수정) ENV_CONFIG_KEYS = ( "STOP_LOSS_PCT", "SHOULDER_CUT_PCT", "STOP_ATR_MULTIPLIER_TAIL", "TARGET_ATR_MULTIPLIER_TAIL", "MAX_POSITION_PCT", "USE_SLOT_CAP", "SLOT_CAP_PCT", "MAX_STOCKS", "USE_KELLY", "RISK_PCT_PER_TRADE", "MIN_POSITION_AMOUNT", "USE_RISK_CHECK", "DAILY_STOP_LOSS_PCT", "CONSECUTIVE_LOSS_LIMIT", "USE_BAN_SYSTEM", "BAN_HOURS", "USE_STOCK_FILTER", "RSI_OVERHEAT_THRESHOLD", "MIN_RECOVERY_RATIO", "MAX_RECOVERY_RATIO", "USE_TWAP", "TWAP_MIN_SPLIT", "TWAP_MAX_SPLIT", "TWAP_MIN_DELAY", "TWAP_MAX_DELAY", "USE_ML_SIGNAL", "ML_MIN_PROBABILITY", "USE_NEWS_ANALYSIS", "NEWS_ANALYSIS_HOUR", "NEWS_MAX_COUNT", "USE_QUICK_PROFIT_PROTECTION", "HIGH_PRICE_CHASE_THRESHOLD", "MAX_DAILY_CHANGE_PCT", "MA20_MAX_ABOVE_PCT", "VOLUME_AVG_MULTIPLIER", "CANDLE_OPEN_PRICE_BUFFER", "INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "SIZE_CLASS_LARGE_MIN", "SIZE_CLASS_MID_MIN", "USE_RANDOM_SPLIT", "FORCE_MARKET_OPEN", "TOTAL_DEPOSIT", # POP/LOCK·금액 손절 관련 추가 키 "ROUND_TRIP_COST_PCT", "POP_NET_PCT", "LOCK_NET_PCT", "MAX_LOSS_PER_TRADE_KRW", # 한투 API 관련 키 추가 (실전/모의 계좌 분리) "KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK", "KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", # 실전 계좌 (KIS_MOCK=false 시 사용) "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK", # 모의 계좌 (KIS_MOCK=true 시 사용) "KIS_MOCK", # 단타 봇 전용 키 "TAKE_PROFIT_PCT", "MIN_DROP_RATE", "MIN_RECOVERY_RATIO_SHORT", # 늘림목 봇 전용 키 "MAX_PER", "MAX_PEG", "MIN_GROWTH_PCT", "DCA_INTERVALS", "DCA_AMOUNTS", # Mattermost 및 AI 리포트 관련 키 "MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY", # 봇별 Mattermost 채널 구분용 키 "KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", ) class TradeDB: """ 트레이딩 봇용 SQLite 데이터베이스 관리 클래스 """ def __init__(self, db_path="quant_bot.db"): """ Args: db_path: SQLite DB 파일 경로 (기본: quant_bot.db) """ self.db_path = db_path self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row # 딕셔너리처럼 접근 가능 self._create_tables() logger.info(f"✅ TradeDB 초기화 완료: {db_path}") def _create_tables(self): """DB 테이블 생성 (없을 경우)""" with self.conn: # 1. 활성 트레이딩 테이블 (현재 보유 중이거나 매수 중인 종목) self.conn.execute(""" CREATE TABLE IF NOT EXISTS active_trades ( code TEXT PRIMARY KEY, -- 종목코드 name TEXT NOT NULL, -- 종목명 strategy TEXT, -- 매매 전략 (TAIL_CATCH_3M 등) -- [가격 정보] avg_buy_price REAL NOT NULL, -- 평단가 current_price REAL, -- 현재가 (업데이트용) stop_price REAL, -- 손절가 target_price REAL, -- 목표가 max_price REAL, -- 최고가 (트레일링 스탑용) atr_entry REAL, -- 진입 시 ATR 변동성 -- [수량 및 진행 상태 (분할매수용)] target_qty INTEGER NOT NULL, -- 목표 매수 수량 current_qty INTEGER NOT NULL,-- 현재 체결 수량 total_invested REAL, -- 총 투입 금액 (수수료 제외) -- [상태 관리] status TEXT NOT NULL, -- BUYING(매수중), HOLDING(보유중), SELLING(매도중) buy_date TEXT NOT NULL, -- 첫 매수 시작 시간 updated_at TEXT NOT NULL, -- 마지막 업데이트 시간 size_class TEXT -- 대/중/소형 (매수 시점) ) """) # 2. 매매 기록 테이블 (손익 분석 & 켈리 공식용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS trade_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL, name TEXT NOT NULL, strategy TEXT, buy_price REAL NOT NULL, -- 평단가 sell_price REAL NOT NULL, -- 매도가 qty INTEGER NOT NULL, -- 수량 profit_rate REAL NOT NULL, -- 수익률 (%) realized_pnl REAL NOT NULL, -- 실현 손익금 (원) hold_minutes INTEGER, -- 보유 시간 (분) buy_date TEXT, -- 매수 시작 시간 sell_date TEXT NOT NULL, -- 매도 완료 시간 sell_reason TEXT, -- 매도 사유 env_snapshot TEXT, -- 매도 시점 env (JSON) size_class TEXT -- 대/중/소형 (변동성 구간) ) """) # 3. 일일 손익 요약 테이블 (대시보드용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS daily_summary ( date TEXT PRIMARY KEY, -- 날짜 (YYYY-MM-DD) start_asset REAL, -- 시작 자산 end_asset REAL, -- 종료 자산 total_trades INTEGER, -- 총 매매 횟수 win_trades INTEGER, -- 익절 횟수 total_pnl REAL, -- 총 손익 win_rate REAL -- 승률 (%) ) """) # 4. 주문·체결 보강 테이블 (kt00007 / ka10076) self.conn.execute(""" CREATE TABLE IF NOT EXISTS order_execution_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, -- 'kt00007' or 'ka10076' ord_no TEXT, stk_cd TEXT, stk_nm TEXT, trde_tp TEXT, -- 매매구분 ord_qty TEXT, ord_uv TEXT, cntr_qty TEXT, cntr_uv TEXT, ord_tm TEXT, cnfm_tm TEXT, sell_tp TEXT, ord_dt TEXT, raw_json TEXT, fetched_at TEXT NOT NULL ) """) # 5. 매수 후보군 테이블 (target_universe 대체) self.conn.execute(""" CREATE TABLE IF NOT EXISTS target_candidates ( code TEXT PRIMARY KEY, -- 종목코드 name TEXT NOT NULL, -- 종목명 score REAL NOT NULL, -- 개미털기 점수 (높을수록 좋음) price REAL NOT NULL, -- 현재가 scan_time TEXT NOT NULL, -- 스캔 시간 updated_at TEXT NOT NULL -- 마지막 업데이트 ) """) # 6. env 설정 전용 테이블 (관리자용, INSERT만 / 최신 1건 = 현재 설정, 키당 컬럼) cols = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, {cols} ) """) self._migrate_add_columns() self._migrate_env_config_to_columns() logger.info("📊 DB 테이블 생성/확인 완료") def _migrate_add_columns(self): """기존 DB에 누락된 컬럼 추가 (한 번만)""" try: cur = self.conn.execute("PRAGMA table_info(trade_history)") cols = [row[1] for row in cur.fetchall()] if "env_snapshot" not in cols: self.conn.execute("ALTER TABLE trade_history ADD COLUMN env_snapshot TEXT") logger.info("📌 trade_history.env_snapshot 컬럼 추가") if "size_class" not in cols: self.conn.execute("ALTER TABLE trade_history ADD COLUMN size_class TEXT") logger.info("📌 trade_history.size_class 컬럼 추가") except Exception as e: logger.debug(f"migrate trade_history: {e}") try: cur = self.conn.execute("PRAGMA table_info(active_trades)") cols = [row[1] for row in cur.fetchall()] if "size_class" not in cols: self.conn.execute("ALTER TABLE active_trades ADD COLUMN size_class TEXT") logger.info("📌 active_trades.size_class 컬럼 추가") except Exception as e: logger.debug(f"migrate active_trades: {e}") # env_config 테이블에 ENV_CONFIG_KEYS에 정의된 컬럼 누락 시 추가 try: cur = self.conn.execute("PRAGMA table_info(env_config)") cols = [row[1] for row in cur.fetchall()] for key in ENV_CONFIG_KEYS: if key not in cols: self.conn.execute(f'ALTER TABLE env_config ADD COLUMN "{key}" TEXT') logger.info(f"📌 env_config.{key} 컬럼 추가") except Exception as e: logger.debug(f"migrate env_config columns: {e}") def _migrate_env_config_to_columns(self): """env_config가 예전 JSON 컬럼(snapshot_json)이면 컬럼 스키마로 이전""" try: cur = self.conn.execute("PRAGMA table_info(env_config)") cols = [row[1] for row in cur.fetchall()] if "snapshot_json" not in cols: return # 기존 데이터 백업 후 새 테이블로 이전 rows = self.conn.execute("SELECT id, created_at, snapshot_json FROM env_config ORDER BY id").fetchall() col_defs = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, {col_defs} ) """) key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS) placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS))) for row in rows: snap = json.loads(row["snapshot_json"]) if row["snapshot_json"] else {} vals = [row["created_at"]] + [snap.get(k) for k in ENV_CONFIG_KEYS] self.conn.execute( f"INSERT INTO env_config_new (created_at, {key_list}) VALUES ({placeholders})", vals, ) self.conn.execute("DROP TABLE env_config") self.conn.execute("ALTER TABLE env_config_new RENAME TO env_config") self.conn.commit() logger.info("📌 env_config: snapshot_json -> 컬럼 스키마 마이그레이션 완료") except Exception as e: logger.debug(f"migrate env_config: {e}") # ============================================================ # [CRUD] Active Trades (활성 트레이딩 관리) # ============================================================ def upsert_trade(self, trade_data: Dict): """ 신규 매수하거나 정보 업데이트 (평단가, 수량 등) Args: trade_data: 트레이드 정보 딕셔너리 필수: code, name, avg_buy_price, target_qty, current_qty, status 선택: strategy, stop_price, target_price, max_price, atr_entry, total_invested """ now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 기본값 설정 code = trade_data.get('code') if not code: logger.error("종목코드 누락: upsert 실패") return False size_class = trade_data.get('size_class') sql = """ INSERT INTO active_trades ( code, name, strategy, avg_buy_price, current_price, stop_price, target_price, max_price, atr_entry, target_qty, current_qty, total_invested, status, buy_date, updated_at, size_class ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(code) DO UPDATE SET avg_buy_price = excluded.avg_buy_price, current_price = excluded.current_price, stop_price = COALESCE(excluded.stop_price, active_trades.stop_price), target_price = COALESCE(excluded.target_price, active_trades.target_price), atr_entry = COALESCE(excluded.atr_entry, active_trades.atr_entry), current_qty = excluded.current_qty, total_invested = excluded.total_invested, max_price = MAX(active_trades.max_price, excluded.max_price), status = excluded.status, updated_at = excluded.updated_at, size_class = COALESCE(excluded.size_class, active_trades.size_class) """ params = ( code, trade_data.get('name', 'Unknown'), trade_data.get('strategy', 'MANUAL'), trade_data.get('avg_buy_price') or trade_data.get('buy_price', 0), trade_data.get('current_price', 0), trade_data.get('stop_price', 0), trade_data.get('target_price', 0), trade_data.get('max_price', trade_data.get('buy_price', 0)), trade_data.get('atr_at_entry') or trade_data.get('atr_entry', 0), trade_data.get('target_qty', trade_data.get('qty', 0)), trade_data.get('current_qty') or trade_data.get('qty', 0), trade_data.get('total_invested', 0), trade_data.get('status', 'HOLDING'), trade_data.get('buy_date', now), now, size_class ) try: with self.conn: self.conn.execute(sql, params) return True except Exception as e: logger.error(f"❌ upsert_trade 실패 ({code}): {e}") return False def get_active_trades(self): """ 활성 트레이딩 목록 조회 (봇 재시작 시 사용) Returns: {종목코드: {trade_info}} 형태의 딕셔너리 """ try: cursor = self.conn.execute("SELECT * FROM active_trades") rows = cursor.fetchall() # 기존 JSON 포맷과 호환되도록 딕셔너리 변환 result = {} for row in rows: code = row['code'] result[code] = { 'code': code, 'name': row['name'], 'strategy': row['strategy'], 'buy_price': row['avg_buy_price'], # JSON 호환 'avg_buy_price': row['avg_buy_price'], 'current_price': row['current_price'], 'stop_price': row['stop_price'], 'target_price': row['target_price'], 'max_price': row['max_price'], 'atr_at_entry': row['atr_entry'], 'qty': row['current_qty'], # JSON 호환 'target_qty': row['target_qty'], 'current_qty': row['current_qty'], 'total_invested': row['total_invested'], 'status': row['status'], 'buy_date': row['buy_date'], 'updated_at': row['updated_at'], 'size_class': row['size_class'] if 'size_class' in row.keys() else None, } logger.debug(f"📂 활성 트레이드 로드: {len(result)}개") return result except Exception as e: logger.error(f"❌ get_active_trades 실패: {e}") return {} def update_current_price(self, code: str, current_price: float): """현재가 업데이트 (매도 판단용)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: self.conn.execute( "UPDATE active_trades SET current_price=?, updated_at=? WHERE code=?", (current_price, now, code) ) except Exception as e: logger.error(f"❌ 현재가 업데이트 실패 ({code}): {e}") def update_max_price(self, code: str, new_max_price: float): """최고가 갱신 (트레일링 스탑용)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: # 기존 max_price보다 클 때만 업데이트 self.conn.execute( """UPDATE active_trades SET max_price = MAX(max_price, ?), updated_at = ? WHERE code = ?""", (new_max_price, now, code) ) except Exception as e: logger.error(f"❌ 최고가 갱신 실패 ({code}): {e}") def close_trade( self, code: str, sell_price: float, sell_reason: str = "", env_snapshot: str = None, size_class: str = None, ): """ 매도 완료 처리: active_trades 삭제 -> trade_history 이동 (INSERT만, env 스냅샷 포함) Args: code: 종목코드 sell_price: 매도가 sell_reason: 매도 사유 env_snapshot: 매도 시점 env JSON (백테스트/대시보드용) size_class: 대/중/소형 (매수 시점 저장값) """ try: # 1. 활성 트레이드 정보 조회 cursor = self.conn.execute("SELECT * FROM active_trades WHERE code=?", (code,)) trade = cursor.fetchone() if not trade: logger.warning(f"⚠️ close_trade: {code} 종목이 active_trades에 없음") return False # 2. 손익 계산 buy_price = trade['avg_buy_price'] qty = trade['current_qty'] profit_rate = ((sell_price - buy_price) / buy_price) * 100 if buy_price > 0 else 0 realized_pnl = (sell_price - buy_price) * qty # 3. 보유 시간 계산 buy_time = datetime.datetime.strptime(trade['buy_date'], '%Y-%m-%d %H:%M:%S') sell_time = datetime.datetime.now() hold_minutes = int((sell_time - buy_time).total_seconds() / 60) # size_class는 active_trades에 있으면 그대로 사용 if size_class is None and 'size_class' in trade.keys() and trade['size_class']: size_class = trade['size_class'] # 4. trade_history에 저장 (env_snapshot, size_class 포함 INSERT) with self.conn: self.conn.execute(""" INSERT INTO trade_history ( code, name, strategy, buy_price, sell_price, qty, profit_rate, realized_pnl, hold_minutes, buy_date, sell_date, sell_reason, env_snapshot, size_class ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( trade['code'], trade['name'], trade['strategy'], buy_price, sell_price, qty, profit_rate, realized_pnl, hold_minutes, trade['buy_date'], sell_time.strftime('%Y-%m-%d %H:%M:%S'), sell_reason, env_snapshot, size_class, )) # 5. active_trades에서 삭제 self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,)) logger.info(f"✅ [{trade['name']}] 매매 종료: 수익률 {profit_rate:.2f}% ({realized_pnl:+,.0f}원)") return True except Exception as e: logger.error(f"❌ close_trade 실패 ({code}): {e}") return False def delete_active_trade(self, code: str): """활성 트레이드 삭제 (긴급 정리용)""" try: with self.conn: self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,)) logger.info(f"🗑️ active_trade 삭제: {code}") return True except Exception as e: logger.error(f"❌ 삭제 실패 ({code}): {e}") return False # ============================================================ # [보강] 주문·체결 이력 (kt00007 / ka10076) # ============================================================ def insert_order_execution( self, source: str, row: dict, ord_dt: str = None, sell_tp: str = None, raw_json: str = None ): """주문·체결 1건 INSERT (보강용, 이력만 쌓음)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: self.conn.execute(""" INSERT INTO order_execution_history ( source, ord_no, stk_cd, stk_nm, trde_tp, ord_qty, ord_uv, cntr_qty, cntr_uv, ord_tm, cnfm_tm, sell_tp, ord_dt, raw_json, fetched_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( source, row.get('ord_no') or row.get('orig_ord_no'), row.get('stk_cd', ''), row.get('stk_nm', ''), row.get('trde_tp', ''), str(row.get('ord_qty', '') or row.get('cntr_qty', '')), str(row.get('ord_uv', '') or row.get('ord_pric', '') or row.get('cntr_uv', '')), str(row.get('cntr_qty', '') or row.get('cnfm_qty', '')), str(row.get('cntr_uv', '') or row.get('cntr_pric', '')), row.get('ord_tm', ''), row.get('cnfm_tm', ''), sell_tp or '', ord_dt or '', raw_json, now, )) self.conn.commit() return True except Exception as e: logger.debug(f"insert_order_execution: {e}") return False # ============================================================ # [분석] 켈리 공식 및 통계 계산 # ============================================================ def calculate_half_kelly(self, recent_days: int = 30) -> float: """ 하프 켈리 공식 계산 (과거 매매 기록 기반) Args: recent_days: 최근 N일 데이터만 사용 Returns: 하프 켈리 비율 (0.0 ~ 1.0) 예: 0.15 리턴 -> "예수금의 15%씩 배팅하는 게 최적" """ try: # 최근 N일 데이터 조회 cutoff_date = (datetime.datetime.now() - datetime.timedelta(days=recent_days)).strftime('%Y-%m-%d') cursor = self.conn.execute( "SELECT profit_rate FROM trade_history WHERE sell_date >= ? ORDER BY sell_date DESC", (cutoff_date,) ) rows = cursor.fetchall() if len(rows) < 20: # 최소 20건 이상 필요 logger.warning(f"⚠️ 켈리 공식: 데이터 부족 ({len(rows)}건) -> 기본값 10% 리턴") return 0.10 # 승률 계산 wins = [r['profit_rate'] for r in rows if r['profit_rate'] > 0] losses = [r['profit_rate'] for r in rows if r['profit_rate'] <= 0] total_count = len(rows) win_count = len(wins) win_rate = win_count / total_count loss_rate = 1.0 - win_rate # 손익비 계산 (평균 수익 / 평균 손실) if not wins or not losses: logger.warning("⚠️ 켈리 공식: 승 또는 패만 있음 -> 기본값 10%") return 0.10 avg_win = sum(wins) / len(wins) avg_loss = abs(sum(losses) / len(losses)) if avg_loss == 0: return 0.50 # 손실이 0이면 최대치 odds = avg_win / avg_loss # 켈리 공식: f = (p * b - q) / b # p=승률, b=손익비, q=패율 kelly_fraction = ((win_rate * odds) - loss_rate) / odds # 하프 켈리 (안전성 확보) half_kelly = kelly_fraction * 0.5 # 음수면 0 리턴 (통계적으로 지는 구조) final_kelly = max(0.0, min(half_kelly, 0.5)) # 최대 50%로 제한 logger.info( f"📊 [켈리 분석] 승률:{win_rate*100:.1f}% | 손익비:{odds:.2f} | " f"켈리:{kelly_fraction*100:.1f}% | 하프켈리:{final_kelly*100:.1f}%" ) return final_kelly except Exception as e: logger.error(f"❌ 켈리 계산 실패: {e}") return 0.10 def get_recent_performance(self, days: int = 7) -> Tuple[float, int, int]: """ 최근 N일 성과 조회 Returns: (총손익, 익절횟수, 손절횟수) """ try: cutoff = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime('%Y-%m-%d') cursor = self.conn.execute( "SELECT realized_pnl FROM trade_history WHERE sell_date >= ?", (cutoff,) ) rows = cursor.fetchall() total_pnl = sum([r['realized_pnl'] for r in rows]) wins = len([r for r in rows if r['realized_pnl'] > 0]) losses = len([r for r in rows if r['realized_pnl'] <= 0]) return total_pnl, wins, losses except Exception as e: logger.error(f"❌ 성과 조회 실패: {e}") return 0.0, 0, 0 def get_trade_stats(self) -> Dict: """전체 매매 통계""" try: cursor = self.conn.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN profit_rate > 0 THEN 1 ELSE 0 END) as wins, AVG(profit_rate) as avg_profit_rate, SUM(realized_pnl) as total_pnl FROM trade_history """) row = cursor.fetchone() return { 'total_trades': row['total'] or 0, 'win_trades': row['wins'] or 0, 'win_rate': (row['wins'] / row['total'] * 100) if row['total'] > 0 else 0, 'avg_profit_rate': row['avg_profit_rate'] or 0, 'total_pnl': row['total_pnl'] or 0 } except Exception as e: logger.error(f"❌ 통계 조회 실패: {e}") return {} # ============================================================ # [유틸] JSON 마이그레이션 # ============================================================ def migrate_from_json(self, json_data: Dict): """ 기존 JSON 포트폴리오를 DB로 마이그레이션 Args: json_data: portfolio.json 내용 (딕셔너리) """ count = 0 for code, info in json_data.items(): trade_data = info.copy() trade_data['code'] = code # 필드 매핑 (JSON -> DB) if 'target_qty' not in trade_data: trade_data['target_qty'] = info.get('qty', 0) if 'current_qty' not in trade_data: trade_data['current_qty'] = info.get('qty', 0) if 'total_invested' not in trade_data: trade_data['total_invested'] = info.get('buy_price', 0) * info.get('qty', 0) if 'status' not in trade_data: trade_data['status'] = 'HOLDING' if self.upsert_trade(trade_data): count += 1 logger.info(f"✅ JSON -> DB 마이그레이션 완료: {count}개 종목") return count # ============================================================ # [CRUD] Target Candidates (매수 후보군 관리) # ============================================================ def update_target_candidates(self, candidates: List[Dict]): """ 매수 후보군 업데이트 (5분마다 호출) Args: candidates: [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...] """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 기존 데이터 전체 삭제 (5분마다 새로 갱신) with self.conn: self.conn.execute("DELETE FROM target_candidates") # 새 후보군 삽입 for item in candidates: self.conn.execute(""" INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, ( item['code'], item['name'], item['score'], item['price'], scan_time, scan_time )) logger.info(f"✅ 매수 후보군 DB 저장: {len(candidates)}개") return True except Exception as e: logger.error(f"❌ 후보군 저장 실패: {e}") return False def add_target_candidate(self, candidate: Dict): """ 매수 후보군 개별 추가 (통과 즉시 저장용, UPSERT 방식) - 500개 스캔 시 시간이 오래 걸려서 통과하는 즉시 DB에 저장 Args: candidate: {'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, ...} """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") with self.conn: # UPSERT: 있으면 업데이트, 없으면 삽입 self.conn.execute(""" INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(code) DO UPDATE SET name = excluded.name, score = excluded.score, price = excluded.price, scan_time = excluded.scan_time, updated_at = excluded.updated_at """, ( candidate['code'], candidate.get('name', ''), candidate.get('score', 0), candidate.get('price', 0), scan_time, scan_time )) return True except Exception as e: logger.debug(f"후보 개별 저장 실패({candidate.get('code', '')}): {e}") return False def get_target_candidates(self) -> List[Dict]: """ 매수 후보군 조회 (점수 순) Returns: [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...] """ try: cursor = self.conn.execute(""" SELECT code, name, score, price, scan_time FROM target_candidates ORDER BY score DESC, price ASC """) rows = cursor.fetchall() result = [] for row in rows: result.append({ 'code': row['code'], 'name': row['name'], 'score': row['score'], 'price': row['price'], 'scan_time': row['scan_time'] }) return result except Exception as e: logger.error(f"❌ 후보군 조회 실패: {e}") return [] def get_trades_by_date(self, date_str: str) -> List[Dict]: """ 특정 날짜의 매매 기록 조회 Args: date_str: 날짜 (YYYYMMDD) Returns: 매매 기록 리스트 """ try: # YYYYMMDD -> YYYY-MM-DD 변환 date_formatted = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}" cursor = self.conn.execute(""" SELECT * FROM trade_history WHERE DATE(sell_date) = ? ORDER BY sell_date DESC """, (date_formatted,)) rows = cursor.fetchall() result = [] for row in rows: result.append({ 'id': row['id'], 'code': row['code'], 'name': row['name'], 'strategy': row['strategy'], 'buy_price': row['buy_price'], 'sell_price': row['sell_price'], 'qty': row['qty'], 'profit_rate': row['profit_rate'], 'realized_pnl': row['realized_pnl'], 'hold_minutes': row['hold_minutes'], 'buy_date': row['buy_date'], 'sell_date': row['sell_date'], 'sell_reason': row['sell_reason'] }) return result except Exception as e: logger.error(f"❌ 날짜별 조회 실패: {e}") return [] # ============================================================ # [env_config] 관리자용 env (INSERT만, 최신 1건 = 살아있는 값) # ============================================================ def insert_env_snapshot(self, snapshot) -> Optional[int]: """ env 설정 INSERT (UPDATE 없음). 관리자가 설정 변경할 때 호출. snapshot: dict 또는 JSON 문자열. 키는 ENV_CONFIG_KEYS에 있는 것만 저장. Returns: 새 행 id, 실패 시 None """ try: if isinstance(snapshot, str): snapshot = json.loads(snapshot) if snapshot else {} if not isinstance(snapshot, dict): return None now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS) placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS))) vals = [now] + [snapshot.get(k) for k in ENV_CONFIG_KEYS] cur = self.conn.execute( f"INSERT INTO env_config (created_at, {key_list}) VALUES ({placeholders})", vals, ) self.conn.commit() return cur.lastrowid except Exception as e: logger.error(f"❌ env_config INSERT 실패: {e}") return None def get_latest_env(self) -> Optional[Dict]: """ 살아있는 최신 env 1건 조회 (ORDER BY id DESC LIMIT 1). Returns: {"id": int, "created_at": str, "snapshot": dict} 또는 없으면 None """ try: row = self.conn.execute( "SELECT * FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() if not row: return None snapshot = { k: (row[k] if row[k] is not None else "") for k in ENV_CONFIG_KEYS if k in row.keys() } return { "id": row["id"], "created_at": row["created_at"], "snapshot": snapshot, } except Exception as e: logger.error(f"❌ env_config 최신 조회 실패: {e}") return None def close(self): """DB 연결 종료""" if self.conn: self.conn.close() logger.info("🔒 DB 연결 종료")