#!/usr/bin/env python3 """ tail_engine.py — 꼬리잡기 백테스트·실매매 공통 엔진 ==================================================== 백테스트(backtest_web)와 실매매(kis_short_ver3)가 동일한 진입/청산 계산식을 쓰도록 공통 로직만 둠. ■ 엔진 공통 로직 (백테·실매 동일) 매수: 당일 낙폭(drop) ≥ min_drop_rate, 당일 회복률(rec_day) ≥ min_recovery_ratio, 망치봉 꼬리(tail_ratio/tail_pct), 3분봉 회복(rec_3m) 구간, RSI < rsi_threshold, 고점추격 방지(high_chase_thr), 시간대/쿨다운/max_daily. 매도: 손절가(sl_pct), 익절가(tp_pct), 어깨컷(shoulder_min_high + shoulder_cut_pct), 장마감. ■ 엔진 밖(실매매 전용): 금액손실컷, ATR 기반 손절/목표가, 퀵프로핏, MA20/ML 필터 등. """ from datetime import datetime from typing import List, Dict, Any, Optional def get_tail_defaults_from_db(db=None) -> Dict[str, Any]: """ env_config 최신 행에서 꼬리잡기 관련 값 전부 로드. 백테스트·파라미터서치·실매매가 동일 DB 값을 쓰도록 단일 소스. db: 기존 TradeDB 인스턴스. 주면 연결 생성/종료 없이 재사용 (봇 메인 루프에서 반복 호출 시 로그 스팸·연결 낭비 방지). 결과에 영향을 주는 env 키: MIN_DROP_RATE, MIN_RECOVERY_RATIO_SHORT, TAIL_RATIO_MIN, TAIL_PCT_MIN, STOP_LOSS_PCT, TAKE_PROFIT_PCT, SHOULDER_MIN_HIGH_PCT, SHOULDER_CUT_PCT, RSI_PERIOD, RSI_OVERHEAT_THRESHOLD, REENTRY_COOLDOWN_SEC, HIGH_PRICE_CHASE_THRESHOLD, MAX_RECOVERY_RATIO_3M, (시간/일일한도). """ own_db = None try: if db is None: from database import TradeDB own_db = TradeDB() db = own_db row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone() if row: r = dict(row) min_drop = float(r.get("MIN_DROP_RATE") or 0.03) min_rec = float(r.get("MIN_RECOVERY_RATIO_SHORT") or 0.5) tail_ratio = float(r.get("TAIL_RATIO_MIN") or 1.5) tail_pct = float(r.get("TAIL_PCT_MIN") or 0.003) sl_pct = abs(float(r.get("STOP_LOSS_PCT") or -0.03)) tp_pct = float(r.get("TAKE_PROFIT_PCT") or 0.05) shoulder_high = float(r.get("SHOULDER_MIN_HIGH_PCT") or 0.015) shoulder_cut = float(r.get("SHOULDER_CUT_PCT") or 0.03) cooldown_sec = int(float(r.get("REENTRY_COOLDOWN_SEC") or 900)) rsi_period = int(r.get("RSI_PERIOD") or 14) rsi_threshold = float(r.get("RSI_OVERHEAT_THRESHOLD") or 78) max_rec_3m = float(r.get("MAX_RECOVERY_RATIO_3M") or 0.8) high_chase = float(r.get("HIGH_PRICE_CHASE_THRESHOLD") or 0.96) time_start = int(r.get("TAIL_TIME_START") or r.get("TIME_START") or 930) time_end = int(r.get("TAIL_TIME_END") or r.get("TIME_END") or 1500) max_daily = int(r.get("MAX_DAILY_TAIL") or r.get("MAX_STOCKS") or 3) else: min_drop, min_rec = 0.03, 0.5 tail_ratio, tail_pct = 1.5, 0.003 sl_pct, tp_pct = 0.03, 0.05 shoulder_high, shoulder_cut = 0.015, 0.03 cooldown_sec, rsi_period, rsi_threshold = 900, 14, 78.0 max_rec_3m, high_chase = 0.8, 0.96 time_start, time_end, max_daily = 930, 1500, 3 except Exception: min_drop, min_rec = 0.03, 0.5 tail_ratio, tail_pct = 1.5, 0.003 sl_pct, tp_pct = 0.03, 0.05 shoulder_high, shoulder_cut = 0.015, 0.03 cooldown_sec, rsi_period, rsi_threshold = 900, 14, 78.0 max_rec_3m, high_chase = 0.8, 0.96 time_start, time_end, max_daily = 930, 1500, 3 finally: if own_db is not None: try: own_db.close() except Exception: pass return { "min_drop_rate": min_drop, "min_recovery_ratio": min_rec, "max_rec_3m": max_rec_3m, "tail_ratio_min": tail_ratio, "tail_pct_min": tail_pct, "sl_pct": sl_pct, "tp_pct": tp_pct, "shoulder_min_high": shoulder_high, "shoulder_cut_pct": shoulder_cut, "rsi_period": rsi_period, "rsi_threshold": rsi_threshold, "high_chase_thr": high_chase, "cooldown_min": cooldown_sec // 60, "time_start_hm": time_start, "time_end_hm": time_end, "max_daily": max_daily, } def compute_rsi_series(closes: List[float], period: int = 14) -> List[Optional[float]]: """RSI 시리즈 (Wilder 스무딩). backtest_web과 동일.""" rsi_list: List[Optional[float]] = [None] * len(closes) if len(closes) < period + 1: return rsi_list deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] gains = [max(d, 0) for d in deltas] losses = [max(-d, 0) for d in deltas] avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period for i in range(period, len(closes)): idx = i - 1 if i > period: avg_gain = (avg_gain * (period - 1) + gains[idx]) / period avg_loss = (avg_loss * (period - 1) + losses[idx]) / period rs = avg_gain / avg_loss if avg_loss > 0 else float("inf") rsi_val = 100 - (100 / (1 + rs)) if avg_loss > 0 else 100.0 rsi_list[i] = rsi_val return rsi_list def _t2dt(t: str) -> datetime: """candle_time 문자열 → datetime.""" return datetime.strptime(t, "%Y%m%d%H%M") def check_buy_signal_live( candles: List[Dict], params: Dict[str, Any], state: Dict[str, Any], ) -> tuple: """ 실시간: 마지막 봉이 백테스트(run_tail_backtest)와 동일한 꼬리잡기 매수 조건을 만족하는지 판단. 로직 단일 소스: run_tail_backtest와 완전 동일한 조건·계산식. candles: 3분봉 리스트 (candle_time, open, high, low, close, volume) state: { "last_exit_dt": datetime|None, "daily_cnt": int } 반환: (reject_reason, reject_msg, signal_dict) - 통과 시: (None, None, {"signal": True, "tail_ratio", "recovery_pos", "rsi_val"}) - 탈락 시: ("탈락-XXX", "상세메시지(숫자포함)", None) → 호출측에서 "🔍 [탈락-XXX] name code: 상세메시지" 로그 """ if len(candles) < 10: return ("탈락-데이터", "봉 수 부족 (len<10)", None) i = len(candles) - 1 c = candles[i] day = c["candle_time"][:8] hm = int(c["candle_time"][8:12]) op = float(c["open"]) hi = float(c["high"]) lo = float(c["low"]) cl = float(c["close"]) min_drop_rate = float(params.get("min_drop_rate", 0.03)) min_recovery_ratio = float(params.get("min_recovery_ratio", 0.5)) max_rec_3m = float(params.get("max_rec_3m", 0.8)) tail_ratio_min = float(params.get("tail_ratio_min", 1.5)) tail_pct_min = float(params.get("tail_pct_min", 0.003)) rsi_period = int(params.get("rsi_period", 14)) rsi_threshold = float(params.get("rsi_threshold", 78)) high_chase_thr = float(params.get("high_chase_thr", 0.96)) time_start_hm = int(params.get("time_start_hm", 930)) time_end_hm = int(params.get("time_end_hm", 1500)) cooldown_min = float(params.get("cooldown_min", 15)) max_daily = int(params.get("max_daily", 3)) if hm < time_start_hm or hm > time_end_hm: return (None, None, None) # 시간대 탈락 (로그 생략 가능) if state.get("daily_cnt", 0) >= max_daily: return (None, None, None) last_exit_dt = state.get("last_exit_dt") if last_exit_dt is not None: elapsed = (_t2dt(c["candle_time"]) - last_exit_dt).total_seconds() / 60 if elapsed < cooldown_min: return (None, None, None) # 당일 누적 OHLC (run_tail_backtest와 동일) running_open = op running_high = hi running_low = lo if lo > 0 else hi for j in range(i - 1, -1, -1): if candles[j]["candle_time"][:8] != day: break running_open = float(candles[j]["open"]) running_high = max(running_high, float(candles[j]["high"])) lj = float(candles[j]["low"]) if lj > 0: running_low = min(running_low, lj) if cl <= 0 or running_open <= 0: return (None, None, None) drop = (running_open - running_low) / running_open if drop < min_drop_rate: return ( "탈락-낙폭", f"낙폭 {drop*100:.2f}% < {min_drop_rate*100:.1f}% (당일 시가 {running_open:,.0f}원 → 저점 {running_low:,.0f}원)", None, ) day_range = running_high - running_low rec_day = (cl - running_low) / day_range if day_range > 0 else 0 if rec_day < min_recovery_ratio: return ( "탈락-회복률", f"회복률 {rec_day*100:.1f}% < {min_recovery_ratio*100:.0f}% (저점 {running_low:,.0f}원 → 현재 {cl:,.0f}원 / 범위 {day_range:,.0f}원)", None, ) # 망치봉 꼬리 (run_tail_backtest와 동일: 최대 3봉 전까지 탐색) body_top = max(op, cl) body_bot = min(op, cl) body_len = body_top - body_bot if body_top > body_bot else 1.0 tail_len = body_bot - lo if lo > 0 else 0.0 lo_use = lo if tail_len <= 0: for j in range(i - 1, max(i - 4, rsi_period), -1): prev = candles[j] o2 = float(prev["open"]) l2 = float(prev["low"]) c2 = float(prev["close"]) if l2 <= 0: continue bt2, bb2 = max(o2, c2), min(o2, c2) bl2 = bt2 - bb2 if bt2 > bb2 else 1.0 tl2 = bb2 - l2 if tl2 > 0: tail_len, body_len, lo_use = tl2, bl2, l2 break tail_ratio = tail_len / body_len if body_len > 0 else 0 tail_pct = tail_len / lo_use if lo_use > 0 and tail_len > 0 else 0.0 if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min: return ( "탈락-꼬리", f"꼬리비율 {tail_ratio:.2f} (기준 {tail_ratio_min}) 또는 꼬리% {tail_pct*100:.2f}% (기준 {tail_pct_min*100:.2f}%)", None, ) c_range = hi - lo if hi > lo else 0 rec_3m = (cl - lo) / c_range if c_range > 0 else 0 if not (min_recovery_ratio <= rec_3m <= max_rec_3m): return ( "탈락-회복3분", f"3분봉 회복률 {rec_3m*100:.1f}% (기준 {min_recovery_ratio*100:.0f}~{max_rec_3m*100:.0f}%)", None, ) closes = [float(x["close"]) for x in candles] rsis = compute_rsi_series(closes, rsi_period) rsi_val = rsis[i] if i < len(rsis) else None if rsi_val is None or rsi_val >= rsi_threshold: return ( "탈락-RSI", (f"RSI {rsi_val:.1f}" if rsi_val is not None else "RSI None") + f" ≥ {rsi_threshold:.0f}", None, ) if cl >= running_high * high_chase_thr: return ( "탈락-피뢰침 고점추격", f"현재가 {cl:,.0f} ≥ 고점대비 {high_chase_thr*100:.0f}%", None, ) return ( None, None, {"signal": True, "tail_ratio": tail_ratio, "tail_pct": tail_pct, "recovery_pos": rec_3m, "rsi_val": rsi_val}, ) def check_sell_signal_live( position: Dict[str, Any], current_candle: Dict[str, Any], params: Dict[str, Any], is_eod: bool = False, ) -> Optional[tuple]: """ 실시간: 백테스트와 동일한 청산 조건 (손절/익절/어깨컷/장마감). position: entry_price, entry_time, stop, target, max_price current_candle: high, low, close 반환: (reason_str, exit_price) 또는 None """ sl_pct = float(params.get("sl_pct", 0.03)) tp_pct = float(params.get("tp_pct", 0.05)) shoulder_min_high = float(params.get("shoulder_min_high", 0.015)) shoulder_cut_pct = float(params.get("shoulder_cut_pct", 0.03)) hi = float(current_candle.get("high", current_candle["close"])) lo = float(current_candle.get("low", current_candle["close"])) cl = float(current_candle["close"]) max_p = max(position["max_price"], hi) ep = position["entry_price"] stop = position.get("stop", ep * (1 - sl_pct)) target = position.get("target", ep * (1 + tp_pct)) reason = None exit_price = cl if lo > 0 and lo <= stop: reason = "손절" exit_price = stop elif hi >= target: reason = "익절" exit_price = target elif max_p >= ep * (1 + shoulder_min_high) and cl <= max_p * (1 - shoulder_cut_pct): reason = "어깨컷" exit_price = cl if reason is None and is_eod: reason = "장마감" exit_price = cl if reason: return (reason, exit_price) return None def run_tail_backtest( candles_by_code: Dict[str, List[Dict]], params: Dict[str, Any], ) -> List[Dict]: """ 백테스트 1회 실행 (backtest_web과 동일 로직). API에서 호출해 단일 소스 유지. candles_by_code: code -> list of {candle_time, open, high, low, close, volume} params: get_tail_defaults_from_db() + sl_pct, tp_pct 등 반환: all_trades 리스트 """ min_drop_rate = float(params.get("min_drop_rate", 0.03)) min_recovery_ratio = float(params.get("min_recovery_ratio", 0.5)) max_rec_3m = float(params.get("max_rec_3m", 0.8)) tail_ratio_min = float(params.get("tail_ratio_min", 1.5)) tail_pct_min = float(params.get("tail_pct_min", 0.003)) sl_pct = float(params.get("sl_pct", 0.03)) tp_pct = float(params.get("tp_pct", 0.05)) shoulder_min_high = float(params.get("shoulder_min_high", 0.015)) shoulder_cut_pct = float(params.get("shoulder_cut_pct", 0.03)) rsi_period = int(params.get("rsi_period", 14)) rsi_threshold = float(params.get("rsi_threshold", 78)) high_chase_thr = float(params.get("high_chase_thr", 0.96)) time_start_hm = int(params.get("time_start_hm", 930)) time_end_hm = int(params.get("time_end_hm", 1500)) cooldown_min = float(params.get("cooldown_min", 15)) max_daily = int(params.get("max_daily", 3)) all_trades: List[Dict] = [] for code, candles in candles_by_code.items(): if len(candles) < rsi_period + 5: continue closes = [float(c["close"]) for c in candles] rsis = compute_rsi_series(closes, rsi_period) position = None last_exit_dt: Dict[str, datetime] = {} daily_cnt: Dict[str, int] = {} cur_day = None running_open, running_high, running_low = 0.0, 0.0, 0.0 i = rsi_period + 1 while i < len(candles): c = candles[i] day = c["candle_time"][:8] hm = int(c["candle_time"][8:12]) op = float(c["open"]) hi = float(c["high"]) lo = float(c["low"]) cl = float(c["close"]) if day != cur_day: cur_day = day running_open = op running_high = hi running_low = lo if lo > 0 else hi else: running_high = max(running_high, hi) if lo > 0: running_low = min(running_low, lo) is_eod = (i == len(candles) - 1) or (candles[i + 1]["candle_time"][:8] != day) if position is not None: max_p = max(position["max_price"], hi) position["max_price"] = max_p reason = None exit_price = cl if lo > 0 and lo <= position["stop"]: reason = "손절" exit_price = position["stop"] elif hi >= position["target"]: reason = "익절" exit_price = position["target"] elif max_p >= position["entry_price"] * (1 + shoulder_min_high) and cl <= max_p * (1 - shoulder_cut_pct): reason = "어깨컷" exit_price = cl elif is_eod: reason = "장마감" exit_price = cl if reason: all_trades.append({ "code": code, "entry_time": position["entry_time"], "exit_time": c["candle_time"], "entry": round(position["entry_price"]), "exit": round(exit_price), "pnl": 0, "reason": reason, "hold_min": 0, }) last_exit_dt[day] = _t2dt(c["candle_time"]) daily_cnt[day] = daily_cnt.get(day, 0) + 1 position = None i += 1 continue if cl <= 0 or running_open <= 0: i += 1 continue if hm < time_start_hm or hm > time_end_hm: i += 1 continue if daily_cnt.get(day, 0) >= max_daily: i += 1 continue if day in last_exit_dt: elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60 if elapsed < cooldown_min: i += 1 continue drop = (running_open - running_low) / running_open if drop < min_drop_rate: i += 1 continue day_range = running_high - running_low rec_day = (cl - running_low) / day_range if day_range > 0 else 0 if rec_day < min_recovery_ratio: i += 1 continue body_top = max(op, cl) body_bot = min(op, cl) body_len = body_top - body_bot if body_top > body_bot else 1.0 tail_len = body_bot - lo if lo > 0 else 0.0 lo_use = lo if tail_len <= 0: for j in range(i - 1, max(i - 4, rsi_period), -1): prev = candles[j] o2, l2, c2 = float(prev["open"]), float(prev["low"]), float(prev["close"]) if l2 <= 0: continue bt2, bb2 = max(o2, c2), min(o2, c2) bl2 = bt2 - bb2 if bt2 > bb2 else 1.0 tl2 = bb2 - l2 if tl2 > 0: tail_len, body_len, lo_use = tl2, bl2, l2 break tail_ratio = tail_len / body_len if body_len > 0 else 0 tail_pct = tail_len / lo_use if lo_use > 0 and tail_len > 0 else 0.0 if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min: i += 1 continue c_range = hi - lo if hi > lo else 0 rec_3m = (cl - lo) / c_range if c_range > 0 else 0 if not (min_recovery_ratio <= rec_3m <= max_rec_3m): i += 1 continue rsi_val = rsis[i] if i < len(rsis) else None if rsi_val is None or rsi_val >= rsi_threshold: i += 1 continue if cl >= running_high * high_chase_thr: i += 1 continue if i + 1 >= len(candles): i += 1 continue next_c = candles[i + 1] if next_c["candle_time"][:8] != day: i += 1 continue entry_price = float(next_c["open"]) if entry_price <= 0: entry_price = cl position = { "entry_price": entry_price, "entry_time": next_c["candle_time"], "stop": entry_price * (1 - sl_pct), "target": entry_price * (1 + tp_pct), "max_price": entry_price, } i += 1 # 진입 봉(next) 건너뜀 — 다음 봉부터 포지션 보유로 청산 체크 continue return all_trades