diff --git a/kis_short_ver2.py b/kis_short_ver2.py index 4b0389d..618d0dd 100644 --- a/kis_short_ver2.py +++ b/kis_short_ver2.py @@ -28,6 +28,17 @@ import requests from database import TradeDB, ENV_CONFIG_KEYS +# WebSocket 실시간 체결가 캐시 + 봉집계기 (REST 폴링 전면 대체) +# CandleAggregator: 틱 → 3분봉 in-memory 집계 → check_buy_signal_tail_catch / check_sell_signals / execute_buy 에서 활용 +try: + from kis_ws import ( + KISWebSocketPriceCache, CandleAggregator, + get_kiwoom_candles_df, _get_kiwoom_creds, + ) + _KIS_WS_AVAILABLE = True +except ImportError: + _KIS_WS_AVAILABLE = False + # 로깅 설정 logging.basicConfig( format='[%(asctime)s] %(message)s', @@ -43,9 +54,9 @@ LOG_GREEN = "\033[92m" # 통과 LOG_CYAN = "\033[96m" # 강조 LOG_RESET = "\033[0m" -# DB 초기화 (스크립트所在 디렉터리 기준 경로) +# DB 초기화 — MariaDB 192.168.0.141 (database.py 모듈 상수 사용) SCRIPT_DIR = Path(__file__).resolve().parent -db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db")) +db = TradeDB() # db_path 인수 무시됨, MariaDB 직접 연결 # DB에서 환경변수 로드 def get_env_from_db(key, default=""): @@ -227,6 +238,12 @@ def _is_token_expired_response(j): class KISClient: """한국투자증권 Open API 클라이언트""" def __init__(self, mock=None): + # 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신) + try: + from kis_token_manager import ensure_both_tokens + ensure_both_tokens() + except Exception as _te: + logger.warning(f"토큰 자동갱신 건너뜀: {_te}") # 모의 여부 결정 if mock is not None: @@ -364,7 +381,20 @@ class KISClient: return None def _headers(self, tr_id, hashkey=None): - """API 호출용 헤더 생성""" + """ + API 호출용 헤더 생성. + KisTokenManager.get_token() 으로 매번 만료 여부 확인: + - 유효하면 메모리에서 즉시 반환 (오버헤드 없음) + - 만료 10분 전이면 선제 갱신 후 새 토큰 사용 + → EGW00123(만료 에러) 없이 자동 교체 + """ + try: + from kis_token_manager import KisTokenManager + fresh = KisTokenManager.instance(is_mock=self.mock).get_token() + if fresh: + self.access_token = fresh + except Exception: + pass # 실패 시 기존 access_token 유지 headers = { "content-type": "application/json; charset=utf-8", "authorization": f"Bearer {self.access_token}", @@ -1122,16 +1152,23 @@ class KISClient: return False j = r.json() if j.get("rt_cd") == "0": + self._last_sell_msg_cd = None + self._last_sell_msg1 = None ord_no = j.get("output", {}).get("ODNO", "") logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})") return True else: + # execute_sell 에서 실패 원인(영업일 아님 등) 구분할 수 있도록 저장 + self._last_sell_msg_cd = j.get("msg_cd", "") + self._last_sell_msg1 = str(j.get("msg1", "") or "") logger.error( f"[매도주문] 실패 code={code} path={path} TR_ID={tr_id} " - f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" + f"rt_cd={j.get('rt_cd')} msg_cd={self._last_sell_msg_cd} msg1={self._last_sell_msg1}" ) return False except Exception as e: + self._last_sell_msg_cd = None + self._last_sell_msg1 = None logger.error(f"매도 주문 예외({code}): {e}") return False @@ -1203,16 +1240,22 @@ class KISClient: df = df.sort_values("time").reset_index(drop=True) # 기술적 지표 추가 - if len(df) >= 14: + # RSI 기간: DB/env의 RSI_PERIOD 로 조절 (기본 14, 단타/스캘핑 시 3·5 권장) + # RSI 수학적 안정화를 위해 호출 측에서 limit≥100 이상 요청하는 것이 전제 + rsi_period = get_env_int("RSI_PERIOD", 14) + if len(df) >= rsi_period: delta = df["close"].diff(1) - gain = delta.where(delta > 0, 0).rolling(window=14).mean() - loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() + gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean() rs = gain / loss.replace(0, float("nan")) df["RSI"] = 100 - (100 / (1 + rs)) - + if len(df) >= 20: df["MA20"] = df["close"].rolling(window=20).mean() - + # MA5: check_buy_signal_tail_catch 에서 ma5_gap_pct 계산에 사용 (없으면 None으로 처리됨) + if len(df) >= 5: + df["MA5"] = df["close"].rolling(window=5).mean() + return df.tail(limit) except Exception as e: logger.error(f"분봉 조회 실패({code}): {e}") @@ -1554,7 +1597,7 @@ class ShortTradingBot: self.ml_predictor = None if ML_AVAILABLE: try: - self.ml_predictor = MLPredictor(db_path=str(SCRIPT_DIR / "quant_bot.db")) + self.ml_predictor = MLPredictor() # MariaDB 내부 연결 if self.ml_predictor.should_retrain(): self.ml_predictor.train_model(retrain=True) except Exception as e: @@ -1570,6 +1613,13 @@ class ShortTradingBot: use_kelly=get_env_bool("USE_KELLY_FORMULA", True), kelly_multiplier=get_env_float("KELLY_MULTIPLIER", 0.25), slot_base_amount_cap=get_env_int("SLOT_BASE_AMOUNT_CAP", 0), + # ── 무조건 깔고 가는 MAX_LOSS 기반 투자 상한 ───────────── + # ATR 계산 결과가 아무리 커도 이 상한 초과 불가 + max_loss_per_trade_krw=get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000), + stop_loss_pct=get_env_float("STOP_LOSS_PCT", -0.03), + # ── 사이즈 클래스별 비율 (DB에서 주입) ─────────────────── + size_small_ratio=get_env_float("SIZE_CLASS_SMALL_RATIO", 0.70), + size_mid_ratio=get_env_float("SIZE_CLASS_MID_RATIO", 0.85), ) logger.info("✅ RiskManager 뼈대 생성 완료") else: @@ -1596,6 +1646,14 @@ class ShortTradingBot: self.holdings = {} # 당일 매매불가로 확인된 종목 (같은 종목 반복 주문 방지 → 다음 후보로 넘어감) self.untradable_skip_set = set() + # 최근 매도 종목 쿨다운 캐시 {code: 매도_timestamp} + # 매도 직후 같은 종목을 즉시 재매수하는 반복매매 루프 방지. + # 쿨다운 기간은 REENTRY_COOLDOWN_SEC(기본 5분)으로 조정. + self.recently_sold: dict = {} + # 매도 실패 백오프 캐시 {code: until_timestamp} + # "영업일이 아닙니다" 등 일시적 API 거부 시 재시도 방지. + # 재시도 대기 시간은 SELL_FAILURE_BACKOFF_SEC(기본 1800초=30분) 으로 조정. + self._sell_backoff: dict = {} active_trades = self.db.get_active_trades(strategy_prefix="SHORT") for code, trade in active_trades.items(): self.holdings[code] = { @@ -1619,10 +1677,251 @@ class ShortTradingBot: self._asset_task = None self.is_first_run = True + # ── WebSocket + CandleAggregator 초기화 ────────────────────────────── + # 틱 수신 → 3분봉 in-memory 집계 → REST 폴링(get_minute_chart) 전면 대체 + # KISWebSocketPriceCache: 실시간 체결가 수신 (check_sell_signals 현재가) + # CandleAggregator : 3분봉 OHLCV·RSI 메모리 집계 (buy/ATR/RiskManager) + # start() 실패 시 is_active=False → REST fallback 자동 적용 + self.ws_cache: Optional["KISWebSocketPriceCache"] = None + self.candle_agg: Optional["CandleAggregator"] = None + self._init_websocket() + + # ── WebSocket + CandleAggregator 초기화 / 갭보정 / 구독 관리 ─────────── + + def _init_websocket(self): + """WebSocket 시작 → CandleAggregator(3분봉) 연결 → 종목 구독 → 갭 보정.""" + if not _KIS_WS_AVAILABLE: + logger.info("ℹ️ kis_ws 모듈 없음 → REST inquire_price / get_minute_chart 폴링 유지") + return + try: + self.ws_cache = KISWebSocketPriceCache( + app_key = self.client.app_key, + app_secret = self.client.app_secret, + is_mock = self.client.mock, + ) + # CandleAggregator: 3분봉 집계 (buy 타점·ATR·RiskManager 전용) + # 3분봉(주전략) + 15분봉 + 60분봉 — 추세 필터 / 다른 전략 확장용 + self.candle_agg = CandleAggregator(db=self.db, timeframes=[3, 15, 60]) + self.ws_cache.attach_candle_aggregator(self.candle_agg) + + ws_ok = self.ws_cache.start() + if not ws_ok: + logger.info("ℹ️ WebSocket 비활성 (모의 or 키 미설정) → REST fallback 유지") + self.ws_cache = None + self.candle_agg = None + return + + # 봇 재시작 시 보유 종목 즉시 구독 + for code in list(self.holdings.keys()): + self.ws_cache.subscribe(code) + + # 유니버스 후보 종목도 미리 구독 (매수 타점 체크 전 봉 데이터 확보) + candidates = self.db.get_target_candidates() + for c in candidates: + code = c.get("code") or c.get("stk_cd", "") + if code and code not in self.holdings: + self.ws_cache.subscribe(code) + + # ── 영구 구독 ETF: 시장 방향 필터용 (유니버스 변경과 무관하게 항상 유지) ── + perm_raw = get_env_from_db("PERMANENT_WS_CODES", "069500,229200") + self._permanent_ws_codes: set = { + c.strip() for c in str(perm_raw).split(",") if c.strip() + } + for code in sorted(self._permanent_ws_codes): + self.ws_cache.subscribe(code) + logger.info("📡 [영구구독] %s (시장방향 ETF)", code) + + logger.info( + "✅ WebSocket + CandleAggregator(3분봉) 활성 (구독 %d종목) " + "— get_minute_chart REST 폴링 대체", + len(self.ws_cache._subscribed), + ) + + # 시작 시 REST 갭 보정 (봉 버퍼 비어있는 경우 RSI 안정화) + self._fill_all_gaps() + + except Exception as _ws_e: + logger.warning("⚠️ WebSocket 초기화 예외(무시): %s", _ws_e) + self.ws_cache = None + self.candle_agg = None + + def _fill_all_gaps(self): + """ + 봇 시작·재접속 후 구독 중인 모든 종목의 분봉 갭을 보정. + RSI(14) 안정화를 위해 limit=120 사용. + + ▶ 키움 우선 전략: + - 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월치) 제공 → 장 초반에도 즉시 봉 확보 가능 + - KIS get_minute_chart 는 당일봉만 제공 → 장 시작 직후 봉 부족 → 키움 우선 + - 키움 키 없으면 KIS fallback (3분봉만, 15/60분봉은 KIS 지원 안 함) + """ + if not self.candle_agg or not self.ws_cache: + return + limit = get_env_int("SHORT_GAP_FILL_LIMIT", 120) + with self.ws_cache._sub_lock: + codes = set(self.ws_cache._subscribed) + + # ── 키움 크레덴셜 조회 ──────────────────────────────────────── + kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) + use_kiwoom = bool(kw_key and kw_secret) + logger.info( + "🔧 [갭보정] %d종목 분봉 로드 시작 (tfs=%s, limit=%d, kiwoom=%s)", + len(codes), self.candle_agg.timeframes, limit, "✅" if use_kiwoom else "❌→KIS fallback", + ) + + for code in sorted(codes): + for tf in self.candle_agg.timeframes: + df = None + # 키움 우선 (토큰은 23시간 캐시 → au10001 한도 방지) + if use_kiwoom: + try: + df = get_kiwoom_candles_df( + code, tf, kw_key, kw_secret, + is_mock=kw_mock, n=limit, + ) + except Exception as e: + logger.debug("키움 갭보정 실패 (%s %dM): %s", code, tf, e) + # KIS fallback: 당일봉만 → 3분봉에만 유효 + if (df is None or df.empty) and tf <= 3: + try: + df = self.client.get_minute_chart( + code, period=str(tf), limit=limit + ) + except Exception as e: + logger.debug("KIS 갭보정 실패 (%s %dM): %s", code, tf, e) + if df is not None and not df.empty: + self.candle_agg.fill_gap_from_rest(code, tf, df) + # 같은 종목 내 timeframe 전환: 짧은 딜레이 + time.sleep(random.uniform(0.2, 0.4)) + # 종목 간 딜레이 + time.sleep(random.uniform(0.3, 0.6)) + + def _sync_subscriptions(self, candidates: list): + """ + target_candidates DB 목록과 WS 구독 목록 동기화. + - 유니버스에서 빠진 종목(보유 중 아닌 것) → unsubscribe + RAM 정리 + - 신규 종목 → subscribe + 3분봉 갭 보정 (봉 버퍼 즉시 확보). + ※ 영구 구독 ETF(_permanent_ws_codes)는 절대 해제하지 않음 (시장 방향 필터용) + """ + if not self.ws_cache: + return + new_codes = {c.get("code") or c.get("stk_cd", "") for c in candidates if c} + new_codes.discard("") + # 현재 보유 종목은 매도 완료 전까지 반드시 유지 + new_codes |= set(self.holdings.keys()) + # 영구 구독 ETF는 유니버스와 무관하게 항상 유지 + new_codes |= getattr(self, '_permanent_ws_codes', set()) + + with self.ws_cache._sub_lock: + current_subs = set(self.ws_cache._subscribed) + + # ── 구독 해제: 유니버스에서 빠진 종목 ───────────────────────── + # 보유 중 종목은 매도 감시를 위해 구독 유지 + for code in sorted(current_subs - new_codes): + self.ws_cache.unsubscribe(code) + if self.candle_agg: + self.candle_agg.remove_code(code) + + # ── 신규 구독: 유니버스에 새로 들어온 종목 ───────────────────── + kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) + use_kiwoom = bool(kw_key and kw_secret) + for code in sorted(new_codes - current_subs): + self.ws_cache.subscribe(code) + # 신규 구독 즉시 갭 보정 (봉 없으면 매수 타점 체크 불가) — 키움 우선 + if not self.candle_agg: + continue + lim = get_env_int("SHORT_GAP_FILL_LIMIT", 120) + for tf in self.candle_agg.timeframes: + df = None + if use_kiwoom: + try: + df = get_kiwoom_candles_df( + code, tf, kw_key, kw_secret, + is_mock=kw_mock, n=lim, + ) + except Exception as e: + logger.debug("키움 신규갭보정 실패 (%s %dM): %s", code, tf, e) + if (df is None or df.empty) and tf <= 3: + try: + df = self.client.get_minute_chart( + code, period=str(tf), limit=lim + ) + except Exception as e: + logger.debug("KIS 신규갭보정 실패 (%s %dM): %s", code, tf, e) + if df is not None and not df.empty: + self.candle_agg.fill_gap_from_rest(code, tf, df) + # tf 간 딜레이 (차트 API, 토큰은 캐시 재사용) + time.sleep(random.uniform(0.2, 0.4)) + + def _get_candles_df(self, code: str, tf: int = 3, n: int = 20) -> Optional[pd.DataFrame]: + """ + CandleAggregator 메모리 봉 → DataFrame 변환 헬퍼. + + 확정봉(confirmed) + 진행봉(current, is_confirmed=0) 을 합쳐 + get_minute_chart 와 동일한 컬럼(open/high/low/close/volume/RSI/MA5/MA20)을 반환. + - 진행봉의 close = 현재가 (최신 틱) → 매수 타점 실시간 포착 + - CandleAggregator 미사용·데이터 부족 시 None 반환 → 호출부에서 REST fallback + + Args: + code : 종목코드 + tf : 봉 주기(분), 꼬리잡기 전략은 항상 3 + n : 반환할 최대 봉 수 (tail 기준) + """ + if not self.candle_agg: + return None + + # 확정봉: RSI_PERIOD(14)보다 넉넉하게 가져와 RSI 안정화 + rsi_period = get_env_int("RSI_PERIOD", 14) + fetch_n = max(n + rsi_period + 5, n + 20) + confirmed = self.candle_agg.get_candles(code, tf, fetch_n) + + if not confirmed: + return None + + rows = list(confirmed) + + # 진행 중인 봉(최신 틱 close) 을 tail 에 추가 → 실시간 캔들 패턴 포착 + current = self.candle_agg.get_current_candle(code, tf) + if current and current.get("open", 0) > 0 and current.get("close", 0) > 0: + rows.append(current) + + if len(rows) < 2: + return None + + df = pd.DataFrame(rows)[["open", "high", "low", "close", "volume"]].copy() + df = df.reset_index(drop=True) + + # 기술적 지표: get_minute_chart 와 동일 로직 + if len(df) >= rsi_period: + delta = df["close"].diff(1) + gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean() + rs = gain / loss.replace(0, float("nan")) + df["RSI"] = 100 - (100 / (1 + rs)) + if len(df) >= 20: + df["MA20"] = df["close"].rolling(window=20).mean() + if len(df) >= 5: + df["MA5"] = df["close"].rolling(window=5).mean() + + return df.tail(n).reset_index(drop=True) + + # ── 설정 리로드 ───────────────────────────────────────────────────────── + def reload_config(self): """[실시간 리로드] DB(env) 설정을 봇에 반영. 메인 루프마다 호출 시 재시작 없이 적용.""" # [손절/익절 설정] self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.04) + + # ★ STOP_LOSS_PCT 부호 안전장치: 양수(0.02)로 입력 시 자동으로 음수(-0.02)로 변환. + # 양수 값이 그대로 사용되면 profit_pct <= stop_loss_pct 조건이 손익분기 이상에서도 + # 참이 돼 칼손절이 수익 구간에서도 발동하는 심각한 버그가 발생함. + if self.stop_loss_pct > 0: + logger.warning( + "🚨 STOP_LOSS_PCT=%.4f 양수 감지 → 자동 부호 반전(%.4f). DB에 음수로 저장 권장 (!설정 STOP_LOSS_PCT=-%.4f)", + self.stop_loss_pct, -self.stop_loss_pct, self.stop_loss_pct, + ) + self.stop_loss_pct = -self.stop_loss_pct + self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05) self.max_stocks = get_env_int("MAX_STOCKS", 3) self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03) @@ -1911,7 +2210,7 @@ class ShortTradingBot: name = self.holdings[code].get("name", code) del self.holdings[code] try: - self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)") + self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)", strategy="SHORT_ANT_SHAKING") except Exception as e: logger.debug(f"잔고동기화 close_trade 스킵 {code}: {e}") logger.info(f"📲 [잔고동기화] 보유 제거: {name} ({code}) - 계좌에 없음") @@ -2621,9 +2920,41 @@ MIN_DROP_RATE=0.025 logger.warning(f"🧪 FORCE_BUY_TEST 현재가 조회 실패: {e}") return None + # ── 시장 방향 필터 (USE_MARKET_REGIME_FILTER=true 시 활성) ── + # KODEX200/KOSDAQ150 60분봉 RSI로 상승장 확인 → 하락장이면 롱 진입 차단 + if get_env_bool("USE_MARKET_REGIME_FILTER", False): + min_rsi = get_env_float("MARKET_REGIME_MIN_RSI", 48.0) + regime = self.db.get_market_regime(tf=60) + if not regime.get("is_bull") or regime.get("avg_rsi", 50) < min_rsi: + logger.debug( + "[시장필터] %s %s: 하락장 차단 (ETF RSI=%.1f < %.1f)", + code, name, regime.get("avg_rsi", 0), min_rsi, + ) + return None + + # ── 테마 과열 필터 (USE_THEME_HEAT_FILTER=true 시 활성) ── + if get_env_bool("USE_THEME_HEAT_FILTER", False): + heat_max = get_env_float("THEME_HEAT_RSI_MAX", 72.0) + meta = self.db.get_stock_meta(code) + if meta and meta.get("theme"): + momentum = self.db.get_theme_momentum(meta["theme"], tf=60) + if momentum.get("count", 0) >= 3 and momentum.get("avg_rsi3", 0) > heat_max: + logger.debug( + "[테마필터] %s %s: 테마(%s) 과열 차단 (RSI=%.1f > %.1f)", + code, name, meta["theme"], + momentum["avg_rsi3"], heat_max, + ) + return None + min_candle_len = get_env_int("MIN_CANDLE_LEN_TAIL", 14) min_price_tail = get_env_float("MIN_PRICE_TAIL", 1000.0) - df = self.client.get_minute_chart(code, period="3", limit=20) + + # [WebSocket 우선] 메모리 봉 사용 → WS 미활성 시 REST fallback + df = self._get_candles_df(code, tf=3, n=20) + if df is None or df.empty: + logger.debug("📡 [%s] WS봉 없음 → REST 3분봉 fallback", code) + df = self.client.get_minute_chart(code, period="3", limit=20) + if df is None or df.empty or len(df) < min_candle_len: logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: 봉수 부족 (len={len(df) if df is not None and not df.empty else 0}, 기준 {min_candle_len}){LOG_RESET}") return None @@ -2638,16 +2969,25 @@ MIN_DROP_RATE=0.025 candle_low = float(candle["low"]) candle_close = float(candle["close"]) - # 분봉 마지막 봉 close=0인 경우 (장 마감/미체결 봉 등) → 현재가 API로 보정 + # 분봉 마지막 봉 close=0인 경우 (장 마감/미체결 봉 등) → WS 캐시 우선 보정, fallback=REST if current_price <= 0 or candle_close <= 0: try: - price_data = self.client.inquire_price(code) - if price_data: - current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) - if current_price > 0: - candle_close = current_price + ws_price = self.ws_cache.get_price(code) if self.ws_cache else None + if ws_price: + p = abs(float(str(ws_price.get("stck_prpr", 0)).replace(",", ""))) + if p > 0: + current_price = p + candle_close = p df = df.copy() - df.loc[df.index[-1], "close"] = current_price + df.loc[df.index[-1], "close"] = p + else: + price_data = self.client.inquire_price(code) + if price_data: + current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) + if current_price > 0: + candle_close = current_price + df = df.copy() + df.loc[df.index[-1], "close"] = current_price except Exception as e: logger.debug(f"현재가 보정 실패({code}): {e}") if candle_open <= 0 and len(df) >= 2: @@ -2659,7 +2999,10 @@ MIN_DROP_RATE=0.025 logger.info(f"{LOG_YELLOW}🔍 [탈락-가격] {name} {code}: 시가/현재가 부적절 (현재 {current_price:,.0f}원, 최소 {min_price_tail:,.0f}){LOG_RESET}") return None - # [수정된 부분: 매수체크 시 당일 진짜 시가, 고가, 저가를 API로 호출해와서 낙폭 계산] + # 당일 시가·고가·저가 확보 우선순위: + # 1순위: WebSocket 캐시 (H0STCNT0 틱에 OPRC/HGPR/LWPR 포함 → REST 호출 없음) + # 2순위: 분봉 DataFrame (캐시 없을 때) + # 3순위: REST inquire_price (분봉만으로 부족한 경우 최후 fallback) day_open = float(df["open"].iloc[0]) day_high = float(df["high"].max()) # 저가가 0인 분봉(비정상 값) 때문에 낙폭이 100%로 계산되는 것을 방지 @@ -2669,18 +3012,27 @@ MIN_DROP_RATE=0.025 day_low = float(valid_lows.min()) if not valid_lows.empty else float(lows.min()) except Exception: day_low = float(df["low"].min()) - + try: - # 3분봉 20개(1시간) 한계를 넘어 하루 전체 기준을 잡기 위해 현재가 API 호출 - today_price_data = self.client.inquire_price(code) - if today_price_data: - api_open = abs(float(str(today_price_data.get("stck_oprc", 0)).replace(",", ""))) - api_high = abs(float(str(today_price_data.get("stck_hgpr", 0)).replace(",", ""))) - api_low = abs(float(str(today_price_data.get("stck_lwpr", 0)).replace(",", ""))) - - if api_open > 0: day_open = api_open - if api_high > 0: day_high = api_high - if api_low > 0: day_low = api_low + # 1순위: WS 캐시에 당일 시고저 있으면 REST 호출 생략 (API 과부하 방지) + ws_data = self.ws_cache.get_price(code) if self.ws_cache else None + if ws_data: + ws_open = abs(float(str(ws_data.get("stck_oprc", 0)).replace(",", ""))) + ws_high = abs(float(str(ws_data.get("stck_hgpr", 0)).replace(",", ""))) + ws_low = abs(float(str(ws_data.get("stck_lwpr", 0)).replace(",", ""))) + if ws_open > 0: day_open = ws_open + if ws_high > 0: day_high = ws_high + if ws_low > 0: day_low = ws_low + else: + # 2순위 fallback: REST inquire_price (WS 캐시 없거나 만료된 경우에만) + today_price_data = self.client.inquire_price(code) + if today_price_data: + api_open = abs(float(str(today_price_data.get("stck_oprc", 0)).replace(",", ""))) + api_high = abs(float(str(today_price_data.get("stck_hgpr", 0)).replace(",", ""))) + api_low = abs(float(str(today_price_data.get("stck_lwpr", 0)).replace(",", ""))) + if api_open > 0: day_open = api_open + if api_high > 0: day_high = api_high + if api_low > 0: day_low = api_low except Exception as e: logger.debug(f"일일 시고저 보정 실패({code}): {e}") @@ -2769,8 +3121,10 @@ MIN_DROP_RATE=0.025 return None # RSI 과열 방지 (수치: env) + # RSI_PERIOD 는 get_minute_chart 에서 계산 시 사용한 기간과 동일해야 함 rsi_val = 50.0 - if "RSI" in df.columns and len(df) >= 14: + rsi_period = get_env_int("RSI_PERIOD", 14) + if "RSI" in df.columns and len(df) >= rsi_period: rsi_val = float(df["RSI"].iloc[-1]) rsi_threshold = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0) if rsi_val >= rsi_threshold: @@ -2874,11 +3228,20 @@ MIN_DROP_RATE=0.025 except Exception: pass - # 현재가 조회 - price_data = self.client.inquire_price(code) + # 현재가 조회 — WebSocket 캐시 우선, 없으면 REST fallback + # ws_cache.get_price() 는 5초 이내 수신된 tick 데이터만 유효 취급 + price_data = None + _used_ws = False + if self.ws_cache and self.ws_cache.is_active: + price_data = self.ws_cache.get_price(code) + if price_data: + _used_ws = True + if not price_data: + # WebSocket 미연결·캐시 만료 → REST (기존 방식 그대로) + price_data = self.client.inquire_price(code) if not price_data: continue - + current_price = abs(float(price_data.get("stck_prpr", 0))) if current_price == 0: continue @@ -2896,10 +3259,12 @@ MIN_DROP_RATE=0.025 # ATR 조회 (DB 또는 재계산) atr = holding.get("atr_entry", 0) if atr == 0: - # ATR 재계산 (3분봉) + # ATR 재계산 — WS 메모리 봉 우선, 없으면 REST fallback try: - df = self.client.get_minute_chart(code, period="3", limit=20) - if not df.empty: + df = self._get_candles_df(code, tf=3, n=20) + if df is None or df.empty: + df = self.client.get_minute_chart(code, period="3", limit=20) + if df is not None and not df.empty: atr = self.calculate_atr(df) if atr > 0: self.holdings[code]["atr_entry"] = atr @@ -3038,8 +3403,11 @@ MIN_DROP_RATE=0.025 "qty": qty, "price": current_price, }) - - time.sleep(random.uniform(0.3, 0.7)) + + # WebSocket으로 가격을 받아온 경우 REST API 딜레이 불필요. + # REST fallback 시에는 기존과 동일하게 딜레이 적용 (과부하 방지). + if not _used_ws: + time.sleep(random.uniform(0.3, 0.7)) except Exception as e: logger.error(f"매도 신호 체크 실패({code}): {e}") continue @@ -3098,10 +3466,12 @@ MIN_DROP_RATE=0.025 # ============================================================ # [매수 금액] 변동성 역가중 (Volatility Inverse Weighting) # ============================================================ - # ATR 계산용 분봉 데이터 (변동성 계산에 필요) + # ATR 계산용 분봉 데이터 — WS 메모리 봉 우선, 없으면 REST fallback df_minute = None try: - df_minute = self.client.get_minute_chart(code, period="3", limit=20) + df_minute = self._get_candles_df(code, tf=3, n=20) + if df_minute is None or df_minute.empty: + df_minute = self.client.get_minute_chart(code, period="3", limit=20) except Exception as e: logger.debug(f"분봉 조회 실패({code}): {e}") @@ -3248,6 +3618,20 @@ MIN_DROP_RATE=0.025 logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}원 × {qty}주 (API 체결 확인) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}원") else: logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}원 × {qty}주 (주문기준) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}원") + + # 매수 후 WebSocket 구독 등록 → 이후 check_sell_signals에서 REST 없이 실시간 수신 + if self.ws_cache and self.ws_cache.is_active: + self.ws_cache.subscribe(code) + # 신규 보유 종목 즉시 갭보정 (봉 버퍼 미확보 시 ATR 계산 즉시 가능하게) + if self.candle_agg: + try: + lim = get_env_int("SHORT_GAP_FILL_LIMIT", 100) + df_gap = self.client.get_minute_chart(code, period="3", limit=lim) + if df_gap is not None and not df_gap.empty: + self.candle_agg.fill_gap_from_rest(code, 3, df_gap) + except Exception as _ge: + logger.debug("매수후 갭보정 실패(%s): %s", code, _ge) + # 체결 알림 (MM) — API 추가 호출 없이 메모리 값만 사용 try: invest_amt = price * qty @@ -3271,9 +3655,31 @@ MIN_DROP_RATE=0.025 if code not in self.holdings: logger.warning(f"⚠️ [{name}] 보유 종목 아님") return False - + + # ★ 매도 실패 백오프 체크 (영업일 아님·시장 마감 등 일시 오류 반복 방지) + backoff_until = self._sell_backoff.get(code, 0) + if time.time() < backoff_until: + remain_min = (backoff_until - time.time()) / 60 + logger.debug("⏸ [%s(%s)] 매도 백오프 중 — %.0f분 후 재시도", name, code, remain_min) + return False + # 매도 주문 success = self.client.sell_market_order(code, qty) + if not success: + # 실패 원인 분석 → 영업일·시장마감 오류면 백오프 등록 + msg_cd = getattr(self.client, "_last_sell_msg_cd", None) or "" + msg1 = getattr(self.client, "_last_sell_msg1", "") or "" + # KIS 비영업일·장마감 오류코드 (40100000=모의 영업일 아님, 40200000=실전 장외시간) + non_biz_codes = {"40100000", "40200000"} + if msg_cd in non_biz_codes or "영업일" in msg1 or "장외" in msg1 or "시장" in msg1: + backoff_sec = get_env_int("SELL_FAILURE_BACKOFF_SEC", 1800) + self._sell_backoff[code] = time.time() + backoff_sec + logger.warning( + "⏸ [%s(%s)] 매도 실패('%s') → %d분 후 재시도 (SELL_FAILURE_BACKOFF_SEC=%d)", + name, code, msg1, backoff_sec // 60, backoff_sec, + ) + return False + if success: # 현재가 조회 price_data = self.client.inquire_price(code) @@ -3287,15 +3693,23 @@ MIN_DROP_RATE=0.025 buy_price = holding.get("buy_price", sell_price) profit_val = (sell_price - buy_price) * qty # 손익 금액 - # DB에서 매도 처리 + # DB에서 매도 처리 (strategy 지정 → 꼬리잡기봇 row만 삭제, 스캘핑봇 row 보호) self.db.close_trade( code=code, sell_price=sell_price, sell_reason=signal['reason'], + strategy="SHORT_ANT_SHAKING", ) del self.holdings[code] - + + # 재진입 쿨다운 기록 (REENTRY_COOLDOWN_SEC 동안 같은 종목 재매수 차단) + self.recently_sold[code] = time.time() + + # 매도 후 WebSocket 구독 해제 → 불필요한 데이터 수신 차단 + if self.ws_cache and self.ws_cache.is_active: + self.ws_cache.unsubscribe(code) + # 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영) self._update_account_light(profit_val=profit_val) @@ -3303,12 +3717,16 @@ MIN_DROP_RATE=0.025 # 체결 알림 (MM) — 매도 직후 _update_account_light로 갱신된 예수금/총자산 사용 (추가 API 없음) try: pct = signal['profit_pct'] * 100 - cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0 - cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0 + cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0 + cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0 + # 당일 손익: 오늘 장 시작 시 총자산 대비 현재 총자산 차이 + day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset else 0 + day_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0 mm_msg = ( f"🔴 **매도 체결** {name} ({code})\n" f"{sell_price:,.0f}원 × {qty:,}주 | {signal['reason']} | 수익률 {pct:+.2f}% (실현 {profit_val:+,.0f}원)\n" - f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%) | 보유 {len(self.holdings)}종목" + f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 보유 {len(self.holdings)}종목\n" + f"당일손익 {day_pnl:+,.0f}원 ({day_pct:+.2f}%) | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%)" ) self.send_mm(mm_msg) except Exception as e: @@ -3404,8 +3822,13 @@ MIN_DROP_RATE=0.025 self.today_date = today_str self.untradable_skip_set.clear() logger.debug("📅 날짜 변경 -> 매매불가 제외 목록 초기화") - active_count = len(self.holdings) + active_count = len(self.holdings) db_candidates = self.db.get_target_candidates() + + # 신규 후보 WS 구독 + 3분봉 갭보정 (봉 버퍼 미확보 종목 자동 보완) + if db_candidates: + self._sync_subscriptions(db_candidates) + if db_candidates and active_count < self.max_stocks: strength_preview = " | 강도순: " + ", ".join( f"{c.get('name', c.get('code',''))} {c.get('score', 0):.1f}" for c in db_candidates[:5] @@ -3421,6 +3844,17 @@ MIN_DROP_RATE=0.025 # 매매불가 종목은 당일 재시도 안 함 → 다음 후보로 if code in self.untradable_skip_set: continue + # ★ 재진입 쿨다운 체크: 최근 매도 종목은 일정 시간 동안 재매수 차단. + # 손절 직후 즉시 재매수 → 손절 반복 루프를 근본 차단. + reentry_cooldown = get_env_int("REENTRY_COOLDOWN_SEC", 300) + elapsed_since_sell = time.time() - self.recently_sold.get(code, 0) + if elapsed_since_sell < reentry_cooldown: + remaining = int(reentry_cooldown - elapsed_since_sell) + logger.info( + "⏳ [재진입 차단] %s(%s) 매도 후 쿨다운 중 — 남은 시간 %d초/%d초", + name, code, remaining, reentry_cooldown, + ) + continue signal = self.check_buy_signal_tail_catch(code, name) if signal: signals_this_turn += 1