#!/usr/bin/env python3 """ tail_param_search.py — 꼬리잡기 백테스트 파라미터 자동 탐색 (Grid Search) ========================================================================== tail_engine을 직접 임포트해 run_tail_backtest 호출. 기본값은 DB(env_config) 단일 소스. 실매매·백테스트·파라서치가 동일한 env 값을 사용해 결과 예측 가능. 실행: cd /home/hoon/kis_bot python3 backtest_scalping/tail_param_search.py 옵션: --start 시작일 (기본: 오늘-7일) --end 종료일 (기본: 오늘) --mode 탐색 모드: coarse(기본) / fine / full --top 상위 N개 출력 (기본: 20) --min_trades 최소 거래 건수 필터 (기본: 3) --min_win_rate 승률 하한 (기본: 52). 이 이상인 것만 손익순 정렬 후 1위. 없으면 차악(손익 1위) 적용. --apply [N] 1위(또는 N위) 결과를 DB에 적용. N 생략 시 1. --from-file --apply N 과 함께 사용 시, 최근 결과 JSON에서 N번째 적용 (탐색 생략). 예시: python3 backtest_scalping/tail_param_search.py --start 2026-03-01 --end 2026-03-06 python3 backtest_scalping/tail_param_search.py --mode fine --apply python3 backtest_scalping/tail_param_search.py --apply 2 --from-file 참고: - 백테스트 엔진(tail_engine)에는 실매 전용 로직이 없음: 금액손실컷(MAX_LOSS_PER_TRADE_KRW), MIN_HOLD_AFTER_BUY_SEC, ATR 기반 손절/목표가(STOP_ATR_MULTIPLIER_TAIL 등) → 웹 백테·파라서치에도 없음. - full 모드 조합 수가 너무 크면 OOM으로 Killed 되므로, 그리드 값 개수를 제한함. """ MIN_WIN_RATE_DEFAULT = 52.0 import sys, os, json, time, argparse from datetime import datetime, timedelta from itertools import product from typing import Optional ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, ROOT) import logging logging.getLogger("TradeDB").setLevel(logging.WARNING) # 반복 초기화 로그 억제 from database import TradeDB import tail_engine as te # ────────────────────────────────────────────────────────────────────────────── # 파라미터 그리드 (그리드 값은 % 단위 → 엔진 호출 시 비율로 변환) # 실매(kis_short_ver3)와 동일한 env를 경우의수에 포함해 진짜 백테스트에 가깝게 함. # time_start_hm / time_end_hm: DB 고정. 탐색 시 아래 주석 해제. # ────────────────────────────────────────────────────────────────────────────── GRIDS = { "coarse": { "min_drop_rate": [1.5, 2.0, 2.5, 3.0, 4.0], "min_recovery_ratio": [30, 40, 50, 60], "sl_pct": [1.5, 2.0, 3.0], "tp_pct": [3.0, 4.0, 5.0, 6.0], "tail_ratio_min": [1.0, 1.5, 2.0], "max_loss_per_trade_krw": [150000, 200000], "risk_pct_per_trade": [1.0, 2.0], # "time_start_hm": [900, 930], # "time_end_hm": [1430, 1500], }, "fine": { "min_drop_rate": [1.5, 2.0, 2.5, 3.0, 4.0], "min_recovery_ratio": [30, 40, 50, 60], "sl_pct": [1.0, 1.5, 2.0, 2.5, 3.0], "tp_pct": [3.0, 4.0, 5.0, 6.0, 7.0], "tail_ratio_min": [0.8, 1.0, 1.5, 2.0], "shoulder_cut_pct": [2.0, 3.0, 4.0], "shoulder_min_high": [1.0, 1.2, 1.5], "tail_pct_min": [0.1, 0.15, 0.2], "max_rec_3m": [70, 80, 90], "high_chase_thr": [94, 96, 98], "max_loss_per_trade_krw": [150000, 200000], "risk_pct_per_trade": [1.0, 2.0], # "time_start_hm": [900, 930], # "time_end_hm": [1430, 1500], }, # full: 조합 수 ~52만 (26만×2×2). MAX_LOSS 하드캡, RISK_PCT 엑셀레이터 테스트. rsi_period는 꼬리잡기용 DB 고정(그리드 제외). 1억 개에서 터진 거라 20~30만이면 10GB·10~20분 내 가능. "full": { "min_drop_rate": [1.5, 2.0, 2.5, 3.0, 4.0], "min_recovery_ratio": [30, 40, 50, 60, 70], "sl_pct": [1.0, 1.5, 2.0, 3.0], "tp_pct": [4.0, 5.0, 6.0, 8.0], "tail_ratio_min": [1.0, 1.5, 2.0], "shoulder_cut_pct": [2.0, 3.0, 4.0], "shoulder_min_high": [1.0, 1.2, 1.5], "tail_pct_min": [0.1, 0.15, 0.2], "max_rec_3m": [70, 80], "high_chase_thr": [94, 96], "rsi_threshold": [72, 78], "max_loss_per_trade_krw": [150000, 200000], "risk_pct_per_trade": [1.0, 2.0], }, } # % 단위 그리드 키 → 엔진은 비율(0~1) 사용 _PCT_KEYS = frozenset({ "min_drop_rate", "min_recovery_ratio", "max_rec_3m", "tail_pct_min", "sl_pct", "tp_pct", "shoulder_min_high", "shoulder_cut_pct", "high_chase_thr", "risk_pct_per_trade", }) # 포지션 사이징: 실매와 동일한 effective_slot 계산용 (백테 가정 자본·켈리) BACKTEST_CAPITAL = 100_000_000 # 1억 원 가정 KELLY_MULTIPLIER_TAIL = 0.25 def _get_fixed_defaults(): """고정값 = DB(env_config) 단일 소스. 엔진과 동일.""" d = te.get_tail_defaults_from_db() d["slot_money"] = 1_000_000 # 수수료/세금은 백테스트와 동일하게 (backtest_web _get_fee_defaults 기준) d["fee_rate"] = 0.00015 # 0.015% d["sell_tax"] = 0.0018 # 0.18% return d def _load_candles(start: str, end: str, rsi_period: int): """ws_candles 3분봉 확정봉만 로드 (backtest_web과 동일).""" start_key = (start.replace("-", "") + "0000") if start else "20260101" end_key = (end.replace("-", "") + "2359") if end else "99991231" db = TradeDB() try: codes_raw = db.conn.execute( "SELECT DISTINCT code FROM ws_candles WHERE timeframe=3 " "AND candle_time >= %s AND candle_time <= %s ORDER BY code", [start_key, end_key] ).fetchall() codes = [r["code"] for r in codes_raw] candles_by_code = {} for code in codes: rows = db.conn.execute( "SELECT candle_time, open, high, low, close, volume " "FROM ws_candles WHERE timeframe=3 AND code=%s " "AND candle_time >= %s AND candle_time <= %s AND is_confirmed=1 " "ORDER BY candle_time ASC", [code, start_key, end_key] ).fetchall() if len(rows) < rsi_period + 5: continue candles_by_code[code] = [dict(r) for r in rows] return candles_by_code finally: db.close() def _params_for_engine(fixed: dict, grid_keys: list, grid_vals: list) -> dict: """고정값 + 그리드 조합 → 엔진용 params (비율 단위).""" params = dict(fixed) for k, v in zip(grid_keys, grid_vals): if k in _PCT_KEYS: params[k] = float(v) / 100.0 else: params[k] = v # 엔진에 불필요한 키 제거 (slot_money, fee_rate, sell_tax는 백테 후처리용) for skip in ("slot_money", "fee_rate", "sell_tax"): params.pop(skip, None) return params def _summary_from_trades(all_trades: list, params: dict, fixed: dict, fee_rate: float, sell_tax: float) -> dict: """all_trades에 pnl/hold_min 보강 후 요약 통계 반환 (backtest_web과 동일). params에 max_loss_per_trade_krw, risk_pct_per_trade, sl_pct가 있으면 실매와 동일한 effective_slot로 손익 계산.""" def _t2dt(s): from datetime import datetime if isinstance(s, str) and len(s) >= 12: return datetime(int(s[:4]), int(s[4:6]), int(s[6:8]), int(s[8:10]), int(s[10:12])) return None # 포지션 크기: 실매와 동일 공식 적용 시 effective_slot, 아니면 고정 slot_money if ("max_loss_per_trade_krw" in params and "risk_pct_per_trade" in params and "sl_pct" in params and params.get("sl_pct", 0) > 0): sl_pct = float(params["sl_pct"]) max_loss_krw = int(params["max_loss_per_trade_krw"]) risk_pct = float(params["risk_pct_per_trade"]) from_risk = BACKTEST_CAPITAL * risk_pct * KELLY_MULTIPLIER_TAIL / sl_pct from_cap = max_loss_krw / sl_pct effective_slot = min(from_risk, from_cap) else: effective_slot = fixed.get("slot_money", 1_000_000) for t in all_trades: qty = max(1, int(effective_slot / t["entry"])) fee = (t["entry"] + t["exit"]) * qty * fee_rate tax = t["exit"] * qty * sell_tax t["pnl"] = round((t["exit"] - t["entry"]) * qty - fee - tax) t["hold_min"] = round((_t2dt(t["exit_time"]) - _t2dt(t["entry_time"])).total_seconds() / 60, 1) if _t2dt(t["exit_time"]) and _t2dt(t["entry_time"]) else 0 total = len(all_trades) if total == 0: return {"total_pnl": 0, "win_rate": 0, "total_trades": 0, "profit_factor": 0, "avg_hold_min": 0, "max_drawdown": 0} total_pnl = sum(t["pnl"] for t in all_trades) wins = [t for t in all_trades if t["pnl"] > 0] win_pnl = sum(t["pnl"] for t in wins) loss_pnl = sum(t["pnl"] for t in all_trades if t["pnl"] < 0) pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0 cum = 0 mdd = 0 for t in all_trades: cum += t["pnl"] if cum > mdd: mdd = cum mdd = min(mdd, cum) mdd = abs(min(0, mdd)) avg_hold = sum(t["hold_min"] for t in all_trades) / total return { "total_pnl": round(total_pnl), "win_rate": round(len(wins) / total * 100, 1), "total_trades": total, "profit_factor": pf, "avg_hold_min": round(avg_hold, 1), "max_drawdown": round(mdd), } def _apply_from_latest_json(rank: int): """최근 tail_search_*.json에서 rank번째(1-based) 항목의 merged_params를 DB에 적용.""" out_dir = os.path.join(ROOT, "backtest_scalping", "results") if not os.path.isdir(out_dir): print("⚠️ results 디렉터리가 없습니다.") return jsons = [f for f in os.listdir(out_dir) if f.startswith("tail_search_") and f.endswith(".json")] if not jsons: print("⚠️ tail_search_*.json 파일이 없습니다.") return jsons.sort(key=lambda f: os.path.getmtime(os.path.join(out_dir, f)), reverse=True) latest_path = os.path.join(out_dir, jsons[0]) with open(latest_path, "r", encoding="utf-8") as f: data = json.load(f) top = data.get("top") or [] if rank < 1 or rank > len(top): print(f"⚠️ 순번 {rank}이(가) 유효하지 않습니다. (1~{len(top)})") return item = top[rank - 1] merged = item.get("merged_params") if not merged: print("⚠️ 해당 항목에 merged_params가 없습니다. (구 버전 JSON)") return print(f"📂 {latest_path} 에서 {rank}번째 적용합니다.") _apply_to_db(merged) def run_search(start: str, end: str, mode: str, top_n: int, min_trades: int, min_win_rate: float, apply_rank: Optional[int], from_file_only: bool): if from_file_only and apply_rank is not None and apply_rank >= 1: _apply_from_latest_json(apply_rank) return grid = GRIDS[mode] keys = list(grid.keys()) combos = list(product(*[grid[k] for k in keys])) total = len(combos) fixed = _get_fixed_defaults() fee_rate = fixed["fee_rate"] sell_tax = fixed["sell_tax"] rsi_period = fixed.get("rsi_period", 14) print(f"\n[{mode.upper()} 모드 — 꼬리잡기] 탐색 조합: {total:,}개 | 기간: {start} ~ {end}") print(" 기본값: DB(env_config) tail_engine.get_tail_defaults_from_db()") print("=" * 70) candles_by_code = _load_candles(start, end, rsi_period) if not candles_by_code: print("⚠️ 해당 기간 ws_candles 3분봉 데이터가 없습니다.") return results = [] t0 = time.time() for i, vals in enumerate(combos): params = _params_for_engine(fixed, keys, vals) all_trades = te.run_tail_backtest(candles_by_code, params) if len(all_trades) < min_trades: continue s = _summary_from_trades(all_trades, params, fixed, fee_rate, sell_tax) results.append({ "params": dict(zip(keys, vals)), "total_pnl": s["total_pnl"], "win_rate": s["win_rate"], "total_trades": s["total_trades"], "pf": s["profit_factor"], "avg_hold": s["avg_hold_min"], "mdd": s["max_drawdown"], }) if (i + 1) % 100 == 0: elapsed = time.time() - t0 eta = elapsed / (i + 1) * (total - i - 1) print(f" 진행: {i+1:>5}/{total} 경과: {elapsed:>5.0f}s 남은: {eta:>5.0f}s", flush=True) elapsed = time.time() - t0 print(f"\n완료: {elapsed:.1f}초 | 유효 결과: {len(results)}건") if not results: print("⚠️ 유효 조합을 찾지 못했습니다. 기간을 늘리거나 min_trades를 낮춰보세요.") return results.sort(key=lambda x: x["total_pnl"], reverse=True) # 승률 min_win_rate 이상만 우선; 없으면 차악(손익 1위) 유지 results_52 = [r for r in results if r["win_rate"] >= min_win_rate] if results_52: results = results_52 results.sort(key=lambda x: x["total_pnl"], reverse=True) print(f"✅ 승률 {min_win_rate}% 이상 {len(results)}건 중 손익순으로 1위 선정") else: print(f"⚠️ 승률 {min_win_rate}% 이상 없음 → 차악(손익 1위) 적용") # ── 결과 출력 ── hdr_keys = list(keys) col_w = max(len(k) for k in hdr_keys) + 2 print(f"\n{'='*70}") print(f" 🏆 수익 TOP {min(top_n, len(results))} (손익: 조합별 effective_slot·MAX_LOSS·RISK_PCT 적용)") print(f"{'='*70}") hdr = " ".join(f"{k:>{col_w}}" for k in hdr_keys) print(f"{hdr} | {'손익(원)':>12} {'승률':>6} {'거래':>5} {'PF':>5} {'보유':>6}") print("-" * (len(hdr) + 55)) for r in results[:top_n]: p = r["params"] row = " ".join(f"{p[k]:>{col_w}.4g}" for k in hdr_keys) print(f"{row} | {r['total_pnl']:>+12,.0f} {r['win_rate']:>5.1f}% " f"{r['total_trades']:>5} {r['pf']:>5.2f} {r['avg_hold']:>5.1f}분") best = results[0] bp = best["params"] # 적용 시 사용할 전체 파라미터 = 고정값(DB) + 그리드 1위 merged = dict(fixed) for k, v in bp.items(): merged[k] = v # 표시용: 그리드 키 + 결과에 영향 주는 고정 파라미터 _tail_disp_map = { "min_drop_rate": "MIN_DROP_RATE (÷100)", "min_recovery_ratio": "MIN_RECOVERY_RATIO_SHORT(÷100)", "sl_pct": "STOP_LOSS_PCT (음수÷100)", "tp_pct": "TAKE_PROFIT_PCT (÷100)", "tail_ratio_min": "TAIL_RATIO_MIN", "tail_pct_min": "TAIL_PCT_MIN (÷100)", "shoulder_min_high": "SHOULDER_MIN_HIGH_PCT (÷100)", "shoulder_cut_pct": "SHOULDER_CUT_PCT (÷100)", "rsi_period": "RSI_PERIOD", "rsi_threshold": "RSI_OVERHEAT_THRESHOLD", "max_rec_3m": "MAX_RECOVERY_RATIO_3M (÷100)", "high_chase_thr": "HIGH_PRICE_CHASE_THRESHOLD(÷100)", "cooldown_min": "REENTRY_COOLDOWN_SEC (×60초)", "time_start_hm": "TIME_START / TAIL_TIME_START", "time_end_hm": "TIME_END / TAIL_TIME_END", "max_daily": "MAX_DAILY_TAIL / MAX_STOCKS", "max_loss_per_trade_krw": "MAX_LOSS_PER_TRADE_KRW (원)", "risk_pct_per_trade": "RISK_PCT_PER_TRADE (÷100)", } print(f""" ╔══════════════════════════════════════════╗ ║ 🏆 꼬리잡기 최적 파라미터 ║ ╠══════════════════════════════════════════╣""") for k in keys: v = bp.get(k, merged.get(k)) label = _tail_disp_map.get(k, k) print(f"║ {label:<34s} : {v!s:>6} ║") for k in ("rsi_period", "rsi_threshold", "max_rec_3m", "tail_pct_min", "shoulder_min_high", "high_chase_thr", "cooldown_min", "time_start_hm", "time_end_hm", "max_daily", "max_loss_per_trade_krw", "risk_pct_per_trade"): if k in merged and k not in keys: label = _tail_disp_map.get(k, k) print(f"║ {label:<34s} : {merged[k]!s:>6} ║") print(f"""╠══════════════════════════════════════════╣ ║ 총 손익 : {best['total_pnl']:>+12,.0f} 원 ║ ║ 승률 : {best['win_rate']:>6.1f}% ║ ║ 총 거래 : {best['total_trades']:>5} 건 ║ ║ Profit Factor : {best['pf']:>5.2f} ║ ║ 평균 보유 : {best['avg_hold']:>5.1f} 분 ║ ║ 최대 낙폭(MDD): {best['mdd']:>12,.0f} 원 ║ ╚══════════════════════════════════════════╝""") # ── 각 결과에 순번·merged_params·db_snapshot 부여 (JSON 및 apply N 지원) ── top_list = [] for idx, r in enumerate(results[:top_n]): merged_for_db = dict(fixed) for k, v in r["params"].items(): merged_for_db[k] = (float(v) / 100.0) if k in _PCT_KEYS else v for skip in ("slot_money", "fee_rate", "sell_tax"): merged_for_db.pop(skip, None) top_list.append({ "rank": idx + 1, "params": r["params"], "total_pnl": r["total_pnl"], "win_rate": r["win_rate"], "total_trades": r["total_trades"], "pf": r["pf"], "avg_hold": r["avg_hold"], "mdd": r["mdd"], "merged_params": merged_for_db, "db_snapshot": _merged_to_db_snapshot(merged_for_db), }) # ── JSON 저장 ── out_dir = os.path.join(ROOT, "backtest_scalping", "results") os.makedirs(out_dir, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") out_path = os.path.join(out_dir, f"tail_search_{mode}_{ts}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump({ "mode": mode, "start": start, "end": end, "min_win_rate": min_win_rate, "top": top_list, }, f, ensure_ascii=False, indent=2) print(f"\n💾 결과 저장: {out_path}") if apply_rank is not None and apply_rank >= 1 and apply_rank <= len(top_list): _apply_to_db(top_list[apply_rank - 1]["merged_params"]) print(f"✅ {apply_rank}번째 결과 적용 완료") def _get_tail_field_map(): """꼬리잡기 파라미터 → env_config 컬럼 매핑 (apply / db_snapshot 공용).""" return { "min_drop_rate": ("MIN_DROP_RATE", lambda v: str(float(v))), "min_recovery_ratio": ("MIN_RECOVERY_RATIO_SHORT", lambda v: str(float(v))), "sl_pct": ("STOP_LOSS_PCT", lambda v: str(-abs(float(v)))), "tp_pct": ("TAKE_PROFIT_PCT", lambda v: str(float(v))), "tail_ratio_min": ("TAIL_RATIO_MIN", lambda v: str(float(v))), "tail_pct_min": ("TAIL_PCT_MIN", lambda v: str(float(v))), "shoulder_min_high": ("SHOULDER_MIN_HIGH_PCT", lambda v: str(float(v))), "shoulder_cut_pct": ("SHOULDER_CUT_PCT", lambda v: str(float(v))), "rsi_period": ("RSI_PERIOD", lambda v: str(int(v))), "rsi_threshold": ("RSI_OVERHEAT_THRESHOLD", lambda v: str(float(v))), "max_rec_3m": ("MAX_RECOVERY_RATIO_3M", lambda v: str(float(v))), "high_chase_thr": ("HIGH_PRICE_CHASE_THRESHOLD",lambda v: str(float(v))), "cooldown_min": ("REENTRY_COOLDOWN_SEC", lambda v: str(int(v) * 60)), "max_daily": ("MAX_STOCKS", lambda v: str(int(v))), "max_loss_per_trade_krw": ("MAX_LOSS_PER_TRADE_KRW", lambda v: str(int(v))), "risk_pct_per_trade": ("RISK_PCT_PER_TRADE", lambda v: str(float(v))), } def _merged_to_db_snapshot(merged_params: dict) -> dict: """merged_params(비율 단위) → env_config 컬럼명:값 문자열 dict (JSON 저장용).""" field_map = _get_tail_field_map() if "time_start_hm" in merged_params: field_map = {**field_map, "time_start_hm": ("TIME_START", lambda v: str(int(v)))} if "time_end_hm" in merged_params: field_map = {**field_map, "time_end_hm": ("TIME_END", lambda v: str(int(v)))} return { db_col: fmt(merged_params[param_k]) for param_k, (db_col, fmt) in field_map.items() if param_k in merged_params } def _apply_to_db(merged_params: dict): """1위 파라미터(비율 단위)를 env_config DB에 자동 반영. 결과에 영향 주는 env 키 전부 반영.""" field_map = _get_tail_field_map() if "time_start_hm" in merged_params: field_map["time_start_hm"] = ("TIME_START", lambda v: str(int(v))) if "time_end_hm" in merged_params: field_map["time_end_hm"] = ("TIME_END", lambda v: str(int(v))) sets, vals = [], [] for param_k, val in merged_params.items(): if param_k not in field_map: continue db_col, fmt = field_map[param_k] sets.append(f"{db_col} = %s") vals.append(fmt(val)) if not sets: print("DB 적용할 파라미터가 없습니다.") return db = TradeDB() try: db.conn.execute(f"UPDATE env_config SET {', '.join(sets)} WHERE id = (SELECT MAX(id) FROM env_config)", vals) db.conn.commit() except Exception as e: # 컬럼 없음 등으로 실패 시 일부만 적용 print(f"⚠️ 전체 적용 실패: {e}. 적용 가능한 컬럼만 시도합니다.") for param_k, val in merged_params.items(): if param_k not in field_map: continue db_col, fmt = field_map[param_k] try: db.conn.execute(f"UPDATE env_config SET {db_col} = %s WHERE id = (SELECT MAX(id) FROM env_config)", [fmt(val)]) db.conn.commit() print(f" {db_col:<30s} = {fmt(val)}") except Exception: pass db.close() print("\n✅ DB env_config 자동 적용 완료:") for param_k, val in merged_params.items(): if param_k in field_map: db_col, fmt = field_map[param_k] print(f" {db_col:<30s} = {fmt(val)}") # ────────────────────────────────────────────────────────────────────────────── # CLI 진입점 # ────────────────────────────────────────────────────────────────────────────── def main(): today = datetime.now().strftime("%Y-%m-%d") week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") parser = argparse.ArgumentParser(description="꼬리잡기 백테스트 파라미터 Grid Search") parser.add_argument("--start", default=week_ago, help="시작일 (YYYY-MM-DD)") parser.add_argument("--end", default=today, help="종료일 (YYYY-MM-DD)") parser.add_argument("--mode", default="coarse", choices=["coarse", "fine", "full"], help="탐색 모드") parser.add_argument("--top", default=20, type=int) parser.add_argument("--min_trades", default=3, type=int, help="최소 거래 건수") parser.add_argument("--min_win_rate", default=MIN_WIN_RATE_DEFAULT, type=float, help="승률 하한 (%%). 이 이상만 손익순 1위") parser.add_argument("--apply", nargs="?", const=1, type=int, default=None, metavar="N", help="N번째 결과를 DB에 적용 (기본 1). --from-file 과 함께 시 최근 JSON에서 적용") parser.add_argument("--from-file", action="store_true", help="--apply N 과 함께 사용 시, 최근 결과 JSON에서만 적용 (탐색 생략)") args = parser.parse_args() run_search( start = args.start, end = args.end, mode = args.mode, top_n = args.top, min_trades = args.min_trades, min_win_rate = args.min_win_rate, apply_rank = args.apply, from_file_only = args.from_file, ) if __name__ == "__main__": main()