#!/usr/bin/env python3 """ holding_bot.py — 장기 홀딩 전략 봇 (추세추종 + 눌림목 분할매수) ======================================================================= 전략 개요: [추세 상승: MA20 > MA60] 매수: RSI 눌림목(rsi_buy1/2) 시 분할 매수 — 기준 완화(추세장 특성) 매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락) OR RSI 과열(rsi_sell) OR 데드크로스(MA20 < MA60) 발생 시 추세 반전 청산 [추세 하락/횡보: MA20 ≤ MA60] (trend_filter=0 이면 전 구간 적용) 매수: 기존 RSI 3단계 역추세 분할매수 (rsi_buy1/2/3) 매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct) 봉: 일봉(D) 기준, KIS API로 최대 100봉씩 페이지네이션 수집 DB 테이블: holding_candles — 종목별 일봉 OHLCV (별도 관리) holding_stock_config — 종목별 파라미터 (code별 최신 INSERT 방식) 실행: python3 holding_bot.py """ import os, sys, time, json, logging, random from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Optional, Tuple import requests ROOT = Path(__file__).resolve().parent sys.path.insert(0, str(ROOT)) from database import TradeDB logging.basicConfig( format="[%(asctime)s] %(message)s", datefmt="%H:%M:%S", level=logging.INFO, ) logger = logging.getLogger("HoldingBot") logging.getLogger("TradeDB").setLevel(logging.WARNING) # 관심종목 파일 WATCHLIST_PATH = ROOT / "long_term_watchlist.json" # ───────────────────────────────────────────────────────────────────────────── # 기본 파라미터 (종목별 DB에 없으면 이 값 사용) # ───────────────────────────────────────────────────────────────────────────── DEFAULT_STOCK_CONFIG = { "rsi_period": 14, # RSI 계산 기간 "rsi_buy1": 55.0, # 1차 매수 RSI ≤ (추세장: 50~55 / 횡보장: 45 이하) "rsi_buy2": 45.0, # 2차 매수 RSI ≤ "rsi_buy3": 35.0, # 3차 매수 RSI ≤ (횡보/하락장 전용) "rsi_sell": 75.0, # RSI 과열 매도 기준 (추세장에선 80 수준으로 올려 조기매도 방지) "take_profit_pct": 15.0, # 수익률 익절 기준 (%) — 추세장에선 크게 잡아야 탈 안 남 "stop_loss_pct": 10.0, # 손절 기준 (%) — 양수로 저장, 음수 방향 적용 "buy1_ratio": 50.0, # 1차 투자 비중 (%) — 추세장엔 처음부터 크게 "buy2_ratio": 50.0, # 2차 투자 비중 (%) "buy3_ratio": 0.0, # 3차 투자 비중 (%) — 추세장에선 0 (자금 보류 안 함) "slot_money": 3_000_000, # 종목당 총 투자금 (원) "atr_period": 14, # ATR 계산 기간 (True Range의 지수이동평균) # ── 샹들리에 엑시트 (Chandelier Exit) ───────────────────────────── # 추세장 청산에서 사용: 고점 − ATR × atr_mult → 변동성 기반 동적 방어선 # mult=2: 타이트(빠른 청산) / mult=3: 기본 / mult=4: 여유롭게(더 오래 홀딩) "atr_mult": 3.0, # 샹들리에 엑시트 배수 (ATR_MULT) # ── 추세추종 신규 파라미터 ────────────────────────────────────── # MA 골든/데드크로스 기반 추세 판단 "ma_fast": 20.0, # 단기 이동평균 기간 (일봉 기준 약 1달) "ma_slow": 60.0, # 장기 이동평균 기간 (일봉 기준 약 3달) # 트레일링 스탑: 포지션 보유 중 고점 대비 이 비율 이상 하락 시 청산 (0=비활성) # 예) 8.0 → 고점 대비 -8% 하락하면 추세 이탈로 판단해 청산 "trail_stop_pct": 8.0, # 추세 필터 사용 여부 (1=사용, 0=미사용 → 기존 역추세 방식으로 고정) "trend_filter": 1.0, # ── 컨텍스트 낙폭 필터 (0=비활성화) ────────────────────────── "ath_drop_min_pct": 0.0, "year_drop_min_pct": 0.0, "w52_drop_min_pct": 0.0, } # ───────────────────────────────────────────────────────────────────────────── # 관심종목 로드 # ───────────────────────────────────────────────────────────────────────────── def load_watchlist() -> List[Dict]: try: with open(WATCHLIST_PATH, encoding="utf-8") as f: data = json.load(f) return data.get("items", []) except Exception as e: logger.error(f"관심종목 파일 로드 실패: {e}") return [] # ───────────────────────────────────────────────────────────────────────────── # DB 테이블 생성 (holding 전용) # ───────────────────────────────────────────────────────────────────────────── HOLDING_DDL = [ """ CREATE TABLE IF NOT EXISTS holding_min_candles ( id BIGINT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) NOT NULL, candle_dt DATETIME NOT NULL COMMENT '60분봉 기준 시작 시간(KST)', tf SMALLINT NOT NULL DEFAULT 60 COMMENT '분봉 단위 (60=60분봉)', open FLOAT NOT NULL DEFAULT 0, high FLOAT NOT NULL DEFAULT 0, low FLOAT NOT NULL DEFAULT 0, close FLOAT NOT NULL DEFAULT 0, volume BIGINT NOT NULL DEFAULT 0, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uq_holding_min (code, tf, candle_dt) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS holding_candles ( id BIGINT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) NOT NULL, candle_date DATE NOT NULL, open FLOAT NOT NULL DEFAULT 0, high FLOAT NOT NULL DEFAULT 0, low FLOAT NOT NULL DEFAULT 0, close FLOAT NOT NULL DEFAULT 0, volume BIGINT NOT NULL DEFAULT 0, change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)', updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uq_holding_candle (code, candle_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS holding_stock_config ( id BIGINT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) NOT NULL, name VARCHAR(50) DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, rsi_period FLOAT DEFAULT 14, rsi_buy1 FLOAT DEFAULT 45, rsi_buy2 FLOAT DEFAULT 40, rsi_buy3 FLOAT DEFAULT 35, rsi_sell FLOAT DEFAULT 70, take_profit_pct FLOAT DEFAULT 5.0, stop_loss_pct FLOAT DEFAULT 10.0, buy1_ratio FLOAT DEFAULT 30, buy2_ratio FLOAT DEFAULT 30, buy3_ratio FLOAT DEFAULT 40, slot_money FLOAT DEFAULT 3000000, atr_period FLOAT DEFAULT 14, ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)', year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)', w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)', ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간', ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간', trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)', trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)', KEY idx_holding_config_code (code) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, ] def ensure_holding_tables(db: TradeDB): """holding 전용 테이블 없으면 생성, 기존 테이블에 신규 컬럼 추가(마이그레이션)""" for ddl in HOLDING_DDL: db.conn.execute(ddl.strip()) db.conn.commit() # ── 기존 테이블에 신규 컬럼 추가 (이미 있으면 무시) ───────────── migrations = [ # holding_candles: change_rate 추가 "ALTER TABLE holding_candles ADD COLUMN change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)'", # holding_stock_config: 낙폭 필터 3종 추가 "ALTER TABLE holding_stock_config ADD COLUMN ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)'", "ALTER TABLE holding_stock_config ADD COLUMN year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)'", "ALTER TABLE holding_stock_config ADD COLUMN w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)'", # 추세추종 신규 파라미터 "ALTER TABLE holding_stock_config ADD COLUMN ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간'", "ALTER TABLE holding_stock_config ADD COLUMN ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간'", "ALTER TABLE holding_stock_config ADD COLUMN trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)'", "ALTER TABLE holding_stock_config ADD COLUMN trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)'", ] for sql in migrations: try: db.conn.execute(sql) db.conn.commit() except Exception: pass # 이미 컬럼 존재 → 무시 logger.info("✅ holding_candles / holding_stock_config 테이블 확인 완료") # ───────────────────────────────────────────────────────────────────────────── # 종목별 파라미터 관리 # ───────────────────────────────────────────────────────────────────────────── def get_stock_config(db: TradeDB, code: str) -> Dict: """종목별 최신 파라미터 조회 (없으면 DEFAULT 반환)""" row = db.conn.execute( "SELECT * FROM holding_stock_config WHERE code=%s ORDER BY id DESC LIMIT 1", [code] ).fetchone() if row: cfg = dict(row) cfg.pop("id", None) cfg.pop("created_at", None) return cfg # 기본값 반환 (DB에 없으면) cfg = dict(DEFAULT_STOCK_CONFIG) cfg["code"] = code cfg["name"] = "" return cfg def set_stock_config(db: TradeDB, code: str, name: str, params: Dict): """종목별 파라미터 저장 (INSERT 신규 row = 최신값 추가 방식)""" cols = ["code", "name"] vals = [code, name] for k, v in params.items(): if k in DEFAULT_STOCK_CONFIG: cols.append(k) vals.append(float(v)) col_sql = ", ".join(cols) ph_sql = ", ".join(["%s"] * len(vals)) db.conn.execute( f"INSERT INTO holding_stock_config ({col_sql}) VALUES ({ph_sql})", vals ) db.conn.commit() # ───────────────────────────────────────────────────────────────────────────── # KIS API — 일봉 OHLCV 조회 (최대 100봉씩 페이지네이션) # ───────────────────────────────────────────────────────────────────────────── def _get_kis_token(db: TradeDB) -> Tuple[str, str, str, bool]: """ env_config에서 KIS 인증 정보 조회. KIS_MOCK=true → 모의 키/도메인 사용 (시세 조회도 모의 도메인에서 동작) KIS_MOCK=false → 실전 키/도메인 사용 None 값은 빈 문자열로 안전 처리. """ row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone() if not row: raise RuntimeError("env_config 테이블이 비어 있습니다.") r = dict(row) def safe(val): return val if val else "" is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes") if is_mock: app_key = safe(r.get("KIS_APP_KEY_MOCK")) app_secret = safe(r.get("KIS_APP_SECRET_MOCK")) base_url = "https://openapivts.koreainvestment.com:29443" else: app_key = safe(r.get("KIS_APP_KEY_REAL")) or safe(r.get("KIS_APP_KEY")) app_secret = safe(r.get("KIS_APP_SECRET_REAL")) or safe(r.get("KIS_APP_SECRET")) base_url = "https://openapi.koreainvestment.com:9443" if not app_key or not app_secret: raise RuntimeError( f"API 키 미설정 (KIS_MOCK={is_mock})\n" f"필요 키: {'KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK' if is_mock else 'KIS_APP_KEY_REAL / KIS_APP_SECRET_REAL'}" ) return app_key, app_secret, base_url, is_mock def _ensure_token(mock: bool) -> str: """ KisTokenManager 싱글톤으로 토큰 반환. - 유효하면 메모리 캐시 즉시 반환 - 만료 10분 전 선제 갱신 후 반환 - KIS 23시간 정책 자동 준수 """ try: from kis_token_manager import KisTokenManager token = KisTokenManager.instance(is_mock=mock).get_token() if token: logger.info("🔑 KIS 토큰 준비 완료 [%s] (앞8자: %s…)", "모의" if mock else "실전", token[:8]) return token except Exception as e: logger.warning("KisTokenManager 실패, 파일 폴백: %s", e) # 폴백: 직접 파일 읽기 path = ROOT / (".kis_token_cache_mock.json" if mock else ".kis_token_cache_real.json") if not path.exists(): raise RuntimeError( f"토큰 캐시 파일 없음: {path}\n" "kis_short_ver2.py 또는 kis_scalping_ver1.py 를 한 번 실행하면 자동 생성됩니다." ) try: cache = json.loads(path.read_text(encoding="utf-8")) except Exception as e: raise RuntimeError(f"토큰 캐시 파일 읽기 실패: {e}") token = cache.get("access_token", "") if not token: raise RuntimeError("토큰 캐시 파일에 access_token 없음") logger.info("🔑 KIS 토큰 파일 폴백 재사용 [%s] (앞8자: %s…)", "모의" if mock else "실전", token[:8]) return token def fetch_daily_ohlcv( code: str, start_date: str, # "YYYY-MM-DD" end_date: str, # "YYYY-MM-DD" app_key: str, app_secret: str, base_url: str, mock: bool = False, max_retries: int = 3, ) -> List[Dict]: """ KIS API 일봉 OHLCV 조회 (날짜 범위, 100봉씩 자동 페이지네이션) - 토큰은 기존 .kis_token_cache_real/mock.json 캐시 파일 재사용 (새 발급 안 함) 반환: [{"date": "YYYYMMDD", "open": float, "high": float, "low": float, "close": float, "volume": int}, ...] 최신 → 과거 순 """ token = _ensure_token(mock) # 캐시 파일에서 읽기만 url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" # 날짜 형식 통일 YYYYMMDD sd = start_date.replace("-", "") ed = end_date.replace("-", "") all_rows: List[Dict] = [] cursor_end = ed # 첫 호출은 end_date부터 과거 방향 조회 for attempt_page in range(100): # 최대 ~10,000봉 # 페이지당 ~140일 윈도우 (약 100 거래일) # DATE_1을 cursor_end 기준 180일 전으로 설정해야 API가 정상 응답 window_start = ( datetime.strptime(cursor_end, "%Y%m%d") - timedelta(days=180) ).strftime("%Y%m%d") # 단, 실제 요청 시작일(sd)보다 앞으로 가지 않음 page_date1 = max(window_start, sd) data = None for retry in range(max_retries): try: headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": app_key, "appsecret": app_secret, "tr_id": "FHKST03010100", } params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": page_date1, # 윈도우 시작 "FID_INPUT_DATE_2": cursor_end, # 윈도우 끝 (커서) "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "0", } resp = requests.get(url, headers=headers, params=params, timeout=10) if resp.status_code == 429: time.sleep((retry + 1) * 2) continue resp.raise_for_status() data = resp.json() break except Exception as e: if retry >= max_retries - 1: logger.error(f"❌ 일봉 조회 실패 {code} (page {attempt_page}): {e}") return all_rows time.sleep(2) if data is None: break rows = data.get("output2", []) if not rows: break done = False for item in rows: dt = item.get("stck_bsop_date", "") # YYYYMMDD if dt < sd: # start_date 이전 → 수집 완료 done = True break c = float(item.get("stck_clpr", 0) or 0) if c <= 0: continue all_rows.append({ "date": dt, "open": float(item.get("stck_oprc", 0) or 0), "high": float(item.get("stck_hgpr", 0) or 0), "low": float(item.get("stck_lwpr", 0) or 0), "close": c, "volume": int(item.get("acml_vol", 0) or 0), "change_rate": float(item.get("prdy_ctrt", 0) or 0), # 전일대비등락률(%) }) if done: break # 다음 페이지: 이번 배치 최고(가장 오래된) 날짜 - 1일 oldest_dt = rows[-1].get("stck_bsop_date", "") if not oldest_dt or oldest_dt <= sd: break prev = (datetime.strptime(oldest_dt, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d") if prev == cursor_end: # 무한루프 방지 break cursor_end = prev time.sleep(0.3) # API 과부하 방지 return all_rows def store_candles(db: TradeDB, code: str, rows: List[Dict]) -> int: """일봉 데이터를 holding_candles에 저장 (ON DUPLICATE KEY UPDATE)""" saved = 0 for r in rows: try: d = r["date"] date_fmt = f"{d[:4]}-{d[4:6]}-{d[6:8]}" db.conn.execute( """INSERT INTO holding_candles (code, candle_date, open, high, low, close, volume, change_rate) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), volume=VALUES(volume), change_rate=VALUES(change_rate)""", [code, date_fmt, r["open"], r["high"], r["low"], r["close"], r["volume"], r.get("change_rate", 0)] ) saved += 1 except Exception as e: logger.debug(f"캔들 저장 오류 ({code} {r.get('date')}): {e}") db.conn.commit() return saved def get_stored_candles(db: TradeDB, code: str, start_date: str = "", end_date: str = "") -> List[Dict]: """holding_candles에서 종목 일봉 조회 (날짜 오름차순, change_rate 포함)""" sql = """SELECT candle_date, open, high, low, close, volume, IFNULL(change_rate, 0) AS change_rate FROM holding_candles WHERE code=%s""" args = [code] if start_date: sql += " AND candle_date >= %s" args.append(start_date) if end_date: sql += " AND candle_date <= %s" args.append(end_date) sql += " ORDER BY candle_date ASC" rows = db.conn.execute(sql, args).fetchall() return [dict(r) for r in rows] # ───────────────────────────────────────────────────────────────────────────── # 60분봉 수집 / 저장 / 조회 # ───────────────────────────────────────────────────────────────────────────── class _KiwoomTokenManager: """ kiwoom_universe_scanner.py / kiwoom_trader_dual.py 의 TokenManager와 동일한 인터페이스 — DB에서 읽은 키를 직접 주입 (env 변수 오염 없음). TokenManager.get_token() 호환: Chart(token_manager=...) 에 그대로 전달 가능. """ def __init__(self, app_key: str, app_secret: str, is_mock: bool = False): self._app_key = app_key self._app_secret = app_secret self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com" self._token: Optional[str] = None self._expiry: Optional[datetime] = None def _is_valid(self) -> bool: if not self._token or not self._expiry: return False return datetime.now() < self._expiry - timedelta(seconds=30) def _request_new_token(self) -> None: """키움 /oauth2/token 으로 새 토큰 발급 (kiwoom_universe_scanner 와 동일 흐름)""" url = f"https://{self._domain}/oauth2/token" body = { "grant_type": "client_credentials", "appkey": self._app_key, "secretkey": self._app_secret, } resp = requests.post(url, json=body, timeout=15) data = resp.json() token = data.get("token") or data.get("access_token", "") if not token: raise RuntimeError( f"키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 실패: {data}" ) # 만료 시간 파싱 (expires_dt: "YYYYMMDDHHMMSS") exp_s = data.get("expires_dt", "") try: self._expiry = datetime.strptime(str(exp_s), "%Y%m%d%H%M%S") except Exception: self._expiry = datetime.now() + timedelta(hours=23) self._token = token logger.info( f"✅ 키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 완료" f" (앞8자: {token[:8]}…, 만료: {self._expiry})" ) def get_token(self) -> str: """ TokenManager.get_token() 호환 — 만료 시 자동 재발급. kis_ws.KiwoomTokenManager 싱글톤 풀을 공유하여 au10001 rate limit 방지. (동일 appkey로 kis_ws 에서 이미 발급받은 토큰은 재사용) """ try: from kis_ws import _get_kiwoom_token_cached is_mock = "mockapi" in self._domain token = _get_kiwoom_token_cached(self._app_key, self._app_secret, is_mock) if token: self._token = token return self._token except Exception: pass # kis_ws 임포트 실패 시 자체 로직으로 폴백 if not self._is_valid(): self._request_new_token() return self._token def fetch_60min_via_kiwoom( code: str, start_date: str, # "YYYY-MM-DD" end_date: str, # "YYYY-MM-DD" kiwoom_key: str, # DB env_config의 KIWOOM_APP_KEY_REAL 또는 KIWOOM_APP_KEY_MOCK kiwoom_secret: str, # DB env_config의 KIWOOM_APP_SECRET_REAL 또는 KIWOOM_APP_SECRET_MOCK is_mock: bool = False, # True → mockapi.kiwoom.com (모의) max_pages: int = 50, # 연속조회 최대 횟수 (1페이지 ≈ 900봉) ) -> List[Dict]: """ 키움증권 REST API (ka10080) 로 60분봉 수집 → holding_min_candles 저장용. kiwoom_universe_scanner.py / kiwoom_trader_dual.py 와 동일한 TokenManager + Chart 패턴 사용. ▶ KIS 한계: inquire-time-itemchartprice 는 당일(2일치) 데이터만 제공 ▶ 키움 장점: ka10080 1회 호출 = 900봉 (60분봉 기준 약 128영업일) cont-yn: Y + next-key 연속조회로 2년치 이상 수집 가능 반환: [{"candle_date": "YYYY-MM-DD HH:MM:SS", "open": float, "high": float, "low": float, "close": float, "volume": int}, ...] 최신 → 과거 순, start_date~end_date 범위 필터 적용 """ from kiwoom_rest_api.koreanstock.chart import Chart mode_str = "모의" if is_mock else "실전" base_url = "https://mockapi.kiwoom.com" if is_mock else "https://api.kiwoom.com" logger.info(f"키움 60분봉 수집 시작: {code} [{mode_str}] {start_date}~{end_date}") # ── 1. TokenManager + Chart 초기화 (kiwoom_universe_scanner 와 동일 패턴) ── try: token_manager = _KiwoomTokenManager(kiwoom_key, kiwoom_secret, is_mock=is_mock) chart = Chart(base_url=base_url, token_manager=token_manager) # 최초 토큰 발급 확인 _ = token_manager.get_token() except Exception as e: logger.error(f"❌ 키움 초기화 실패 [{mode_str}]: {e}") return [] # ── 2. 날짜 범위 파싱 ───────────────────────────────────────────────────── start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") rows: List[Dict] = [] cont_yn = "N" next_key = "" for page in range(max_pages): try: resp = chart.stock_minute_chart_request_ka10080( stk_cd = code, tic_scope = "60", # 60분봉 (1/3/5/10/15/30/45/60 지원) upd_stkpc_tp = "1", # 수정주가 반영 cont_yn = cont_yn, next_key = next_key, ) except Exception as e: logger.error(f"❌ 키움 ka10080 조회 실패 (page {page}): {e}") break records = resp.get("stk_min_pole_chart_qry") or [] if not records: logger.info(f"키움 60분봉: 더 이상 데이터 없음 (page {page})") break reached_start = False for rec in records: # cntr_tm: "YYYYMMDDHHMMSS" 형식 raw_dt = str(rec.get("cntr_tm", "")) if len(raw_dt) < 14: continue try: candle_dt = datetime( int(raw_dt[0:4]), int(raw_dt[4:6]), int(raw_dt[6:8]), int(raw_dt[8:10]), int(raw_dt[10:12]), int(raw_dt[12:14]), ) except Exception: continue # 역방향(최신→과거) 수집이므로 start_date보다 과거이면 종료 if candle_dt.date() < start_dt.date(): reached_start = True break if candle_dt.date() > end_dt.date(): continue try: rows.append({ "candle_date": candle_dt.strftime("%Y-%m-%d %H:%M:%S"), "open": abs(float(rec.get("open_pric", 0) or 0)), "high": abs(float(rec.get("high_pric", 0) or 0)), "low": abs(float(rec.get("low_pric", 0) or 0)), "close": abs(float(rec.get("cur_prc", 0) or 0)), "volume": abs(int(float(rec.get("trde_qty", 0) or 0))), }) except Exception: continue if reached_start: break # 연속조회 키는 응답 dict에서 읽음 (Chart._execute_request 가 헤더를 dict로 포함) new_cont = str(resp.get("cont-yn", resp.get("cont_yn", "N"))).upper() new_nkey = str(resp.get("next-key", resp.get("next_key", ""))) if new_cont != "Y" or not new_nkey: break cont_yn = new_cont next_key = new_nkey time.sleep(0.3) # API 레이트리밋 준수 (초당 3회 이하) logger.info(f"✅ 키움 60분봉 수집 완료: {code} {len(rows)}봉 ({start_date}~{end_date})") return rows def fetch_min_ohlcv( code: str, start_date: str, # "YYYY-MM-DD" end_date: str, # "YYYY-MM-DD" app_key: str, app_secret: str, base_url: str, tf_min: int = 60, # 집계 단위 (60 = 60분봉) mock: bool = False, max_retries: int = 3, ) -> List[Dict]: """ KIS FHKST03010200(분봉조회) 1분봉 수집 → tf_min 단위 집계 반환. 페이지네이션 전략: FID_INPUT_HOUR_1=HHMMSS 커서로 역방향(최신→과거) 순회. FID_PW_DATA_INCU_YN=Y → 과거 날짜까지 연속 조회 가능. 각 응답 ~30 레코드, 마지막 레코드 절대 시각 - 1분을 다음 커서로 사용. 60분봉 집계 규칙 (역방향 수집이므로 open/close 처리 주의): 처음 만난 레코드 = 이 버킷의 가장 최신(close) 이후 이전 레코드로 갈수록 open 덮어씀 → 결국 가장 오래된 open이 남음 high = MAX, low = MIN, volume = SUM """ token = _ensure_token(mock) url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" try: sd = datetime.strptime(start_date, "%Y-%m-%d") ed = datetime.strptime(end_date, "%Y-%m-%d") except ValueError as e: raise ValueError(f"날짜 형식 오류: {e}") # 60분봉 버킷 집계 저장소 buckets: Dict[datetime, Dict] = {} cursor_dt = ed.replace(hour=15, minute=30, second=0) # end_date 15:30 부터 역순 total_1min = 0 # 수집한 1분봉 수 (로그용) for page in range(3000): # 최대 ~90,000 1분봉 (약 7개월치) cursor_str = cursor_dt.strftime("%H%M%S") data = None for retry in range(max_retries): try: headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": app_key, "appsecret": app_secret, # 분봉 조회는 실전/모의 도메인만 다르고 TR_ID는 동일 "tr_id": "FHKST03010200", } params = { "FID_ETC_CLS_CODE": "", "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_HOUR_1": cursor_str, "FID_PW_DATA_INCU_YN": "Y", # 과거 날짜 데이터 포함 } resp = requests.get(url, headers=headers, params=params, timeout=10) if resp.status_code == 429: time.sleep((retry + 1) * 2) continue resp.raise_for_status() data = resp.json() break except Exception as e: if retry >= max_retries - 1: logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}") return _buckets_to_min_list(buckets, tf_min) time.sleep(2) if data is None: break rows = data.get("output2", []) if not rows: break done = False last_abs_dt = None for row in rows: date_s = row.get("stck_bsop_date", "") time_s = row.get("stck_cntg_hour", "") if len(date_s) < 8 or len(time_s) < 6: continue try: abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S") except ValueError: continue last_abs_dt = abs_dt if abs_dt.date() > ed.date(): continue # end_date 이후 → 스킵 if abs_dt.date() < sd.date(): done = True # start_date 이전 → 수집 완료 break # 장중 시간만 처리 (09:00~15:30) market_min = abs_dt.hour * 60 + abs_dt.minute if market_min < 540 or market_min > 930: # 09:00 ~ 15:30 continue c = float(row.get("stck_prpr", 0) or 0) if c <= 0: continue total_1min += 1 # tf_min 단위 버킷 시작 시간 계산 (09:00, 10:00, 11:00, ...) bucket_min = (abs_dt.minute // tf_min) * tf_min bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0) o = float(row.get("stck_oprc", c) or c) h = float(row.get("stck_hgpr", c) or c) l = float(row.get("stck_lwpr", c) or c) v = int(row.get("cntg_vol", 0) or 0) if bucket_dt not in buckets: # 역방향: 처음 만나는 레코드 = 버킷의 가장 최신 = close buckets[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v} else: b = buckets[bucket_dt] b["open"] = o # 더 이른 open으로 계속 갱신 b["high"] = max(b["high"], h) b["low"] = min(b["low"], l) b["volume"] += v # close는 첫 기록(최신) 유지 — 갱신하지 않음 if done: break # 다음 커서: 이번 배치 마지막(가장 오래된) 절대 시각 - 1분 if last_abs_dt is None: break next_cur = last_abs_dt - timedelta(minutes=1) if next_cur.date() < sd.date(): break cursor_dt = next_cur time.sleep(0.15) # API 과부하 방지 (~6 req/s) logger.info(f" [{code}] 1분봉 {total_1min}개 수집 → {tf_min}분봉 {len(buckets)}개 집계") return _buckets_to_min_list(buckets, tf_min) def fetch_and_store_min_candles( db: "TradeDB", code: str, start_date: str, end_date: str, app_key: str, app_secret: str, base_url: str, tf_min: int = 60, mock: bool = False, max_retries: int = 3, progress: Optional[Dict] = None, # 외부 dict: {"status","fetched","saved"} ) -> int: """ 1분봉 페이지 단위로 수집하면서 날짜가 바뀌는 순간 해당 날의 버킷을 즉시 DB 저장. Flask 백그라운드 스레드에서 호출해 웹페이지가 블로킹되지 않도록 한다. progress dict를 통해 진행상황을 실시간으로 노출한다. """ def _upd(**kw): if progress is not None: progress.update(kw) token = _ensure_token(mock) url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" try: sd = datetime.strptime(start_date, "%Y-%m-%d") ed = datetime.strptime(end_date, "%Y-%m-%d") except ValueError as e: raise ValueError(f"날짜 형식 오류: {e}") # 날짜별로 버킷 집계: {date: {bucket_dt: {o,h,l,c,v}}} day_buckets: Dict[str, Dict[datetime, Dict]] = {} cursor_dt = ed.replace(hour=15, minute=30, second=0) total_1min = 0 total_saved = 0 prev_date_s: Optional[str] = None # 이전 루프의 날짜 (날짜 전환 감지용) for page in range(3000): cursor_str = cursor_dt.strftime("%H%M%S") data = None for retry in range(max_retries): try: headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": app_key, "appsecret": app_secret, "tr_id": "FHKST03010200", } params = { "FID_ETC_CLS_CODE": "", "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_HOUR_1": cursor_str, "FID_PW_DATA_INCU_YN": "Y", } resp = requests.get(url, headers=headers, params=params, timeout=10) if resp.status_code == 429: time.sleep((retry + 1) * 2) continue resp.raise_for_status() data = resp.json() break except Exception as e: if retry >= max_retries - 1: logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}") data = None break time.sleep(2) if data is None: break rows = data.get("output2", []) if not rows: break done = False last_abs_dt = None for row in rows: date_s = row.get("stck_bsop_date", "") time_s = row.get("stck_cntg_hour", "") if len(date_s) < 8 or len(time_s) < 6: continue try: abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S") except ValueError: continue last_abs_dt = abs_dt if abs_dt.date() > ed.date(): continue if abs_dt.date() < sd.date(): done = True break market_min = abs_dt.hour * 60 + abs_dt.minute if market_min < 540 or market_min > 930: continue c = float(row.get("stck_prpr", 0) or 0) if c <= 0: continue total_1min += 1 cur_date_s = date_s[:8] # "YYYYMMDD" # 날짜가 바뀌면 이전 날짜 데이터를 즉시 DB에 flush if prev_date_s is not None and cur_date_s != prev_date_s: flush_buckets = day_buckets.pop(prev_date_s, {}) if flush_buckets: rows_to_save = _buckets_to_min_list(flush_buckets, tf_min) n = store_min_candles(db, code, rows_to_save, tf_min) total_saved += n _upd(saved=total_saved, fetched=total_1min, current_date=prev_date_s) logger.info(f" [{code}] {prev_date_s} → {tf_min}분봉 {n}개 저장 (누적 {total_saved})") prev_date_s = cur_date_s bucket_min = (abs_dt.minute // tf_min) * tf_min bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0) o = float(row.get("stck_oprc", c) or c) h = float(row.get("stck_hgpr", c) or c) l = float(row.get("stck_lwpr", c) or c) v = int(row.get("cntg_vol", 0) or 0) if cur_date_s not in day_buckets: day_buckets[cur_date_s] = {} bkt = day_buckets[cur_date_s] if bucket_dt not in bkt: bkt[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v} else: b = bkt[bucket_dt] b["open"] = o b["high"] = max(b["high"], h) b["low"] = min(b["low"], l) b["volume"] += v _upd(fetched=total_1min) if done: break if last_abs_dt is None: break next_cur = last_abs_dt - timedelta(minutes=1) if next_cur.date() < sd.date(): break cursor_dt = next_cur time.sleep(0.15) # 루프 종료 후 남은 날짜 데이터 flush for date_s, flush_buckets in day_buckets.items(): if flush_buckets: rows_to_save = _buckets_to_min_list(flush_buckets, tf_min) n = store_min_candles(db, code, rows_to_save, tf_min) total_saved += n _upd(saved=total_saved, fetched=total_1min) logger.info(f" [{code}] {date_s}(말미) → {tf_min}분봉 {n}개 저장 (누적 {total_saved})") logger.info(f" [{code}] 수집완료: 1분봉 {total_1min}개 → {tf_min}분봉 {total_saved}개 저장") return total_saved def _buckets_to_min_list(buckets: Dict, tf_min: int) -> List[Dict]: """버킷 dict → 시간순 list 변환 (store_min_candles 입력용)""" return [ { "dt": dt.strftime("%Y-%m-%d %H:%M:%S"), "tf": tf_min, "open": b["open"], "high": b["high"], "low": b["low"], "close": b["close"], "volume": b["volume"], } for dt, b in sorted(buckets.items()) ] def store_min_candles(db: TradeDB, code: str, rows: List[Dict], tf_min: int = 60) -> int: """60분봉 데이터를 holding_min_candles에 저장 (ON DUPLICATE KEY UPDATE)""" saved = 0 for r in rows: try: db.conn.execute( """INSERT INTO holding_min_candles (code, candle_dt, tf, open, high, low, close, volume) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), volume=VALUES(volume)""", [code, r["dt"], tf_min, r["open"], r["high"], r["low"], r["close"], r["volume"]] ) saved += 1 except Exception as e: logger.debug(f"분봉 저장 오류 ({code} {r.get('dt')}): {e}") db.conn.commit() return saved def get_stored_min_candles(db: TradeDB, code: str, start_dt: str = "", end_dt: str = "", tf_min: int = 60) -> List[Dict]: """ holding_min_candles에서 종목 분봉 조회 (시간 오름차순). run_backtest() 호환: candle_dt를 candle_date 키로 반환 """ sql = """SELECT candle_dt AS candle_date, open, high, low, close, volume FROM holding_min_candles WHERE code=%s AND tf=%s""" args = [code, tf_min] if start_dt: sql += " AND candle_dt >= %s" args.append(start_dt) if end_dt: end_val = end_dt + " 23:59:59" if len(end_dt) == 10 else end_dt sql += " AND candle_dt <= %s" args.append(end_val) sql += " ORDER BY candle_dt ASC" rows = db.conn.execute(sql, args).fetchall() return [dict(r) for r in rows] def get_min_candle_stats(db: TradeDB, code: str, tf_min: int = 60) -> Dict: """60분봉 보유 현황 (개수, 최소/최대 datetime)""" row = db.conn.execute( "SELECT COUNT(*) AS cnt, MIN(candle_dt) AS mn, MAX(candle_dt) AS mx " "FROM holding_min_candles WHERE code=%s AND tf=%s", [code, tf_min] ).fetchone() if row and row["cnt"]: return { "count": int(row["cnt"]), "min": str(row["mn"])[:16] if row["mn"] else "", "max": str(row["mx"])[:16] if row["mx"] else "", } return {"count": 0, "min": "", "max": ""} # ───────────────────────────────────────────────────────────────────────────── # 기술 지표 계산 # ───────────────────────────────────────────────────────────────────────────── def _rsi_series(closes: List[float], period: int) -> List[Optional[float]]: """Wilder RSI 시리즈""" n = len(closes) rsi = [None] * n if n < period + 1: return rsi deltas = [closes[i] - closes[i - 1] for i in range(1, n)] gains = [max(d, 0) for d in deltas] losses = [max(-d, 0) for d in deltas] ag = sum(gains[:period]) / period al = sum(losses[:period]) / period for i in range(period, n): idx = i - 1 if i > period: ag = (ag * (period - 1) + gains[idx]) / period al = (al * (period - 1) + losses[idx]) / period rs = ag / al if al > 0 else float("inf") rsi[i] = 100.0 - (100.0 / (1 + rs)) if al > 0 else 100.0 return rsi def _ma_series(closes: List[float], period: int) -> List[Optional[float]]: """단순이동평균(SMA) 시리즈. period 미만 구간은 None.""" n = len(closes) ma = [None] * n if period < 1 or n < period: return ma window_sum = sum(closes[:period]) ma[period - 1] = window_sum / period for i in range(period, n): window_sum += closes[i] - closes[i - period] ma[i] = window_sum / period return ma def _atr_series(highs, lows, closes, period: int) -> List[Optional[float]]: """ATR 시리즈""" n = len(closes) tr_list = [None] * n for i in range(1, n): tr_list[i] = max( highs[i] - lows[i], abs(highs[i] - closes[i - 1]), abs(lows[i] - closes[i - 1]), ) atr = [None] * n if n < period + 1: return atr atr[period] = sum(t for t in tr_list[1:period+1] if t) / period for i in range(period + 1, n): if tr_list[i] is not None and atr[i - 1] is not None: atr[i] = (atr[i - 1] * (period - 1) + tr_list[i]) / period return atr # ───────────────────────────────────────────────────────────────────────────── # 컨텍스트 신호 계산 (ATH · 연도고점 · 52주 고/저) # ───────────────────────────────────────────────────────────────────────────── def _calc_context_signals(candles: List[Dict], current_price: float) -> Dict: """ DB 일봉 전체를 기반으로 현재가 대비 컨텍스트 낙폭 지표를 반환. 백테스트와 실매매 양쪽에서 재사용. 반환 키: ath - 역대 최고가 (DB 전체 기간 high 최댓값) year_high - 당해 연도 최고가 w52_high - 최근 252봉(약 1년) 최고가 w52_low - 최근 252봉 최저가 ath_drop_pct - 현재가 기준 ATH 대비 낙폭 (%) ← 클수록 많이 빠진 것 year_drop_pct - 연도고점 대비 낙폭 (%) w52_drop_pct - 52주 고점 대비 낙폭 (%) """ if not candles: return { "ath": current_price, "year_high": current_price, "w52_high": current_price, "w52_low": current_price, "ath_drop_pct": 0.0, "year_drop_pct": 0.0, "w52_drop_pct": 0.0, } this_year = str(datetime.now().year) all_highs = [float(c["high"]) for c in candles if float(c["high"]) > 0] year_highs = [float(c["high"]) for c in candles if str(c["candle_date"])[:4] == this_year and float(c["high"]) > 0] # 52주 ≈ 최근 252 거래일 w52_bars = candles[-252:] if len(candles) >= 252 else candles w52_highs = [float(c["high"]) for c in w52_bars if float(c["high"]) > 0] w52_lows = [float(c["low"]) for c in w52_bars if float(c["low"]) > 0] ath = max(all_highs) if all_highs else current_price year_high = max(year_highs) if year_highs else current_price w52_high = max(w52_highs) if w52_highs else current_price w52_low = min(w52_lows) if w52_lows else current_price def drop_pct(ref: float) -> float: return round((ref - current_price) / ref * 100, 2) if ref > 0 else 0.0 return { "ath": ath, "year_high": year_high, "w52_high": w52_high, "w52_low": w52_low, "ath_drop_pct": drop_pct(ath), "year_drop_pct": drop_pct(year_high), "w52_drop_pct": drop_pct(w52_high), } # ───────────────────────────────────────────────────────────────────────────── # 백테스트 엔진 # ───────────────────────────────────────────────────────────────────────────── def run_backtest( candles: List[Dict], cfg: Dict, fee_rate: float = 0.015 / 100, # 매수/매도 각각 0.015% sell_tax: float = 0.18 / 100, # 증권거래세 0.18% tf_min: int = 0, # 0=일봉, 60=60분봉 등. 0이면 자동감지 ) -> Dict: """ 홀딩 전략 백테스트 (추세추종 + 눌림목 / 역추세 선택) [추세 상승: MA_FAST > MA_SLOW] 매수: RSI ≤ rsi_buy1 → 1차, RSI ≤ rsi_buy2 → 2차 추가매수 매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락) OR RSI 과열(rsi_sell) OR 익절(take_profit_pct) OR 데드크로스(MA_FAST < MA_SLOW) 발생 시 추세 반전 청산 [추세 하락/횡보: MA_FAST ≤ MA_SLOW] or [trend_filter=0] 매수: RSI ≤ rsi_buy1/2/3 (기존 3단계 역추세 분할매수) 매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct) 진입가: 신호봉 다음 봉 시가 (1봉 지연) """ if len(candles) < 20: return {"error": "봉 부족", "total_trades": 0} # ── 봉 유형 자동 감지 ───────────────────────────────────────────── _sample_dt = str(candles[0].get("candle_date", "")) _is_min = tf_min > 0 or len(_sample_dt) > 10 rsi_period = int(cfg.get("rsi_period", 14)) rsi_buy1 = float(cfg.get("rsi_buy1", 55)) rsi_buy2 = float(cfg.get("rsi_buy2", 45)) rsi_buy3 = float(cfg.get("rsi_buy3", 35)) rsi_sell = float(cfg.get("rsi_sell", 75)) tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100 sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100 buy1_ratio = float(cfg.get("buy1_ratio", 50)) / 100 buy2_ratio = float(cfg.get("buy2_ratio", 50)) / 100 buy3_ratio = float(cfg.get("buy3_ratio", 0)) / 100 slot_money = float(cfg.get("slot_money", 3_000_000)) ma_fast_p = int(cfg.get("ma_fast", 20)) ma_slow_p = int(cfg.get("ma_slow", 60)) trail_stop_p = float(cfg.get("trail_stop_pct", 8.0)) # %, 0=비활성 trend_filter = float(cfg.get("trend_filter", 1.0)) >= 0.5 # bool 변환 atr_period_p = int(cfg.get("atr_period", 14)) # 샹들리에 엑시트 배수: env/카드에서 조정 가능 (하드코딩 금지) atr_mult = float(cfg.get("atr_mult", 3.0)) ath_drop_min = float(cfg.get("ath_drop_min_pct", 0.0)) year_drop_min = float(cfg.get("year_drop_min_pct", 0.0)) w52_drop_min = float(cfg.get("w52_drop_min_pct", 0.0)) closes = [float(c["close"]) for c in candles] highs = [float(c["high"]) for c in candles] lows = [float(c["low"]) for c in candles] opens = [float(c["open"]) for c in candles] datetimes = [str(c["candle_date"]) for c in candles] dates = [s[:10] for s in datetimes] rsis = _rsi_series(closes, rsi_period) ma_fast_arr = _ma_series(closes, ma_fast_p) ma_slow_arr = _ma_series(closes, ma_slow_p) # ATR: 샹들리에 엑시트에 사용 (추세장 청산 핵심 지표) atr_arr = _atr_series(highs, lows, closes, atr_period_p) # ── 컨텍스트 배열 사전 계산 ────────────────────────────────────── ath_arr = [] year_high_arr = [] w52_high_arr = [] w52_low_arr = [] _year_max: Dict[str, float] = {} _ath_run = 0.0 from datetime import date as _date, timedelta as _td for i in range(len(candles)): h = highs[i]; l = lows[i]; y = dates[i][:4] _ath_run = max(_ath_run, h) _year_max[y] = max(_year_max.get(y, 0.0), h) if _is_min: try: cutoff = str(_date.fromisoformat(dates[i]) - _td(days=365)) w52_s = i while w52_s > 0 and dates[w52_s - 1] >= cutoff: w52_s -= 1 except Exception: w52_s = max(0, i - 252 * 7) else: w52_s = max(0, i - 251) ath_arr.append(_ath_run) year_high_arr.append(_year_max[y]) w52_high_arr.append(max(highs[w52_s:i + 1])) w52_low_arr.append(min(lows[w52_s:i + 1])) # ── 포지션 상태 ─────────────────────────────────────────────────── position = None # None or dict trades: List[Dict] = [] equity: List[Dict] = [] cum_pnl = 0.0 start_i = max(rsi_period + 1, ma_slow_p) # MA_SLOW가 준비된 이후부터 진입 for i in range(start_i, len(candles) - 1): rsi = rsis[i] if rsi is None: continue next_open = float(opens[i + 1]) if opens[i + 1] > 0 else closes[i] if next_open <= 0: continue mf = ma_fast_arr[i] # 단기 MA 현재봉 ms = ma_slow_arr[i] # 장기 MA 현재봉 mf_prev = ma_fast_arr[i - 1] if i > 0 else mf ms_prev = ma_slow_arr[i - 1] if i > 0 else ms # 추세 방향 판단 is_uptrend = trend_filter and mf is not None and ms is not None and mf > ms # ─── 데드크로스 감지: 추세 반전 신호 ───────────────────────── is_deadcross = ( trend_filter and mf is not None and ms is not None and mf_prev is not None and ms_prev is not None and mf < ms # 현재봉: MA_FAST < MA_SLOW and mf_prev >= ms_prev # 직전봉: MA_FAST ≥ MA_SLOW (방금 크로스) ) # ─── 보유 중: 매도 체크 ─────────────────────────────────────── if position is not None: avg = position["avg"] qty = position["qty"] c_now = closes[i] # 트레일링 스탑: 보유 중 최고가 업데이트 후 고점 대비 낙폭 체크 position["peak"] = max(position["peak"], c_now) profit_pct = (c_now - avg) / avg if avg > 0 else 0.0 sell_reason = None if position.get("trend_mode"): # ─── 추세장 청산: 고정 익절·RSI과열 무시 → 끝까지 추세 추적 ─── # 핵심: rsi_sell·tp_pct로 조기 청산하면 200% 추세를 15%에 팔게 됨 if is_deadcross: # MA 데드크로스 → 추세 반전 확정 → 즉시 청산 sell_reason = f"데드크로스({mf:.0f}<{ms:.0f})" else: # 샹들리에 엑시트 = 고점 − ATR × 배수 (변동성 기반 동적 방어선) # ATR이 클수록 방어선이 낮아져 변동성 큰 종목에 더 여유를 줌 _atr_v = atr_arr[i] if (atr_arr[i] is not None) else closes[i] * 0.05 chandelier = position["peak"] - _atr_v * atr_mult if trail_stop_p > 0: # 퍼센트 방어선도 계산, 더 타이트한(높은) 쪽 채택 pct_trail = position["peak"] * (1.0 - trail_stop_p / 100.0) chandelier = max(chandelier, pct_trail) if c_now <= chandelier: sell_reason = f"샹들리에({profit_pct*100:+.1f}%)" elif profit_pct <= -sl_pct: # 손절은 추세장에서도 반드시 유지 (무제한 손실 방지) sell_reason = f"손절({profit_pct*100:.1f}%)" else: # ─── 횡보/역추세장 청산: 철저히 고정 익절·손절비 준수 ────────── if profit_pct >= tp_pct: sell_reason = f"익절(+{profit_pct*100:.1f}%)" elif rsi >= rsi_sell: sell_reason = f"RSI과열({rsi:.1f})" elif profit_pct <= -sl_pct: sell_reason = f"손절({profit_pct*100:.1f}%)" if sell_reason: exit_price = next_open fee = exit_price * qty * (fee_rate + sell_tax) pnl = (exit_price - avg) * qty - fee hold = i - position["entry_i"] cum_pnl += pnl trades.append({ "buy_date": datetimes[position["entry_i"]][:16], "sell_date": datetimes[i + 1][:16], "avg_price": round(avg), "exit_price": round(exit_price), "qty": qty, "pnl": round(pnl), "hold_days": hold, "reason": sell_reason, "rsi_sell": round(rsi, 1), "ath_drop": position.get("ath_drop", 0), "year_drop": position.get("year_drop", 0), "w52_drop": position.get("w52_drop", 0), }) equity.append({"date": datetimes[i + 1][:16], "cum_pnl": round(cum_pnl)}) position = None continue # 매도 후 당일 매수 방지 # ─── 미보유: 컨텍스트 낙폭 필터 ───────────────────────────── price_now = closes[i] def _drop(ref: float) -> float: return (ref - price_now) / ref * 100 if ref > 0 else 0.0 if ath_drop_min > 0 and _drop(ath_arr[i]) < ath_drop_min: continue if year_drop_min > 0 and _drop(year_high_arr[i]) < year_drop_min: continue if w52_drop_min > 0 and _drop(w52_high_arr[i]) < w52_drop_min: continue # ─── 추세 상승 모드 매수 ───────────────────────────────────── if is_uptrend: # RSI 눌림목: rsi_buy1(1차), rsi_buy2(2차) 기준으로 진입 stage = 0 if rsi <= rsi_buy2: stage = 2 elif rsi <= rsi_buy1: stage = 1 if stage == 0: continue ratio = buy2_ratio if stage == 2 else buy1_ratio invest = slot_money * ratio qty = max(1, int(invest / next_open)) cost = next_open * qty * (1 + fee_rate) position = { "avg": next_open, "qty": qty, "cost": cost, "stage": stage, "entry_i": i + 1, "peak": next_open, # 트레일링 스탑용 고점 "trend_mode": True, # 추세 모드 진입 여부 (청산 로직 구분) "ath_drop": round(_drop(ath_arr[i]), 1), "year_drop": round(_drop(year_high_arr[i]), 1), "w52_drop": round(_drop(w52_high_arr[i]), 1), } # ─── 횡보/하락 모드 (역추세 분할매수) ─────────────────────── else: stage = 0 if rsi <= rsi_buy3: stage = 3 elif rsi <= rsi_buy2: stage = 2 elif rsi <= rsi_buy1: stage = 1 if stage == 0: continue ratios = [buy1_ratio, buy2_ratio, buy3_ratio] ratio = ratios[stage - 1] if ratio <= 0: continue # buy3_ratio=0이면 3단계 진입 안 함 invest = slot_money * ratio qty = max(1, int(invest / next_open)) cost = next_open * qty * (1 + fee_rate) position = { "avg": next_open, "qty": qty, "cost": cost, "stage": stage, "entry_i": i + 1, "peak": next_open, "trend_mode": False, "ath_drop": round(_drop(ath_arr[i]), 1), "year_drop": round(_drop(year_high_arr[i]), 1), "w52_drop": round(_drop(w52_high_arr[i]), 1), } # ─── 미청산 포지션 마지막 봉 강제 청산 ────────────────────────── if position is not None and len(candles) > 0: exit_price = closes[-1] avg = position["avg"]; qty = position["qty"] fee = exit_price * qty * (fee_rate + sell_tax) pnl = (exit_price - avg) * qty - fee cum_pnl += pnl trades.append({ "buy_date": dates[position["entry_i"]], "sell_date": dates[-1], "avg_price": round(avg), "exit_price": round(exit_price), "qty": qty, "pnl": round(pnl), "hold_days": len(candles) - 1 - position["entry_i"], "reason": "기간종료", "rsi_sell": None, "ath_drop": position.get("ath_drop", 0), "year_drop": position.get("year_drop", 0), "w52_drop": position.get("w52_drop", 0), }) equity.append({"date": dates[-1], "cum_pnl": round(cum_pnl)}) # ─── 요약 통계 ─────────────────────────────────────────────────── total = len(trades) wins = [t for t in trades if t["pnl"] > 0] losses = [t for t in trades if t["pnl"] < 0] total_pnl = sum(t["pnl"] for t in trades) avg_hold = sum(t["hold_days"] for t in trades) / total if total else 0 win_pnl = sum(t["pnl"] for t in wins) loss_pnl = sum(t["pnl"] for t in losses) pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0 peak, mdd, cum = 0.0, 0.0, 0.0 for t in trades: cum += t["pnl"] peak = max(peak, cum) mdd = max(mdd, peak - cum) # 이유별 집계 reasons: Dict[str, int] = {} for t in trades: r = t["reason"] prefix = r.split("(")[0] # "익절", "RSI과열", "손절", "기간종료" reasons[prefix] = reasons.get(prefix, 0) + 1 # ── Buy & Hold 벤치마크 ────────────────────────────────────────────── # 첫 봉 종가 매수 → 마지막 봉 종가 매도 (수수료·세금 제외 단순 계산) bnh_pct = round((closes[-1] - closes[0]) / closes[0] * 100, 2) if closes[0] > 0 else 0.0 bnh_pnl = round(slot_money * bnh_pct / 100) # 봇 수익률: 투자 원금(slot_money) 대비 실현 손익 bot_pct = round(total_pnl / slot_money * 100, 2) if slot_money > 0 else 0.0 # 알파 = 봇 수익률 - Buy & Hold 수익률 (양수면 봇이 시장 이김) alpha_pct = round(bot_pct - bnh_pct, 2) return { "summary": { "total_trades": total, "win_trades": len(wins), "loss_trades": len(losses), "win_rate": round(len(wins) / total * 100, 1) if total else 0, "total_pnl": round(total_pnl), "avg_hold_days": round(avg_hold, 1), "profit_factor": round(min(pf, 9999.0), 2), "max_drawdown": round(mdd), # Buy & Hold 비교 "bnh_pct": bnh_pct, # 기간 동안 그냥 들고 있었을 때 수익률(%) "bnh_pnl": bnh_pnl, # 그냥 들고 있었을 때 손익(원) "bot_pct": bot_pct, # 봇 수익률(투자금 대비 %) "alpha_pct": alpha_pct, # 초과수익(봇 - Buy&Hold, %p) }, "equity": equity, "reasons": reasons, "trades": trades[-200:], } # ───────────────────────────────────────────────────────────────────────────── # 파라미터 Grid Search (단일 종목) # ───────────────────────────────────────────────────────────────────────────── def run_param_search( candles: List[Dict], grid: Optional[Dict] = None, min_trades: int = 2, base_cfg: Optional[Dict] = None, # 사용자가 설정한 현재 파라미터 (없으면 DEFAULT 사용) ) -> List[Dict]: """ 종목 하나에 대해 파라미터 Grid Search 실행 반환: 수익 기준 내림차순 정렬된 결과 리스트 base_cfg: 탐색 그리드에 없는 파라미터들의 기준값. 웹 카드에서 사용자가 직접 설정한 값을 전달하면 rsi_sell, rsi_period, rsi_buy3 등이 그 값으로 고정됨. None 이면 DEFAULT_STOCK_CONFIG 사용. """ from itertools import product as iproduct if grid is None: grid = { # ── MA 추세 판단 ────────────────────────────────────────────── "ma_fast": [15, 20, 30], "ma_slow": [40, 60, 120], # ── 샹들리에 배수: 추세장 청산 핵심 파라미터 ──────────────────── # 작을수록 타이트(빨리 청산) / 클수록 여유(오래 홀딩) "atr_mult": [2.0, 3.0, 4.0], # 퍼센트 보조 방어선 (0=비활성, ATR선이 더 낮을 때만 활성화) "trail_stop_pct": [0.0, 8.0], # ── 진입 RSI 눌림목 ──────────────────────────────────────────── "rsi_buy1": [60, 55, 50, 45], "rsi_buy2": [50, 45, 40, 35], # ── 횡보장 익절/손절 기준 (추세장엔 무시됨) ──────────────────── "take_profit_pct": [15.0, 30.0], "stop_loss_pct": [7.0, 15.0], # ── ATH 낙폭 필터 ───────────────────────────────────────────── "ath_drop_min_pct": [0.0, 20.0], } keys = list(grid.keys()) combos = list(iproduct(*[grid[k] for k in keys])) results = [] # base_cfg가 주어지면 사용자 설정값 우선, 없으면 DEFAULT 사용 _base = dict(DEFAULT_STOCK_CONFIG) if base_cfg: _base.update(base_cfg) for vals in combos: cfg = dict(_base) cfg.update(dict(zip(keys, vals))) # MA 단기 < 장기 조건 보정 if cfg["ma_fast"] >= cfg["ma_slow"]: continue # RSI buy1 > buy2 조건 보정 if cfg["rsi_buy1"] <= cfg["rsi_buy2"]: continue res = run_backtest(candles, cfg) s = res.get("summary", {}) if s.get("total_trades", 0) < min_trades: continue results.append({ "params": {k: cfg[k] for k in keys}, "total_pnl": s["total_pnl"], "win_rate": s["win_rate"], "total_trades": s["total_trades"], "pf": s["profit_factor"], "avg_hold": s["avg_hold_days"], "mdd": s["max_drawdown"], }) results.sort(key=lambda x: x["total_pnl"], reverse=True) return results # ───────────────────────────────────────────────────────────────────────────── # 실매매 봇 (HoldingTrader) # ───────────────────────────────────────────────────────────────────────────── class HoldingTrader: """ 장기 홀딩 실매매 봇 - active_trades 에 strategy='HOLDING' 으로 기록 → 다른 봇과 계좌 분리 - 종목별 파라미터는 holding_stock_config 에서 조회 - 10분 주기로 RSI 체크 → 3단계 분할매수 / 익절·손절 매도 - 진입가: 시장가 주문 (장중 호가 추종) """ STRATEGY = "HOLDING" # active_trades.strategy 값 — 다른 봇과 충돌 방지 BOT_NAME = "홀딩봇" def __init__(self): self.db = TradeDB() ensure_holding_tables(self.db) # 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신) try: from kis_token_manager import ensure_both_tokens ensure_both_tokens() except Exception as _te: logger.warning(f"토큰 자동갱신 건너뜀: {_te}") cfg_row = self.db.conn.execute( "SELECT * FROM env_config ORDER BY id DESC LIMIT 1" ).fetchone() r = dict(cfg_row) if cfg_row else {} def safe(v): return v if v else "" is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes") self.mock = is_mock self.app_key = safe(r.get("KIS_APP_KEY_MOCK" if is_mock else "KIS_APP_KEY_REAL")) self.app_secret = safe(r.get("KIS_APP_SECRET_MOCK" if is_mock else "KIS_APP_SECRET_REAL")) self.base_url = ( "https://openapivts.koreainvestment.com:29443" if is_mock else "https://openapi.koreainvestment.com:9443" ) self.account_no = safe(r.get("KIS_ACCOUNT_NO_MOCK" if is_mock else "KIS_ACCOUNT_NO_REAL")) self.account_code = safe(r.get("KIS_ACCOUNT_CODE_MOCK" if is_mock else "KIS_ACCOUNT_CODE_REAL")) or "01" # 알림 설정 self.mm_server = safe(r.get("MM_SERVER_URL", "")) or "https://mattermost.hoonfam.org" self.mm_token = safe(r.get("MM_BOT_TOKEN_", "")) self.mm_channel = safe(r.get("KIS_LONG_MM_CHANNEL", "")) or "stock" self.tg_token = safe(r.get("MM_BOT_TOKEN_", "")) # TG 토큰 있으면 별도로 self.tg_chat = safe(r.get("KIS_LONG_MM_CHANNEL", "")) self.watchlist = load_watchlist() logger.info(f"✅ {self.BOT_NAME} 초기화 | 모의={is_mock} | 종목={len(self.watchlist)}개") # ──────────────────── 알림 ──────────────────── def _notify(self, title: str, body: str): msg = f"**[{self.BOT_NAME}]** {title}\n{body}" if self.mm_token and self.mm_server: try: requests.post( f"{self.mm_server}/hooks/{self.mm_token}", json={"text": msg}, timeout=5 ) except Exception as e: logger.debug(f"알림 전송 실패: {e}") # ──────────────────── KIS REST ──────────────────── def _token(self) -> str: return _ensure_token(self.mock) def _get(self, path: str, params: dict) -> dict: """GET 요청 (429 재시도)""" url = f"{self.base_url}{path}" token = self._token() headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "appsecret": self.app_secret, } for attempt in range(3): try: r = requests.get(url, headers=headers, params=params, timeout=10) if r.status_code == 429: time.sleep((attempt + 1) * 2) continue r.raise_for_status() return r.json() except Exception as e: if attempt == 2: raise time.sleep(2) raise RuntimeError("GET 요청 실패") def _post(self, path: str, tr_id: str, data: dict) -> dict: """POST 요청 (주문용, 429 재시도)""" url = f"{self.base_url}{path}" token = self._token() headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": tr_id, } for attempt in range(3): try: r = requests.post(url, headers=headers, json=data, timeout=10) if r.status_code == 429: time.sleep((attempt + 1) * 2) continue r.raise_for_status() return r.json() except Exception as e: if attempt == 2: raise time.sleep(2) raise RuntimeError("POST 요청 실패") def get_price(self, code: str) -> Optional[Dict]: """ 현재가 + 당일 고/저, 52주 고/저, 등락률 조회 (FHKST01010100) 반환 dict 키: price - 현재가 day_high - 당일 고가 day_low - 당일 저가 w52_high - 52주 최고가 w52_low - 52주 최저가 change_rate - 전일대비 등락률(%) volume - 누적 거래량 """ try: headers = { "Content-Type": "application/json", "authorization": f"Bearer {self._token()}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": "FHKST01010100", } params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code} r = requests.get( f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price", headers=headers, params=params, timeout=10 ) out = r.json().get("output", {}) price = float(out.get("stck_prpr", 0) or 0) if price <= 0: return None return { "price": price, "day_high": float(out.get("stck_hgpr", 0) or 0), # 당일 고가 "day_low": float(out.get("stck_lwpr", 0) or 0), # 당일 저가 "w52_high": float(out.get("w52_hgpr", 0) or 0), # 52주 최고가 "w52_low": float(out.get("w52_lwpr", 0) or 0), # 52주 최저가 "change_rate": float(out.get("prdy_ctrt", 0) or 0), # 등락률(%) "volume": int(out.get("acml_vol", 0) or 0), # 누적 거래량 } except Exception as e: logger.warning(f"현재가 조회 실패 ({code}): {e}") return None def _order(self, code: str, qty: int, is_buy: bool) -> bool: """시장가 주문 (매수/매도)""" if qty <= 0: return False # 모의: VTTC0802U(매수) / VTTC0801U(매도) 실전: TTTC0802U / TTTC0801U if is_buy: tr_id = "VTTC0802U" if self.mock else "TTTC0802U" else: tr_id = "VTTC0801U" if self.mock else "TTTC0801U" try: data = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "PDNO": code, "ORD_DVSN": "01", # 시장가 "ORD_QTY": str(qty), "ORD_UNPR": "0", } result = self._post( "/uapi/domestic-stock/v1/trading/order-cash", tr_id, data ) ok = result.get("rt_cd") == "0" if not ok: logger.error(f"주문 실패 ({code} {'매수' if is_buy else '매도'}): {result.get('msg1','')}") return ok except Exception as e: logger.error(f"주문 예외 ({code}): {e}") return False # ──────────────────── DB 포지션 ──────────────────── def _get_position(self, code: str) -> Optional[dict]: """HOLDING 전략 포지션 조회""" row = self.db.conn.execute( "SELECT * FROM active_trades WHERE code=%s AND strategy=%s", [code, self.STRATEGY] ).fetchone() return dict(row) if row else None def _upsert_position(self, code: str, name: str, avg_price: float, qty: int, invested: float, stage: int, is_uptrend: bool = False): """ 포지션 저장/갱신 (strategy='HOLDING'). status = T{stage}(추세장 진입) or S{stage}(횡보장 진입) → 봇 재시작 후에도 청산 모드(T/S)를 구분하여 샹들리에/고정익절 적용 """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") mode_prefix = "T" if is_uptrend else "S" # T=Trend / S=Sideways self.db.conn.execute( """INSERT INTO active_trades (code, name, strategy, avg_buy_price, current_qty, target_qty, total_invested, status, buy_date, updated_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE avg_buy_price=VALUES(avg_buy_price), current_qty=VALUES(current_qty), target_qty=VALUES(target_qty), total_invested=VALUES(total_invested), status=VALUES(status), updated_at=VALUES(updated_at)""", [code, name, self.STRATEGY, round(avg_price, 2), qty, qty, round(invested, 2), f"{mode_prefix}{stage}", now, now] ) self.db.conn.commit() def _delete_position(self, code: str): """포지션 삭제 (청산 후)""" self.db.conn.execute( "DELETE FROM active_trades WHERE code=%s AND strategy=%s", [code, self.STRATEGY] ) self.db.conn.commit() # ──────────────────── RSI 계산 ──────────────────── def _get_rsi(self, code: str, cfg: dict, current_price: float) -> Optional[float]: """ 저장된 일봉 + 당일 현재가(가중치 없이 단순 추가)로 RSI 계산. 장중에도 '오늘의 종가가 이 가격이라면' 의 RSI를 반환. """ period = int(cfg.get("rsi_period", 14)) candles = get_stored_candles(self.db, code) if len(candles) < period + 2: return None closes = [float(c["close"]) for c in candles] # 오늘 데이터가 DB에 없으면 현재가로 추가 today_str = datetime.now().strftime("%Y-%m-%d") if str(candles[-1]["candle_date"])[:10] != today_str: closes.append(float(current_price)) rsis = _rsi_series(closes, period) for r in reversed(rsis): if r is not None: return r return None # ──────────────────── 매수 로직 ──────────────────── def _try_buy(self, code: str, name: str, current_price: float, rsi: float, cfg: dict, ctx: Optional[Dict] = None, is_uptrend: bool = False): """ 분할매수 체크 및 실행. is_uptrend: 추세 상승장 여부 → status에 T/S 기록해 청산 모드 구분 ctx: _calc_context_signals() 반환값 (ATH·연도고점·52주 낙폭 필터) """ rsi_buy1 = float(cfg.get("rsi_buy1", 45)) rsi_buy2 = float(cfg.get("rsi_buy2", 40)) rsi_buy3 = float(cfg.get("rsi_buy3", 35)) b1 = float(cfg.get("buy1_ratio", 30)) / 100 b2 = float(cfg.get("buy2_ratio", 30)) / 100 b3 = float(cfg.get("buy3_ratio", 40)) / 100 slot = float(cfg.get("slot_money", 3_000_000)) ath_min = float(cfg.get("ath_drop_min_pct", 0.0)) year_min = float(cfg.get("year_drop_min_pct", 0.0)) w52_min = float(cfg.get("w52_drop_min_pct", 0.0)) pos = self._get_position(code) cur_status = (pos["status"] or "S0") if pos else "S0" cur_stage = int(cur_status[1]) if len(cur_status) > 1 and cur_status[1].isdigit() else 0 # ─── 매수 단계 및 비중 결정 ──────────────────────────────────────── target_stage, ratio = 0, 0.0 if cur_stage == 0 and rsi <= rsi_buy1: target_stage, ratio = 1, b1 elif cur_stage == 1 and rsi <= rsi_buy2: target_stage, ratio = 2, b2 elif cur_stage == 2 and rsi <= rsi_buy3: target_stage, ratio = 3, b3 if target_stage == 0: return # RSI 조건 미충족 # ─── 컨텍스트 낙폭 필터 ──────────────────────────────────────────── if ctx: if ath_min > 0 and ctx["ath_drop_pct"] < ath_min: logger.info(f"[{code}] ATH 낙폭 부족 ({ctx['ath_drop_pct']:.1f}% < {ath_min}%) → 매수 보류") return if year_min > 0 and ctx["year_drop_pct"] < year_min: logger.info(f"[{code}] 연도 낙폭 부족 ({ctx['year_drop_pct']:.1f}% < {year_min}%) → 매수 보류") return if w52_min > 0 and ctx["w52_drop_pct"] < w52_min: logger.info(f"[{code}] 52주 낙폭 부족 ({ctx['w52_drop_pct']:.1f}% < {w52_min}%) → 매수 보류") return invest = slot * ratio qty = max(1, int(invest / current_price)) cost = current_price * qty mode_str = "📈추세" if is_uptrend else "📉역추세" logger.info(f"[{code}] {mode_str} 매수 RSI={rsi:.1f} → {target_stage}차 {qty}주 @ {current_price:,.0f}원") if not self._order(code, qty, is_buy=True): return # 평단가 갱신 if pos: prev_cost = float(pos["avg_buy_price"]) * int(pos["current_qty"]) prev_qty = int(pos["current_qty"]) new_qty = prev_qty + qty new_avg = (prev_cost + cost) / new_qty new_inv = float(pos["total_invested"]) + cost else: new_qty = qty new_avg = current_price new_inv = cost self._upsert_position(code, name, new_avg, new_qty, new_inv, target_stage, is_uptrend) # 컨텍스트 낙폭 정보 알림에 포함 ctx_txt = "" if ctx: ctx_txt = ( f"\n▪ ATH 낙폭: {ctx['ath_drop_pct']:.1f}%" f" | 연도: {ctx['year_drop_pct']:.1f}%" f" | 52주: {ctx['w52_drop_pct']:.1f}%" ) pnl_txt = f"평단가: {new_avg:,.0f}원 | 수량: {new_qty}주" self._notify( f"📥 {name}({code}) {target_stage}차 매수", f"▪ 체결가: {current_price:,.0f}원\n" f"▪ 수량: {qty}주 ({cost:,.0f}원)\n" f"▪ {pnl_txt}\n" f"▪ RSI: {rsi:.1f} (기준: {[rsi_buy1,rsi_buy2,rsi_buy3][target_stage-1]})" f"{ctx_txt}" ) # ──────────────────── 매도 로직 ──────────────────── def _try_sell(self, code: str, name: str, current_price: float, rsi: float, cfg: dict, is_deadcross: bool = False, peak_price: float = 0.0, atr_val: float = 0.0): """ 익절/손절 체크 및 실행. [추세장 진입(status=T*)] → 샹들리에 엑시트(peak − ATR × mult) + 데드크로스만 청산 → 고정 익절·RSI과열 무시: 추세 끝까지 추적 [횡보장 진입(status=S*)] → 기존 방식: 고정 익절% / RSI과열 / 손절% """ pos = self._get_position(code) if not pos: return avg = float(pos["avg_buy_price"]) qty = int(pos["current_qty"]) pnl_r = (current_price - avg) / avg if avg > 0 else 0.0 cur_status = (pos.get("status") or "S1") is_trend_mode = cur_status.startswith("T") # T=추세장, S=횡보장 tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100 sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100 rsi_sell_thr = float(cfg.get("rsi_sell", 75.0)) trail_stop_p = float(cfg.get("trail_stop_pct", 8.0)) atr_mult = float(cfg.get("atr_mult", 3.0)) sell_reason = None if is_trend_mode: # ─── 추세장: 고정 익절·RSI과열 무시 → 추세 끝까지 추적 ────────── if is_deadcross: sell_reason = f"데드크로스({pnl_r*100:+.1f}%)" elif peak_price > 0 and atr_val > 0: chandelier = peak_price - atr_val * atr_mult if trail_stop_p > 0: pct_trail = peak_price * (1.0 - trail_stop_p / 100.0) chandelier = max(chandelier, pct_trail) if current_price <= chandelier: sell_reason = f"샹들리에({pnl_r*100:+.1f}%)" # 손절은 추세장에서도 유지 (치명적 손실 방지) if sell_reason is None and pnl_r <= -sl_pct: sell_reason = f"손절({pnl_r*100:.1f}%)" else: # ─── 횡보장: 기존 방식 철저히 준수 ────────────────────────────── if pnl_r >= tp_pct: sell_reason = f"익절(+{pnl_r*100:.1f}%)" elif rsi and rsi >= rsi_sell_thr: sell_reason = f"RSI과열({rsi:.1f})" elif pnl_r <= -sl_pct: sell_reason = f"손절({pnl_r*100:.1f}%)" if not sell_reason: return logger.info(f"[{code}] 매도 신호: {sell_reason} | {qty}주 @ {current_price:,.0f}원 " f"(peak:{peak_price:,.0f} atr:{atr_val:,.0f})") if not self._order(code, qty, is_buy=False): return fee = current_price * qty * (0.015 / 100 + 0.18 / 100) pnl = (current_price - avg) * qty - fee self._delete_position(code) self._notify( f"📤 {name}({code}) 매도 | {sell_reason}", f"▪ 체결가: {current_price:,.0f}원\n" f"▪ 수량: {qty}주\n" f"▪ 평단가: {avg:,.0f}원\n" f"▪ 최고가: {peak_price:,.0f}원 | ATR: {atr_val:,.0f}\n" f"▪ 손익: {pnl:+,.0f}원\n" f"▪ 수익률: {pnl_r*100:+.2f}%" ) # ──────────────────── 메인 루프 ──────────────────── def _is_market_hour(self) -> bool: """장 시간 체크 (09:00~15:30 평일)""" now = datetime.now() if now.weekday() >= 5: # 토·일 return False t = now.hour * 100 + now.minute return 900 <= t <= 1530 def _fetch_today_candles(self): """ 장 마감 직후 오늘 일봉을 자동 수집하여 DB 갱신. KIS API는 당일 종가가 확정되는 15:30 이후 조회 가능. """ today = datetime.now().strftime("%Y-%m-%d") app_key, app_secret, base_url, mock = _get_kis_token(self.db) logger.info(f"📥 장 마감 후 자동 캔들 수집 ({today})") total_saved = 0 for item in self.watchlist: code, name = item["code"], item["name"] try: rows = fetch_daily_ohlcv(code, today, today, app_key, app_secret, base_url, mock=mock) saved = store_candles(self.db, code, rows) total_saved += saved logger.info(f" [{code}] {name}: {saved}봉 저장") time.sleep(0.3) except Exception as e: logger.error(f" [{code}] 캔들 자동 수집 오류: {e}") logger.info(f"✅ 자동 캔들 수집 완료 (총 {total_saved}봉 저장)") self._notify( "📊 일봉 자동 수집 완료", f"{today} 종가 기준 {len(self.watchlist)}종목 수집 ({total_saved}봉 저장)" ) def run(self): logger.info(f"\n{'='*50}") logger.info(f" {self.BOT_NAME} 시작 | 모의={self.mock}") logger.info(f" 종목: {[i['name'] for i in self.watchlist]}") logger.info(f"{'='*50}\n") self._notify("🚀 봇 시작", f"종목 {len(self.watchlist)}개 모니터링 시작") _candles_fetched_date = "" # 오늘 자동수집 완료 날짜 (중복 방지) while True: try: now = datetime.now() t = now.hour * 100 + now.minute # ── 장 마감 후 자동 일봉 수집 (15:35~15:50, 하루 1회) ── today_str = now.strftime("%Y-%m-%d") if (now.weekday() < 5 and 1535 <= t <= 1550 and _candles_fetched_date != today_str): self._fetch_today_candles() _candles_fetched_date = today_str if not self._is_market_hour(): logger.info("⏳ 장 외 시간 대기...") time.sleep(60) continue for item in self.watchlist: code = item["code"] name = item["name"] try: cfg = get_stock_config(self.db, code) pi = self.get_price(code) # price info dict if not pi: continue price = pi["price"] # DB 일봉 기반 컨텍스트 신호 (ATH·연도고점·52주 고저) stored = get_stored_candles(self.db, code) ctx = _calc_context_signals(stored, price) rsi = self._get_rsi(code, cfg, price) pos = self._get_position(code) # ── 실시간 MA / ATR / Peak 계산 ────────────────────── is_uptrend = False is_deadcross = False peak_price = price atr_val = price * 0.05 # fallback: 현재가의 5% if len(stored) >= 30: _cl = [float(c["close"]) for c in stored] _hi = [float(c["high"]) for c in stored] _lo = [float(c["low"]) for c in stored] # 오늘 실시간 봉 추가 (당일 고·저·종가로 지표 갱신) _cl.append(price); _hi.append(pi["day_high"]) _lo.append(pi["day_low"]) maf_p = int(cfg.get("ma_fast", 20)) mas_p = int(cfg.get("ma_slow", 60)) atr_p = int(cfg.get("atr_period", 14)) tf = float(cfg.get("trend_filter", 1.0)) >= 0.5 maf_a = _ma_series(_cl, maf_p) mas_a = _ma_series(_cl, mas_p) atr_a = _atr_series(_hi, _lo, _cl, atr_p) mf = maf_a[-1]; ms = mas_a[-1] mfp = maf_a[-2] if len(maf_a) > 1 else mf msp = mas_a[-2] if len(mas_a) > 1 else ms if atr_a[-1] is not None: atr_val = atr_a[-1] if tf and mf and ms: is_uptrend = mf > ms is_deadcross = (mf < ms and mfp is not None and msp is not None and mfp >= msp) # ── 보유 시 진입일 이후 최고가(Peak) 추적 ──────────── if pos: buy_d = str(pos.get("buy_date", ""))[:10] if buy_d: post = [c for c in stored if str(c["candle_date"])[:10] >= buy_d] highs = [float(c["high"]) for c in post] + [pi["day_high"], price] peak_price = max(highs) if highs else price logger.info( f"[{code}] {name} | " f"현재가:{price:,.0f}({pi['change_rate']:+.2f}%) | " f"ATH낙폭:{ctx['ath_drop_pct']:.1f}% " f"연도:{ctx['year_drop_pct']:.1f}% | " f"RSI:{f'{rsi:.1f}' if rsi else 'N/A'} | " f"추세:{'상승' if is_uptrend else '하락/횡보'} | " f"ATR:{atr_val:,.0f} | " f"포지션:{'있음(' + pos['status'] + ')' if pos else '없음'}" ) if pos: self._try_sell(code, name, price, rsi, cfg, is_deadcross=is_deadcross, peak_price=peak_price, atr_val=atr_val) else: self._try_buy(code, name, price, rsi, cfg, ctx, is_uptrend=is_uptrend) except Exception as e: logger.error(f"[{code}] 처리 중 오류: {e}") time.sleep(3) sleep_sec = random.uniform(540, 660) # 9~11분 랜덤 대기 logger.info(f"💤 {sleep_sec/60:.1f}분 후 다음 체크...") time.sleep(sleep_sec) except KeyboardInterrupt: logger.info("🛑 사용자 중단") self._notify("🛑 봇 중단", "사용자가 종료했습니다.") break except Exception as e: logger.error(f"메인 루프 오류: {e}") time.sleep(60) self.db.close() # ───────────────────────────────────────────────────────────────────────────── # CLI: 캔들 수집 / 백테스트 실행 # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="홀딩 전략 봇") sub = parser.add_subparsers(dest="cmd") p_fetch = sub.add_parser("fetch", help="일봉 캔들 수집") p_fetch.add_argument("--start", default="2023-01-01", help="시작일 YYYY-MM-DD") p_fetch.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d")) p_bt = sub.add_parser("backtest", help="백테스트 실행") p_bt.add_argument("--code", default="", help="종목코드 (공백=전체)") p_bt.add_argument("--start", default="2023-01-01") p_bt.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d")) sub.add_parser("live", help="실매매 봇 시작") args = parser.parse_args() if args.cmd == "live": # 실매매 봇 시작 — DB는 HoldingTrader 내부에서 관리 trader = HoldingTrader() trader.run() sys.exit(0) db = TradeDB() ensure_holding_tables(db) if args.cmd == "fetch": items = load_watchlist() app_key, app_secret, base_url, mock = _get_kis_token(db) for item in items: code, name = item["code"], item["name"] logger.info(f" [{code}] {name} 일봉 수집 ({args.start} ~ {args.end})") rows = fetch_daily_ohlcv(code, args.start, args.end, app_key, app_secret, base_url, mock=mock) saved = store_candles(db, code, rows) logger.info(f" → {saved}봉 저장 완료") time.sleep(0.5) elif args.cmd == "backtest": items = load_watchlist() if args.code: items = [i for i in items if i["code"] == args.code] for item in items: code = item["code"] candles = get_stored_candles(db, code, args.start, args.end) cfg = get_stock_config(db, code) result = run_backtest(candles, cfg) s = result.get("summary", {}) print(f"\n[{code}] {item['name']}") print(f" 거래:{s.get('total_trades')} | 승률:{s.get('win_rate')}% | " f"손익:{s.get('total_pnl'):+,}원 | PF:{s.get('profit_factor')} | " f"MDD:{s.get('max_drawdown'):,}원") else: parser.print_help() db.close()