diff --git a/database.py b/database.py index 6381733..a64129e 100644 --- a/database.py +++ b/database.py @@ -1,20 +1,178 @@ """ 트레이딩 봇 데이터베이스 관리 모듈 -- SQLite 기반 데이터 무결성 보장 (JSON 대비 재시작 안전성 향상) +- MariaDB 기반 (pymysql) — SQLite 에서 이전 - 활성 트레이딩 관리 (active_trades) - 매매 히스토리 관리 (trade_history) + +MariaDB 접속 정보 우선순위: + 1) 환경변수 DB_HOST / DB_PORT / DB_USER / DB_PASS / DB_NAME + 2) 아래 DEFAULT 상수 (기본값) """ import json -import sqlite3 +import os import datetime import logging +import threading from typing import Dict, List, Optional, Tuple +try: + import pymysql + import pymysql.cursors + _PYMYSQL_AVAILABLE = True +except ImportError: + _PYMYSQL_AVAILABLE = False + logger = logging.getLogger("TradeDB") +# ── MariaDB 기본 접속 정보 (환경변수로 재정의 가능) ───────────────── +_DB_HOST = os.environ.get("DB_HOST", "192.168.0.141") +_DB_PORT = int(os.environ.get("DB_PORT", "3306")) +_DB_USER = os.environ.get("DB_USER", "jae") +_DB_PASS = os.environ.get("DB_PASS", "1234") +_DB_NAME = os.environ.get("DB_NAME", "kis_quant_db") + + +# ══════════════════════════════════════════════════════════════════════ +# SQLite 호환 래퍼 — 기존 conn.execute() / with conn: 패턴 유지 +# ══════════════════════════════════════════════════════════════════════ +class _MariaDBConn: + """ + pymysql 연결을 sqlite3 인터페이스처럼 감싸는 래퍼. + - conn.execute(sql, params) → DictCursor 반환 (row['col'] 접근) + - with conn: ... → 자동커밋(autocommit=True) 이므로 no-op + - conn.commit() → 명시적 커밋 (autocommit=True라 호환) + - ? 플레이스홀더 → %s 자동 변환 + - SQLite-only DDL 키워드 자동 변환 + """ + + # SQL 텍스트 자동 변환 규칙 (SQLite → MySQL) + _REPLACE_PAIRS = [ + # DDL 키워드 + ("INTEGER PRIMARY KEY AUTOINCREMENT", "INT NOT NULL AUTO_INCREMENT PRIMARY KEY"), + ("INTEGER PRIMARY KEY", "INT NOT NULL PRIMARY KEY"), + ("AUTOINCREMENT", "AUTO_INCREMENT"), + # DML — SQLite 전용 구문 + ("INSERT OR REPLACE INTO", "REPLACE INTO"), + ("last_insert_rowid()", "LAST_INSERT_ID()"), + # ON CONFLICT 처리: 단순 패턴 제거 후 ON DUPLICATE KEY UPDATE 로 수동 변환 + # (복잡한 케이스는 각 메서드에서 직접 처리) + ] + + def __init__(self): + self._lock = threading.Lock() + self._conn = None + self._connect() + + def _connect(self): + """pymysql 연결 (재연결 포함).""" + if not _PYMYSQL_AVAILABLE: + raise ImportError( + "pymysql 미설치. 설치: pip install pymysql\n" + "또는: pip install PyMySQL" + ) + self._conn = pymysql.connect( + host=_DB_HOST, port=_DB_PORT, + user=_DB_USER, password=_DB_PASS, + database=_DB_NAME, + charset="utf8mb4", + autocommit=True, + cursorclass=pymysql.cursors.DictCursor, + connect_timeout=10, + read_timeout=30, + write_timeout=30, + ) + logger.debug("✅ MariaDB 연결 완료 (%s:%s/%s)", _DB_HOST, _DB_PORT, _DB_NAME) + + def _ensure_connected(self): + """연결 끊김 시 자동 재접속.""" + try: + self._conn.ping(reconnect=True) + except Exception: + try: + self._connect() + except Exception as e: + logger.error("❌ MariaDB 재접속 실패: %s", e) + raise + + @staticmethod + def _translate_sql(sql: str) -> str: + """SQLite 전용 SQL 구문을 MySQL 호환으로 변환.""" + for old, new in _MariaDBConn._REPLACE_PAIRS: + sql = sql.replace(old, new) + # ? → %s (파라미터 플레이스홀더) + # 단, 이미 %s 가 있는 경우 중복 변환 방지 + if "?" in sql: + sql = sql.replace("?", "%s") + return sql + + def execute(self, sql: str, params=None): + """ + SQL 실행. sqlite3.Connection.execute() 와 동일한 인터페이스. + 반환값: DictCursor (fetchone/fetchall/lastrowid 사용 가능) + """ + sql = self._translate_sql(sql) + with self._lock: + self._ensure_connected() + cur = self._conn.cursor() + cur.execute(sql, params or ()) + return cur + + def __enter__(self): + """with conn: 패턴 호환 (autocommit=True 이므로 실질적으로 no-op).""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def commit(self): + """명시적 커밋 (autocommit=True 환경에서 호환성 유지용).""" + try: + with self._lock: + self._conn.commit() + except Exception: + pass + + @property + def row_factory(self): + return None # DictCursor 가 이미 dict 반환, 호환용 stub + + @row_factory.setter + def row_factory(self, _): + pass # sqlite3 호환용 stub (무시) + + def close(self): + try: + self._conn.close() + except Exception: + pass + + # ── information_schema 기반 컬럼 목록 조회 (PRAGMA 대체) ────── + def get_columns(self, table_name: str) -> list: + """ + PRAGMA table_info() 대체. + 반환: 컬럼명 문자열 리스트 + """ + try: + cur = self.execute( + "SELECT COLUMN_NAME FROM information_schema.COLUMNS " + "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s " + "ORDER BY ORDINAL_POSITION", + (_DB_NAME, table_name), + ) + return [row["COLUMN_NAME"] for row in cur.fetchall()] + except Exception as e: + logger.debug("get_columns(%s) 실패: %s", table_name, e) + return [] + +# ML 학습용 진입 시점 피처 컬럼 (active_trades / trade_history 공통) +ML_ENTRY_FEATURE_COLUMNS = ( + "rsi", "volume_ratio", "tail_length_pct", "ma5_gap_pct", "ma20_gap_pct", + "foreign_net_buy", "institution_net_buy", "market_hour", +) + # env_config 테이블 컬럼 (키 하나당 컬럼 하나, 추가/삭제 시 여기와 CREATE TABLE만 수정) ENV_CONFIG_KEYS = ( - "STOP_LOSS_PCT", "SHOULDER_CUT_PCT", "STOP_ATR_MULTIPLIER_TAIL", "TARGET_ATR_MULTIPLIER_TAIL", + "STOP_LOSS_PCT", "SHOULDER_CUT_PCT", "SHOULDER_MIN_HIGH_PCT", "SHOULDER_MIN_NET_PCT", "STOP_ATR_MULTIPLIER_TAIL", "TARGET_ATR_MULTIPLIER_TAIL", "MAX_POSITION_PCT", "USE_SLOT_CAP", "SLOT_CAP_PCT", "MAX_STOCKS", "USE_KELLY", "RISK_PCT_PER_TRADE", "MIN_POSITION_AMOUNT", "USE_RISK_CHECK", "DAILY_STOP_LOSS_PCT", "CONSECUTIVE_LOSS_LIMIT", @@ -24,8 +182,23 @@ ENV_CONFIG_KEYS = ( "USE_ML_SIGNAL", "ML_MIN_PROBABILITY", "USE_NEWS_ANALYSIS", "NEWS_ANALYSIS_HOUR", "NEWS_MAX_COUNT", "USE_QUICK_PROFIT_PROTECTION", "HIGH_PRICE_CHASE_THRESHOLD", "MAX_DAILY_CHANGE_PCT", "MA20_MAX_ABOVE_PCT", "VOLUME_AVG_MULTIPLIER", "CANDLE_OPEN_PRICE_BUFFER", + "MIN_CANDLE_LEN_TAIL", "MIN_PRICE_TAIL", "TAIL_SCORE_BASE", "TAIL_SCORE_RATIO_MULT", + "TAIL_RATIO_MIN", "TAIL_PCT_MIN", # 꼬리 비율/하락률 최소 (AI·봇 공통 기준) "INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "SIZE_CLASS_LARGE_MIN", "SIZE_CLASS_MID_MIN", - "USE_RANDOM_SPLIT", "FORCE_MARKET_OPEN", "TOTAL_DEPOSIT", + # 단타 스캔 후보 점수 (전부 env/DB) + "SCAN_INVESTOR_NET_STRONG", "SCAN_INVESTOR_SCORE_STRONG", "SCAN_INVESTOR_SCORE_WEAK", + "SCAN_INVESTOR_BONUS_STRONG", "SCAN_INVESTOR_BONUS_WEAK", + "SCAN_VOLUME_BONUS_MIN", "SCAN_VOLUME_BONUS_POINT", + "SCAN_EXEC_STRENGTH_HIGH", "SCAN_EXEC_STRENGTH_MID", "SCAN_EXEC_BONUS_HIGH", "SCAN_EXEC_BONUS_MID", + "SCAN_SCORE_DROP_WEIGHT", "SCAN_SCORE_RECOVERY_WEIGHT", + "MIN_SCORE_DISPLAY", "MM_TOP_N", + "SLOT_MONEY_DEFAULT", "SLOT_BASE_AMOUNT_CAP", + "SIZE_CLASS_SMALL_RATIO", "SIZE_CLASS_MID_RATIO", + # 개미털기/유니버스 필터 추가 키 + "SCAN_MIN_PREV_DAY_PCT", "UPDATE_UNIVERSE_MIN_CANDIDATES", + "UPDATE_UNIVERSE_MIN_SCORE", "UPDATE_UNIVERSE_FALLBACK_TOP_N", "UPDATE_UNIVERSE_TOP_LOG", "UPDATE_UNIVERSE_TOP_N", + "SCAN_INTERVAL_SEC", # 유니버스 스캔 주기(초), kiwoom_universe_scanner 전용 + "USE_RANDOM_SPLIT", "FORCE_MARKET_OPEN", "FORCE_BUY_TEST", "TOTAL_DEPOSIT", # POP/LOCK·금액 손절 관련 추가 키 "ROUND_TRIP_COST_PCT", "POP_NET_PCT", "LOCK_NET_PCT", "MAX_LOSS_PER_TRADE_KRW", # 한투 API 관련 키 추가 (실전/모의 계좌 분리) @@ -36,29 +209,147 @@ ENV_CONFIG_KEYS = ( "KIS_MOCK", # 단타 봇 전용 키 "TAKE_PROFIT_PCT", "MIN_DROP_RATE", "MIN_RECOVERY_RATIO_SHORT", + # 단타 매도 로직 (env/DB에서 수치 로드) + "SCALP_ATR_UP_MULT", "SCALP_ATR_DOWN_MULT", "SCALP_ATR_DROP_MULT", + "QUICK_PROFIT_PROTECT_HOURS", "QUICK_PROFIT_MAX_RATIO", "QUICK_PROFIT_CURRENT_MIN", + "MIN_HOLD_EARLY_TAKE_PCT", "MIN_HOLD_HIGH_PCT", "MIN_HOLD_DROP_FROM_HIGH", + "POST_HOLD_TAKE_PCT", "POST_HOLD_DROP_FROM_HIGH", # 늘림목 봇 전용 키 "MAX_PER", "MAX_PEG", "MIN_GROWTH_PCT", "DCA_INTERVALS", "DCA_AMOUNTS", # Mattermost 및 AI 리포트 관련 키 "MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY", + "AI_JOURNAL_LINES", "ANTHROPIC_API_KEY", "CLAUDE_MODEL_ID", "CLAUDE_MAX_TOKENS", + "MM_BUTLER_CHANNEL", + # OpenRouter API 연동용 키 + "OPENROUTER_API_KEY", "OPENROUTER_MODEL_ID", # 봇별 Mattermost 채널 구분용 키 "KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", + # 롱 위시리스트/뉴스 리포트 전용 키 + "LONG_DAILY_LOOKBACK_DAYS", "LONG_MA_SHORT_DAYS", "LONG_MA_LONG_DAYS", + "LONG_REPORT_AM_HOUR", "LONG_REPORT_AM_MIN", + "LONG_REPORT_PM_HOUR", "LONG_REPORT_PM_MIN", + "LONG_NEWS_ENABLED", "LONG_NEWS_INTERVAL_MIN", + "LONG_NEWS_ACTIVE_START_HOUR", "LONG_NEWS_ACTIVE_END_HOUR", + "LONG_ANALYSIS_DELAY_MIN_SEC", "LONG_ANALYSIS_DELAY_MAX_SEC", + # RSI 기간: 스윙=14(기본), 단타=5, 스캘핑=3 권장 (DB에서 실시간 변경 가능) + "RSI_PERIOD", + # KIS WebSocket 관련 키 + "KIS_WS_URL_REAL", "KIS_WS_URL_MOCK", "KIS_WS_MOCK_ENABLED", + # 재진입 쿨다운: 매도 후 같은 종목 재매수를 N초 동안 차단 (반복매매 루프 방지) + "REENTRY_COOLDOWN_SEC", + # 매도 실패 백오프: 영업일 아님·장외 시간 오류 시 N초 동안 재시도 금지 (API 낭비·차단 방지) + "SELL_FAILURE_BACKOFF_SEC", + # ── 스캘핑봇(kis_scalping_ver1) 전용 키 ────────────────────────────── + # RSI 과매도 임계값: 이 값 이하면 "과매도 → 되돌림 가능" 후보로 판단 + "SCALP_RSI_OVERSOLD", + # RSI 과매수 임계값: 이 값 이상이면 신규 진입 금지 (고점 추격 방지) + "SCALP_RSI_OVERBOUGHT", + # 스캘핑봇이 사용할 봉 단위 (분): 1 또는 3 권장 + "SCALP_CANDLE_TIMEFRAME", + # 장 시작 후 몇 분 뒤부터 매매 허용 (장 시작 직후 변동성 회피) + "SCALP_MARKET_OPEN_WAIT_MIN", + # WS 재접속 후 갭 보정에 사용할 REST 분봉 조회 캔들 수 + "SCALP_GAP_FILL_LIMIT", + # 확정 봉 기준 최소 거래량: 이 값 미만인 봉은 유동성 부족으로 무시 + "SCALP_MIN_VOLUME", + # 스캘핑봇 채널 (MM 알림) + "KIS_SCALP_MM_CHANNEL", + # ws_candles 자동 정리 보존 일수 + "SCALP_CANDLE_KEEP_DAYS", + # 스캘핑 전용 손절 % (꼬리잡기 STOP_LOSS_PCT와 분리, 기본 1.5%) + # 1분봉 초단타: 손절이 넓으면(-4%) 자금이 묶여 회전율 0 → 타이트하게 + "SCALP_STOP_LOSS_PCT", + # 스캘핑 전용 익절 % (꼬리잡기 TAKE_PROFIT_PCT와 분리, 기본 1.5%) + # 1분봉에서 +5% 익절은 도달 불가 → 박리다매 전략으로 1.5%씩 수익 적립 + "SCALP_TAKE_PROFIT_PCT", + # 스캘핑 낙폭 필터 % (꼬리잡기 MIN_DROP_RATE와 분리, 기본 1.5%) + # 3% 기준은 1분봉 소형주에서 너무 엄격 → 1.5%로 완화해 타점 빈도 증가 + "SCALP_MIN_DROP_RATE", + # 봉부족 감지 시 재갭보정 최소 간격(초): 같은 종목 중복 REST 호출 방지 (기본 30초) + "SCALP_GAP_RETRY_SEC", + # 트레일링 발동 최소 수익률(%): 고점이 매수가 대비 이 이상 올라야 트레일링 활성화 + # 0.5%면 수수료(~0.21%) 뺀 나머지만 이익 → 1.5 이상 권장 + "SCALP_TRAIL_TRIGGER_PCT", + # 트레일링/본절사수 후 최소 순이익(%): 수수료+세금 위에 이 값만큼 추가 마진 확보 + # 0이면 본절(수수료 이후 0원), 0.2면 최소 +0.2% 순이익 보장 + "SCALP_MIN_PROFIT_PCT", + # ── 단타봇(kis_short_ver2) 전용 키 ─────────────────────────────────── + # 켈리 공식 사용 여부 (true=켈리 적용, false=고정 비중) + "USE_KELLY_FORMULA", + # 켈리 공식 적용 배수 (0.25 = Full Kelly의 25%, 과도한 베팅 방지) + "KELLY_MULTIPLIER", + # 시장가 IOC 주문 사용 여부 (실전: true=IOC, false=일반 시장가) + "USE_MARKET_IOC", + # WebSocket 실시간 가격 캐시 유효기간(초): 이 시간 이상 지나면 REST 재조회 + "KIS_PRICE_CACHE_TTL_SEC", + # WS 재접속 후 갭 보정에 사용할 REST 분봉 조회 캔들 수 (단타봇) + "SHORT_GAP_FILL_LIMIT", + # 매수 직후 최소 보유 시간(초): 이 기간 내 매도 신호 무시 (API 잔고 반영 지연 대응) + "MIN_HOLD_AFTER_BUY_SEC", + # 최소 보유 시간(시간): 이 기간 이전에는 손절 외 매도 금지 (꼬리잡기 전략 충분히 대기) + "MIN_HOLD_HOURS", + # 3개월 최대 회복 비율: 전고점 대비 이 비율 이상 회복한 종목은 추격 매수 제외 + "MAX_RECOVERY_RATIO_3M", + # 유니버스 상위 N개 후보 경량 체크 (매수 후보 1차 필터) + "CANDIDATE_LIST_TOP_N_LIGHT", + # 매수 신호 체크 시 사용할 유니버스 최대 종목 수 (과부하 방지) + "SCAN_UNIVERSE_MAX_CODES", + # 꼬리 캔들 패턴 인식 시 과거 몇 봉까지 확인할지 (lookback) + "TAIL_CANDLE_LOOKBACK", + # ── 레거시 단일 계좌 키 (KIS_APP_KEY_REAL/MOCK 이전 버전 호환용) ────── + "KIS_APP_KEY", "KIS_APP_SECRET", "KIS_ACCOUNT_NO", "KIS_ACCOUNT_CODE", + # ── mm_butler 전용 키 ──────────────────────────────────────────────── + # Gemini AI 모델 ID (gemini-2.5-flash, gemini-1.5-pro 등) + "GEMINI_MODEL_ID", + # MM 원격 명령 폴링 주기(초): 너무 짧으면 API 과부하 + "MM_BUTLER_POLL_SEC", + # AI 소스 텍스트 최대 길이(문자): 초과 시 잘라서 전송 (토큰 비용 관리) + "AI_SOURCE_MAX_CHARS", + # ── 수수료·거래세 (전략 공통) ────────────────────────────────────────── + # FEE_RATE_PCT : 위탁수수료율 (매수/매도 각각, 기본 0.015%) + # SELL_TAX_RATE_PCT: 증권거래세율 (매도 시만 부과, 기본 0.18% ← 2025 코스피/코스닥 공통) + "FEE_RATE_PCT", "SELL_TAX_RATE_PCT", + # ── 키움증권 REST API 키 (60분봉 과거 데이터 수집 전용) ──────────────── + # 실전/모의 분리 (KIS_MOCK 값에 따라 자동 선택) + # KIWOOM_APP_KEY_REAL : 키움증권 실전 앱키 + # KIWOOM_APP_SECRET_REAL: 키움증권 실전 시크릿 + # KIWOOM_APP_KEY_MOCK : 키움증권 모의 앱키 (mockapi.kiwoom.com) + # KIWOOM_APP_SECRET_MOCK: 키움증권 모의 시크릿 + # KIWOOM_APP_KEY / KIWOOM_APP_SECRET: 레거시 (단일 키 호환용) + "KIWOOM_APP_KEY_REAL", "KIWOOM_APP_SECRET_REAL", + "KIWOOM_APP_KEY_MOCK", "KIWOOM_APP_SECRET_MOCK", + "KIWOOM_APP_KEY", "KIWOOM_APP_SECRET", + # ── WebSocket 영구 구독 종목 (시장 방향 필터용) ───────────────────── + # KOSPI/KOSDAQ 지수 ETF는 매매 후보와 무관하게 항상 구독 유지. + # 60분봉 RSI 로 상승장/하락장 체크 → 스캘핑/꼬리잡기 진입 방향 결정. + # 기본값: KODEX200(069500), KODEX KOSDAQ150(229200) + # 콤마 구분 코드: "069500,229200" + "PERMANENT_WS_CODES", + # ── 시장 방향 필터 (상승장에서만 롱 진입) ────────────────────────── + # USE_MARKET_REGIME_FILTER: true=활성, false=비활성 (기본 false) + # MARKET_REGIME_MIN_RSI : ETF 60분봉 RSI 이 값 이상이어야 롱 진입 허용 (기본 48) + "USE_MARKET_REGIME_FILTER", "MARKET_REGIME_MIN_RSI", + # ── 테마 과열 필터 (테마 전체가 과열이면 신규 진입 억제) ─────────── + # USE_THEME_HEAT_FILTER : true=활성 (기본 false) + # THEME_HEAT_RSI_MAX : 테마 평균 RSI 이 값 초과면 진입 차단 (기본 72) + "USE_THEME_HEAT_FILTER", "THEME_HEAT_RSI_MAX", ) class TradeDB: """ - 트레이딩 봇용 SQLite 데이터베이스 관리 클래스 + 트레이딩 봇용 MariaDB 데이터베이스 관리 클래스. + 기존 SQLite 인터페이스와 100% 호환 (db_path 인수는 무시됨). """ def __init__(self, db_path="quant_bot.db"): """ Args: - db_path: SQLite DB 파일 경로 (기본: quant_bot.db) + db_path: 하위 호환용 (무시됨). MariaDB 접속 정보는 환경변수/모듈 상수 사용. """ - self.db_path = db_path - self.conn = sqlite3.connect(db_path, check_same_thread=False) - self.conn.row_factory = sqlite3.Row # 딕셔너리처럼 접근 가능 + self.db_path = db_path # 호환용 보존 + self.conn = _MariaDBConn() self._create_tables() - logger.info(f"✅ TradeDB 초기화 완료: {db_path}") + logger.info("✅ TradeDB 초기화 완료: MariaDB %s:%s/%s", _DB_HOST, _DB_PORT, _DB_NAME) def _create_tables(self): """DB 테이블 생성 (없을 경우)""" @@ -66,14 +357,14 @@ class TradeDB: # 1. 활성 트레이딩 테이블 (현재 보유 중이거나 매수 중인 종목) self.conn.execute(""" CREATE TABLE IF NOT EXISTS active_trades ( - code TEXT PRIMARY KEY, -- 종목코드 - name TEXT NOT NULL, -- 종목명 - strategy TEXT, -- 매매 전략 (TAIL_CATCH_3M 등) - + code VARCHAR(20) NOT NULL, -- 종목코드 + name VARCHAR(100) NOT NULL, -- 종목명 + strategy VARCHAR(50) NOT NULL DEFAULT 'MANUAL', -- 매매 전략 (SHORT_ANT_SHAKING / SCALP_RSI_REVERSAL 등) + PRIMARY KEY (code, strategy), -- 복합 PK: 같은 종목을 서로 다른 봇이 독립 보유 가능 -- [가격 정보] - avg_buy_price REAL NOT NULL, -- 평단가 - current_price REAL, -- 현재가 (업데이트용) - stop_price REAL, -- 손절가 + avg_buy_price DOUBLE NOT NULL, -- 평단가 + current_price DOUBLE, -- 현재가 (업데이트용) + stop_price DOUBLE, -- 손절가 target_price REAL, -- 목표가 max_price REAL, -- 최고가 (트레일링 스탑용) atr_entry REAL, -- 진입 시 ATR 변동성 @@ -94,150 +385,260 @@ class TradeDB: # 2. 매매 기록 테이블 (손익 분석 & 켈리 공식용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS trade_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL, - name TEXT NOT NULL, - strategy TEXT, - buy_price REAL NOT NULL, -- 평단가 - sell_price REAL NOT NULL, -- 매도가 - qty INTEGER NOT NULL, -- 수량 - profit_rate REAL NOT NULL, -- 수익률 (%) - realized_pnl REAL NOT NULL, -- 실현 손익금 (원) - hold_minutes INTEGER, -- 보유 시간 (분) - buy_date TEXT, -- 매수 시작 시간 - sell_date TEXT NOT NULL, -- 매도 완료 시간 - sell_reason TEXT, -- 매도 사유 - env_snapshot TEXT, -- 매도 시점 env (JSON) - size_class TEXT -- 대/중/소형 (변동성 구간) - ) + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + code VARCHAR(20) NOT NULL, + name VARCHAR(100) NOT NULL, + strategy VARCHAR(50), + buy_price DOUBLE NOT NULL, + sell_price DOUBLE NOT NULL, + qty INT NOT NULL, + profit_rate DOUBLE NOT NULL, + realized_pnl DOUBLE NOT NULL, + hold_minutes INT, + buy_date VARCHAR(30), + sell_date VARCHAR(30) NOT NULL, + sell_reason VARCHAR(200), + env_snapshot TEXT, + size_class VARCHAR(20) + ) CHARACTER SET utf8mb4 """) - + # 3. 일일 손익 요약 테이블 (대시보드용) self.conn.execute(""" CREATE TABLE IF NOT EXISTS daily_summary ( - date TEXT PRIMARY KEY, -- 날짜 (YYYY-MM-DD) - start_asset REAL, -- 시작 자산 - end_asset REAL, -- 종료 자산 - total_trades INTEGER, -- 총 매매 횟수 - win_trades INTEGER, -- 익절 횟수 - total_pnl REAL, -- 총 손익 - win_rate REAL -- 승률 (%) - ) + date VARCHAR(10) NOT NULL PRIMARY KEY, + start_asset DOUBLE, + end_asset DOUBLE, + total_trades INT, + win_trades INT, + total_pnl DOUBLE, + win_rate DOUBLE + ) CHARACTER SET utf8mb4 """) - + # 4. 주문·체결 보강 테이블 (kt00007 / ka10076) self.conn.execute(""" CREATE TABLE IF NOT EXISTS order_execution_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - source TEXT NOT NULL, -- 'kt00007' or 'ka10076' - ord_no TEXT, - stk_cd TEXT, - stk_nm TEXT, - trde_tp TEXT, -- 매매구분 - ord_qty TEXT, - ord_uv TEXT, - cntr_qty TEXT, - cntr_uv TEXT, - ord_tm TEXT, - cnfm_tm TEXT, - sell_tp TEXT, - ord_dt TEXT, + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + source VARCHAR(20) NOT NULL, + ord_no VARCHAR(30), + stk_cd VARCHAR(20), + stk_nm VARCHAR(100), + trde_tp VARCHAR(20), + ord_qty VARCHAR(20), + ord_uv VARCHAR(20), + cntr_qty VARCHAR(20), + cntr_uv VARCHAR(20), + ord_tm VARCHAR(20), + cnfm_tm VARCHAR(20), + sell_tp VARCHAR(20), + ord_dt VARCHAR(20), raw_json TEXT, - fetched_at TEXT NOT NULL - ) + fetched_at VARCHAR(30) NOT NULL + ) CHARACTER SET utf8mb4 """) # 5. 매수 체결 이력 (일일 한도용 - '산 시점' 날짜 기준 누적) self.conn.execute(""" CREATE TABLE IF NOT EXISTS buy_execution_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - code TEXT NOT NULL, - name TEXT NOT NULL, - strategy TEXT NOT NULL, - buy_date TEXT NOT NULL, -- YYYY-MM-DD (산 날짜 = 한도 기준일) - executed_at TEXT NOT NULL, -- 체결 시각 - amount REAL NOT NULL, -- 매수 금액 (주가×수량) - qty INTEGER NOT NULL - ) + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + code VARCHAR(20) NOT NULL, + name VARCHAR(100) NOT NULL, + strategy VARCHAR(50) NOT NULL, + buy_date VARCHAR(10) NOT NULL, + executed_at VARCHAR(30) NOT NULL, + amount DOUBLE NOT NULL, + qty INT NOT NULL + ) CHARACTER SET utf8mb4 """) # 6. 매수 후보군 테이블 (target_universe 대체) self.conn.execute(""" CREATE TABLE IF NOT EXISTS target_candidates ( - code TEXT PRIMARY KEY, -- 종목코드 - name TEXT NOT NULL, -- 종목명 - score REAL NOT NULL, -- 개미털기 점수 (높을수록 좋음) - price REAL NOT NULL, -- 현재가 - scan_time TEXT NOT NULL, -- 스캔 시간 - updated_at TEXT NOT NULL -- 마지막 업데이트 - ) + code VARCHAR(20) NOT NULL PRIMARY KEY, + name VARCHAR(100) NOT NULL, + score DOUBLE NOT NULL, + price DOUBLE NOT NULL, + scan_time VARCHAR(30) NOT NULL, + updated_at VARCHAR(30) NOT NULL + ) CHARACTER SET utf8mb4 """) - # 7. env 설정 전용 테이블 (관리자용, INSERT만 / 최신 1건 = 현재 설정, 키당 컬럼) - cols = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS]) + # 7. 종목 메타데이터 (테마·섹터·시장구분) — 스캐너가 채움, 조인 분석 용 + # market: 'K'=KOSPI, 'Q'=KOSDAQ, 'E'=ETF/기타 + # sector: 업종명 (KIS bstp_kor_isnm, 예: '반도체') + # theme : 주요 테마 (예: 'AI반도체', '2차전지', '원자력') + # - 직접 UPDATE 또는 별도 스크립트로 채움 + # theme_rank: 테마 내 대장주/추종주 순위 (1=핵심, 2=연관, 3=주변) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS stock_meta ( + code VARCHAR(20) NOT NULL PRIMARY KEY, + name VARCHAR(100) NOT NULL DEFAULT '', + market CHAR(1) NOT NULL DEFAULT 'Q', + sector_code VARCHAR(20), + sector VARCHAR(100), + theme VARCHAR(100), + theme_rank TINYINT DEFAULT 3, + updated_at VARCHAR(30) NOT NULL + ) CHARACTER SET utf8mb4 + """) + + # 8. env 설정 전용 테이블 (관리자용, INSERT만 / 최신 1건 = 현재 설정, 키당 컬럼) + cols = ", ".join([f"`{k}` TEXT" for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - created_at TEXT NOT NULL, + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + created_at VARCHAR(30) NOT NULL, {cols} - ) + ) CHARACTER SET utf8mb4 """) - + + # 8. 키-값 저장소 (매터모스트 원격 조종: 마지막 AI 추천문, last_seen 등) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS kv_store ( + k VARCHAR(100) NOT NULL PRIMARY KEY, + v MEDIUMTEXT + ) CHARACTER SET utf8mb4 + """) + + # 9. AI 분석 기록 (Butler !클로드분석/!애미분석 시 프롬프트 요약·응답 저장 → 나중에 꺼내보기) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS ai_analysis_log ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + created_at VARCHAR(30) NOT NULL, + model VARCHAR(50) NOT NULL, + context_summary TEXT, + response MEDIUMTEXT + ) CHARACTER SET utf8mb4 + """) + + # 10. WebSocket 실시간 봉 집계 (백테스트용 — CandleAggregator 배치 INSERT) + # - is_confirmed=1 인 확정 봉만 저장 (진행 중 봉은 RAM에만 존재) + # - source: 'ws'=WebSocket틱 집계, 'rest'=갭보정 REST 조회 + # - UNIQUE(code, timeframe, candle_time) → ON DUPLICATE KEY UPDATE + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS ws_candles ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + code VARCHAR(20) NOT NULL, + timeframe TINYINT NOT NULL, + candle_time VARCHAR(12) NOT NULL, + `open` DOUBLE NOT NULL, + high DOUBLE NOT NULL, + low DOUBLE NOT NULL, + close DOUBLE NOT NULL, + volume BIGINT NOT NULL DEFAULT 0, + rsi_2 DOUBLE, + rsi_3 DOUBLE, + rsi_5 DOUBLE, + is_confirmed TINYINT NOT NULL DEFAULT 1, + source VARCHAR(10) NOT NULL DEFAULT 'ws', + updated_at VARCHAR(30) NOT NULL, + UNIQUE KEY uq_candle (code, timeframe, candle_time) + ) CHARACTER SET utf8mb4 + """) + self._migrate_add_columns() self._migrate_env_config_to_columns() logger.info("📊 DB 테이블 생성/확인 완료") def _migrate_add_columns(self): - """기존 DB에 누락된 컬럼 추가 (한 번만)""" + """기존 DB에 누락된 컬럼 추가 (한 번만) — PRAGMA → information_schema 대체""" try: - cur = self.conn.execute("PRAGMA table_info(trade_history)") - cols = [row[1] for row in cur.fetchall()] + cols = self.conn.get_columns("trade_history") if "env_snapshot" not in cols: self.conn.execute("ALTER TABLE trade_history ADD COLUMN env_snapshot TEXT") logger.info("📌 trade_history.env_snapshot 컬럼 추가") if "size_class" not in cols: - self.conn.execute("ALTER TABLE trade_history ADD COLUMN size_class TEXT") + self.conn.execute("ALTER TABLE trade_history ADD COLUMN size_class VARCHAR(20)") logger.info("📌 trade_history.size_class 컬럼 추가") except Exception as e: logger.debug(f"migrate trade_history: {e}") + # ── active_trades PK 복합키 마이그레이션 (code → code+strategy) ────────── + # 두 봇(SHORT/SCALP)이 같은 종목을 독립 보유 가능하도록 PK 확장. + # 신규 설치는 DDL에서 처리됨. 기존 테이블은 여기서 한 번만 ALTER. try: - cur = self.conn.execute("PRAGMA table_info(active_trades)") - cols = [row[1] for row in cur.fetchall()] + cursor = self.conn.execute(""" + SELECT COLUMN_NAME FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'active_trades' + AND CONSTRAINT_NAME = 'PRIMARY' + ORDER BY ORDINAL_POSITION + """) + pk_cols = [row[0] if isinstance(row, (list, tuple)) else row['COLUMN_NAME'] + for row in cursor.fetchall()] + if 'strategy' not in pk_cols: + logger.info("⚙️ active_trades PK 복합키 마이그레이션 시작 (code → code+strategy)") + # NULL strategy → 'MANUAL' 로 채움 (NOT NULL 변경 전 필수) + self.conn.execute("UPDATE active_trades SET strategy = 'MANUAL' WHERE strategy IS NULL OR strategy = ''") + # strategy 컬럼 NOT NULL DEFAULT 'MANUAL' 로 변경 후 PK 재구성 + self.conn.execute("ALTER TABLE active_trades MODIFY COLUMN strategy VARCHAR(50) NOT NULL DEFAULT 'MANUAL'") + self.conn.execute("ALTER TABLE active_trades DROP PRIMARY KEY, ADD PRIMARY KEY (code, strategy)") + logger.info("✅ active_trades PK 복합키(code, strategy) 변환 완료") + except Exception as e: + logger.debug("active_trades PK 마이그레이션 스킵(이미 완료 또는 신규): %s", e) + + try: + cols = self.conn.get_columns("active_trades") if "size_class" not in cols: - self.conn.execute("ALTER TABLE active_trades ADD COLUMN size_class TEXT") + self.conn.execute("ALTER TABLE active_trades ADD COLUMN size_class VARCHAR(20)") logger.info("📌 active_trades.size_class 컬럼 추가") + for c in ML_ENTRY_FEATURE_COLUMNS: + if c not in cols: + self.conn.execute(f"ALTER TABLE active_trades ADD COLUMN `{c}` DOUBLE") + logger.info(f"📌 active_trades.{c} 컬럼 추가 (ML 진입 피처)") except Exception as e: logger.debug(f"migrate active_trades: {e}") - # env_config 테이블에 ENV_CONFIG_KEYS에 정의된 컬럼 누락 시 추가 try: - cur = self.conn.execute("PRAGMA table_info(env_config)") - cols = [row[1] for row in cur.fetchall()] + cols = self.conn.get_columns("trade_history") + for c in ML_ENTRY_FEATURE_COLUMNS: + if c not in cols: + self.conn.execute(f"ALTER TABLE trade_history ADD COLUMN `{c}` DOUBLE") + logger.info(f"📌 trade_history.{c} 컬럼 추가 (ML 진입 피처)") + except Exception as e: + logger.debug(f"migrate trade_history ML columns: {e}") + try: + cols = self.conn.get_columns("env_config") for key in ENV_CONFIG_KEYS: if key not in cols: - self.conn.execute(f'ALTER TABLE env_config ADD COLUMN "{key}" TEXT') + self.conn.execute(f"ALTER TABLE env_config ADD COLUMN `{key}` TEXT") logger.info(f"📌 env_config.{key} 컬럼 추가") except Exception as e: logger.debug(f"migrate env_config columns: {e}") + # ── target_candidates 테마/섹터/시장구분 컬럼 추가 ────────────────── + try: + cols = self.conn.get_columns("target_candidates") + for col, ddl in [ + ("market", "CHAR(1) DEFAULT 'Q'"), + ("sector", "VARCHAR(100)"), + ("theme", "VARCHAR(100)"), + ]: + if col not in cols: + self.conn.execute( + f"ALTER TABLE target_candidates ADD COLUMN `{col}` {ddl}" + ) + logger.info(f"📌 target_candidates.{col} 컬럼 추가") + except Exception as e: + logger.debug(f"migrate target_candidates theme cols: {e}") def _migrate_env_config_to_columns(self): """env_config가 예전 JSON 컬럼(snapshot_json)이면 컬럼 스키마로 이전""" try: - cur = self.conn.execute("PRAGMA table_info(env_config)") - cols = [row[1] for row in cur.fetchall()] + cols = self.conn.get_columns("env_config") if "snapshot_json" not in cols: return - # 기존 데이터 백업 후 새 테이블로 이전 - rows = self.conn.execute("SELECT id, created_at, snapshot_json FROM env_config ORDER BY id").fetchall() - col_defs = ", ".join([f'"{k}" TEXT' for k in ENV_CONFIG_KEYS]) + rows = self.conn.execute( + "SELECT id, created_at, snapshot_json FROM env_config ORDER BY id" + ).fetchall() + col_defs = ", ".join([f"`{k}` TEXT" for k in ENV_CONFIG_KEYS]) self.conn.execute(f""" CREATE TABLE IF NOT EXISTS env_config_new ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - created_at TEXT NOT NULL, + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + created_at VARCHAR(30) NOT NULL, {col_defs} - ) + ) CHARACTER SET utf8mb4 """) - key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS) - placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS))) + key_list = ", ".join(f"`{k}`" for k in ENV_CONFIG_KEYS) + placeholders = ", ".join(["%s"] * (1 + len(ENV_CONFIG_KEYS))) for row in rows: snap = json.loads(row["snapshot_json"]) if row["snapshot_json"] else {} vals = [row["created_at"]] + [snap.get(k) for k in ENV_CONFIG_KEYS] @@ -247,7 +648,6 @@ class TradeDB: ) self.conn.execute("DROP TABLE env_config") self.conn.execute("ALTER TABLE env_config_new RENAME TO env_config") - self.conn.commit() logger.info("📌 env_config: snapshot_json -> 컬럼 스키마 마이그레이션 완료") except Exception as e: logger.debug(f"migrate env_config: {e}") @@ -264,6 +664,7 @@ class TradeDB: trade_data: 트레이드 정보 딕셔너리 필수: code, name, avg_buy_price, target_qty, current_qty, status 선택: strategy, stop_price, target_price, max_price, atr_entry, total_invested + ML 학습용: entry_features (dict) 또는 rsi, volume_ratio 등 개별 키 """ now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -274,24 +675,34 @@ class TradeDB: return False size_class = trade_data.get('size_class') - sql = """ + feats = trade_data.get('entry_features') or {} + feat_vals = [] + for k in ML_ENTRY_FEATURE_COLUMNS: + v = feats.get(k) if k in feats else trade_data.get(k) + feat_vals.append(v if isinstance(v, (int, float)) else None) + cols = ", ".join(["code", "name", "strategy", "avg_buy_price", "current_price", "stop_price", "target_price", + "max_price", "atr_entry", "target_qty", "current_qty", "total_invested", + "status", "buy_date", "updated_at", "size_class"] + list(ML_ENTRY_FEATURE_COLUMNS)) + placeholders = ", ".join(["%s"] * (16 + len(ML_ENTRY_FEATURE_COLUMNS))) + # MySQL: ON DUPLICATE KEY UPDATE (excluded. → VALUES()) + updates = ( + "avg_buy_price = VALUES(avg_buy_price), current_price = VALUES(current_price), " + "stop_price = COALESCE(VALUES(stop_price), stop_price), " + "target_price = COALESCE(VALUES(target_price), target_price), " + "atr_entry = COALESCE(VALUES(atr_entry), atr_entry), " + "current_qty = VALUES(current_qty), total_invested = VALUES(total_invested), " + "max_price = GREATEST(max_price, VALUES(max_price)), " + "status = VALUES(status), updated_at = VALUES(updated_at), " + "size_class = COALESCE(VALUES(size_class), size_class)" + ) + for c in ML_ENTRY_FEATURE_COLUMNS: + updates += f", `{c}` = COALESCE(VALUES(`{c}`), `{c}`)" + sql = f""" INSERT INTO active_trades ( - code, name, strategy, avg_buy_price, current_price, stop_price, target_price, - max_price, atr_entry, target_qty, current_qty, total_invested, - status, buy_date, updated_at, size_class - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(code) DO UPDATE SET - avg_buy_price = excluded.avg_buy_price, - current_price = excluded.current_price, - stop_price = COALESCE(excluded.stop_price, active_trades.stop_price), - target_price = COALESCE(excluded.target_price, active_trades.target_price), - atr_entry = COALESCE(excluded.atr_entry, active_trades.atr_entry), - current_qty = excluded.current_qty, - total_invested = excluded.total_invested, - max_price = MAX(active_trades.max_price, excluded.max_price), - status = excluded.status, - updated_at = excluded.updated_at, - size_class = COALESCE(excluded.size_class, active_trades.size_class) + {cols} + ) VALUES ({placeholders}) + ON DUPLICATE KEY UPDATE + {updates} """ params = ( code, @@ -309,8 +720,8 @@ class TradeDB: trade_data.get('status', 'HOLDING'), trade_data.get('buy_date', now), now, - size_class - ) + size_class, + ) + tuple(feat_vals) try: with self.conn: @@ -334,7 +745,7 @@ class TradeDB: try: if strategy_prefix: cursor = self.conn.execute( - "SELECT * FROM active_trades WHERE strategy LIKE ?", + "SELECT * FROM active_trades WHERE strategy LIKE %s", (strategy_prefix.strip().upper() + "%",) ) else: @@ -373,6 +784,40 @@ class TradeDB: logger.error(f"❌ get_active_trades 실패: {e}") return {} + def get_active_trade(self, code: str) -> Optional[Dict]: + """ + 활성 트레이딩 단일 종목 조회. + 잔고 동기화 시 DB에 저장된 평단가를 폴백용으로 사용할 때 쓴다. + """ + try: + cursor = self.conn.execute( + "SELECT * FROM active_trades WHERE code = ?", + (code,), + ) + row = cursor.fetchone() + if not row: + return None + return { + "code": row["code"], + "name": row["name"], + "strategy": row["strategy"], + "avg_buy_price": row["avg_buy_price"], + "current_price": row["current_price"], + "stop_price": row["stop_price"], + "target_price": row["target_price"], + "max_price": row["max_price"], + "atr_entry": row["atr_entry"], + "target_qty": row["target_qty"], + "current_qty": row["current_qty"], + "total_invested": row["total_invested"], + "status": row["status"], + "buy_date": row["buy_date"], + "updated_at": row["updated_at"], + } + except Exception as e: + logger.error(f"❌ get_active_trade 실패 ({code}): {e}") + return None + def update_current_price(self, code: str, current_price: float): """현재가 업데이트 (매도 판단용)""" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -407,6 +852,8 @@ class TradeDB: sell_reason: str = "", env_snapshot: str = None, size_class: str = None, + strategy: str = None, + realized_pnl_override: float = None, ): """ 매도 완료 처리: active_trades 삭제 -> trade_history 이동 (INSERT만, env 스냅샷 포함) @@ -417,10 +864,19 @@ class TradeDB: sell_reason: 매도 사유 env_snapshot: 매도 시점 env JSON (백테스트/대시보드용) size_class: 대/중/소형 (매수 시점 저장값) + strategy: 봇 전략 ID (SHORT_ANT_SHAKING / SCALP_RSI_REVERSAL 등) + 지정 시 해당 전략 row만 삭제 (다른 봇의 동일 종목 보호). + None이면 code 단독 조회 (단일 봇 운영 환경 호환). """ try: - # 1. 활성 트레이드 정보 조회 - cursor = self.conn.execute("SELECT * FROM active_trades WHERE code=?", (code,)) + # 1. 활성 트레이드 정보 조회 (strategy 지정 시 정확히 해당 row만 조회) + if strategy: + cursor = self.conn.execute( + "SELECT * FROM active_trades WHERE code=%s AND strategy=%s", + (code, strategy), + ) + else: + cursor = self.conn.execute("SELECT * FROM active_trades WHERE code=%s", (code,)) trade = cursor.fetchone() if not trade: @@ -430,8 +886,13 @@ class TradeDB: # 2. 손익 계산 buy_price = trade['avg_buy_price'] qty = trade['current_qty'] - profit_rate = ((sell_price - buy_price) / buy_price) * 100 if buy_price > 0 else 0 - realized_pnl = (sell_price - buy_price) * qty + # realized_pnl_override 가 있으면 수수료·세금 반영 순손익을 외부에서 주입 + # 없으면 내부 계산 (수수료 미포함 gross) + if realized_pnl_override is not None: + realized_pnl = realized_pnl_override + else: + realized_pnl = (sell_price - buy_price) * qty + profit_rate = (realized_pnl / (buy_price * qty) * 100) if buy_price * qty > 0 else 0 # 3. 보유 시간 계산 buy_time = datetime.datetime.strptime(trade['buy_date'], '%Y-%m-%d %H:%M:%S') @@ -442,14 +903,17 @@ class TradeDB: if size_class is None and 'size_class' in trade.keys() and trade['size_class']: size_class = trade['size_class'] - # 4. trade_history에 저장 (env_snapshot, size_class 포함 INSERT) + # 4. trade_history에 저장 (env_snapshot, size_class, ML 진입 피처 포함 INSERT) + feat_vals = [trade[c] if c in trade.keys() else None for c in ML_ENTRY_FEATURE_COLUMNS] + cols_th = "code, name, strategy, buy_price, sell_price, qty, profit_rate, realized_pnl, hold_minutes, buy_date, sell_date, sell_reason, env_snapshot, size_class" + if ML_ENTRY_FEATURE_COLUMNS: + cols_th += ", " + ", ".join(ML_ENTRY_FEATURE_COLUMNS) + placeholders = ", ".join(["?"] * (14 + len(ML_ENTRY_FEATURE_COLUMNS))) with self.conn: - self.conn.execute(""" + self.conn.execute(f""" INSERT INTO trade_history ( - code, name, strategy, buy_price, sell_price, qty, - profit_rate, realized_pnl, hold_minutes, buy_date, sell_date, sell_reason, - env_snapshot, size_class - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + {cols_th} + ) VALUES ({placeholders}) """, ( trade['code'], trade['name'], @@ -465,10 +929,16 @@ class TradeDB: sell_reason, env_snapshot, size_class, - )) + ) + tuple(feat_vals)) - # 5. active_trades에서 삭제 - self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,)) + # 5. active_trades에서 삭제 (strategy 지정 시 해당 봇 row만 삭제) + if strategy: + self.conn.execute( + "DELETE FROM active_trades WHERE code=%s AND strategy=%s", + (code, strategy), + ) + else: + self.conn.execute("DELETE FROM active_trades WHERE code=%s", (code,)) logger.info(f"✅ [{trade['name']}] 매매 종료: 수익률 {profit_rate:.2f}% ({realized_pnl:+,.0f}원)") return True @@ -477,12 +947,21 @@ class TradeDB: logger.error(f"❌ close_trade 실패 ({code}): {e}") return False - def delete_active_trade(self, code: str): - """활성 트레이드 삭제 (긴급 정리용)""" + def delete_active_trade(self, code: str, strategy: str = None): + """ + 활성 트레이드 삭제 (긴급 정리용). + strategy 지정 시 해당 봇 row만 삭제, None이면 해당 종목 전체 삭제. + """ try: with self.conn: - self.conn.execute("DELETE FROM active_trades WHERE code=?", (code,)) - logger.info(f"🗑️ active_trade 삭제: {code}") + if strategy: + self.conn.execute( + "DELETE FROM active_trades WHERE code=%s AND strategy=%s", + (code, strategy), + ) + else: + self.conn.execute("DELETE FROM active_trades WHERE code=%s", (code,)) + logger.info(f"🗑️ active_trade 삭제: {code}" + (f" [{strategy}]" if strategy else "")) return True except Exception as e: logger.error(f"❌ 삭제 실패 ({code}): {e}") @@ -522,12 +1001,12 @@ class TradeDB: """ try: cursor = self.conn.execute(""" - SELECT COALESCE(SUM(amount), 0), COUNT(*) + SELECT COALESCE(SUM(amount), 0) AS total_amount, COUNT(*) AS cnt FROM buy_execution_log - WHERE buy_date = ? AND strategy LIKE ? + WHERE buy_date = %s AND strategy LIKE %s """, (date_str, strategy_prefix.strip().upper() + "%")) row = cursor.fetchone() - return (float(row[0]), int(row[1])) + return (float(row["total_amount"]), int(row["cnt"])) except Exception as e: logger.error(f"❌ get_daily_buy_amount 실패: {e}") return (0.0, 0) @@ -731,34 +1210,66 @@ class TradeDB: def update_target_candidates(self, candidates: List[Dict]): """ 매수 후보군 업데이트 (5분마다 호출) - + Args: - candidates: [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...] + candidates: [{ + 'code': '005930', 'name': '삼성전자', + 'score': 5.2, 'price': 75000, + 'market': 'K', # 선택 (없으면 stock_meta에서 조회) + 'sector': '반도체', # 선택 + 'theme': 'AI반도체' # 선택 + }, ...] """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - + + # stock_meta 캐시: 테마/섹터가 없는 항목을 DB에서 보완 + meta_cache: Dict[str, Dict] = {} + # 기존 데이터 전체 삭제 (5분마다 새로 갱신) with self.conn: self.conn.execute("DELETE FROM target_candidates") - - # 새 후보군 삽입 + for item in candidates: + code = item['code'] + # market/sector/theme 없으면 stock_meta에서 조회 + market = item.get('market') or "" + sector = item.get('sector') or "" + theme = item.get('theme') or "" + if not (market and sector and theme): + if code not in meta_cache: + try: + r = self.conn.execute( + "SELECT market, sector, theme FROM stock_meta WHERE code=%s", + (code,) + ).fetchone() + meta_cache[code] = dict(r) if r else {} + except Exception: + meta_cache[code] = {} + m = meta_cache[code] + market = market or m.get("market") or "Q" + sector = sector or m.get("sector") or "" + theme = theme or m.get("theme") or "" + self.conn.execute(""" - INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO target_candidates + (code, name, score, price, scan_time, updated_at, market, sector, theme) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( - item['code'], - item['name'], - item['score'], - item['price'], + code, + item.get('name', ''), + item.get('score', 0), + item.get('price', 0), scan_time, - scan_time + scan_time, + market, + sector, + theme, )) - + logger.info(f"✅ 매수 후보군 DB 저장: {len(candidates)}개") return True - + except Exception as e: logger.error(f"❌ 후보군 저장 실패: {e}") return False @@ -767,87 +1278,327 @@ class TradeDB: """ 매수 후보군 개별 추가 (통과 즉시 저장용, UPSERT 방식) - 500개 스캔 시 시간이 오래 걸려서 통과하는 즉시 DB에 저장 - + Args: - candidate: {'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, ...} + candidate: { + 'code': '005930', 'name': '삼성전자', + 'score': 5.2, 'price': 75000, + 'market': 'K', 'sector': '반도체', 'theme': 'AI반도체' # 선택 + } """ try: scan_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - + code = candidate['code'] + market = candidate.get('market') or "" + sector = candidate.get('sector') or "" + theme = candidate.get('theme') or "" + + # 없으면 stock_meta 에서 보완 + if not (market and sector and theme): + try: + r = self.conn.execute( + "SELECT market, sector, theme FROM stock_meta WHERE code=%s", + (code,) + ).fetchone() + if r: + m = dict(r) + market = market or m.get("market") or "Q" + sector = sector or m.get("sector") or "" + theme = theme or m.get("theme") or "" + except Exception: + pass + with self.conn: - # UPSERT: 있으면 업데이트, 없으면 삽입 self.conn.execute(""" - INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(code) DO UPDATE SET - name = excluded.name, - score = excluded.score, - price = excluded.price, - scan_time = excluded.scan_time, - updated_at = excluded.updated_at + INSERT INTO target_candidates + (code, name, score, price, scan_time, updated_at, market, sector, theme) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + name = VALUES(name), + score = VALUES(score), + price = VALUES(price), + scan_time = VALUES(scan_time), + updated_at = VALUES(updated_at), + market = VALUES(market), + sector = VALUES(sector), + theme = VALUES(theme) """, ( - candidate['code'], + code, candidate.get('name', ''), candidate.get('score', 0), candidate.get('price', 0), - scan_time, - scan_time + scan_time, scan_time, + market, sector, theme, )) - + return True - + except Exception as e: logger.debug(f"후보 개별 저장 실패({candidate.get('code', '')}): {e}") return False def get_target_candidates(self) -> List[Dict]: """ - 매수 후보군 조회 (점수 순) - + 매수 후보군 조회 (점수 순). + stock_meta JOIN으로 테마/섹터 보완 반환. + Returns: - [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000}, ...] + [{'code': '005930', 'name': '삼성전자', 'score': 5.2, 'price': 75000, + 'market': 'K', 'sector': '반도체', 'theme': 'AI반도체'}, ...] """ try: cursor = self.conn.execute(""" - SELECT code, name, score, price, scan_time - FROM target_candidates - ORDER BY score DESC, price ASC + SELECT + t.code, t.name, t.score, t.price, t.scan_time, + COALESCE(t.market, m.market, 'Q') AS market, + COALESCE(t.sector, m.sector, '') AS sector, + COALESCE(t.theme, m.theme, '') AS theme + FROM target_candidates t + LEFT JOIN stock_meta m ON m.code = t.code + ORDER BY t.score DESC, t.price ASC """) rows = cursor.fetchall() - + result = [] for row in rows: result.append({ - 'code': row['code'], - 'name': row['name'], - 'score': row['score'], - 'price': row['price'], - 'scan_time': row['scan_time'] + 'code': row['code'], + 'name': row['name'], + 'score': row['score'], + 'price': row['price'], + 'scan_time': row['scan_time'], + 'market': row.get('market', 'Q'), + 'sector': row.get('sector', ''), + 'theme': row.get('theme', ''), }) - + return result - + except Exception as e: logger.error(f"❌ 후보군 조회 실패: {e}") return [] - + + # ------------------------------------------------------------------ + # stock_meta 테마/섹터 메타데이터 헬퍼 + # ------------------------------------------------------------------ + + def upsert_stock_meta( + self, + code: str, + name: str = "", + market: str = "Q", + sector_code: str = "", + sector: str = "", + theme: str = "", + theme_rank: int = 3, + ) -> bool: + """ + 종목 메타데이터 저장/갱신 (스캐너·수동 입력 모두 사용). + + Args: + code : 종목코드 (6자리) + name : 종목명 + market : 'K'=KOSPI, 'Q'=KOSDAQ, 'E'=ETF + sector_code: 업종코드 (KIS bstp_cls_code 등) + sector : 업종명 (예: '반도체') + theme : 테마명 (예: 'AI반도체', '2차전지') + theme_rank : 테마 내 순위 (1=핵심주, 2=연관, 3=주변) + """ + try: + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with self.conn: + self.conn.execute(""" + INSERT INTO stock_meta + (code, name, market, sector_code, sector, theme, theme_rank, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + name = IF(VALUES(name) != '', VALUES(name), name), + market = IF(VALUES(market) != '', VALUES(market), market), + sector_code = IF(VALUES(sector_code) != '', VALUES(sector_code), sector_code), + sector = IF(VALUES(sector) != '', VALUES(sector), sector), + theme = IF(VALUES(theme) != '', VALUES(theme), theme), + theme_rank = IF(VALUES(theme) != '', VALUES(theme_rank), theme_rank), + updated_at = VALUES(updated_at) + """, (code, name, market, sector_code, sector, theme, theme_rank, now)) + return True + except Exception as e: + logger.debug("upsert_stock_meta 실패(%s): %s", code, e) + return False + + def get_stock_meta(self, code: str) -> Optional[Dict]: + """종목 메타데이터 단건 조회. 없으면 None.""" + try: + row = self.conn.execute( + "SELECT * FROM stock_meta WHERE code = %s", (code,) + ).fetchone() + return dict(row) if row else None + except Exception as e: + logger.debug("get_stock_meta 실패(%s): %s", code, e) + return None + + def get_theme_momentum(self, theme: str, tf: int = 60) -> Dict: + """ + 테마 종목들의 최신 확정 봉(timeframe=tf) RSI 통계. + "이 테마가 지금 뜨겁냐 식었냐"를 숫자로 파악. + + Args: + theme: 테마명 (stock_meta.theme) + tf : 봉 단위 분 (기본 60분봉) + + Returns: + { + 'theme': 'AI반도체', + 'tf': 60, + 'count': 5, # 데이터 있는 종목 수 + 'avg_rsi3': 62.1, # 평균 RSI(3) + 'max_rsi3': 78.4, # 최고 RSI(3) + 'min_rsi3': 44.2, # 최저 RSI(3) + 'hot_count': 2, # RSI>70 (과열) 종목 수 + 'cold_count': 1, # RSI<30 (과매도) 종목 수 + } + """ + try: + row = self.conn.execute(""" + SELECT + COUNT(*) AS cnt, + AVG(w.rsi_3) AS avg_rsi, + MAX(w.rsi_3) AS max_rsi, + MIN(w.rsi_3) AS min_rsi, + SUM(w.rsi_3 > 70) AS hot_cnt, + SUM(w.rsi_3 < 30) AS cold_cnt + FROM ws_candles w + INNER JOIN stock_meta m ON m.code = w.code + WHERE m.theme = %s + AND w.timeframe = %s + AND w.is_confirmed = 1 + AND w.candle_time = ( + SELECT MAX(candle_time) + FROM ws_candles + WHERE code = w.code AND timeframe = w.timeframe + ) + """, (theme, tf)).fetchone() + + if not row or not row['cnt']: + return {'theme': theme, 'tf': tf, 'count': 0} + return { + 'theme': theme, + 'tf': tf, + 'count': int(row['cnt'] or 0), + 'avg_rsi3': round(float(row['avg_rsi'] or 0), 1), + 'max_rsi3': round(float(row['max_rsi'] or 0), 1), + 'min_rsi3': round(float(row['min_rsi'] or 0), 1), + 'hot_count': int(row['hot_cnt'] or 0), + 'cold_count':int(row['cold_cnt'] or 0), + } + except Exception as e: + logger.debug("get_theme_momentum 실패(%s, %dM): %s", theme, tf, e) + return {'theme': theme, 'tf': tf, 'count': 0} + + def get_all_theme_momentum(self, tf: int = 60) -> List[Dict]: + """ + 모든 테마의 최신 모멘텀 요약 (테마 히트맵). + 가장 뜨거운 테마부터 정렬 반환. + """ + try: + rows = self.conn.execute(""" + SELECT + m.theme, + COUNT(DISTINCT w.code) AS cnt, + AVG(w.rsi_3) AS avg_rsi, + SUM(w.rsi_3 > 70) AS hot_cnt, + SUM(w.rsi_3 < 30) AS cold_cnt + FROM ws_candles w + INNER JOIN stock_meta m ON m.code = w.code + WHERE m.theme IS NOT NULL AND m.theme != '' + AND w.timeframe = %s + AND w.is_confirmed = 1 + AND w.candle_time = ( + SELECT MAX(candle_time) + FROM ws_candles + WHERE code = w.code AND timeframe = w.timeframe + ) + GROUP BY m.theme + ORDER BY avg_rsi DESC + """, (tf,)).fetchall() + + return [{ + 'theme': r['theme'], + 'count': int(r['cnt'] or 0), + 'avg_rsi3': round(float(r['avg_rsi'] or 0), 1), + 'hot_count': int(r['hot_cnt'] or 0), + 'cold_count':int(r['cold_cnt'] or 0), + } for r in rows] + except Exception as e: + logger.debug("get_all_theme_momentum 실패: %s", e) + return [] + + def get_market_regime(self, codes: list = None, tf: int = 60) -> Dict: + """ + 영구 구독 ETF (KODEX200 등)의 최신 60분봉 RSI로 시장 방향 판단. + 스캘핑/꼬리잡기 진입 전 "지금 상승장인가?" 필터로 활용. + + Args: + codes: ETF 코드 리스트 (None=env PERMANENT_WS_CODES) + tf : 봉 단위 분 (기본 60) + + Returns: + { + 'is_bull': True, # avg_rsi > 50 → 상승 국면 + 'avg_rsi': 58.4, + 'details': [{'code': '069500', 'rsi': 61.2}, ...] + } + """ + try: + if not codes: + raw = self.conn.execute( + "SELECT PERMANENT_WS_CODES FROM env_config ORDER BY id DESC LIMIT 1" + ).fetchone() + perm = str(raw['PERMANENT_WS_CODES'] if raw else "") if raw else "" + codes = [c.strip() for c in perm.split(",") if c.strip()] or ["069500", "229200"] + + rows = self.conn.execute(""" + SELECT code, rsi_3 + FROM ws_candles + WHERE code IN ({}) + AND timeframe = %s + AND is_confirmed = 1 + AND candle_time = ( + SELECT MAX(candle_time) FROM ws_candles + WHERE code = ws_candles.code AND timeframe = ws_candles.timeframe + ) + """.format(",".join(["%s"] * len(codes))), (*codes, tf)).fetchall() + + details = [{'code': r['code'], 'rsi': round(float(r['rsi_3'] or 50), 1)} for r in rows] + avg_rsi = sum(d['rsi'] for d in details) / len(details) if details else 50.0 + return { + 'is_bull': avg_rsi > 50, + 'avg_rsi': round(avg_rsi, 1), + 'details': details, + } + except Exception as e: + logger.debug("get_market_regime 실패: %s", e) + return {'is_bull': True, 'avg_rsi': 50.0, 'details': []} + def get_trades_by_date(self, date_str: str) -> List[Dict]: """ 특정 날짜의 매매 기록 조회 Args: - date_str: 날짜 (YYYYMMDD) + date_str: 날짜 (YYYYMMDD 또는 YYYY-MM-DD 모두 허용) Returns: 매매 기록 리스트 """ try: - # YYYYMMDD -> YYYY-MM-DD 변환 - date_formatted = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}" + # YYYYMMDD(8자) → YYYY-MM-DD 변환, 이미 YYYY-MM-DD(10자)면 그대로 사용 + if len(date_str) == 8 and "-" not in date_str: + date_formatted = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}" + else: + date_formatted = date_str[:10] # 'YYYY-MM-DD'만 사용 (시간 부분 제거) cursor = self.conn.execute(""" SELECT * FROM trade_history - WHERE DATE(sell_date) = ? + WHERE DATE(sell_date) = %s ORDER BY sell_date DESC """, (date_formatted,)) @@ -894,14 +1645,13 @@ class TradeDB: if not isinstance(snapshot, dict): return None now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - key_list = ", ".join(f'"{k}"' for k in ENV_CONFIG_KEYS) - placeholders = ", ".join(["?"] * (1 + len(ENV_CONFIG_KEYS))) + key_list = ", ".join(f"`{k}`" for k in ENV_CONFIG_KEYS) + placeholders = ", ".join(["%s"] * (1 + len(ENV_CONFIG_KEYS))) vals = [now] + [snapshot.get(k) for k in ENV_CONFIG_KEYS] cur = self.conn.execute( f"INSERT INTO env_config (created_at, {key_list}) VALUES ({placeholders})", vals, ) - self.conn.commit() return cur.lastrowid except Exception as e: logger.error(f"❌ env_config INSERT 실패: {e}") @@ -933,6 +1683,193 @@ class TradeDB: logger.error(f"❌ env_config 최신 조회 실패: {e}") return None + # ============================================================ + # [kv_store] 매터모스트 원격 조종용 키-값 (마지막 AI 추천, last_seen 등) + # ============================================================ + + def get_kv(self, key: str) -> Optional[str]: + """kv_store에서 값 조회. 없으면 None.""" + try: + row = self.conn.execute("SELECT v FROM kv_store WHERE k = ?", (key,)).fetchone() + return row["v"] if row else None + except Exception as e: + logger.debug(f"get_kv 실패: {e}") + return None + + def set_kv(self, key: str, value: str) -> bool: + """kv_store에 값 저장 (REPLACE INTO = INSERT OR REPLACE MySQL 버전).""" + try: + self.conn.execute("REPLACE INTO kv_store (k, v) VALUES (%s, %s)", (key, value)) + return True + except Exception as e: + logger.error(f"❌ set_kv 실패: {e}") + return False + + def set_last_ai_recommendations(self, text: str) -> bool: + """마지막 AI 수치 추천문 저장 (!적용 시 사용).""" + return self.set_kv("last_ai_recommendations", text) + + def get_last_ai_recommendations(self) -> Optional[str]: + """마지막 AI 수치 추천문 조회. 없으면 None.""" + return self.get_kv("last_ai_recommendations") + + # ------------------------------------------------------------------ + # [ai_analysis_log] Butler 분석 기록 (프롬프트 요약·응답 저장/조회) + # ------------------------------------------------------------------ + def insert_ai_analysis_log(self, model: str, context_summary: str, response: str) -> Optional[int]: + """AI 분석 한 건 저장. model=claude|gemini, context_summary=현재상태 요약, response=AI 전체 응답.""" + try: + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cur = self.conn.execute( + "INSERT INTO ai_analysis_log (created_at, model, context_summary, response) VALUES (%s, %s, %s, %s)", + (now, model, (context_summary or "")[:2000], response or ""), + ) + return cur.lastrowid + except Exception as e: + logger.error("insert_ai_analysis_log 실패: %s", e) + return None + + def get_ai_analysis_log_list(self, limit: int = 10) -> list: + """최근 N건 목록. 각 항목: id, created_at, model, context_summary, response(앞 400자).""" + try: + rows = self.conn.execute( + """ + SELECT id, created_at, model, context_summary, response + FROM ai_analysis_log + ORDER BY id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + out = [] + for r in rows: + resp = (r["response"] or "")[:400] + if len(r["response"] or "") > 400: + resp += "..." + out.append({"id": r["id"], "created_at": r["created_at"], "model": r["model"], + "context_summary": r["context_summary"], "response_preview": resp}) + return out + except Exception as e: + logger.error("get_ai_analysis_log_list 실패: %s", e) + return [] + + def get_ai_analysis_log_by_id(self, log_id: int) -> Optional[dict]: + """id로 전체 한 건 조회. 없으면 None.""" + try: + row = self.conn.execute( + "SELECT id, created_at, model, context_summary, response FROM ai_analysis_log WHERE id = ?", + (log_id,), + ).fetchone() + if not row: + return None + return {"id": row["id"], "created_at": row["created_at"], "model": row["model"], + "context_summary": row["context_summary"], "response": row["response"]} + except Exception as e: + logger.error("get_ai_analysis_log_by_id 실패: %s", e) + return None + + # ================================================================== + # ws_candles 헬퍼 (kis_scalping_ver1 전용) + # ================================================================== + + def upsert_ws_candle(self, code: str, timeframe: int, candle_time: str, + open_: float, high: float, low: float, close: float, + volume: int, is_confirmed: int = 0, source: str = "ws", + rsi_2: float = None, rsi_3: float = None, rsi_5: float = None): + """ + 봉 1개를 INSERT OR REPLACE 로 저장. + - 진행 중인 봉(is_confirmed=0): 틱마다 close/high/low/volume 갱신 + - 확정 봉(is_confirmed=1): RSI 계산 후 최종 저장 + """ + now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + with self.conn: + self.conn.execute(""" + INSERT INTO ws_candles + (code, timeframe, candle_time, open, high, low, close, volume, + rsi_2, rsi_3, rsi_5, is_confirmed, source, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(code, timeframe, candle_time) DO UPDATE SET + high = MAX(high, excluded.high), + low = MIN(low, excluded.low), + close = excluded.close, + volume = excluded.volume, + rsi_2 = COALESCE(excluded.rsi_2, rsi_2), + rsi_3 = COALESCE(excluded.rsi_3, rsi_3), + rsi_5 = COALESCE(excluded.rsi_5, rsi_5), + is_confirmed = excluded.is_confirmed, + source = excluded.source, + updated_at = excluded.updated_at + """, (code, timeframe, candle_time, open_, high, low, close, volume, + rsi_2, rsi_3, rsi_5, is_confirmed, source, now_str)) + except Exception as e: + logger.error("upsert_ws_candle 실패(%s): %s", code, e) + + def get_ws_candles(self, code: str, timeframe: int, limit: int = 100, + confirmed_only: bool = False) -> list: + """ + 최근 N개 봉 리스트 반환 (오래된 순 → 최신 순). + confirmed_only=True 면 확정 봉만 반환 (RSI 계산용). + """ + try: + cond = "AND is_confirmed = 1" if confirmed_only else "" + rows = self.conn.execute(f""" + SELECT candle_time, open, high, low, close, volume, + rsi_2, rsi_3, rsi_5, is_confirmed, source + FROM ws_candles + WHERE code = ? AND timeframe = ? {cond} + ORDER BY candle_time DESC + LIMIT ? + """, (code, timeframe, limit)).fetchall() + # 최신→오래된 순으로 왔으므로 역정렬해서 반환 (오래된→최신) + result = [dict(r) for r in rows] + result.reverse() + return result + except Exception as e: + logger.error("get_ws_candles 실패(%s): %s", code, e) + return [] + + def get_latest_ws_candle(self, code: str, timeframe: int) -> Optional[dict]: + """가장 최신 봉 1개 반환 (진행 중 봉 포함).""" + try: + row = self.conn.execute(""" + SELECT candle_time, open, high, low, close, volume, + rsi_2, rsi_3, rsi_5, is_confirmed + FROM ws_candles + WHERE code = ? AND timeframe = ? + ORDER BY candle_time DESC LIMIT 1 + """, (code, timeframe)).fetchone() + return dict(row) if row else None + except Exception as e: + logger.error("get_latest_ws_candle 실패(%s): %s", code, e) + return None + + def get_latest_confirmed_ws_candle(self, code: str, timeframe: int) -> Optional[dict]: + """가장 최신 확정 봉 1개 반환 (RSI 포함).""" + try: + row = self.conn.execute(""" + SELECT candle_time, open, high, low, close, volume, + rsi_2, rsi_3, rsi_5 + FROM ws_candles + WHERE code = ? AND timeframe = ? AND is_confirmed = 1 + ORDER BY candle_time DESC LIMIT 1 + """, (code, timeframe)).fetchone() + return dict(row) if row else None + except Exception as e: + logger.error("get_latest_confirmed_ws_candle 실패(%s): %s", code, e) + return None + + def cleanup_old_ws_candles(self, keep_days: int = 3): + """오래된 ws_candles 정리 (기본 3일 이상 지난 봉 삭제).""" + cutoff = (datetime.datetime.now() - datetime.timedelta(days=keep_days)).strftime("%Y%m%d%H%M") + try: + with self.conn: + self.conn.execute( + "DELETE FROM ws_candles WHERE candle_time < ?", (cutoff,) + ) + except Exception as e: + logger.error("cleanup_old_ws_candles 실패: %s", e) + def close(self): """DB 연결 종료""" if self.conn: