""" 트레이딩 봇 데이터베이스 관리 모듈 - MariaDB 기반 (pymysql) — SQLite 에서 이전 - 활성 트레이딩 관리 (active_trades) - 매매 히스토리 관리 (trade_history) MariaDB 접속 정보 우선순위: 1) 환경변수 DB_HOST / DB_PORT / DB_USER / DB_PASS / DB_NAME 2) 아래 DEFAULT 상수 (기본값) """ import json import os import datetime import logging import threading from typing import Dict, List, Optional, Tuple try: import pymysql import pymysql.cursors _PYMYSQL_AVAILABLE = True except ImportError: _PYMYSQL_AVAILABLE = False logger = logging.getLogger("TradeDB") # ── MariaDB 기본 접속 정보 (환경변수로 재정의 가능) ───────────────── _DB_HOST = os.environ.get("DB_HOST", "192.168.0.141") _DB_PORT = int(os.environ.get("DB_PORT", "3306")) _DB_USER = os.environ.get("DB_USER", "jae") _DB_PASS = os.environ.get("DB_PASS", "1234") _DB_NAME = os.environ.get("DB_NAME", "kis_quant_db") # ══════════════════════════════════════════════════════════════════════ # SQLite 호환 래퍼 — 기존 conn.execute() / with conn: 패턴 유지 # ══════════════════════════════════════════════════════════════════════ class _MariaDBConn: """ pymysql 연결을 sqlite3 인터페이스처럼 감싸는 래퍼. - conn.execute(sql, params) → DictCursor 반환 (row['col'] 접근) - with conn: ... → 자동커밋(autocommit=True) 이므로 no-op - conn.commit() → 명시적 커밋 (autocommit=True라 호환) - ? 플레이스홀더 → %s 자동 변환 - SQLite-only DDL 키워드 자동 변환 """ # SQL 텍스트 자동 변환 규칙 (SQLite → MySQL) _REPLACE_PAIRS = [ # DDL 키워드 ("INTEGER PRIMARY KEY AUTOINCREMENT", "INT NOT NULL AUTO_INCREMENT PRIMARY KEY"), ("INTEGER PRIMARY KEY", "INT NOT NULL PRIMARY KEY"), ("AUTOINCREMENT", "AUTO_INCREMENT"), # DML — SQLite 전용 구문 ("INSERT OR REPLACE INTO", "REPLACE INTO"), ("last_insert_rowid()", "LAST_INSERT_ID()"), # ON CONFLICT 처리: 단순 패턴 제거 후 ON DUPLICATE KEY UPDATE 로 수동 변환 # (복잡한 케이스는 각 메서드에서 직접 처리) ] def __init__(self): self._lock = threading.Lock() self._conn = None self._connect() def _connect(self): """pymysql 연결 (재연결 포함).""" if not _PYMYSQL_AVAILABLE: raise ImportError( "pymysql 미설치. 설치: pip install pymysql\n" "또는: pip install PyMySQL" ) self._conn = pymysql.connect( host=_DB_HOST, port=_DB_PORT, user=_DB_USER, password=_DB_PASS, database=_DB_NAME, charset="utf8mb4", autocommit=True, cursorclass=pymysql.cursors.DictCursor, connect_timeout=10, read_timeout=30, write_timeout=30, ) logger.debug("✅ MariaDB 연결 완료 (%s:%s/%s)", _DB_HOST, _DB_PORT, _DB_NAME) def _ensure_connected(self): """연결 끊김 시 자동 재접속.""" try: self._conn.ping(reconnect=True) except Exception: try: self._connect() except Exception as e: logger.error("❌ MariaDB 재접속 실패: %s", e) raise @staticmethod def _translate_sql(sql: str) -> str: """SQLite 전용 SQL 구문을 MySQL 호환으로 변환.""" for old, new in _MariaDBConn._REPLACE_PAIRS: sql = sql.replace(old, new) # ? → %s (파라미터 플레이스홀더) # 단, 이미 %s 가 있는 경우 중복 변환 방지 if "?" in sql: sql = sql.replace("?", "%s") return sql def execute(self, sql: str, params=None): """ SQL 실행. sqlite3.Connection.execute() 와 동일한 인터페이스. 반환값: DictCursor (fetchone/fetchall/lastrowid 사용 가능) """ sql = self._translate_sql(sql) with self._lock: self._ensure_connected() cur = self._conn.cursor() cur.execute(sql, params or ()) return cur def __enter__(self): """with conn: 패턴 호환 (autocommit=True 이므로 실질적으로 no-op).""" return self def __exit__(self, exc_type, exc_val, exc_tb): pass def commit(self): """명시적 커밋 (autocommit=True 환경에서 호환성 유지용).""" try: with self._lock: self._conn.commit() except Exception: pass @property def row_factory(self): return None # DictCursor 가 이미 dict 반환, 호환용 stub @row_factory.setter def row_factory(self, _): pass # sqlite3 호환용 stub (무시) def close(self): try: self._conn.close() except Exception: pass # ── information_schema 기반 컬럼 목록 조회 (PRAGMA 대체) ────── def get_columns(self, table_name: str) -> list: """ PRAGMA table_info() 대체. 반환: 컬럼명 문자열 리스트 """ try: cur = self.execute( "SELECT COLUMN_NAME FROM information_schema.COLUMNS " "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s " "ORDER BY ORDINAL_POSITION", (_DB_NAME, table_name), ) return [row["COLUMN_NAME"] for row in cur.fetchall()] except Exception as e: logger.debug("get_columns(%s) 실패: %s", table_name, e) return [] # ML 학습용 진입 시점 피처 컬럼 (active_trades / trade_history 공통) ML_ENTRY_FEATURE_COLUMNS = ( "rsi", "volume_ratio", "tail_length_pct", "ma5_gap_pct", "ma20_gap_pct", "foreign_net_buy", "institution_net_buy", "market_hour", ) # env_config 테이블 컬럼 (키 하나당 컬럼 하나, 추가/삭제 시 여기와 CREATE TABLE만 수정) ENV_CONFIG_KEYS = ( "STOP_LOSS_PCT", "SHOULDER_CUT_PCT", "SHOULDER_MIN_HIGH_PCT", "SHOULDER_MIN_NET_PCT", "STOP_ATR_MULTIPLIER_TAIL", "TARGET_ATR_MULTIPLIER_TAIL", "MAX_POSITION_PCT", "USE_SLOT_CAP", "SLOT_CAP_PCT", "MAX_STOCKS", "USE_KELLY", "RISK_PCT_PER_TRADE", "MIN_POSITION_AMOUNT", "USE_RISK_CHECK", "DAILY_STOP_LOSS_PCT", "CONSECUTIVE_LOSS_LIMIT", "USE_BAN_SYSTEM", "BAN_HOURS", "USE_STOCK_FILTER", "RSI_OVERHEAT_THRESHOLD", "MIN_RECOVERY_RATIO", "MAX_RECOVERY_RATIO", "USE_TWAP", "TWAP_MIN_SPLIT", "TWAP_MAX_SPLIT", "TWAP_MIN_DELAY", "TWAP_MAX_DELAY", "USE_ML_SIGNAL", "ML_MIN_PROBABILITY", "USE_NEWS_ANALYSIS", "NEWS_ANALYSIS_HOUR", "NEWS_MAX_COUNT", "USE_QUICK_PROFIT_PROTECTION", "HIGH_PRICE_CHASE_THRESHOLD", "MAX_DAILY_CHANGE_PCT", "MA20_MAX_ABOVE_PCT", "VOLUME_AVG_MULTIPLIER", "CANDLE_OPEN_PRICE_BUFFER", "MIN_CANDLE_LEN_TAIL", "MIN_PRICE_TAIL", "TAIL_SCORE_BASE", "TAIL_SCORE_RATIO_MULT", "TAIL_RATIO_MIN", "TAIL_PCT_MIN", # 꼬리 비율/하락률 최소 (AI·봇 공통 기준) "INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "SIZE_CLASS_LARGE_MIN", "SIZE_CLASS_MID_MIN", # 단타 스캔 후보 점수 (전부 env/DB) "SCAN_INVESTOR_NET_STRONG", "SCAN_INVESTOR_SCORE_STRONG", "SCAN_INVESTOR_SCORE_WEAK", "SCAN_INVESTOR_BONUS_STRONG", "SCAN_INVESTOR_BONUS_WEAK", "SCAN_VOLUME_BONUS_MIN", "SCAN_VOLUME_BONUS_POINT", "SCAN_EXEC_STRENGTH_HIGH", "SCAN_EXEC_STRENGTH_MID", "SCAN_EXEC_BONUS_HIGH", "SCAN_EXEC_BONUS_MID", "SCAN_SCORE_DROP_WEIGHT", "SCAN_SCORE_RECOVERY_WEIGHT", "MIN_SCORE_DISPLAY", "MM_TOP_N", "SLOT_MONEY_DEFAULT", "SLOT_BASE_AMOUNT_CAP", "SIZE_CLASS_SMALL_RATIO", "SIZE_CLASS_MID_RATIO", # 개미털기/유니버스 필터 추가 키 "SCAN_MIN_PREV_DAY_PCT", "UPDATE_UNIVERSE_MIN_CANDIDATES", "UPDATE_UNIVERSE_MIN_SCORE", "UPDATE_UNIVERSE_FALLBACK_TOP_N", "UPDATE_UNIVERSE_TOP_LOG", "UPDATE_UNIVERSE_TOP_N", "SCAN_INTERVAL_SEC", # 유니버스 스캔 주기(초), kiwoom_universe_scanner 전용 "USE_RANDOM_SPLIT", "FORCE_MARKET_OPEN", "FORCE_BUY_TEST", "TOTAL_DEPOSIT", # POP/LOCK·금액 손절 관련 추가 키 "ROUND_TRIP_COST_PCT", "POP_NET_PCT", "LOCK_NET_PCT", "MAX_LOSS_PER_TRADE_KRW", # 한투 API 관련 키 추가 (실전/모의 계좌 분리) "KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK", "KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", # 실전 계좌 (KIS_MOCK=false 시 사용) "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK", # 모의 계좌 (KIS_MOCK=true 시 사용) "KIS_MOCK", # 단타 봇 전용 키 "TAKE_PROFIT_PCT", "MIN_DROP_RATE", "MIN_RECOVERY_RATIO_SHORT", # 단타 매도 로직 (env/DB에서 수치 로드) "SCALP_ATR_UP_MULT", "SCALP_ATR_DOWN_MULT", "SCALP_ATR_DROP_MULT", "QUICK_PROFIT_PROTECT_HOURS", "QUICK_PROFIT_MAX_RATIO", "QUICK_PROFIT_CURRENT_MIN", "MIN_HOLD_EARLY_TAKE_PCT", "MIN_HOLD_HIGH_PCT", "MIN_HOLD_DROP_FROM_HIGH", "POST_HOLD_TAKE_PCT", "POST_HOLD_DROP_FROM_HIGH", # 늘림목 봇 전용 키 "MAX_PER", "MAX_PEG", "MIN_GROWTH_PCT", "DCA_INTERVALS", "DCA_AMOUNTS", # Mattermost 및 AI 리포트 관련 키 "MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY", "AI_JOURNAL_LINES", "ANTHROPIC_API_KEY", "CLAUDE_MODEL_ID", "CLAUDE_MAX_TOKENS", "MM_BUTLER_CHANNEL", # OpenRouter API 연동용 키 "OPENROUTER_API_KEY", "OPENROUTER_MODEL_ID", # 봇별 Mattermost 채널 구분용 키 "KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", # 롱 위시리스트/뉴스 리포트 전용 키 "LONG_DAILY_LOOKBACK_DAYS", "LONG_MA_SHORT_DAYS", "LONG_MA_LONG_DAYS", "LONG_REPORT_AM_HOUR", "LONG_REPORT_AM_MIN", "LONG_REPORT_PM_HOUR", "LONG_REPORT_PM_MIN", "LONG_NEWS_ENABLED", "LONG_NEWS_INTERVAL_MIN", "LONG_NEWS_ACTIVE_START_HOUR", "LONG_NEWS_ACTIVE_END_HOUR", "LONG_ANALYSIS_DELAY_MIN_SEC", "LONG_ANALYSIS_DELAY_MAX_SEC", # RSI 기간: 스윙=14(기본), 단타=5, 스캘핑=3 권장 (DB에서 실시간 변경 가능) "RSI_PERIOD", # KIS WebSocket 관련 키 "KIS_WS_URL_REAL", "KIS_WS_URL_MOCK", "KIS_WS_MOCK_ENABLED", # 재진입 쿨다운: 매도 후 같은 종목 재매수를 N초 동안 차단 (반복매매 루프 방지) "REENTRY_COOLDOWN_SEC", # 매도 실패 백오프: 영업일 아님·장외 시간 오류 시 N초 동안 재시도 금지 (API 낭비·차단 방지) "SELL_FAILURE_BACKOFF_SEC", # ── 스캘핑봇(kis_scalping_ver1) 전용 키 ────────────────────────────── # RSI 과매도 임계값: 이 값 이하면 "과매도 → 되돌림 가능" 후보로 판단 "SCALP_RSI_OVERSOLD", # RSI 과매수 임계값: 이 값 이상이면 신규 진입 금지 (고점 추격 방지) "SCALP_RSI_OVERBOUGHT", # 스캘핑봇이 사용할 봉 단위 (분): 1 또는 3 권장 "SCALP_CANDLE_TIMEFRAME", # 장 시작 후 몇 분 뒤부터 매매 허용 (장 시작 직후 변동성 회피) "SCALP_MARKET_OPEN_WAIT_MIN", # WS 재접속 후 갭 보정에 사용할 REST 분봉 조회 캔들 수 "SCALP_GAP_FILL_LIMIT", # 확정 봉 기준 최소 거래량: 이 값 미만인 봉은 유동성 부족으로 무시 "SCALP_MIN_VOLUME", # 스캘핑봇 채널 (MM 알림) "KIS_SCALP_MM_CHANNEL", # ws_candles 자동 정리 보존 일수 "SCALP_CANDLE_KEEP_DAYS", # 스캘핑 전용 손절 % (꼬리잡기 STOP_LOSS_PCT와 분리, 기본 1.5%) # 1분봉 초단타: 손절이 넓으면(-4%) 자금이 묶여 회전율 0 → 타이트하게 "SCALP_STOP_LOSS_PCT", # 스캘핑 전용 익절 % (꼬리잡기 TAKE_PROFIT_PCT와 분리, 기본 1.5%) # 1분봉에서 +5% 익절은 도달 불가 → 박리다매 전략으로 1.5%씩 수익 적립 "SCALP_TAKE_PROFIT_PCT", # 스캘핑 낙폭 필터 % (꼬리잡기 MIN_DROP_RATE와 분리, 기본 1.5%) # 3% 기준은 1분봉 소형주에서 너무 엄격 → 1.5%로 완화해 타점 빈도 증가 "SCALP_MIN_DROP_RATE", # 봉부족 감지 시 재갭보정 최소 간격(초): 같은 종목 중복 REST 호출 방지 (기본 30초) "SCALP_GAP_RETRY_SEC", # 트레일링 발동 최소 수익률(%): 고점이 매수가 대비 이 이상 올라야 트레일링 활성화 # 0.5%면 수수료(~0.21%) 뺀 나머지만 이익 → 1.5 이상 권장 "SCALP_TRAIL_TRIGGER_PCT", # 트레일링/본절사수 후 최소 순이익(%): 수수료+세금 위에 이 값만큼 추가 마진 확보 # 0이면 본절(수수료 이후 0원), 0.2면 최소 +0.2% 순이익 보장 "SCALP_MIN_PROFIT_PCT", # ── 단타봇(kis_short_ver2) 전용 키 ─────────────────────────────────── # 켈리 공식 사용 여부 (true=켈리 적용, false=고정 비중) "USE_KELLY_FORMULA", # 켈리 공식 적용 배수 (0.25 = Full Kelly의 25%, 과도한 베팅 방지) "KELLY_MULTIPLIER", # 시장가 IOC 주문 사용 여부 (실전: true=IOC, false=일반 시장가) "USE_MARKET_IOC", # WebSocket 실시간 가격 캐시 유효기간(초): 이 시간 이상 지나면 REST 재조회 "KIS_PRICE_CACHE_TTL_SEC", # WS 재접속 후 갭 보정에 사용할 REST 분봉 조회 캔들 수 (단타봇) "SHORT_GAP_FILL_LIMIT", # 매수 직후 최소 보유 시간(초): 이 기간 내 매도 신호 무시 (API 잔고 반영 지연 대응) "MIN_HOLD_AFTER_BUY_SEC", # 최소 보유 시간(시간): 이 기간 이전에는 손절 외 매도 금지 (꼬리잡기 전략 충분히 대기) "MIN_HOLD_HOURS", # 3개월 최대 회복 비율: 전고점 대비 이 비율 이상 회복한 종목은 추격 매수 제외 "MAX_RECOVERY_RATIO_3M", # 유니버스 상위 N개 후보 경량 체크 (매수 후보 1차 필터) "CANDIDATE_LIST_TOP_N_LIGHT", # 매수 신호 체크 시 사용할 유니버스 최대 종목 수 (과부하 방지) "SCAN_UNIVERSE_MAX_CODES", # 꼬리 캔들 패턴 인식 시 과거 몇 봉까지 확인할지 (lookback) "TAIL_CANDLE_LOOKBACK", # ── 레거시 단일 계좌 키 (KIS_APP_KEY_REAL/MOCK 이전 버전 호환용) ────── "KIS_APP_KEY", "KIS_APP_SECRET", "KIS_ACCOUNT_NO", "KIS_ACCOUNT_CODE", # ── mm_butler 전용 키 ──────────────────────────────────────────────── # Gemini AI 모델 ID (gemini-2.5-flash, gemini-1.5-pro 등) "GEMINI_MODEL_ID", # MM 원격 명령 폴링 주기(초): 너무 짧으면 API 과부하 "MM_BUTLER_POLL_SEC", # AI 소스 텍스트 최대 길이(문자): 초과 시 잘라서 전송 (토큰 비용 관리) "AI_SOURCE_MAX_CHARS", # ── 수수료·거래세 (전략 공통) ────────────────────────────────────────── # FEE_RATE_PCT : 위탁수수료율 (매수/매도 각각, 기본 0.015%) # SELL_TAX_RATE_PCT: 증권거래세율 (매도 시만 부과, 기본 0.18% ← 2025 코스피/코스닥 공통) "FEE_RATE_PCT", "SELL_TAX_RATE_PCT", # ── 키움증권 REST API 키 (60분봉 과거 데이터 수집 전용) ──────────────── # 실전/모의 분리 (KIS_MOCK 값에 따라 자동 선택) # KIWOOM_APP_KEY_REAL : 키움증권 실전 앱키 # KIWOOM_APP_SECRET_REAL: 키움증권 실전 시크릿 # KIWOOM_APP_KEY_MOCK : 키움증권 모의 앱키 (mockapi.kiwoom.com) # KIWOOM_APP_SECRET_MOCK: 키움증권 모의 시크릿 # KIWOOM_APP_KEY / KIWOOM_APP_SECRET: 레거시 (단일 키 호환용) "KIWOOM_APP_KEY_REAL", "KIWOOM_APP_SECRET_REAL", "KIWOOM_APP_KEY_MOCK", "KIWOOM_APP_SECRET_MOCK", "KIWOOM_APP_KEY", "KIWOOM_APP_SECRET", # ── WebSocket 영구 구독 종목 (시장 방향 필터용) ───────────────────── # KOSPI/KOSDAQ 지수 ETF는 매매 후보와 무관하게 항상 구독 유지. # 60분봉 RSI 로 상승장/하락장 체크 → 스캘핑/꼬리잡기 진입 방향 결정. # 기본값: KODEX200(069500), KODEX KOSDAQ150(229200) # 콤마 구분 코드: "069500,229200" "PERMANENT_WS_CODES", # ── 시장 방향 필터 (상승장에서만 롱 진입) ────────────────────────── # USE_MARKET_REGIME_FILTER: true=활성, false=비활성 (기본 false) # MARKET_REGIME_MIN_RSI : ETF 60분봉 RSI 이 값 이상이어야 롱 진입 허용 (기본 48) "USE_MARKET_REGIME_FILTER", "MARKET_REGIME_MIN_RSI", # ── 테마 과열 필터 (테마 전체가 과열이면 신규 진입 억제) ─────────── # USE_THEME_HEAT_FILTER : true=활성 (기본 false) # THEME_HEAT_RSI_MAX : 테마 평균 RSI 이 값 초과면 진입 차단 (기본 72) "USE_THEME_HEAT_FILTER", "THEME_HEAT_RSI_MAX", ) class TradeDB: """ 트레이딩 봇용 MariaDB 데이터베이스 관리 클래스. 기존 SQLite 인터페이스와 100% 호환 (db_path 인수는 무시됨). """ def __init__(self, db_path="quant_bot.db"): """ Args: db_path: 하위 호환용 (무시됨). MariaDB 접속 정보는 환경변수/모듈 상수 사용. """ self.db_path = db_path # 호환용 보존 self.conn = _MariaDBConn() self._create_tables() logger.info("✅ TradeDB 초기화 완료: MariaDB %s:%s/%s", _DB_HOST, _DB_PORT, _DB_NAME) def _create_tables(self): """DB 테이블 생성 (없을 경우)""" with self.conn: # 1. 활성 트레이딩 테이블 (현재 보유 중이거나 매수 중인 종목) self.conn.execute(""" CREATE TABLE IF NOT EXISTS active_trades ( code VARCHAR(20) NOT NULL, -- 종목코드 name VARCHAR(100) NOT NULL, -- 종목명 strategy VARCHAR(50) NOT NULL DEFAULT 'MANUAL', -- 매매 전략 (SHORT_ANT_SHAKING / SCALP_RSI_REVERSAL 등) PRIMARY KEY (code, strategy), -- 복합 PK: 같은 종목을 서로 다른 봇이 독립 보유 가능 -- [가격 정보] avg_buy_price DOUBLE NOT NULL, -- 평단가 current_price DOUBLE, -- 현재가 (업데이트용) stop_price DOUBLE, -- 손절가 target_price REAL, -- 목표가 max_price REAL, -- 최고가 (트레일링 스탑용) atr_entry REAL, -- 진입 시 ATR 변동성 -- [수량 및 진행 상태 (분할매수용)] target_qty INTEGER NOT NULL, -- 목표 매수 수량 current_qty INTEGER NOT NULL,-- 현재 체결 수량 total_invested REAL, -- 총 투입 금액 (수수료 제외) -- [상태 관리] status TEXT NOT NULL, -- BUYING(매수중), HOLDING(보유중), SELLING(매도중) buy_date TEXT NOT NULL, -- 첫 매수 시작 시간 updated_at TEXT NOT NULL, -- 마지막 업데이트 시간 size_class TEXT -- 대/중/소형 (매수 시점) ) """) # 2. 매매 기록 테이블 (손익 분석 & 켈리 공식용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS trade_history ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, code VARCHAR(20) NOT NULL, name VARCHAR(100) NOT NULL, strategy VARCHAR(50), buy_price DOUBLE NOT NULL, sell_price DOUBLE NOT NULL, qty INT NOT NULL, profit_rate DOUBLE NOT NULL, realized_pnl DOUBLE NOT NULL, hold_minutes INT, buy_date VARCHAR(30), sell_date VARCHAR(30) NOT NULL, sell_reason VARCHAR(200), env_snapshot TEXT, size_class VARCHAR(20) ) CHARACTER SET utf8mb4 """) # 3. 일일 손익 요약 테이블 (대시보드용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS daily_summary ( date VARCHAR(10) NOT NULL PRIMARY KEY, start_asset DOUBLE, end_asset DOUBLE, total_trades INT, win_trades INT, total_pnl DOUBLE, win_rate DOUBLE ) CHARACTER SET utf8mb4 """) # 4. 주문·체결 보강 테이블 (kt00007 / ka10076) self.conn.execute(""" CREATE TABLE IF NOT EXISTS order_execution_history ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, source VARCHAR(20) NOT NULL, ord_no VARCHAR(30), stk_cd VARCHAR(20), stk_nm VARCHAR(100), trde_tp VARCHAR(20), ord_qty VARCHAR(20), ord_uv VARCHAR(20), cntr_qty VARCHAR(20), cntr_uv VARCHAR(20), ord_tm VARCHAR(20), cnfm_tm VARCHAR(20), sell_tp VARCHAR(20), ord_dt VARCHAR(20), raw_json TEXT, fetched_at VARCHAR(30) NOT NULL ) CHARACTER SET utf8mb4 """) # 5. 매수 체결 이력 (일일 한도용 - '산 시점' 날짜 기준 누적) self.conn.execute(""" CREATE TABLE IF NOT EXISTS buy_execution_log ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, code VARCHAR(20) NOT NULL, name VARCHAR(100) NOT NULL, strategy VARCHAR(50) NOT NULL, buy_date VARCHAR(10) NOT NULL, executed_at VARCHAR(30) NOT NULL, amount DOUBLE NOT NULL, qty INT NOT NULL ) CHARACTER SET utf8mb4 """) # 6. 매수 후보군 테이블 (target_universe 대체) self.conn.execute(""" CREATE TABLE IF NOT EXISTS target_candidates ( code VARCHAR(20) NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL, score DOUBLE NOT NULL, price DOUBLE NOT NULL, scan_time VARCHAR(30) NOT NULL, updated_at VARCHAR(30) NOT NULL ) CHARACTER SET utf8mb4 """) # 7. 종목 메타데이터 (테마·섹터·시장구분) — 스캐너가 채움, 조인 분석 용 # market: 'K'=KOSPI, 'Q'=KOSDAQ, 'E'=ETF/기타 # sector: 업종명 (KIS bstp_kor_isnm, 예: '반도체') # theme : 주요 테마 (예: 'AI반도체', '2차전지', '원자력') # - 직접 UPDATE 또는 별도 스크립트로 채움 # theme_rank: 테마 내 대장주/추종주 순위 (1=핵심, 2=연관, 3=주변) self.conn.execute(""" CREATE TABLE IF NOT EXISTS stock_meta ( code VARCHAR(20) NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL DEFAULT '', market CHAR(1) NOT NULL DEFAULT 'Q', sector_code VARCHAR(20), sector VARCHAR(100), theme VARCHAR(100), theme_rank TINYINT DEFAULT 3, updated_at VARCHAR(30) NOT NULL ) CHARACTER SET utf8mb4 """) # 8. env 설정 전용 테이블 (관리자용, INSERT만 / 최신 1건 = 현재 설정, 키당 컬럼) cols = ", ".join([f"`{k}` TEXT" for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, created_at VARCHAR(30) NOT NULL, {cols} ) CHARACTER SET utf8mb4 """) # 8. 키-값 저장소 (매터모스트 원격 조종: 마지막 AI 추천문, last_seen 등) self.conn.execute(""" CREATE TABLE IF NOT EXISTS kv_store ( k VARCHAR(100) NOT NULL PRIMARY KEY, v MEDIUMTEXT ) CHARACTER SET utf8mb4 """) # 9. AI 분석 기록 (Butler !클로드분석/!애미분석 시 프롬프트 요약·응답 저장 → 나중에 꺼내보기) self.conn.execute(""" CREATE TABLE IF NOT EXISTS ai_analysis_log ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, created_at VARCHAR(30) NOT NULL, model VARCHAR(50) NOT NULL, context_summary TEXT, response MEDIUMTEXT ) CHARACTER SET utf8mb4 """) # 10. WebSocket 실시간 봉 집계 (백테스트용 — CandleAggregator 배치 INSERT) # - is_confirmed=1 인 확정 봉만 저장 (진행 중 봉은 RAM에만 존재) # - source: 'ws'=WebSocket틱 집계, 'rest'=갭보정 REST 조회 # - UNIQUE(code, timeframe, candle_time) → ON DUPLICATE KEY UPDATE self.conn.execute(""" CREATE TABLE IF NOT EXISTS ws_candles ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, code VARCHAR(20) NOT NULL, timeframe TINYINT NOT NULL, candle_time VARCHAR(12) NOT NULL, `open` DOUBLE NOT NULL, high DOUBLE NOT NULL, low DOUBLE NOT NULL, close DOUBLE NOT NULL, volume BIGINT NOT NULL DEFAULT 0, rsi_2 DOUBLE, rsi_3 DOUBLE, rsi_5 DOUBLE, is_confirmed TINYINT NOT NULL DEFAULT 1, source VARCHAR(10) NOT NULL DEFAULT 'ws', updated_at VARCHAR(30) NOT NULL, UNIQUE KEY uq_candle (code, timeframe, candle_time) ) CHARACTER SET utf8mb4 """) self._migrate_add_columns() self._migrate_env_config_to_columns() logger.info("📊 DB 테이블 생성/확인 완료") def _migrate_add_columns(self): """기존 DB에 누락된 컬럼 추가 (한 번만) — PRAGMA → information_schema 대체""" try: cols = self.conn.get_columns("trade_history") if "env_snapshot" not in cols: self.conn.execute("ALTER TABLE trade_history ADD COLUMN env_snapshot TEXT") logger.info("📌 trade_history.env_snapshot 컬럼 추가") if "size_class" not in cols: self.conn.execute("ALTER TABLE trade_history ADD COLUMN size_class VARCHAR(20)") logger.info("📌 trade_history.size_class 컬럼 추가") except Exception as e: logger.debug(f"migrate trade_history: {e}") # ── active_trades PK 복합키 마이그레이션 (code → code+strategy) ────────── # 두 봇(SHORT/SCALP)이 같은 종목을 독립 보유 가능하도록 PK 확장. # 신규 설치는 DDL에서 처리됨. 기존 테이블은 여기서 한 번만 ALTER. try: cursor = self.conn.execute(""" SELECT COLUMN_NAME FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'active_trades' AND CONSTRAINT_NAME = 'PRIMARY' ORDER BY ORDINAL_POSITION """) pk_cols = [row[0] if isinstance(row, (list, tuple)) else row['COLUMN_NAME'] for row in cursor.fetchall()] if 'strategy' not in pk_cols: logger.info("⚙️ active_trades PK 복합키 마이그레이션 시작 (code → code+strategy)") # NULL strategy → 'MANUAL' 로 채움 (NOT NULL 변경 전 필수) self.conn.execute("UPDATE active_trades SET strategy = 'MANUAL' WHERE strategy IS NULL OR strategy = ''") # strategy 컬럼 NOT NULL DEFAULT 'MANUAL' 로 변경 후 PK 재구성 self.conn.execute("ALTER TABLE active_trades MODIFY COLUMN strategy VARCHAR(50) NOT NULL DEFAULT 'MANUAL'") self.conn.execute("ALTER TABLE active_trades DROP PRIMARY KEY, ADD PRIMARY KEY (code, strategy)") logger.info("✅ active_trades PK 복합키(code, strategy) 변환 완료") except Exception as e: logger.debug("active_trades PK 마이그레이션 스킵(이미 완료 또는 신규): %s", e) try: cols = self.conn.get_columns("active_trades") if "size_class" not in cols: self.conn.execute("ALTER TABLE active_trades ADD COLUMN size_class VARCHAR(20)") logger.info("📌 active_trades.size_class 컬럼 추가") for c in ML_ENTRY_FEATURE_COLUMNS: if c not in cols: self.conn.execute(f"ALTER TABLE active_trades ADD COLUMN `{c}` DOUBLE") logger.info(f"📌 active_trades.{c} 컬럼 추가 (ML 진입 피처)") except Exception as e: logger.debug(f"migrate active_trades: {e}") try: cols = self.conn.get_columns("trade_history") for c in ML_ENTRY_FEATURE_COLUMNS: if c not in cols: self.conn.execute(f"ALTER TABLE trade_history ADD COLUMN `{c}` DOUBLE") logger.info(f"📌 trade_history.{c} 컬럼 추가 (ML 진입 피처)") except Exception as e: logger.debug(f"migrate trade_history ML columns: {e}") try: cols = self.conn.get_columns("env_config") for key in ENV_CONFIG_KEYS: if key not in cols: self.conn.execute(f"ALTER TABLE env_config ADD COLUMN `{key}` TEXT") logger.info(f"📌 env_config.{key} 컬럼 추가") except Exception as e: logger.debug(f"migrate env_config columns: {e}") # ── target_candidates 테마/섹터/시장구분 컬럼 추가 ────────────────── try: cols = self.conn.get_columns("target_candidates") for col, ddl in [ ("market", "CHAR(1) DEFAULT 'Q'"), ("sector", "VARCHAR(100)"), ("theme", "VARCHAR(100)"), ]: if col not in cols: self.conn.execute( f"ALTER TABLE target_candidates ADD COLUMN `{col}` {ddl}" ) logger.info(f"📌 target_candidates.{col} 컬럼 추가") except Exception as e: logger.debug(f"migrate target_candidates theme cols: {e}") def _migrate_env_config_to_columns(self): """env_config가 예전 JSON 컬럼(snapshot_json)이면 컬럼 스키마로 이전""" try: cols = self.conn.get_columns("env_config") if "snapshot_json" not in cols: return rows = self.conn.execute( "SELECT id, created_at, snapshot_json FROM env_config ORDER BY id" ).fetchall() col_defs = ", ".join([f"`{k}` TEXT" for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config_new ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, created_at VARCHAR(30) NOT NULL, {col_defs} ) CHARACTER SET utf8mb4 """) key_list = ", ".join(f"`{k}`" for k in ENV_CONFIG_KEYS) placeholders = ", ".join(["%s"] * (1 + len(ENV_CONFIG_KEYS))) for row in rows: snap = json.loads(row["snapshot_json"]) if row["snapshot_json"] else {} vals = [row["created_at"]] + [snap.get(k) for k in ENV_CONFIG_KEYS] self.conn.execute( f"INSERT INTO env_config_new (created_at, {key_list}) VALUES ({placeholders})", vals, ) self.conn.execute("DROP TABLE env_config") self.conn.execute("ALTER TABLE env_config_new RENAME TO env_config") logger.info("📌 env_config: snapshot_json -> 컬럼 스키마 마이그레이션 완료") except Exception as e: logger.debug(f"migrate env_config: {e}") # ============================================================ # [CRUD] Active Trades (활성 트레이딩 관리) # ============================================================ def upsert_trade(self, trade_data: Dict): """ 신규 매수하거나 정보 업데이트 (평단가, 수량 등) Args: trade_data: 트레이드 정보 딕셔너리 필수: code, name, avg_buy_price, target_qty, current_qty, status 선택: strategy, stop_price, target_price, max_price, atr_entry, total_invested ML 학습용: entry_features (dict) 또는 rsi, volume_ratio 등 개별 키 """ now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 기본값 설정 code = trade_data.get('code') if not code: logger.error("종목코드 누락: upsert 실패") return False size_class = trade_data.get('size_class') feats = trade_data.get('entry_features') or {} feat_vals = [] for k in ML_ENTRY_FEATURE_COLUMNS: v = feats.get(k) if k in feats else trade_data.get(k) feat_vals.append(v if isinstance(v, (int, float)) else None) cols = ", ".join(["code", "name", "strategy", "avg_buy_price", "current_price", "stop_price", "target_price", "max_price", "atr_entry", "target_qty", "current_qty", "total_invested", "status", "buy_date", "updated_at", "size_class"] + list(ML_ENTRY_FEATURE_COLUMNS)) placeholders = ", ".join(["%s"] * (16 + len(ML_ENTRY_FEATURE_COLUMNS))) # MySQL: ON DUPLICATE KEY UPDATE (excluded. → VALUES()) updates = ( "avg_buy_price = VALUES(avg_buy_price), current_price = VALUES(current_price), " "stop_price = COALESCE(VALUES(stop_price), stop_price), " "target_price = COALESCE(VALUES(target_price), target_price), " "atr_entry = COALESCE(VALUES(atr_entry), atr_entry), " "current_qty = VALUES(current_qty), total_invested = VALUES(total_invested), " "max_price = GREATEST(max_price, VALUES(max_price)), " "status = VALUES(status), updated_at = VALUES(updated_at), " "size_class = COALESCE(VALUES(size_class), size_class)" ) for c in ML_ENTRY_FEATURE_COLUMNS: updates += f", `{c}` = COALESCE(VALUES(`{c}`), `{c}`)" sql = f""" INSERT INTO active_trades ( {cols} ) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {updates} """ params = ( code, trade_data.get('name', 'Unknown'), trade_data.get('strategy', 'MANUAL'), trade_data.get('avg_buy_price') or trade_data.get('buy_price', 0), trade_data.get('current_price', 0), trade_data.get('stop_price', 0), trade_data.get('target_price', 0), trade_data.get('max_price', trade_data.get('buy_price', 0)), trade_data.get('atr_at_entry') or trade_data.get('atr_entry', 0), trade_data.get('target_qty', trade_data.get('qty', 0)), trade_data.get('current_qty') or trade_data.get('qty', 0), trade_data.get('total_invested', 0), trade_data.get('status', 'HOLDING'), trade_data.get('buy_date', now), now, size_class, ) + tuple(feat_vals) try: with self.conn: self.conn.execute(sql, params) return True except Exception as e: logger.error(f"❌ upsert_trade 실패 ({code}): {e}") return False def get_active_trades(self, strategy_prefix: Optional[str] = None): """ 활성 트레이딩 목록 조회 (봇 재시작 시 사용) Args: strategy_prefix: None이면 전부, 'LONG'이면 strategy LIKE 'LONG%'만, 'SHORT'면 'SHORT%'만 (늘림목/단타 섞임 방지) Returns: {종목코드: {trade_info}} 형태의 딕셔너리 """ try: if strategy_prefix: cursor = self.conn.execute( "SELECT * FROM active_trades WHERE strategy LIKE %s", (strategy_prefix.strip().upper() + "%",) ) else: cursor = self.conn.execute("SELECT * FROM active_trades") rows = cursor.fetchall() # 기존 JSON 포맷과 호환되도록 딕셔너리 변환 result = {} for row in rows: code = row['code'] result[code] = { 'code': code, 'name': row['name'], 'strategy': row['strategy'], 'buy_price': row['avg_buy_price'], # JSON 호환 'avg_buy_price': row['avg_buy_price'], 'current_price': row['current_price'], 'stop_price': row['stop_price'], 'target_price': row['target_price'], 'max_price': row['max_price'], 'atr_at_entry': row['atr_entry'], 'qty': row['current_qty'], # JSON 호환 'target_qty': row['target_qty'], 'current_qty': row['current_qty'], 'total_invested': row['total_invested'], 'status': row['status'], 'buy_date': row['buy_date'], 'updated_at': row['updated_at'], 'size_class': row['size_class'] if 'size_class' in row.keys() else None, } logger.debug(f"📂 활성 트레이드 로드: {len(result)}개") return result except Exception as e: logger.error(f"❌ get_active_trades 실패: {e}") return {} def get_active_trade(self, code: str) -> Optional[Dict]: """ 활성 트레이딩 단일 종목 조회. 잔고 동기화 시 DB에 저장된 평단가를 폴백용으로 사용할 때 쓴다. """ try: cursor = self.conn.execute( "SELECT * FROM active_trades WHERE code = ?", (code,), ) row = cursor.fetchone() if not row: return None return { "code": row["code"], "name": row["name"], "strategy": row["strategy"], "avg_buy_price": row["avg_buy_price"], "current_price": row["current_price"], "stop_price": row["stop_price"], "target_price": row["target_price"], "max_price": row["max_price"], "atr_entry": row["atr_entry"], "target_qty": row["target_qty"], "current_qty": row["current_qty"], "total_invested": row["total_invested"], "status": row["status"], "buy_date": row["buy_date"], "updated_at": row["updated_at"], } except Exception as e: logger.error(f"❌ get_active_trade 실패 ({code}): {e}") return None def update_current_price(self, code: str, current_price: float): """현재가 업데이트 (매도 판단용)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: self.conn.execute( "UPDATE active_trades SET current_price=?, updated_at=? WHERE code=?", (current_price, now, code) ) except Exception as e: logger.error(f"❌ 현재가 업데이트 실패 ({code}): {e}") def update_max_price(self, code: str, new_max_price: float): """최고가 갱신 (트레일링 스탑용)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: # 기존 max_price보다 클 때만 업데이트 self.conn.execute( """UPDATE active_trades SET max_price = MAX(max_price, ?), updated_at = ? WHERE code = ?""", (new_max_price, now, code) ) except Exception as e: logger.error(f"❌ 최고가 갱신 실패 ({code}): {e}") def close_trade( self, code: str, sell_price: float, sell_reason: str = "", env_snapshot: str = None, size_class: str = None, strategy: str = None, realized_pnl_override: float = None, ): """ 매도 완료 처리: active_trades 삭제 -> trade_history 이동 (INSERT만, env 스냅샷 포함) Args: code: 종목코드 sell_price: 매도가 sell_reason: 매도 사유 env_snapshot: 매도 시점 env JSON (백테스트/대시보드용) size_class: 대/중/소형 (매수 시점 저장값) strategy: 봇 전략 ID (SHORT_ANT_SHAKING / SCALP_RSI_REVERSAL 등) 지정 시 해당 전략 row만 삭제 (다른 봇의 동일 종목 보호). None이면 code 단독 조회 (단일 봇 운영 환경 호환). """ try: # 1. 활성 트레이드 정보 조회 (strategy 지정 시 정확히 해당 row만 조회) if strategy: cursor = self.conn.execute( "SELECT * FROM active_trades WHERE code=%s AND strategy=%s", (code, strategy), ) else: cursor = self.conn.execute("SELECT * FROM active_trades WHERE code=%s", (code,)) trade = cursor.fetchone() if not trade: logger.warning(f"⚠️ close_trade: {code} 종목이 active_trades에 없음") return False # 2. 손익 계산 buy_price = trade['avg_buy_price'] qty = trade['current_qty'] # realized_pnl_override 가 있으면 수수료·세금 반영 순손익을 외부에서 주입 # 없으면 내부 계산 (수수료 미포함 gross) if realized_pnl_override is not None: realized_pnl = realized_pnl_override else: realized_pnl = (sell_price - buy_price) * qty profit_rate = (realized_pnl / (buy_price * qty) * 100) if buy_price * qty > 0 else 0 # 3. 보유 시간 계산 buy_time = datetime.datetime.strptime(trade['buy_date'], '%Y-%m-%d %H:%M:%S') sell_time = datetime.datetime.now() hold_minutes = int((sell_time - buy_time).total_seconds() / 60) # size_class는 active_trades에 있으면 그대로 사용 if size_class is None and 'size_class' in trade.keys() and trade['size_class']: size_class = trade['size_class'] # 4. trade_history에 저장 (env_snapshot, size_class, ML 진입 피처 포함 INSERT) feat_vals = [trade[c] if c in trade.keys() else None for c in ML_ENTRY_FEATURE_COLUMNS] cols_th = "code, name, strategy, buy_price, sell_price, qty, profit_rate, realized_pnl, hold_minutes, buy_date, sell_date, sell_reason, env_snapshot, size_class" if ML_ENTRY_FEATURE_COLUMNS: cols_th += ", " + ", ".join(ML_ENTRY_FEATURE_COLUMNS) placeholders = ", ".join(["?"] * (14 + len(ML_ENTRY_FEATURE_COLUMNS))) with self.conn: self.conn.execute(f""" INSERT INTO trade_history ( {cols_th} ) VALUES ({placeholders}) """, ( trade['code'], trade['name'], trade['strategy'], buy_price, sell_price, qty, profit_rate, realized_pnl, hold_minutes, trade['buy_date'], sell_time.strftime('%Y-%m-%d %H:%M:%S'), sell_reason, env_snapshot, size_class, ) + tuple(feat_vals)) # 5. active_trades에서 삭제 (strategy 지정 시 해당 봇 row만 삭제) if strategy: self.conn.execute( "DELETE FROM active_trades WHERE code=%s AND strategy=%s", (code, strategy), ) else: self.conn.execute("DELETE FROM active_trades WHERE code=%s", (code,)) logger.info(f"✅ [{trade['name']}] 매매 종료: 수익률 {profit_rate:.2f}% ({realized_pnl:+,.0f}원)") return True except Exception as e: logger.error(f"❌ close_trade 실패 ({code}): {e}") return False def delete_active_trade(self, code: str, strategy: str = None): """ 활성 트레이드 삭제 (긴급 정리용). strategy 지정 시 해당 봇 row만 삭제, None이면 해당 종목 전체 삭제. """ try: with self.conn: if strategy: self.conn.execute( "DELETE FROM active_trades WHERE code=%s AND strategy=%s", (code, strategy), ) else: self.conn.execute("DELETE FROM active_trades WHERE code=%s", (code,)) logger.info(f"🗑️ active_trade 삭제: {code}" + (f" [{strategy}]" if strategy else "")) return True except Exception as e: logger.error(f"❌ 삭제 실패 ({code}): {e}") return False def insert_buy_execution( self, code: str, name: str, strategy: str, amount: float, qty: int, ): """ 매수 체결 이력 저장 (일일 한도용). '하루' = 산 날짜(buy_date) 기준. """ now = datetime.datetime.now() buy_date = now.strftime("%Y-%m-%d") executed_at = now.strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: self.conn.execute(""" INSERT INTO buy_execution_log (code, name, strategy, buy_date, executed_at, amount, qty) VALUES (?, ?, ?, ?, ?, ?, ?) """, (code, name, strategy, buy_date, executed_at, amount, qty)) return True except Exception as e: logger.error(f"❌ insert_buy_execution 실패 ({code}): {e}") return False def get_daily_buy_amount(self, date_str: str, strategy_prefix: str = "LONG") -> Tuple[float, int]: """ 해당 날짜(산 시점 기준)에 strategy_prefix에 해당하는 매수 누적 금액·건수. date_str: YYYY-MM-DD Returns: (누적 금액, 건수) """ try: cursor = self.conn.execute(""" SELECT COALESCE(SUM(amount), 0) AS total_amount, COUNT(*) AS cnt FROM buy_execution_log WHERE buy_date = %s AND strategy LIKE %s """, (date_str, strategy_prefix.strip().upper() + "%")) row = cursor.fetchone() return (float(row["total_amount"]), int(row["cnt"])) except Exception as e: logger.error(f"❌ get_daily_buy_amount 실패: {e}") return (0.0, 0) # ============================================================ # [보강] 주문·체결 이력 (kt00007 / ka10076) # ============================================================ def insert_order_execution( self, source: str, row: dict, ord_dt: str = None, sell_tp: str = None, raw_json: str = None ): """주문·체결 1건 INSERT (보강용, 이력만 쌓음)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: self.conn.execute(""" INSERT INTO order_execution_history ( source, ord_no, stk_cd, stk_nm, trde_tp, ord_qty, ord_uv, cntr_qty, cntr_uv, ord_tm, cnfm_tm, sell_tp, ord_dt, raw_json, fetched_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( source, row.get('ord_no') or row.get('orig_ord_no'), row.get('stk_cd', ''), row.get('stk_nm', ''), row.get('trde_tp', ''), str(row.get('ord_qty', '') or row.get('cntr_qty', '')), str(row.get('ord_uv', '') or row.get('ord_pric', '') or row.get('cntr_uv', '')), str(row.get('cntr_qty', '') or row.get('cnfm_qty', '')), str(row.get('cntr_uv', '') or row.get('cntr_pric', '')), row.get('ord_tm', ''), row.get('cnfm_tm', ''), sell_tp or '', ord_dt or '', raw_json, now, )) self.conn.commit() return True except Exception as e: logger.debug(f"insert_order_execution: {e}") return False # ============================================================ # [분석] 켈리 공식 및 통계 계산 # ============================================================ def calculate_half_kelly(self, recent_days: int = 30) -> float: """ 하프 켈리 공식 계산 (과거 매매 기록 기반) Args: recent_days: 최근 N일 데이터만 사용 Returns: 하프 켈리 비율 (0.0 ~ 1.0) 예: 0.15 리턴 -> "예수금의 15%씩 배팅하는 게 최적" """ try: # 최근 N일 데이터 조회 cutoff_date = (datetime.datetime.now() - datetime.timedelta(days=recent_days)).strftime('%Y-%m-%d') cursor = self.conn.execute( "SELECT profit_rate FROM trade_history WHERE sell_date >= ? ORDER BY sell_date DESC", (cutoff_date,) ) rows = cursor.fetchall() if len(rows) < 20: # 최소 20건 이상 필요 logger.warning(f"⚠️ 켈리 공식: 데이터 부족 ({len(rows)}건) -> 기본값 10% 리턴") return 0.10 # 승률 계산 wins = [r['profit_rate'] for r in rows if r['profit_rate'] > 0] losses = [r['profit_rate'] for r in rows if r['profit_rate'] <= 0] total_count = len(rows) win_count = len(wins) win_rate = win_count / total_count loss_rate = 1.0 - win_rate # 손익비 계산 (평균 수익 / 평균 손실) if not wins or not losses: logger.warning("⚠️ 켈리 공식: 승 또는 패만 있음 -> 기본값 10%") return 0.10 avg_win = sum(wins) / len(wins) avg_loss = abs(sum(losses) / len(losses)) if avg_loss == 0: return 0.50 # 손실이 0이면 최대치 odds = avg_win / avg_loss # 켈리 공식: f = (p * b - q) / b # p=승률, b=손익비, q=패율 kelly_fraction = ((win_rate * odds) - loss_rate) / odds # 하프 켈리 (안전성 확보) half_kelly = kelly_fraction * 0.5 # 음수면 0 리턴 (통계적으로 지는 구조) final_kelly = max(0.0, min(half_kelly, 0.5)) # 최대 50%로 제한 logger.info( f"📊 [켈리 분석] 승률:{win_rate*100:.1f}% | 손익비:{odds:.2f} | " f"켈리:{kelly_fraction*100:.1f}% | 하프켈리:{final_kelly*100:.1f}%" ) return final_kelly except Exception as e: logger.error(f"❌ 켈리 계산 실패: {e}") return 0.10 def get_recent_performance(self, days: int = 7) -> Tuple[float, int, int]: """ 최근 N일 성과 조회 Returns: (총손익, 익절횟수, 손절횟수) """ try: cutoff = (datetime.datetime.now() - datetime.timedelta(days=days)).strftime('%Y-%m-%d') cursor = self.conn.execute( "SELECT realized_pnl FROM trade_history WHERE sell_date >= ?", (cutoff,) ) rows = cursor.fetchall() total_pnl = sum([r['realized_pnl'] for r in rows]) wins = len([r for r in rows if r['realized_pnl'] > 0]) losses = len([r for r in rows if r['realized_pnl'] <= 0]) return total_pnl, wins, losses except Exception as e: logger.error(f"❌ 성과 조회 실패: {e}") return 0.0, 0, 0 def get_trade_stats(self) -> Dict: """전체 매매 통계""" try: cursor = self.conn.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN profit_rate > 0 THEN 1 ELSE 0 END) as wins, AVG(profit_rate) as avg_profit_rate, SUM(realized_pnl) as total_pnl FROM trade_history """) row = cursor.fetchone() return { 'total_trades': row['total'] or 0, 'win_trades': row['wins'] or 0, 'win_rate': (row['wins'] / row['total'] * 100) if row['total'] > 0 else 0, 'avg_profit_rate': row['avg_profit_rate'] or 0, 'total_pnl': row['total_pnl'] or 0 } except Exception as e: logger.error(f"❌ 통계 조회 실패: {e}") return {} # ============================================================ # [유틸] JSON 마이그레이션 # ============================================================ def migrate_from_json(self, json_data: Dict): """ 기존 JSON 포트폴리오를 DB로 마이그레이션 Args: json_data: portfolio.json 내용 (딕셔너리) """ count = 0 for code, info in json_data.items(): trade_data = info.copy() trade_data['code'] = code # 필드 매핑 (JSON -> DB) if 'target_qty' not in trade_data: trade_data['target_qty'] = info.get('qty', 0) if 'current_qty' not in trade_data: trade_data['current_qty'] = info.get('qty', 0) if 'total_invested' not in trade_data: trade_data['total_invested'] = info.get('buy_price', 0) * info.get('qty', 0) if 'status' not in trade_data: trade_data['status'] = 'HOLDING' if self.upsert_trade(trade_data): count += 1 logger.info(f"✅ JSON -> DB 마이그레이션 완료: {count}개 종목") return count # ============================================================ # [CRUD] Target Candidates (매수 후보군 관리) # ============================================================ def update_target_candidates(self, candidates: List[Dict]): """ 매수 후보군 업데이트 (5분마다 호출) Args: candidates: [{ 'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, 'market': 'K', # 선택 (없으면 stock_meta에서 조회) 'sector': '반도체', # 선택 'theme': 'AI반도체' # 선택 }, ...] """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # stock_meta 캐시: 테마/섹터가 없는 항목을 DB에서 보완 meta_cache: Dict[str, Dict] = {} # 기존 데이터 전체 삭제 (5분마다 새로 갱신) with self.conn: self.conn.execute("DELETE FROM target_candidates") for item in candidates: code = item['code'] # market/sector/theme 없으면 stock_meta에서 조회 market = item.get('market') or "" sector = item.get('sector') or "" theme = item.get('theme') or "" if not (market and sector and theme): if code not in meta_cache: try: r = self.conn.execute( "SELECT market, sector, theme FROM stock_meta WHERE code=%s", (code,) ).fetchone() meta_cache[code] = dict(r) if r else {} except Exception: meta_cache[code] = {} m = meta_cache[code] market = market or m.get("market") or "Q" sector = sector or m.get("sector") or "" theme = theme or m.get("theme") or "" self.conn.execute(""" INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at, market, sector, theme) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( code, item.get('name', ''), item.get('score', 0), item.get('price', 0), scan_time, scan_time, market, sector, theme, )) logger.info(f"✅ 매수 후보군 DB 저장: {len(candidates)}개") return True except Exception as e: logger.error(f"❌ 후보군 저장 실패: {e}") return False def add_target_candidate(self, candidate: Dict): """ 매수 후보군 개별 추가 (통과 즉시 저장용, UPSERT 방식) - 500개 스캔 시 시간이 오래 걸려서 통과하는 즉시 DB에 저장 Args: candidate: { 'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, 'market': 'K', 'sector': '반도체', 'theme': 'AI반도체' # 선택 } """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") code = candidate['code'] market = candidate.get('market') or "" sector = candidate.get('sector') or "" theme = candidate.get('theme') or "" # 없으면 stock_meta 에서 보완 if not (market and sector and theme): try: r = self.conn.execute( "SELECT market, sector, theme FROM stock_meta WHERE code=%s", (code,) ).fetchone() if r: m = dict(r) market = market or m.get("market") or "Q" sector = sector or m.get("sector") or "" theme = theme or m.get("theme") or "" except Exception: pass with self.conn: self.conn.execute(""" INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at, market, sector, theme) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE name = VALUES(name), score = VALUES(score), price = VALUES(price), scan_time = VALUES(scan_time), updated_at = VALUES(updated_at), market = VALUES(market), sector = VALUES(sector), theme = VALUES(theme) """, ( code, candidate.get('name', ''), candidate.get('score', 0), candidate.get('price', 0), scan_time, scan_time, market, sector, theme, )) return True except Exception as e: logger.debug(f"후보 개별 저장 실패({candidate.get('code', '')}): {e}") return False def get_target_candidates(self) -> List[Dict]: """ 매수 후보군 조회 (점수 순). stock_meta JOIN으로 테마/섹터 보완 반환. Returns: [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, 'market': 'K', 'sector': '반도체', 'theme': 'AI반도체'}, ...] """ try: cursor = self.conn.execute(""" SELECT t.code, t.name, t.score, t.price, t.scan_time, COALESCE(t.market, m.market, 'Q') AS market, COALESCE(t.sector, m.sector, '') AS sector, COALESCE(t.theme, m.theme, '') AS theme FROM target_candidates t LEFT JOIN stock_meta m ON m.code = t.code ORDER BY t.score DESC, t.price ASC """) rows = cursor.fetchall() result = [] for row in rows: result.append({ 'code': row['code'], 'name': row['name'], 'score': row['score'], 'price': row['price'], 'scan_time': row['scan_time'], 'market': row.get('market', 'Q'), 'sector': row.get('sector', ''), 'theme': row.get('theme', ''), }) return result except Exception as e: logger.error(f"❌ 후보군 조회 실패: {e}") return [] # ------------------------------------------------------------------ # stock_meta 테마/섹터 메타데이터 헬퍼 # ------------------------------------------------------------------ def upsert_stock_meta( self, code: str, name: str = "", market: str = "Q", sector_code: str = "", sector: str = "", theme: str = "", theme_rank: int = 3, ) -> bool: """ 종목 메타데이터 저장/갱신 (스캐너·수동 입력 모두 사용). Args: code : 종목코드 (6자리) name : 종목명 market : 'K'=KOSPI, 'Q'=KOSDAQ, 'E'=ETF sector_code: 업종코드 (KIS bstp_cls_code 등) sector : 업종명 (예: '반도체') theme : 테마명 (예: 'AI반도체', '2차전지') theme_rank : 테마 내 순위 (1=핵심주, 2=연관, 3=주변) """ try: now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") with self.conn: self.conn.execute(""" INSERT INTO stock_meta (code, name, market, sector_code, sector, theme, theme_rank, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE name = IF(VALUES(name) != '', VALUES(name), name), market = IF(VALUES(market) != '', VALUES(market), market), sector_code = IF(VALUES(sector_code) != '', VALUES(sector_code), sector_code), sector = IF(VALUES(sector) != '', VALUES(sector), sector), theme = IF(VALUES(theme) != '', VALUES(theme), theme), theme_rank = IF(VALUES(theme) != '', VALUES(theme_rank), theme_rank), updated_at = VALUES(updated_at) """, (code, name, market, sector_code, sector, theme, theme_rank, now)) return True except Exception as e: logger.debug("upsert_stock_meta 실패(%s): %s", code, e) return False def get_stock_meta(self, code: str) -> Optional[Dict]: """종목 메타데이터 단건 조회. 없으면 None.""" try: row = self.conn.execute( "SELECT * FROM stock_meta WHERE code = %s", (code,) ).fetchone() return dict(row) if row else None except Exception as e: logger.debug("get_stock_meta 실패(%s): %s", code, e) return None def get_theme_momentum(self, theme: str, tf: int = 60) -> Dict: """ 테마 종목들의 최신 확정 봉(timeframe=tf) RSI 통계. "이 테마가 지금 뜨겁냐 식었냐"를 숫자로 파악. Args: theme: 테마명 (stock_meta.theme) tf : 봉 단위 분 (기본 60분봉) Returns: { 'theme': 'AI반도체', 'tf': 60, 'count': 5, # 데이터 있는 종목 수 'avg_rsi3': 62.1, # 평균 RSI(3) 'max_rsi3': 78.4, # 최고 RSI(3) 'min_rsi3': 44.2, # 최저 RSI(3) 'hot_count': 2, # RSI>70 (과열) 종목 수 'cold_count': 1, # RSI<30 (과매도) 종목 수 } """ try: row = self.conn.execute(""" SELECT COUNT(*) AS cnt, AVG(w.rsi_3) AS avg_rsi, MAX(w.rsi_3) AS max_rsi, MIN(w.rsi_3) AS min_rsi, SUM(w.rsi_3 > 70) AS hot_cnt, SUM(w.rsi_3 < 30) AS cold_cnt FROM ws_candles w INNER JOIN stock_meta m ON m.code = w.code WHERE m.theme = %s AND w.timeframe = %s AND w.is_confirmed = 1 AND w.candle_time = ( SELECT MAX(candle_time) FROM ws_candles WHERE code = w.code AND timeframe = w.timeframe ) """, (theme, tf)).fetchone() if not row or not row['cnt']: return {'theme': theme, 'tf': tf, 'count': 0} return { 'theme': theme, 'tf': tf, 'count': int(row['cnt'] or 0), 'avg_rsi3': round(float(row['avg_rsi'] or 0), 1), 'max_rsi3': round(float(row['max_rsi'] or 0), 1), 'min_rsi3': round(float(row['min_rsi'] or 0), 1), 'hot_count': int(row['hot_cnt'] or 0), 'cold_count':int(row['cold_cnt'] or 0), } except Exception as e: logger.debug("get_theme_momentum 실패(%s, %dM): %s", theme, tf, e) return {'theme': theme, 'tf': tf, 'count': 0} def get_all_theme_momentum(self, tf: int = 60) -> List[Dict]: """ 모든 테마의 최신 모멘텀 요약 (테마 히트맵). 가장 뜨거운 테마부터 정렬 반환. """ try: rows = self.conn.execute(""" SELECT m.theme, COUNT(DISTINCT w.code) AS cnt, AVG(w.rsi_3) AS avg_rsi, SUM(w.rsi_3 > 70) AS hot_cnt, SUM(w.rsi_3 < 30) AS cold_cnt FROM ws_candles w INNER JOIN stock_meta m ON m.code = w.code WHERE m.theme IS NOT NULL AND m.theme != '' AND w.timeframe = %s AND w.is_confirmed = 1 AND w.candle_time = ( SELECT MAX(candle_time) FROM ws_candles WHERE code = w.code AND timeframe = w.timeframe ) GROUP BY m.theme ORDER BY avg_rsi DESC """, (tf,)).fetchall() return [{ 'theme': r['theme'], 'count': int(r['cnt'] or 0), 'avg_rsi3': round(float(r['avg_rsi'] or 0), 1), 'hot_count': int(r['hot_cnt'] or 0), 'cold_count':int(r['cold_cnt'] or 0), } for r in rows] except Exception as e: logger.debug("get_all_theme_momentum 실패: %s", e) return [] def get_market_regime(self, codes: list = None, tf: int = 60) -> Dict: """ 영구 구독 ETF (KODEX200 등)의 최신 60분봉 RSI로 시장 방향 판단. 스캘핑/꼬리잡기 진입 전 "지금 상승장인가?" 필터로 활용. Args: codes: ETF 코드 리스트 (None=env PERMANENT_WS_CODES) tf : 봉 단위 분 (기본 60) Returns: { 'is_bull': True, # avg_rsi > 50 → 상승 국면 'avg_rsi': 58.4, 'details': [{'code': '069500', 'rsi': 61.2}, ...] } """ try: if not codes: raw = self.conn.execute( "SELECT PERMANENT_WS_CODES FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() perm = str(raw['PERMANENT_WS_CODES'] if raw else "") if raw else "" codes = [c.strip() for c in perm.split(",") if c.strip()] or ["069500", "229200"] rows = self.conn.execute(""" SELECT code, rsi_3 FROM ws_candles WHERE code IN ({}) AND timeframe = %s AND is_confirmed = 1 AND candle_time = ( SELECT MAX(candle_time) FROM ws_candles WHERE code = ws_candles.code AND timeframe = ws_candles.timeframe ) """.format(",".join(["%s"] * len(codes))), (*codes, tf)).fetchall() details = [{'code': r['code'], 'rsi': round(float(r['rsi_3'] or 50), 1)} for r in rows] avg_rsi = sum(d['rsi'] for d in details) / len(details) if details else 50.0 return { 'is_bull': avg_rsi > 50, 'avg_rsi': round(avg_rsi, 1), 'details': details, } except Exception as e: logger.debug("get_market_regime 실패: %s", e) return {'is_bull': True, 'avg_rsi': 50.0, 'details': []} def get_trades_by_date(self, date_str: str) -> List[Dict]: """ 특정 날짜의 매매 기록 조회 Args: date_str: 날짜 (YYYYMMDD 또는 YYYY-MM-DD 모두 허용) Returns: 매매 기록 리스트 """ try: # YYYYMMDD(8자) → YYYY-MM-DD 변환, 이미 YYYY-MM-DD(10자)면 그대로 사용 if len(date_str) == 8 and "-" not in date_str: date_formatted = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}" else: date_formatted = date_str[:10] # 'YYYY-MM-DD'만 사용 (시간 부분 제거) cursor = self.conn.execute(""" SELECT * FROM trade_history WHERE DATE(sell_date) = %s ORDER BY sell_date DESC """, (date_formatted,)) rows = cursor.fetchall() result = [] for row in rows: result.append({ 'id': row['id'], 'code': row['code'], 'name': row['name'], 'strategy': row['strategy'], 'buy_price': row['buy_price'], 'sell_price': row['sell_price'], 'qty': row['qty'], 'profit_rate': row['profit_rate'], 'realized_pnl': row['realized_pnl'], 'hold_minutes': row['hold_minutes'], 'buy_date': row['buy_date'], 'sell_date': row['sell_date'], 'sell_reason': row['sell_reason'] }) return result except Exception as e: logger.error(f"❌ 날짜별 조회 실패: {e}") return [] # ============================================================ # [env_config] 관리자용 env (INSERT만, 최신 1건 = 살아있는 값) # ============================================================ def insert_env_snapshot(self, snapshot) -> Optional[int]: """ env 설정 INSERT (UPDATE 없음). 관리자가 설정 변경할 때 호출. snapshot: dict 또는 JSON 문자열. 키는 ENV_CONFIG_KEYS에 있는 것만 저장. Returns: 새 행 id, 실패 시 None """ try: if isinstance(snapshot, str): snapshot = json.loads(snapshot) if snapshot else {} if not isinstance(snapshot, dict): return None now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") key_list = ", ".join(f"`{k}`" for k in ENV_CONFIG_KEYS) placeholders = ", ".join(["%s"] * (1 + len(ENV_CONFIG_KEYS))) vals = [now] + [snapshot.get(k) for k in ENV_CONFIG_KEYS] cur = self.conn.execute( f"INSERT INTO env_config (created_at, {key_list}) VALUES ({placeholders})", vals, ) return cur.lastrowid except Exception as e: logger.error(f"❌ env_config INSERT 실패: {e}") return None def get_latest_env(self) -> Optional[Dict]: """ 살아있는 최신 env 1건 조회 (ORDER BY id DESC LIMIT 1). Returns: {"id": int, "created_at": str, "snapshot": dict} 또는 없으면 None """ try: row = self.conn.execute( "SELECT * FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() if not row: return None snapshot = { k: (row[k] if row[k] is not None else "") for k in ENV_CONFIG_KEYS if k in row.keys() } return { "id": row["id"], "created_at": row["created_at"], "snapshot": snapshot, } except Exception as e: logger.error(f"❌ env_config 최신 조회 실패: {e}") return None # ============================================================ # [kv_store] 매터모스트 원격 조종용 키-값 (마지막 AI 추천, last_seen 등) # ============================================================ def get_kv(self, key: str) -> Optional[str]: """kv_store에서 값 조회. 없으면 None.""" try: row = self.conn.execute("SELECT v FROM kv_store WHERE k = ?", (key,)).fetchone() return row["v"] if row else None except Exception as e: logger.debug(f"get_kv 실패: {e}") return None def set_kv(self, key: str, value: str) -> bool: """kv_store에 값 저장 (REPLACE INTO = INSERT OR REPLACE MySQL 버전).""" try: self.conn.execute("REPLACE INTO kv_store (k, v) VALUES (%s, %s)", (key, value)) return True except Exception as e: logger.error(f"❌ set_kv 실패: {e}") return False def set_last_ai_recommendations(self, text: str) -> bool: """마지막 AI 수치 추천문 저장 (!적용 시 사용).""" return self.set_kv("last_ai_recommendations", text) def get_last_ai_recommendations(self) -> Optional[str]: """마지막 AI 수치 추천문 조회. 없으면 None.""" return self.get_kv("last_ai_recommendations") # ------------------------------------------------------------------ # [ai_analysis_log] Butler 분석 기록 (프롬프트 요약·응답 저장/조회) # ------------------------------------------------------------------ def insert_ai_analysis_log(self, model: str, context_summary: str, response: str) -> Optional[int]: """AI 분석 한 건 저장. model=claude|gemini, context_summary=현재상태 요약, response=AI 전체 응답.""" try: now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") cur = self.conn.execute( "INSERT INTO ai_analysis_log (created_at, model, context_summary, response) VALUES (%s, %s, %s, %s)", (now, model, (context_summary or "")[:2000], response or ""), ) return cur.lastrowid except Exception as e: logger.error("insert_ai_analysis_log 실패: %s", e) return None def get_ai_analysis_log_list(self, limit: int = 10) -> list: """최근 N건 목록. 각 항목: id, created_at, model, context_summary, response(앞 400자).""" try: rows = self.conn.execute( """ SELECT id, created_at, model, context_summary, response FROM ai_analysis_log ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() out = [] for r in rows: resp = (r["response"] or "")[:400] if len(r["response"] or "") > 400: resp += "..." out.append({"id": r["id"], "created_at": r["created_at"], "model": r["model"], "context_summary": r["context_summary"], "response_preview": resp}) return out except Exception as e: logger.error("get_ai_analysis_log_list 실패: %s", e) return [] def get_ai_analysis_log_by_id(self, log_id: int) -> Optional[dict]: """id로 전체 한 건 조회. 없으면 None.""" try: row = self.conn.execute( "SELECT id, created_at, model, context_summary, response FROM ai_analysis_log WHERE id = ?", (log_id,), ).fetchone() if not row: return None return {"id": row["id"], "created_at": row["created_at"], "model": row["model"], "context_summary": row["context_summary"], "response": row["response"]} except Exception as e: logger.error("get_ai_analysis_log_by_id 실패: %s", e) return None # ================================================================== # ws_candles 헬퍼 (kis_scalping_ver1 전용) # ================================================================== def upsert_ws_candle(self, code: str, timeframe: int, candle_time: str, open_: float, high: float, low: float, close: float, volume: int, is_confirmed: int = 0, source: str = "ws", rsi_2: float = None, rsi_3: float = None, rsi_5: float = None): """ 봉 1개를 INSERT OR REPLACE 로 저장. - 진행 중인 봉(is_confirmed=0): 틱마다 close/high/low/volume 갱신 - 확정 봉(is_confirmed=1): RSI 계산 후 최종 저장 """ now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with self.conn: self.conn.execute(""" INSERT INTO ws_candles (code, timeframe, candle_time, open, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(code, timeframe, candle_time) DO UPDATE SET high = MAX(high, excluded.high), low = MIN(low, excluded.low), close = excluded.close, volume = excluded.volume, rsi_2 = COALESCE(excluded.rsi_2, rsi_2), rsi_3 = COALESCE(excluded.rsi_3, rsi_3), rsi_5 = COALESCE(excluded.rsi_5, rsi_5), is_confirmed = excluded.is_confirmed, source = excluded.source, updated_at = excluded.updated_at """, (code, timeframe, candle_time, open_, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed, source, now_str)) except Exception as e: logger.error("upsert_ws_candle 실패(%s): %s", code, e) def get_ws_candles(self, code: str, timeframe: int, limit: int = 100, confirmed_only: bool = False) -> list: """ 최근 N개 봉 리스트 반환 (오래된 순 → 최신 순). confirmed_only=True 면 확정 봉만 반환 (RSI 계산용). """ try: cond = "AND is_confirmed = 1" if confirmed_only else "" rows = self.conn.execute(f""" SELECT candle_time, open, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed, source FROM ws_candles WHERE code = ? AND timeframe = ? {cond} ORDER BY candle_time DESC LIMIT ? """, (code, timeframe, limit)).fetchall() # 최신→오래된 순으로 왔으므로 역정렬해서 반환 (오래된→최신) result = [dict(r) for r in rows] result.reverse() return result except Exception as e: logger.error("get_ws_candles 실패(%s): %s", code, e) return [] def get_latest_ws_candle(self, code: str, timeframe: int) -> Optional[dict]: """가장 최신 봉 1개 반환 (진행 중 봉 포함).""" try: row = self.conn.execute(""" SELECT candle_time, open, high, low, close, volume, rsi_2, rsi_3, rsi_5, is_confirmed FROM ws_candles WHERE code = ? AND timeframe = ? ORDER BY candle_time DESC LIMIT 1 """, (code, timeframe)).fetchone() return dict(row) if row else None except Exception as e: logger.error("get_latest_ws_candle 실패(%s): %s", code, e) return None def get_latest_confirmed_ws_candle(self, code: str, timeframe: int) -> Optional[dict]: """가장 최신 확정 봉 1개 반환 (RSI 포함).""" try: row = self.conn.execute(""" SELECT candle_time, open, high, low, close, volume, rsi_2, rsi_3, rsi_5 FROM ws_candles WHERE code = ? AND timeframe = ? AND is_confirmed = 1 ORDER BY candle_time DESC LIMIT 1 """, (code, timeframe)).fetchone() return dict(row) if row else None except Exception as e: logger.error("get_latest_confirmed_ws_candle 실패(%s): %s", code, e) return None def cleanup_old_ws_candles(self, keep_days: int = 3): """오래된 ws_candles 정리 (기본 3일 이상 지난 봉 삭제).""" cutoff = (datetime.datetime.now() - datetime.timedelta(days=keep_days)).strftime("%Y%m%d%H%M") try: with self.conn: self.conn.execute( "DELETE FROM ws_candles WHERE candle_time < ?", (cutoff,) ) except Exception as e: logger.error("cleanup_old_ws_candles 실패: %s", e) def close(self): """DB 연결 종료""" if self.conn: self.conn.close() logger.info("🔒 DB 연결 종료")