#!/usr/bin/env python3 """ scalping_engine.py — 스캘핑 백테스트·파라미터서치·실매매 공통 엔진 ==================================================================== 계산식만 같으면 백테스트와 실매매 결과가 같아지도록, 공통 로직만 이 모듈에 두고 backtest_web / param_search / kis_scalping_ver2 가 모두 이 엔진만 호출합니다. ■ 엔진에 들어간 공통 로직 (백테스트·파라미터서치·실매매 동일) - 매수: 시간대(time_start_hm~time_end_hm), 쿨다운(cooldown_min), 일일 진입 횟수(max_daily), RSI(3) 과매도(rsi_oversold), 이전봉 음봉+현재봉 양봉, 당일 낙폭(drop_rate), 거래량 배수(vol_mult). - 매도: 손절가(sl_pct), 익절가(tp_pct), 트레일링스탑(trail_trigger/trail_stop), 장마감청산(is_eod). ■ 엔진 밖(실매매 전용) 로직 — 백테스트에는 없음 - 매수 추가 필터: 고점추격 방지(high_chase_thr), 급등주(max_daily_chg), 시장/테마 필터, ML, min_price 등. - 매도 추가: 본절사수(breakeven), 금액손실컷(MAX_LOSS_PER_TRADE_KRW). (백테스트는 % 손절/익절/트레일만 사용) 캔들 형식: list of dict with keys candle_time(YYYYMMDDHHMI), open, high, low, close, volume """ from datetime import datetime from typing import List, Dict, Any, Optional, Tuple # DB 기본값 로드 (백테스트/param_search가 동일한 값 사용하도록 단일 소스) def get_scalping_defaults_from_db() -> Dict[str, Any]: """ env_config 최신 행에서 스캘핑 관련 기본값 로드. 백테스트 API·param_search가 이 함수만 쓰면 실매매(DB)와 동일한 값으로 동작. 반환 키: cooldown_min, time_start_hm, time_end_hm, fee_rate, sell_tax, slot_money, rsi_period, vol_mult, trail_trigger, trail_stop, max_daily """ try: from database import TradeDB db = TradeDB() row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone() db.close() if row: r = dict(row) # SCALP_COOLDOWN_SEC(초) → cooldown_min(분). 실매매와 동일 키 사용 sec = r.get("SCALP_COOLDOWN_SEC") or r.get("REENTRY_COOLDOWN_SEC") or "600" cooldown_min = max(0, int(float(sec)) // 60) fee_pct = float(r.get("FEE_RATE_PCT") or 0.015) tax_pct = float(r.get("SELL_TAX_RATE_PCT") or 0.18) slot = float(r.get("SLOT_MONEY_DEFAULT") or 300_000) else: cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0 except Exception: cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0 return { "cooldown_min": cooldown_min, "time_start_hm": 900, "time_end_hm": 1530, "time_start": 900, "time_end": 1530, "fee_rate": fee_pct / 100, "sell_tax": tax_pct / 100, "slot_money": slot, "rsi_period": 3, "vol_mult": 0, "trail_trigger": 0.007, "trail_stop": 0.004, "max_daily": 3, } def compute_rsi_series(closes: list, period: int = 3) -> list: """RSI 시리즈 계산 (Wilder 스무딩). backtest_web과 동일.""" rsi_list = [None] * len(closes) if len(closes) < period + 1: return rsi_list deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] gains = [max(d, 0) for d in deltas] losses = [max(-d, 0) for d in deltas] avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period for i in range(period, len(closes)): idx = i - 1 if i > period: avg_gain = (avg_gain * (period - 1) + gains[idx]) / period avg_loss = (avg_loss * (period - 1) + losses[idx]) / period rs = avg_gain / avg_loss if avg_loss > 0 else float("inf") rsi_val = 100 - (100 / (1 + rs)) if avg_loss > 0 else 100.0 rsi_list[i] = rsi_val return rsi_list def _t2dt(t: str) -> datetime: """candle_time 문자열 → datetime.""" return datetime.strptime(t, "%Y%m%d%H%M") def run_scalping_backtest( codes_candles: Dict[str, List[Dict]], params: Dict[str, Any], ) -> List[Dict]: """ 종목별 캔들에 대해 스캘핑 백테스트 실행. 실매매와 동일한 규칙 적용. params: rsi_period, rsi_oversold, sl_pct, tp_pct, drop_rate, slot_money, fee_rate, sell_tax, cooldown_min, trail_trigger, trail_stop, time_start_hm, time_end_hm, max_daily, vol_mult sl_pct/tp_pct/drop_rate/trail_trigger/trail_stop: 비율 (0.015 = 1.5%) fee_rate, sell_tax: 비율 (0.00015, 0.0018) time_start_hm, time_end_hm: HHMM 정수 (900, 1530) """ rsi_period = int(params.get("rsi_period", 3)) rsi_oversold = float(params.get("rsi_oversold", 25)) sl_pct = float(params.get("sl_pct", 0.015)) tp_pct = float(params.get("tp_pct", 0.015)) drop_rate = float(params.get("drop_rate", 0.015)) slot_money = float(params.get("slot_money", 300_000)) fee_rate = float(params.get("fee_rate", 0.00015)) sell_tax = float(params.get("sell_tax", 0.0018)) cooldown_min = float(params.get("cooldown_min", 10)) trail_trigger = float(params.get("trail_trigger", 0)) trail_stop = float(params.get("trail_stop", 0.004)) time_start_hm = int(params.get("time_start_hm", 900)) time_end_hm = int(params.get("time_end_hm", 1530)) max_daily = int(params.get("max_daily", 3)) vol_mult = float(params.get("vol_mult", 0)) all_trades: List[Dict] = [] for code, rows in codes_candles.items(): if len(rows) < rsi_period + 5: continue candles = [dict(r) for r in rows] closes = [float(c["close"]) for c in candles] volumes = [float(c.get("volume", 0)) for c in candles] rsis = compute_rsi_series(closes, rsi_period) position: Optional[Dict] = None last_exit_dt: Dict[str, datetime] = {} daily_cnt: Dict[str, int] = {} cur_day = None running_open = 0.0 running_low = 0.0 for i in range(rsi_period + 1, len(candles)): c = candles[i] day = c["candle_time"][:8] hm = int(c["candle_time"][8:12]) cl = float(c["close"]) lo = float(c["low"]) hi = float(c["high"]) vol = volumes[i] if i < len(volumes) else 0 if day != cur_day: cur_day = day running_open = float(c["open"]) running_low = lo else: running_low = min(running_low, lo) is_eod = (i == len(candles) - 1) or (candles[i + 1]["candle_time"][:8] != day) # ── 포지션 보유 중: 청산 체크 ── if position is not None: max_price = max(position["max_price"], hi) position["max_price"] = max_price reason = None exit_price = cl if lo <= position["stop"]: reason = "손절" exit_price = position["stop"] elif hi >= position["target"]: reason = "익절" exit_price = position["target"] elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger): ts = max_price * (1 - trail_stop) if cl <= ts: reason = "트레일링스탑" exit_price = cl if reason is None and is_eod: reason = "장마감청산" exit_price = cl if reason: qty = position["qty"] buy_amt = position["entry_price"] * qty sell_amt = exit_price * qty pnl = ( sell_amt - buy_amt - buy_amt * fee_rate - sell_amt * fee_rate - sell_amt * sell_tax ) hold_min = int( (_t2dt(c["candle_time"]) - _t2dt(position["entry_time"])).total_seconds() / 60 ) all_trades.append({ "code": code, "buy_time": position["entry_time"], "sell_time": c["candle_time"], "buy_price": position["entry_price"], "sell_price": round(exit_price, 2), "qty": qty, "pnl": round(pnl), "profit_rate": round( (exit_price - position["entry_price"]) / position["entry_price"] * 100, 2 ), "hold_min": hold_min, "sell_reason": reason, "rsi_entry": round(position["rsi"], 1), }) last_exit_dt[day] = _t2dt(c["candle_time"]) position = None continue # ── 포지션 없음: 매수 신호 ── if hm < time_start_hm or hm >= time_end_hm: continue if day in last_exit_dt: elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60 if elapsed < cooldown_min: continue if daily_cnt.get(day, 0) >= max_daily: continue rsi = rsis[i] if rsi is None or rsi > rsi_oversold: continue prev_c = candles[i - 1] prev_bear = float(prev_c["close"]) < float(prev_c["open"]) curr_bull = cl > float(c["open"]) if not (prev_bear and curr_bull): continue if running_open <= 0: continue dr = (running_open - running_low) / running_open if dr < drop_rate: continue if vol_mult > 0: win = max(1, min(20, i)) vol_avg = sum(volumes[i - win : i]) / win if vol_avg > 0 and vol < vol_avg * vol_mult: continue if i + 1 >= len(candles): continue next_c = candles[i + 1] if next_c["candle_time"][:8] != day: continue entry_price = float(next_c["open"]) if entry_price <= 0: continue qty = max(1, int(slot_money / entry_price)) stop = entry_price * (1 - sl_pct) target = entry_price * (1 + tp_pct) position = { "entry_price": entry_price, "entry_time": next_c["candle_time"], "qty": qty, "stop": stop, "target": target, "max_price": entry_price, "rsi": rsi, } daily_cnt[day] = daily_cnt.get(day, 0) + 1 all_trades.sort(key=lambda x: x["sell_time"]) return all_trades # ── 실시간 봇용: 단일 시점 매수/매도 판단 (백테스트와 동일 규칙) ───────────────── def check_buy_signal_live( candles: List[Dict], params: Dict[str, Any], state: Dict[str, Any], ) -> Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]]: """ 실시간 봇: 현재 캔들 리스트의 마지막 봉이 백테스트와 동일한 매수 신호인지 판단. candles: 확정 봉 리스트 (candle_time, open, high, low, close, volume) state: { "last_exit_dt": datetime or None (당일 마지막 청산 시각), "daily_cnt": int (당일 이미 진입한 횟수) } 반환: (reject_reason, reject_msg, sig) — 통과 시 (None, None, {"signal": True, "rsi": float}), 탈락 시 ("탈락-XXX", "메시지", None). 실매 ver2에서 ver1과 동일한 로그 출력용. """ if len(candles) < 4: return ("탈락-봉부족", "확정봉 4개 미만", None) rsi_period = int(params.get("rsi_period", 3)) rsi_oversold = float(params.get("rsi_oversold", 25)) rsi_overbought = float(params.get("rsi_overbought", 75.0)) sl_pct = float(params.get("sl_pct", 0.015)) tp_pct = float(params.get("tp_pct", 0.015)) drop_rate = float(params.get("drop_rate", 0.015)) time_start_hm = int(params.get("time_start_hm", 900)) time_end_hm = int(params.get("time_end_hm", 1530)) cooldown_min = float(params.get("cooldown_min", 10)) max_daily = int(params.get("max_daily", 3)) vol_mult = float(params.get("vol_mult", 0)) i = len(candles) - 1 c = candles[i] day = c["candle_time"][:8] hm = int(c["candle_time"][8:12]) cl = float(c["close"]) lo = float(c["low"]) prev_c = candles[i - 1] if hm < time_start_hm or hm >= time_end_hm: return (None, None, None) last_exit_dt = state.get("last_exit_dt") if last_exit_dt is not None: elapsed = (_t2dt(c["candle_time"]) - last_exit_dt).total_seconds() / 60 if elapsed < cooldown_min: return (None, None, None) if state.get("daily_cnt", 0) >= max_daily: return (None, None, None) closes = [float(x["close"]) for x in candles] rsis = compute_rsi_series(closes, rsi_period) rsi = rsis[i] if i < len(rsis) else None if rsi is None: return ("탈락-RSI없음", "RSI 미계산 (봉 축적 중)", None) if rsi <= 0.0: return ("탈락-RSI무효", "RSI3=0.0 (봉 부족, 계산 불가)", None) if rsi > rsi_overbought: return ("탈락-RSI과열", "RSI3=%.1f > %.0f" % (rsi, rsi_overbought), None) if rsi > rsi_oversold: return ("탈락-RSI조건", "RSI3=%.1f (과매도<%.0f 아님)" % (rsi, rsi_oversold), None) prev_bear = float(prev_c["close"]) < float(prev_c["open"]) curr_bull = cl > float(c["open"]) if not (prev_bear and curr_bull): return ("탈락-되돌림없음", "prev_bear=%s curr_bull=%s" % (prev_bear, curr_bull), None) running_open = float(c["open"]) running_low = lo for j in range(i - 1, -1, -1): if candles[j]["candle_time"][:8] != day: break running_open = float(candles[j]["open"]) running_low = min(running_low, float(candles[j]["low"])) if running_open <= 0: return (None, None, None) dr = (running_open - running_low) / running_open if dr < drop_rate: return ("탈락-낙폭", "%.2f%% < %.1f%%(SCALP_MIN_DROP_RATE)" % (dr * 100, drop_rate * 100), None) if vol_mult > 0: volumes = [float(x.get("volume", 0)) for x in candles] vol = volumes[i] if i < len(volumes) else 0 win = max(1, min(20, i)) vol_avg = sum(volumes[i - win : i]) / win if vol_avg > 0 and vol < vol_avg * vol_mult: return ("탈락-거래량", "%.0f < 평균%.0f × %.1f" % (vol, vol_avg, vol_mult), None) return (None, None, {"signal": True, "rsi": rsi}) def check_sell_signal_live( position: Dict[str, Any], current_candle: Dict[str, Any], params: Dict[str, Any], is_eod: bool = False, ) -> Optional[tuple]: """ 실시간 봇: 보유 포지션에 대해 백테스트와 동일한 청산 조건 판단. position: { "entry_price", "entry_time", "qty", "stop", "target", "max_price", "rsi" } current_candle: { "high", "low", "close" } (현재가 기준이면 high=low=close 또는 고점 갱신값) is_eod: 장 마감 구간이면 True 시 장마감청산 반환 반환: (reason_str, exit_price) 또는 None """ sl_pct = float(params.get("sl_pct", 0.015)) tp_pct = float(params.get("tp_pct", 0.015)) trail_trigger = float(params.get("trail_trigger", 0)) trail_stop = float(params.get("trail_stop", 0.004)) hi = float(current_candle.get("high", current_candle["close"])) lo = float(current_candle.get("low", current_candle["close"])) cl = float(current_candle["close"]) max_price = max(position["max_price"], hi) reason = None exit_price = cl if lo <= position["stop"]: reason = "손절" exit_price = position["stop"] elif hi >= position["target"]: reason = "익절" exit_price = position["target"] elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger): ts = max_price * (1 - trail_stop) if cl <= ts: reason = "트레일링스탑" exit_price = cl if reason is None and is_eod: reason = "장마감청산" exit_price = cl if reason: return (reason, exit_price) return None