""" kis_scalping_ver1.py — KIS WebSocket 기반 초단타(스캘핑) 봇 ════════════════════════════════════════════════════════════ 전략: RSI 과매도(= expired_dt - margin: logger.info("한투 토큰 만료 임박/만료 → 재발급") return None logger.info("한투 토큰 캐시 재사용 (만료: %s)", expired_dt.strftime("%H:%M:%S")) return {"access_token": token, "expires_at": expired_dt.isoformat()} except Exception: return None def _save_kis_token_cache(mock, token, expired_str): path = KIS_TOKEN_CACHE_PATH_MOCK if mock else KIS_TOKEN_CACHE_PATH_REAL try: with open(path, "w", encoding="utf-8") as f: json.dump({"mock": mock, "access_token": token, "access_token_token_expired": expired_str}, f) except Exception as e: logger.warning("한투 토큰 캐시 저장 실패: %s", e) # ══════════════════════════════════════════════════════════════════════ # KIS REST 클라이언트 (ver2 에서 직접 이식 — 구조 동일) # ══════════════════════════════════════════════════════════════════════ class KISClient: """한국투자증권 REST API 클라이언트. ver2 KISClient 와 동일 구조.""" def __init__(self, app_key, app_secret, account_no, account_code, mock=True): self.app_key = app_key self.app_secret = app_secret self.account_no = account_no self.account_code = account_code self.mock = mock self.base_url = ( "https://openapivts.koreainvestment.com:29443" if mock else "https://openapi.koreainvestment.com:9443" ) self._token: Optional[str] = None self._token_lock = __import__("threading").Lock() # API 호출 간 최소 간격 (429 방지) self._last_call_ts: float = 0.0 self._min_interval: float = 0.22 # 초 (초당 ~4건 안전 마진) # 마지막 매도 오류 캐시 (execute_sell 에서 사용) self._last_sell_msg_cd: Optional[str] = None self._last_sell_msg1: Optional[str] = None self._init_token() def _init_token(self): cache = _load_kis_token_cache(self.mock) if cache: self._token = cache["access_token"] else: try: from kis_token_manager import ensure_token if ensure_token(self.mock): cache = _load_kis_token_cache(self.mock) self._token = cache["access_token"] if cache else None except Exception as e: logger.warning("kis_token_manager 발급 실패: %s", e) self._token = None def _issue_token(self) -> Optional[str]: try: r = requests.post( f"{self.base_url}/oauth2/tokenP", json={"grant_type": "client_credentials", "appkey": self.app_key, "appsecret": self.app_secret}, timeout=10, ) j = r.json() token = j.get("access_token") if token: _save_kis_token_cache(self.mock, token, j.get("access_token_token_expired", "")) logger.info("✅ 한투 토큰 발급 완료") return token except Exception as e: logger.error("❌ 한투 토큰 발급 실패: %s", e) return None def _throttle(self): """ API 호출 전 처리: 1. 토큰 만료 10분 전 선제 갱신 (KisTokenManager) 2. 최소 호출 간격 대기 (429 방지) """ # 토큰 갱신 체크 (유효하면 메모리 반환, 만료 임박 시만 재발급) try: from kis_token_manager import KisTokenManager fresh = KisTokenManager.instance(is_mock=self.mock).get_token() if fresh: self._token = fresh except Exception: pass elapsed = time.time() - self._last_call_ts if elapsed < self._min_interval: time.sleep(self._min_interval - elapsed) self._last_call_ts = time.time() def _get(self, path, tr_id, params, retry=5): """GET 요청 + EGW00201 자동 재시도.""" self._throttle() headers = { "authorization": f"Bearer {self._token}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": tr_id, "custtype": "P", } url = self.base_url + path for attempt in range(1, retry + 1): try: r = requests.get(url, headers=headers, params=params, timeout=10) if r.status_code == 200: j = r.json() if j.get("msg_cd") == "EGW00201": wait = 1.5 logger.warning( "⏳ API 초당거래 초과 (EGW00201) GET %s -> %.1f초 대기 후 재시도 (%d/%d) msg1=%s", path, wait, attempt, retry, j.get("msg1", ""), ) time.sleep(wait) continue return r except requests.exceptions.Timeout: logger.warning("⏰ GET %s 타임아웃 (%d/%d)", path, attempt, retry) time.sleep(1.0) return requests.Response() def _post(self, path, tr_id, body, retry=3): """POST 요청 + EGW00201 자동 재시도.""" self._throttle() headers = { "authorization": f"Bearer {self._token}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": tr_id, "content-type": "application/json; charset=utf-8", "custtype": "P", } url = self.base_url + path for attempt in range(1, retry + 1): try: r = requests.post(url, headers=headers, json=body, timeout=10) if r.status_code == 200: j = r.json() if j.get("msg_cd") == "EGW00201": wait = 1.5 logger.warning( "⏳ API 초당거래 초과 (EGW00201) POST %s -> %.1f초 대기 (%d/%d)", path, wait, attempt, retry, ) time.sleep(wait) continue return r except requests.exceptions.Timeout: logger.warning("⏰ POST %s 타임아웃 (%d/%d)", path, attempt, retry) time.sleep(1.0) return requests.Response() def inquire_price(self, code: str) -> Optional[dict]: """현재가 조회 [v1_국내주식-007]""" r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100", {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}, ) if r.status_code != 200: return None j = r.json() return j.get("output") if j.get("rt_cd") == "0" else None def get_minute_chart(self, code: str, period: str = "1", limit: int = 100) -> pd.DataFrame: """ 분봉 차트 조회 [v1_국내주식-017] — 재접속 갭 보정용. 스캘핑봇은 이 함수를 정상 운영 중에는 호출하지 않음. """ path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" tr_id = "FHKST03010200" try: end_dt = dt.now() start_dt = end_dt - datetime.timedelta(days=1) params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_HOUR_1": start_dt.strftime("%Y%m%d"), "FID_INPUT_HOUR_2": end_dt.strftime("%Y%m%d"), "FID_PW_DATA_INCU_YN": "Y", "FID_ETC_CLS_CODE": "", } r = self._get(path, tr_id, params) if r.status_code != 200: return pd.DataFrame() j = r.json() if j.get("rt_cd") != "0": return pd.DataFrame() data = j.get("output2", []) if not data: return pd.DataFrame() rows = [] for item in data: try: date_str = str(item.get("stck_bsop_date", "") or "") time_str = str(item.get("stck_cntg_hour", "") or "000000") rows.append({ "time": date_str + time_str[:4], # YYYYMMDDHHMM "open": abs(float(item.get("stck_oprc", 0))), "high": abs(float(item.get("stck_hgpr", 0))), "low": abs(float(item.get("stck_lwpr", 0))), "close": abs(float(item.get("stck_clpr", 0))), "volume": int(item.get("acml_vol", 0)), }) except Exception: continue if not rows: return pd.DataFrame() df = pd.DataFrame(rows) df = df.sort_values("time").reset_index(drop=True) return df.tail(limit) except Exception as e: logger.error("분봉 조회 실패(%s): %s", code, e) return pd.DataFrame() def get_investor_trend(self, code: str, days: int = 3) -> Optional[dict]: """투자자 동향 조회 (수급 확인용)""" try: r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-investor", "FHKST01010900", {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": (dt.now() - datetime.timedelta(days=days)).strftime("%Y%m%d"), "FID_INPUT_DATE_2": dt.now().strftime("%Y%m%d"), "FID_PERIOD_DIV_CODE": "D"}, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None output = j.get("output", []) if not output: return None foreign_net = sum(int(str(o.get("frgn_ntby_qty", 0)).replace(",", "")) for o in output if o) org_net = sum(int(str(o.get("orgn_ntby_qty", 0)).replace(",", "")) for o in output if o) return {"foreign_net_buy": foreign_net, "org_net_buy": org_net} except Exception as e: logger.debug("투자자 동향 조회 실패(%s): %s", code, e) return None def get_account_balance(self) -> Optional[dict]: """계좌 잔고 조회""" tr_id = "VTTC8434R" if self.mock else "TTTC8434R" try: r = self._get( "/uapi/domestic-stock/v1/trading/inquire-balance", tr_id, {"CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "AFHR_FLPR_YN": "N", "OFL_YN": "N", "INQR_DVSN": "01", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "00", "CTX_AREA_FK100": "", "CTX_AREA_NK100": ""}, ) if r.status_code != 200: return None j = r.json() return j if j.get("rt_cd") == "0" else None except Exception as e: logger.error("계좌 잔고 조회 실패: %s", e) return None def buy_order(self, code: str, qty: int, price: int = 0, order_type: str = "01") -> bool: """매수 주문 (01=시장가, 00=지정가)""" tr_id = "VTTC0802U" if self.mock else "TTTC0802U" path = "/uapi/domestic-stock/v1/trading/order-cash" try: body = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": str(price), } r = self._post(path, tr_id, body) if r.status_code != 200: return False j = r.json() if j.get("rt_cd") == "0": logger.info("✅ 매수주문 성공: %s %d주 @ %d원", code, qty, price) return True logger.error("❌ 매수주문 실패: %s | msg=%s", code, j.get("msg1", "")) return False except Exception as e: logger.error("매수 주문 예외(%s): %s", code, e) return False def sell_order(self, code: str, qty: int, price: int = 0, order_type: str = "01") -> bool: """매도 주문 (01=시장가, 00=지정가)""" tr_id = "VTTC0801U" if self.mock else "TTTC0801U" path = "/uapi/domestic-stock/v1/trading/order-cash" try: body = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": str(price), } r = self._post(path, tr_id, body) if r.status_code != 200: return False j = r.json() if j.get("rt_cd") == "0": return True self._last_sell_msg_cd = j.get("msg_cd", "") self._last_sell_msg1 = j.get("msg1", "") logger.error("❌ 매도주문 실패: %s | msg=%s", code, self._last_sell_msg1) return False except Exception as e: self._last_sell_msg_cd = None self._last_sell_msg1 = None logger.error("매도 주문 예외(%s): %s", code, e) return False def sell_market_order(self, code: str, qty: int) -> bool: return self.sell_order(code, qty, price=0, order_type="01") # ══════════════════════════════════════════════════════════════════════ # 알림 헬퍼 # ══════════════════════════════════════════════════════════════════════ def _load_mm_channel_id(channel_alias: str) -> Optional[str]: try: if MM_CONFIG_FILE.exists(): with open(MM_CONFIG_FILE, "r", encoding="utf-8") as f: cfg = json.load(f) return cfg.get("channels", {}).get(channel_alias) except Exception: pass return None def msg_mm(text: str, channel_alias: str = None) -> None: """Mattermost 알림 전송.""" alias = channel_alias or MM_CHANNEL_SCALP cid = _load_mm_channel_id(alias) if not cid or not MM_BOT_TOKEN: logger.debug("[MM 알림 스킵] channel=%s cid=%s", alias, cid) return try: requests.post( f"{MM_SERVER_URL}/api/v4/posts", headers={"Authorization": f"Bearer {MM_BOT_TOKEN}", "Content-Type": "application/json"}, json={"channel_id": cid, "message": text}, timeout=5, ) except Exception as e: logger.debug("MM 알림 실패: %s", e) # ══════════════════════════════════════════════════════════════════════ # 스캘핑 봇 메인 클래스 # ══════════════════════════════════════════════════════════════════════ class ScalpingBotV1: """ WebSocket 봉 집계 기반 초단타 스캘핑 봇. 매수 조건 (AND 조건) ───────────────────── 1. 장 시작 후 SCALP_MARKET_OPEN_WAIT_MIN 분 경과 2. 당일 낙폭 >= MIN_DROP_RATE (ver2 동일) 3. 저점 회복률 >= MIN_RECOVERY_RATIO_SHORT (ver2 동일) 4. RSI(3) <= SCALP_RSI_OVERSOLD (과매도 구간) 5. 이전 봉 close < open (하락) → 현재 봉 close > open (반등 시작) 6. MA5 위에 있거나, 현재봉 거래량 >= 평균 거래량 × VOLUME_AVG_MULTIPLIER 7. RSI(3) >= SCALP_RSI_OVERBOUGHT 시 진입 금지 (고점 추격 방지) 매도 조건 (ver2 check_sell_signals 와 동일) ──────────────────────────────────────────── - 손절: 현재가 <= stop_price - 익절: 현재가 >= target_price - 숄더컷, 퀵프로핏, ATR 트레일링 스탑 """ def __init__(self): # ── KIS 인증 정보 (DB 에서 로드) ───────────────────────── is_mock = get_env_bool("KIS_MOCK", True) if is_mock: app_key = get_env_from_db("KIS_APP_KEY_MOCK", "") app_secret = get_env_from_db("KIS_APP_SECRET_MOCK", "") account_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK", "") account_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK", "01") else: app_key = get_env_from_db("KIS_APP_KEY_REAL", "") app_secret = get_env_from_db("KIS_APP_SECRET_REAL", "") account_no = get_env_from_db("KIS_ACCOUNT_NO_REAL", "") account_code = get_env_from_db("KIS_ACCOUNT_CODE_REAL", "01") self.client = KISClient(app_key, app_secret, account_no, account_code, mock=is_mock) self.db = db self.mm_channel = MM_CHANNEL_SCALP # ── 보유 종목 & 상태 ───────────────────────────────────── self.holdings: Dict[str, dict] = {} self.untradable_skip_set: set = set() self.recently_sold: dict = {} self._sell_backoff: dict = {} self.today_date = dt.now().strftime("%Y-%m-%d") # 갭보정 실패 재시도 대기열: 신규 구독 시 갭보정이 실패한 종목 코드 집합 # check_buy_signal_scalp에서 봉부족 감지 시 자동으로 백그라운드 재갭보정 트리거 self._gap_retry_queue: set = set() # 봉부족 종목별 마지막 재갭보정 시각 (같은 종목 30초 내 중복 재시도 방지) self._gap_retry_ts: dict = {} # ── 자산 추적 ───────────────────────────────────────────── self.current_cash = 0.0 self.current_total_asset = 0.0 self.start_day_asset = 0.0 # ── 리포트 플래그 ───────────────────────────────────────── self.morning_report_sent = False self.closing_report_sent = False # ── 설정 로드 ───────────────────────────────────────────── self.reload_config() # ── DB 에서 활성 포지션 복구 (SCALP_ 전략만) ────────────── active_trades = self.db.get_active_trades(strategy_prefix="SCALP") for code, trade in active_trades.items(): self.holdings[code] = { "buy_price": trade.get("avg_buy_price", 0), "qty": trade.get("current_qty", 0), "stop_price": trade.get("stop_price", 0), "target_price": trade.get("target_price", 0), "max_price": trade.get("max_price", 0), "atr_entry": trade.get("atr_entry", 0), "buy_time": trade.get("buy_date", dt.now().strftime("%Y-%m-%d %H:%M:%S")), "name": trade.get("name", code), "size_class": trade.get("size_class", ""), } # ── WebSocket + CandleAggregator 초기화 ────────────────── self.ws_cache: Optional[KISWebSocketPriceCache] = None self.candle_agg: Optional[CandleAggregator] = None self._init_websocket() # ── ML / RiskManager (선택적) ───────────────────────────── self.ml_predictor = None if ML_AVAILABLE and get_env_bool("USE_ML_SIGNAL", False): try: self.ml_predictor = MLPredictor() except Exception as e: logger.warning("ML 초기화 실패: %s", e) self.risk_manager = None if RISK_MANAGER_AVAILABLE: try: self.risk_manager = RiskManager( risk_pct_per_trade = get_env_float("RISK_PCT_PER_TRADE", 0.015), max_position_pct = get_env_float("MAX_POSITION_PCT", 0.03), min_position_amount = get_env_int("MIN_POSITION_AMOUNT", 20000), use_kelly = get_env_bool("USE_KELLY", False), kelly_multiplier = get_env_float("KELLY_MULTIPLIER", 0.25), slot_base_amount_cap= get_env_int("SLOT_BASE_AMOUNT_CAP", 0), # ── 무조건 깔고 가는 MAX_LOSS 기반 투자 상한 ───────── max_loss_per_trade_krw=get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000), stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.03), # ── 사이즈 클래스별 비율 (DB에서 주입) ─────────────── size_small_ratio = get_env_float("SIZE_CLASS_SMALL_RATIO", 0.70), size_mid_ratio = get_env_float("SIZE_CLASS_MID_RATIO", 0.85), ) except Exception as e: logger.warning("RiskManager 초기화 실패: %s", e) # ── 백그라운드 태스크 핸들 ──────────────────────────────── self._report_task = None logger.info("🚀 ScalpingBotV1 초기화 완료 (mock=%s, holdings=%d)", is_mock, len(self.holdings)) # ------------------------------------------------------------------ # WebSocket + CandleAggregator 초기화 # ------------------------------------------------------------------ def _init_websocket(self): """WebSocket 시작 → CandleAggregator 연결 → 종목 구독 → 갭 보정.""" if not _KIS_WS_AVAILABLE: logger.warning("⚠️ kis_ws 모듈 없음 → WebSocket 비활성") return try: # [수정] 웹소켓은 데이터 수신용이므로 무조건 실전(Real) 서버를 타도록 하이브리드 구성 ws_app_key = get_env_from_db("KIS_APP_KEY_REAL", "") ws_app_secret = get_env_from_db("KIS_APP_SECRET_REAL", "") self.ws_cache = KISWebSocketPriceCache( app_key = ws_app_key if ws_app_key else self.client.app_key, app_secret = ws_app_secret if ws_app_secret else self.client.app_secret, is_mock = False, # 실전 서버 강제 접속 ) # CandleAggregator 생성 후 WS 에 연결 # 1분봉(주전략) + 3분봉 + 15분봉 + 60분봉 — 나중에 다른 전략 필터로 활용 가능 tf = get_env_int("SCALP_CANDLE_TIMEFRAME", 1) self.candle_agg = CandleAggregator(db=self.db, timeframes=[tf, 3, 15, 60]) self.ws_cache.attach_candle_aggregator(self.candle_agg) ws_ok = self.ws_cache.start() if not ws_ok: logger.warning("⚠️ WebSocket 시작 실패 → 스캘핑봇은 WS 없이 동작 불가") self.ws_cache = None self.candle_agg = None return # 기존 보유 종목 구독 for code in list(self.holdings.keys()): self.ws_cache.subscribe(code) # 키움봇 후보 종목도 미리 구독 candidates = self.db.get_target_candidates() for c in candidates: code = c.get("code") or c.get("stk_cd", "") if code: self.ws_cache.subscribe(code) # ── 영구 구독 ETF: 시장 방향 필터용 (유니버스 변경과 무관하게 항상 유지) ── # KOSPI(069500), KOSDAQ(229200) 지수 ETF의 60분봉 RSI → 상승장/하락장 판단 perm_raw = get_env_from_db("PERMANENT_WS_CODES", "069500,229200") self._permanent_ws_codes: set = { c.strip() for c in str(perm_raw).split(",") if c.strip() } for code in sorted(self._permanent_ws_codes): self.ws_cache.subscribe(code) logger.info("📡 [영구구독] %s (시장방향 ETF)", code) logger.info("✅ WebSocket + CandleAggregator 활성 (%d종목 구독)", len(self.ws_cache._subscribed)) # WS 연결 성공 시마다 갭보정 자동 실행 등록 # (장 시간 첫 연결 시 REST 분봉 로드 → 봉부족 해소) # 새벽 자동재연결 시에는 kis_ws 내부에서 is_market_hours() 체크 후 스킵 self.ws_cache.set_on_connected_callback(self._fill_all_gaps) # 시작 시 즉시 한 번 실행 (장 중 재시작 대비) self._fill_all_gaps() except Exception as e: logger.warning("⚠️ WebSocket 초기화 예외: %s", e) self.ws_cache = None self.candle_agg = None def _fill_all_gaps(self): """ 봇 시작 시 또는 WS 재접속 시 모든 구독 종목의 봉 갭을 REST 로 보정. RAM 버퍼(_confirmed)에 즉시 적재 → 다음 매수 체크에 즉시 반영. DB는 백그라운드 큐로 비동기 저장. ▶ 키움 우선 전략: - 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월치) 제공 → 장 초반에도 즉시 봉 확보 가능 - KIS get_minute_chart 는 당일봉만 제공 → 장 시작 직후 봉 부족 → 키움 우선 - 키움 키 없으면 KIS fallback (1분/3분봉만, 15/60분봉은 KIS 지원 안 함) """ if not self.candle_agg or not self.ws_cache: return # ── 중복 실행 방지 ───────────────────────────────────────────── if getattr(self, '_gap_filling', False): logger.debug("갭보정 이미 진행 중 → 스킵") return self._gap_filling = True try: main_tf = get_env_int("SCALP_CANDLE_TIMEFRAME", 1) limit = get_env_int("SCALP_GAP_FILL_LIMIT", 120) with self.ws_cache._sub_lock: codes = set(self.ws_cache._subscribed) # ── 키움 크레덴셜 조회 ──────────────────────────────────── kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) use_kiwoom = bool(kw_key and kw_secret) logger.info( "🔧 [갭보정] %d종목 분봉 로드 시작 (tfs=%s, limit=%d, kiwoom=%s)", len(codes), self.candle_agg.timeframes, limit, "✅" if use_kiwoom else "❌→KIS fallback", ) for code in sorted(codes): for tf in self.candle_agg.timeframes: df = None # 키움 우선: 어제 봉 포함, 장 초반에도 봉 바로 확보 # ※ 토큰은 _get_kiwoom_token_cached()가 23시간 캐시 → au10001 한도 방지 if use_kiwoom: try: df = get_kiwoom_candles_df( code, tf, kw_key, kw_secret, is_mock=kw_mock, n=limit, ) except Exception as e: logger.debug("키움 갭보정 실패 (%s %dM): %s", code, tf, e) # KIS fallback: 당일봉만 → 1분/3분봉에만 유효 if (df is None or df.empty) and tf <= 3: try: df = self.client.get_minute_chart( code, period=str(tf), limit=limit ) except Exception as e: logger.debug("KIS 갭보정 실패 (%s %dM): %s", code, tf, e) if df is not None and not df.empty: self.candle_agg.fill_gap_from_rest(code, tf, df) # 같은 종목 내 timeframe 전환: 짧은 딜레이 (차트 API 레이트리밋) time.sleep(random.uniform(0.2, 0.4)) # 종목 간 딜레이: 조금 더 길게 time.sleep(random.uniform(0.3, 0.6)) finally: self._gap_filling = False # ------------------------------------------------------------------ # 설정 리로드 # ------------------------------------------------------------------ def reload_config(self): """DB 설정 실시간 반영 (메인 루프마다 호출).""" # ── 스캘핑 전용 손익절: 꼬리잡기(STOP_LOSS_PCT/TAKE_PROFIT_PCT)와 분리 ── # SCALP_STOP_LOSS_PCT : 스캘핑 손절 % (양수, 기본 1.5%) # 1분봉 초단타에서 -4% 손절은 너무 넓어 자금이 묶임 → 1.5%로 타이트하게 설정 # SCALP_TAKE_PROFIT_PCT: 스캘핑 익절 % (양수, 기본 1.5%) # +5% 익절은 1분봉에서 거의 도달 불가 → 1.5%로 줄여 회전율 극대화 scalp_sl = get_env_float("SCALP_STOP_LOSS_PCT", 0.015) self.scalp_stop_loss_pct = -abs(scalp_sl) # 음수로 통일 (ex: -0.015) self.scalp_take_profit_pct = get_env_float("SCALP_TAKE_PROFIT_PCT", 0.015) # 스캘핑 낙폭 기준: 꼬리잡기 MIN_DROP_RATE와 독립 # MIN_DROP_RATE(3%) 그대로 쓰면 1분봉 소형주에선 걸러지는 종목이 너무 많아짐 # SCALP_MIN_DROP_RATE 기본 1.5%로 완화 → 타점 빈도 증가 self.scalp_min_drop_rate = get_env_float("SCALP_MIN_DROP_RATE", 0.015) # 글로벌 손절/익절 (호환 유지 - check_sell_signals 등에서 참조) self.stop_loss_pct = self.scalp_stop_loss_pct self.take_profit_pct = self.scalp_take_profit_pct self.max_stocks = get_env_int("MAX_STOCKS", 3) self.min_drop_rate = self.scalp_min_drop_rate # 하위호환 # 일일 회복률: 스캘핑에선 사용하지 않음 (RSI 과매도와 모순). 설정만 유지. self.min_recovery_ratio = get_env_float("MIN_RECOVERY_RATIO_SHORT", 0.5) # 스캘핑 전용 # RSI 과매도: 이 값 이하 봉이 나오면 되돌림 후보 (기본 25) self.rsi_oversold = get_env_float("SCALP_RSI_OVERSOLD", 25.0) # RSI 과매수: 이 값 이상이면 고점 추격 방지 진입 금지 (기본 75) self.rsi_overbought = get_env_float("SCALP_RSI_OVERBOUGHT", 75.0) # 봉 단위 (분) self.candle_tf = get_env_int("SCALP_CANDLE_TIMEFRAME", 1) # 장 시작 후 대기 (분) self.open_wait_min = get_env_int("SCALP_MARKET_OPEN_WAIT_MIN", 5) # 포지션 크기 self.slot_money = get_env_float("SLOT_MONEY_DEFAULT", 300000) self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0) # ATR 매도 배수 self.atr_up_mult = get_env_float("SCALP_ATR_UP_MULT", 1.5) self.atr_down_mult = get_env_float("SCALP_ATR_DOWN_MULT", 0.8) # 피뢰침/급등 필터 self.high_chase_thr = get_env_float("HIGH_PRICE_CHASE_THRESHOLD", 0.96) self.max_daily_chg = get_env_float("MAX_DAILY_CHANGE_PCT", 20.0) # 거래량 배율 필터 self.vol_multiplier = get_env_float("VOLUME_AVG_MULTIPLIER", 1.5) # ML self.use_ml_signal = get_env_bool("USE_ML_SIGNAL", False) self.ml_min_prob = get_env_float("ML_MIN_PROBABILITY", 0.55) # 최소 가격 self.min_price = get_env_float("MIN_PRICE_TAIL", 1000.0) # ------------------------------------------------------------------ # 장 상태 체크 # ------------------------------------------------------------------ def check_market_status(self) -> bool: """장 중 여부 + 장 시작 후 워밍업 대기 확인.""" now = dt.now() h, m = now.hour, now.minute # 장외 시간 if not ((9 <= h < 15) or (h == 15 and m <= 30)): return False if get_env_bool("FORCE_MARKET_OPEN", False): return True # 장 시작(9:00) 후 open_wait_min 분 대기 market_start = now.replace(hour=9, minute=0, second=0, microsecond=0) elapsed_min = (now - market_start).total_seconds() / 60 if elapsed_min < self.open_wait_min: logger.info("⏳ 장 시작 후 %.0f분 경과 (워밍업 대기: %d분)", elapsed_min, self.open_wait_min) return False return True # ------------------------------------------------------------------ # 구독 관리 (새 후보 추가/구독 종목 정리) # ------------------------------------------------------------------ def _sync_subscriptions(self, candidates: list): """ target_candidates DB 목록과 WS 구독 목록을 동기화. - 새 종목 → subscribe + 갭 보정 - 유니버스에서 빠진 종목(보유 중 아닌 것) → unsubscribe + RAM 정리 ※ 영구 구독 ETF(_permanent_ws_codes)는 절대 해제하지 않음 (시장 방향 필터용) """ if not self.ws_cache: return new_codes = {c.get("code") or c.get("stk_cd", "") for c in candidates if c} new_codes.discard("") # 현재 보유 종목은 매도 완료 전까지 반드시 유지 new_codes |= set(self.holdings.keys()) # 영구 구독 ETF는 유니버스와 무관하게 항상 유지 new_codes |= getattr(self, '_permanent_ws_codes', set()) with self.ws_cache._sub_lock: current_subs = set(self.ws_cache._subscribed) # ── 구독 해제: 유니버스에서 빠진 종목 ───────────────────────── # 보유 중 종목은 매도 감시를 위해 구독 유지 for code in sorted(current_subs - new_codes): self.ws_cache.unsubscribe(code) if self.candle_agg: self.candle_agg.remove_code(code) # ── 신규 구독: 유니버스에 새로 들어온 종목 ───────────────────── kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) use_kiwoom = bool(kw_key and kw_secret) for code in sorted(new_codes - current_subs): self.ws_cache.subscribe(code) # 구독 즉시 갭 보정 (봉 버퍼 없는 신규 종목) — 키움 우선 if not self.candle_agg: continue lim = get_env_int("SCALP_GAP_FILL_LIMIT", 120) ok = False for tf in self.candle_agg.timeframes: df = None if use_kiwoom: try: df = get_kiwoom_candles_df( code, tf, kw_key, kw_secret, is_mock=kw_mock, n=lim, ) except Exception as e: logger.debug("키움 신규갭보정 실패 (%s %dM): %s", code, tf, e) if (df is None or df.empty) and tf <= 3: try: df = self.client.get_minute_chart( code, period=str(tf), limit=lim ) except Exception as e: logger.debug("KIS 신규갭보정 실패 (%s %dM): %s", code, tf, e) if df is not None and not df.empty: self.candle_agg.fill_gap_from_rest(code, tf, df) ok = True # tf 간 딜레이 (차트 API, 토큰은 캐시 재사용) time.sleep(random.uniform(0.2, 0.4)) if ok: logger.info("🔧 [신규갭보정] %s: 모든 tf 로드 완료", code) else: logger.warning("⚠️ [신규갭보정실패] %s: 데이터 없음 → 재시도 대기열 등록", code) self._gap_retry_queue.add(code) # ------------------------------------------------------------------ # 매수 신호: RSI 과매도 되돌림 # ------------------------------------------------------------------ def check_buy_signal_scalp(self, code: str, name: str) -> Optional[dict]: """ [스캘핑 진입 조건 — 1분봉 초단타 특화] 1. RAM 버퍼에서 최근 확정 봉 조회 (REST 없음) 2. RSI(3) <= SCALP_RSI_OVERSOLD: 단기 과매도 구간 진입 3. 이전 봉 음봉 → 최신 확정 봉 양봉: 되돌림(Retracement) 시작 신호 4. 당일 낙폭 필터 (SCALP_MIN_DROP_RATE 기준, 꼬리잡기보다 완화) ※ 일일 회복률(50%) 필터는 제거 — RSI<25(폭락직후)와 모순이라 매수 기회를 모두 죽임 5. 피뢰침/급등 필터 6. 거래량 필터 (VOLUME_AVG_MULTIPLIER) 7. 수급 수집 (ML 피처용, 진입 필터로 사용하지 않음) ─ 당일 시고저: WS 캐시 우선 → REST fallback (API 과부하 방지) """ try: # [테스트용] FORCE_BUY_TEST=true 시 조건 무시 if get_env_bool("FORCE_BUY_TEST", False): ws_d = self.ws_cache.get_price(code) if self.ws_cache else None px = 0.0 if ws_d: px = abs(float(str(ws_d.get("stck_prpr", 0)))) if px <= 0: pd_ = self.client.inquire_price(code) if pd_: px = abs(float(str(pd_.get("stck_prpr", 0)).replace(",", ""))) if px > 0: return {"code": code, "name": name, "price": px, "score": 5.0, "entry_features": {}} return None # ── 0. 시장 방향 필터 (USE_MARKET_REGIME_FILTER=true 시 활성) ── # KODEX200(069500)/KOSDAQ150(229200) 60분봉 RSI로 상승장 확인 # 하락장(ETF RSI < MARKET_REGIME_MIN_RSI)이면 롱 진입 차단 if get_env_bool("USE_MARKET_REGIME_FILTER", False): min_rsi = get_env_float("MARKET_REGIME_MIN_RSI", 48.0) regime = self.db.get_market_regime(tf=60) if not regime.get("is_bull") or regime.get("avg_rsi", 50) < min_rsi: logger.debug( "[시장필터] %s %s: 하락장 차단 (ETF RSI=%.1f < %.1f)", code, name, regime.get("avg_rsi", 0), min_rsi, ) return None # ── 0b. 테마 과열 필터 (USE_THEME_HEAT_FILTER=true 시 활성) ── # 이 종목 테마의 60분봉 RSI 평균이 THEME_HEAT_RSI_MAX 초과면 차단 # "테마 전체가 과열인데 내가 지금 진입하면 상투 잡기" if get_env_bool("USE_THEME_HEAT_FILTER", False): heat_max = get_env_float("THEME_HEAT_RSI_MAX", 72.0) meta = self.db.get_stock_meta(code) if meta and meta.get("theme"): momentum = self.db.get_theme_momentum(meta["theme"], tf=60) if momentum.get("count", 0) >= 3 and momentum.get("avg_rsi3", 0) > heat_max: logger.debug( "[테마필터] %s %s: 테마(%s) 과열 차단 (RSI=%.1f > %.1f)", code, name, meta["theme"], momentum["avg_rsi3"], heat_max, ) return None # ── 1. RAM 버퍼에서 확정 봉 조회 (DB 조회 없음, 즉시 반영) ── # fill_gap_from_rest() 가 RAM(_confirmed)에 즉시 쓰므로 # 갭보정 직후에도 봉을 사용 가능 (DB 큐 플러시 대기 불필요) if self.candle_agg: candles = self.candle_agg.get_candles(code, self.candle_tf, n=50) else: # WebSocket 비활성 시 DB fallback candles = self.db.get_ws_candles(code, self.candle_tf, limit=50, confirmed_only=True) if len(candles) < 5: logger.info("%s🔍 [탈락-봉부족] %s %s: 확정봉 %d개 (최소 5개 필요)%s", LOG_YELLOW, name, code, len(candles), LOG_RESET) # ── 봉부족 감지 → 백그라운드 재갭보정 트리거 ────────────── # 갭보정이 처음에 실패했거나 누락된 경우 즉시 재시도 # 같은 종목 30초 내 중복 재시도 방지 (API 과부하 방지) now_ts = time.time() last_retry = self._gap_retry_ts.get(code, 0) retry_interval = get_env_int("SCALP_GAP_RETRY_SEC", 30) if now_ts - last_retry > retry_interval: self._gap_retry_ts[code] = now_ts def _retry_gap(c=code): try: tf = self.candle_tf lim = get_env_int("SCALP_GAP_FILL_LIMIT", 100) df = self.client.get_minute_chart(c, period=str(tf), limit=lim) if df is not None and not df.empty and self.candle_agg: self.candle_agg.fill_gap_from_rest(c, tf, df) logger.info("🔧 [봉부족 재갭보정 완료] %s: %d행 로드", c, len(df)) else: logger.warning("⚠️ [봉부족 재갭보정 실패] %s: 데이터 없음", c) except Exception as ex: logger.warning("⚠️ [봉부족 재갭보정 오류] %s: %s", c, ex) threading.Thread(target=_retry_gap, daemon=True, name=f"gap-retry-{code}").start() return None latest = candles[-1] # 가장 최신 확정 봉 prev = candles[-2] # 그 전 봉 curr_price = float(latest["close"]) if curr_price < self.min_price: return None # ── 2. RSI 과열/과매도 체크 ─────────────────────────────── # RSI(상대강도지수): 단기간 얼마나 오르고 내렸는지 나타내는 지표. # SCALP_RSI_OVERSOLD(기본 25) 이하: 단기 극과매도 → 되돌림 가능성 ↑ rsi3 = latest.get("rsi_3") if rsi3 is None: logger.info("%s🔍 [탈락-RSI없음] %s %s: RSI 미계산 (봉 축적 중)%s", LOG_YELLOW, name, code, LOG_RESET) return None rsi3 = float(rsi3) # RSI 0.0 은 "극과매도"가 아니라 "봉 데이터 부족으로 계산 불가" # → 신뢰할 수 없으므로 진입 차단 if rsi3 <= 0.0: logger.info("%s🔍 [탈락-RSI무효] %s %s: RSI3=0.0 (봉 부족, 계산 불가)%s", LOG_YELLOW, name, code, LOG_RESET) return None if rsi3 > self.rsi_overbought: logger.info("%s🔍 [탈락-RSI과열] %s %s: RSI3=%.1f > %.0f%s", LOG_YELLOW, name, code, rsi3, self.rsi_overbought, LOG_RESET) return None if rsi3 > self.rsi_oversold: # 과매도 구간 아님 → 진입 기회 아님 logger.info("%s🔍 [탈락-RSI조건] %s %s: RSI3=%.1f (과매도<%.0f 아님)%s", LOG_YELLOW, name, code, rsi3, self.rsi_oversold, LOG_RESET) return None # ── 3. 되돌림(Retracement) 봉 확인 ────────────────────── # 되돌림: 하락하던 주가가 방향을 틀어 상승 전환하는 찰나의 타이밍 # 이전 봉 음봉 + 현재 봉 양봉 → 전환 신호 확인 prev_bearish = float(prev["close"]) < float(prev["open"]) curr_bullish = float(latest["close"]) > float(latest["open"]) if not (prev_bearish and curr_bullish): logger.info("%s🔍 [탈락-되돌림없음] %s %s: prev_bear=%s curr_bull=%s%s", LOG_YELLOW, name, code, prev_bearish, curr_bullish, LOG_RESET) return None # ── 4. 당일 시고저 확보 (WS 캐시 우선 → REST fallback) ── # H0STCNT0 체결 틱에 stck_oprc/hgpr/lwpr 포함 → REST 없이 바로 사용 current_price = curr_price day_open = day_high = day_low = 0.0 ws_d = self.ws_cache.get_price(code) if self.ws_cache else None if ws_d: current_price = abs(float(str(ws_d.get("stck_prpr", curr_price)).replace(",", ""))) or curr_price day_open = abs(float(str(ws_d.get("stck_oprc", 0)).replace(",", ""))) day_high = abs(float(str(ws_d.get("stck_hgpr", 0)).replace(",", ""))) day_low = abs(float(str(ws_d.get("stck_lwpr", 0)).replace(",", ""))) # WS 캐시 없거나 시고저 미수신 시에만 REST 호출 (API 과부하 방지) if day_open <= 0 or day_low <= 0: price_data = self.client.inquire_price(code) if not price_data: return None current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) or current_price day_open = abs(float(str(price_data.get("stck_oprc", 0)).replace(",", ""))) day_high = abs(float(str(price_data.get("stck_hgpr", 0)).replace(",", ""))) day_low = abs(float(str(price_data.get("stck_lwpr", 0)).replace(",", ""))) if day_open <= 0 or day_low <= 0 or current_price <= 0: return None # ── 낙폭 필터 (SCALP_MIN_DROP_RATE, 기본 1.5%) ── # 꼬리잡기 MIN_DROP_RATE(3%)보다 절반으로 완화 → 1분봉 타점 빈도 증가 drop_rate = (day_open - day_low) / day_open if day_open > 0 else 0 if drop_rate < self.scalp_min_drop_rate: logger.info("%s🔍 [탈락-낙폭] %s %s: %.2f%% < %.1f%%(SCALP_MIN_DROP_RATE)%s", LOG_YELLOW, name, code, drop_rate * 100, self.scalp_min_drop_rate * 100, LOG_RESET) return None # ── [핵심 변경] 일일 회복률 필터 제거 ───────────────────── # 기존: recovery_pos_day >= MIN_RECOVERY_RATIO_SHORT(50%) 조건이 있었음 # 문제: RSI<25(방금 폭락)이면 당연히 회복률도 낮음 → 동시 충족 불가 → 매수 기회 0 # 스캘핑은 '폭락 직후 되돌림' 포착이 목적이므로 일일 회복률 체크 불필요 # ── 5. 피뢰침 / 급등 필터 ──────────────────────────────── if current_price >= day_high * self.high_chase_thr: logger.info("%s🔍 [탈락-고점추격] %s %s: 현재가 %.0f ≥ 고가 %.0f × %.2f%s", LOG_YELLOW, name, code, current_price, day_high, self.high_chase_thr, LOG_RESET) return None if day_low > 0: # 당일 변동폭이 너무 크면 이미 급등주 → 스캘핑 회수 어려움 daily_chg_pct = (day_high - day_low) / day_low * 100 if daily_chg_pct > self.max_daily_chg: logger.info("%s🔍 [탈락-피뢰침 급등주] %s %s: 일일 변동폭 %.1f%% > %.0f%%%s", LOG_YELLOW, name, code, daily_chg_pct, self.max_daily_chg, LOG_RESET) return None # ── 6. 거래량 필터 ──────────────────────────────────────── # 현재 봉 거래량이 최근 N봉 평균 대비 VOLUME_AVG_MULTIPLIER 배 이상이어야 # 단순 노이즈가 아닌 실제 수급 참여 신호로 판단 volumes = [float(c.get("volume", 0)) for c in candles] avg_vol = sum(volumes[:-1]) / max(len(volumes) - 1, 1) curr_vol = float(latest.get("volume", 0)) if avg_vol > 0 and curr_vol < avg_vol * self.vol_multiplier: logger.info("%s🔍 [탈락-거래량] %s %s: %.0f < 평균%.0f × %.1f%s", LOG_YELLOW, name, code, curr_vol, avg_vol, self.vol_multiplier, LOG_RESET) return None # ── 7. 수급 수집 (ML 피처용, 진입 필터로 미사용) ────────── investor_trend = self.client.get_investor_trend(code, days=3) entry_features = { "rsi": rsi3, "volume_ratio": curr_vol / avg_vol if avg_vol > 0 else 1.0, "drop_rate": drop_rate, # 당일 낙폭 (ML 학습용) "tail_length_pct": 0.0, "ma5_gap_pct": None, "ma20_gap_pct": None, "foreign_net_buy": investor_trend.get("foreign_net_buy", 0) if investor_trend else 0, "institution_net_buy": investor_trend.get("org_net_buy", 0) if investor_trend else 0, "market_hour": dt.now().hour, } # ── ML 승률 필터 (설정 시) ──────────────────────────────── if self.use_ml_signal and self.ml_predictor: try: ml_feats = {k: (v if v is not None else 0.0) for k, v in entry_features.items()} ml_prob = self.ml_predictor.predict_win_probability(ml_feats) if ml_prob < self.ml_min_prob: logger.info("%s🔍 [탈락-ML] %s %s: %.2f%% < %.0f%%%s", LOG_YELLOW, name, code, ml_prob * 100, self.ml_min_prob * 100, LOG_RESET) return None except Exception: pass # 과매도가 깊을수록(RSI 낮을수록) 점수 상승 → 우선 매수 score = 5.0 + (self.rsi_oversold - rsi3) / 5.0 logger.info( "%s🎯 [스캘핑 시그널] %s | 가격:%.0f | RSI3:%.1f | 낙폭:%.1f%% | 거래량:%.1fx%s", LOG_CYAN, name, current_price, rsi3, drop_rate * 100, curr_vol / max(avg_vol, 1), LOG_RESET, ) return { "code": code, "name": name, "price": current_price, "score": score, "entry_features": entry_features, } except Exception as e: logger.info("%s🔍 [탈락-예외] %s %s: %s%s", LOG_YELLOW, name, code, e, LOG_RESET) return None # ------------------------------------------------------------------ # 매수 실행 (ver2 execute_buy 동일 구조) # ------------------------------------------------------------------ def execute_buy(self, signal: dict) -> bool: """매수 주문 실행 + DB 저장 + WS 구독.""" code = signal["code"] name = signal["name"] price = signal["price"] if price <= 0: return False # ── 포지션 크기 계산 (원화 손실 한도 역산) ──────────────────────────── # MAX_LOSS_PER_TRADE_KRW(원) ÷ |SCALP_STOP_LOSS_PCT| = 최대 투자 가능 금액 # 스캘핑 손절 1.5% 기준 예: 손실한도 20만원 → 투자상한 = 200,000 / 0.015 = 13,333,333원 # SLOT_MONEY_DEFAULT 가 더 낮으면 그쪽 사용 (포지션 과집중 방지) max_loss_krw = get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000) # 스캘핑 전용 손절 비율 (SCALP_STOP_LOSS_PCT, 기본 1.5%) scalp_sl_pct = abs(self.scalp_stop_loss_pct) # 양수로 처리 if max_loss_krw > 0 and scalp_sl_pct > 0: invest_limit = max_loss_krw / scalp_sl_pct invest_amount = min(invest_limit, self.slot_money) else: invest_amount = self.slot_money qty = max(1, int(invest_amount / price)) if qty <= 0: return False # 스캘핑 전용 타이트한 손절/익절가 (SCALP_STOP/TAKE_PROFIT_PCT) # 꼬리잡기의 STOP_LOSS_PCT(-4%), TAKE_PROFIT_PCT(+5%)와 완전 분리 stop_price = price * (1 + self.scalp_stop_loss_pct) # ex: -1.5% target_price = price * (1 + self.scalp_take_profit_pct) # ex: +1.5% ok = self.client.buy_order(code, qty, order_type="01") # 시장가 if not ok: return False now_str = dt.now().strftime("%Y-%m-%d %H:%M:%S") holding = { "buy_price": price, "qty": qty, "stop_price": stop_price, "target_price": target_price, "max_price": price, "atr_entry": 0.0, "buy_time": now_str, "name": name, "size_class": "", } self.holdings[code] = holding # DB 저장 (upsert_trade 는 trade_data dict 방식) self.db.upsert_trade({ "code": code, "name": name, "strategy": "SCALP_RSI_REVERSAL", "avg_buy_price": price, "current_price": price, "stop_price": stop_price, "target_price": target_price, "max_price": price, "atr_entry": 0.0, "target_qty": qty, "current_qty": qty, "total_invested": price * qty, "status": "HOLDING", "buy_date": now_str, "entry_features": signal["entry_features"], }) # WS 구독 (이미 구독 중이면 무시됨) if self.ws_cache: self.ws_cache.subscribe(code) rsi_val = signal["entry_features"].get("rsi", 0) msg = (f"🛒 **[스캘핑 매수]** {name}({code})\n" f"매수가: {price:,.0f}원 × {qty}주 = {price*qty:,.0f}원\n" f"손절: {stop_price:,.0f}원(-{abs(self.scalp_stop_loss_pct)*100:.1f}%) | " f"익절: {target_price:,.0f}원(+{self.scalp_take_profit_pct*100:.1f}%)\n" f"RSI3: {rsi_val:.1f}") msg_mm(msg) logger.info("✅ [매수체결] %s %s @ %d원 × %d주 | 손절-%.1f%% 익절+%.1f%%", name, code, int(price), qty, abs(self.scalp_stop_loss_pct) * 100, self.scalp_take_profit_pct * 100) return True # ------------------------------------------------------------------ # 매도 신호 체크 (ver2 check_sell_signals 와 동일 원리) # ------------------------------------------------------------------ def check_sell_signals(self) -> List[dict]: """ 보유 종목 순회 → 손절/익절/ATR 조건 체크. 현재가: WebSocket 캐시 우선, 없으면 REST fallback. """ signals = [] now = dt.now() for code, holding in list(self.holdings.items()): try: name = holding.get("name", code) buy_price = float(holding.get("buy_price", 0)) qty = int(holding.get("qty", 0)) stop_price = float(holding.get("stop_price", 0)) target_price = float(holding.get("target_price", 0)) max_price = float(holding.get("max_price", buy_price)) if qty <= 0 or buy_price <= 0: continue # 매도 백오프 중이면 스킵 backoff_until = self._sell_backoff.get(code, 0) if time.time() < backoff_until: continue # 현재가 조회: WS 캐시 → REST fallback current_price = 0.0 if self.ws_cache and self.ws_cache.is_active: pd_ = self.ws_cache.get_price(code) if pd_: current_price = abs(float(str(pd_.get("stck_prpr", 0)))) if current_price <= 0: pd_ = self.client.inquire_price(code) if pd_: current_price = abs(float(str(pd_.get("stck_prpr", 0)).replace(",", ""))) if current_price <= 0: continue # max_price 갱신 if current_price > max_price: max_price = current_price holding["max_price"] = max_price self.db.upsert_trade({ "code": code, "name": holding.get("name", code), "avg_buy_price": buy_price, "current_price": current_price, "max_price": max_price, "target_qty": qty, "current_qty": qty, "status": "HOLDING", "buy_date": holding.get("buy_time", now.strftime("%Y-%m-%d %H:%M:%S")), }) profit_pct = (current_price - buy_price) / buy_price # 가격 변화율 (수수료 전) profit_val = (current_price - buy_price) * qty # 가격 손익 원화 (수수료 전) # ── 본절가(breakeven) 계산 ────────────────────────────── # 왕복 수수료 + 세금 + 최소 마진을 합산한 최소 보장 라인 # FEE_RATE_PCT : 위탁수수료 매수/매도 각각 (기본 0.015%) # SELL_TAX_RATE_PCT: 증권거래세 매도 시만 (기본 0.18%) # SCALP_MIN_PROFIT_PCT: 수수료 위 최소 순이익 마진 (기본 0.2%) # breakeven = 매수가 × (1 + 수수료×2 + 세금 + 최소마진) _fee = get_env_float("FEE_RATE_PCT", 0.015) / 100 _tax = get_env_float("SELL_TAX_RATE_PCT", 0.18) / 100 _min_margin = get_env_float("SCALP_MIN_PROFIT_PCT", 0.2) / 100 breakeven_pct = _fee * 2 + _tax + _min_margin breakeven_price = buy_price * (1 + breakeven_pct) reason = None # [1] 손절 (% 기준) if current_price <= stop_price: reason = f"손절 ({profit_pct*100:.2f}%)" # [2] 금액 손실컷 (원화 기준) — 고가 종목에서 % 손절 이전에 원화 손실이 터지는 경우 대비 # 예) 10만원짜리 20주 보유, 손절 -3% → 20만원 손실 → 설정값 초과 시 먼저 컷 elif profit_val <= -get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000): reason = f"금액손실컷 ({profit_val:,.0f}원)" # [3] 익절 (% 기준) elif current_price >= target_price: reason = f"익절 ({profit_pct*100:.2f}%)" # [4] 본절사수 — 한 번 의미 있게 올랐던 종목이 수수료 라인까지 내려오면 # 트레일링 전에 본절+0.2%에서 먼저 청산 (마이너스 방지) # 조건: 고점이 본절가 이상이었던 적 있음 + 현재가가 본절가로 회귀 elif max_price >= breakeven_price and current_price <= breakeven_price: net_pct = profit_pct - breakeven_pct + _min_margin reason = f"본절사수 (순익≈{net_pct*100:+.2f}%)" # [5] ATR 트레일링 스탑 (고점 대비 하락) # 발동 조건: 고점이 매수가 대비 본절가 이상 올라야 함 # → 수수료 방어 라인을 넘긴 수익에서만 트레일링 작동 elif max_price >= breakeven_price: drop_from_high = (max_price - current_price) / max_price if drop_from_high >= self.atr_down_mult * 0.01: # 트레일링 발동이어도 현재가가 본절 이하면 본절사수로 전환 # (위 [4]에서 먼저 걸리므로 여기는 본절 이상에서만 도달) reason = f"트레일링스탑 고점대비-{drop_from_high*100:.1f}%" if reason: signals.append({ "code": code, "name": name, "current_price": current_price, "qty": qty, "buy_price": buy_price, "profit_pct": profit_pct, "reason": reason, }) except Exception as e: logger.error("매도 신호 체크 오류(%s): %s", code, e) return signals # ------------------------------------------------------------------ # 매도 실행 # ------------------------------------------------------------------ def execute_sell(self, signal: dict): """매도 주문 실행 + DB 업데이트 + WS 구독 해제.""" code = signal["code"] name = signal["name"] current_price = signal["current_price"] qty = signal["qty"] buy_price = signal["buy_price"] profit_pct = signal["profit_pct"] reason = signal["reason"] # 메시지에 사용할 손익·보유시간 미리 계산 (holdings 삭제 전) # ── 수수료 계산 (env/DB 에서 비율 로드) ───────────────────────── # FEE_RATE_PCT : 위탁수수료 (매수/매도 각각, 기본 0.015%) # SELL_TAX_RATE_PCT: 증권거래세 (매도 시만 부과, 기본 0.18%) # 왕복 총비용 = buy_price×qty×fee + sell_price×qty×(fee + tax) fee_rate = get_env_float("FEE_RATE_PCT", 0.015) / 100 tax_rate = get_env_float("SELL_TAX_RATE_PCT", 0.18) / 100 total_fee = (buy_price * qty * fee_rate + current_price * qty * (fee_rate + tax_rate)) gross_pnl = (current_price - buy_price) * qty # 수수료 제외 손익 realized_pnl = gross_pnl - total_fee # 수수료 반영 순손익 buy_time_str = (self.holdings.get(code) or {}).get("buy_time", dt.now().strftime("%Y-%m-%d %H:%M:%S")) try: hold_min = int((dt.now() - dt.strptime(buy_time_str, "%Y-%m-%d %H:%M:%S")).total_seconds() / 60) except Exception: hold_min = 0 ok = self.client.sell_market_order(code, qty) if not ok: msg_cd = self.client._last_sell_msg_cd or "" msg1 = self.client._last_sell_msg1 or "" # 영업일 아님/장 외 시간 오류 → 백오프 if msg_cd in ("APBK0013", "APBK0962", "40910000"): backoff = get_env_int("SELL_FAILURE_BACKOFF_SEC", 1800) self._sell_backoff[code] = time.time() + backoff logger.warning("⏳ [매도백오프] %s %s: %s → %d초 대기", name, code, msg_cd, backoff) # ── 잔고 없음: 계좌에 포지션이 없는데 로컬 DB·메모리에만 남은 경우 ── # 모의투자 계정 초기화, 수동 취소 등으로 실제 잔고가 사라진 경우 # 로컬 holdings·active_trades를 강제 정리해 무한 재시도 방지 elif "잔고" in msg1 or "보유" in msg1 or "APBK3020" in msg_cd: logger.warning("⚠️ [유령잔고 정리] %s %s: 브로커 잔고 없음 → 로컬 기록 강제 삭제", name, code) self.db.close_trade(code=code, sell_price=0, sell_reason="잔고없음(강제정리)", strategy="SCALP_RSI_REVERSAL") self.holdings.pop(code, None) return # DB: active_trades → trade_history 이동 (strategy 지정 → 스캘핑 row만 삭제) # realized_pnl_override: 수수료·거래세 이미 차감된 순손익을 DB에 저장 self.db.close_trade( code=code, sell_price=current_price, sell_reason=reason, strategy="SCALP_RSI_REVERSAL", realized_pnl_override=realized_pnl, ) if code in self.holdings: del self.holdings[code] # WS 구독 해제 (재매수 대기 종목은 계속 구독) self.recently_sold[code] = time.time() # 당일 확정 손익: 스캘핑(SCALP_) 전략만 필터 → 꼬리잡기 손익 혼합 방지 try: today_trades = self.db.get_trades_by_date(self.today_date) day_pnl = sum( (t.get("realized_pnl") or 0) for t in today_trades if str(t.get("strategy", "")).startswith("SCALP") ) except Exception: day_pnl = 0 # 수수료 반영 순수익률 net_pct = realized_pnl / (buy_price * qty) if buy_price * qty > 0 else 0 emoji = "🔴" if realized_pnl < 0 else "🟢" msg = (f"{emoji} **[스캘핑 매도]** {name}({code})\n" f"{current_price:,.0f}원 × {qty:,}주 | {reason} | " f"수익률 {net_pct*100:+.2f}% (실현 {realized_pnl:+,.0f}원 / 수수료 -{total_fee:,.0f}원)\n" f"보유: {hold_min}분 | 보유 {len(self.holdings)}종목\n" f"당일손익 {day_pnl:+,.0f}원") msg_mm(msg) logger.info("%s [매도체결] %s %s @ %d원 %+.2f%% (수수료 -%,.0f원) (%s)%s", LOG_GREEN if realized_pnl >= 0 else LOG_RED, name, code, int(current_price), net_pct * 100, total_fee, reason, LOG_RESET) # ------------------------------------------------------------------ # 매인 루프 # ------------------------------------------------------------------ def run(self): """메인 루프 진입점.""" asyncio.run(self._run_async()) async def _run_async(self): """비동기 루프: 리포트 태스크 + 동기 매매 루프.""" self._report_task = asyncio.create_task(self._report_scheduler()) loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._sync_trading_loop) def _sync_trading_loop(self): """동기 매매 루프.""" logger.info("📈 스캘핑 매매 루프 시작") last_cleanup_day = "" while True: try: self.reload_config() now = dt.now() today_str = now.strftime("%Y-%m-%d") # 날짜 변경 처리 if today_str != self.today_date: self.today_date = today_str self.untradable_skip_set.clear() self.morning_report_sent = False self.closing_report_sent = False logger.info("📅 날짜 변경: %s", today_str) # ws_candles 오래된 봉 정리 (1일 1회) if today_str != last_cleanup_day: keep = get_env_int("SCALP_CANDLE_KEEP_DAYS", 3) self.db.cleanup_old_ws_candles(keep_days=keep) last_cleanup_day = today_str # 장 외 시간: 보유 없으면 슬립 if not self.check_market_status(): time.sleep(30) continue # ── 매도 우선 ───────────────────────────────────────── sell_signals = self.check_sell_signals() for sig in sell_signals: self.execute_sell(sig) # ── 후보 종목 동기화 (새 종목 구독) ────────────────── candidates = self.db.get_target_candidates() self._sync_subscriptions(candidates) # ── 매수 체크 ───────────────────────────────────────── active_cnt = len(self.holdings) if candidates and active_cnt < self.max_stocks: logger.info("🔍 [매수체크] 후보 %d개 순회 (보유 %d/%d)", len(candidates), active_cnt, self.max_stocks) for c in candidates: code = c.get("code") or c.get("stk_cd", "") name = c.get("name") or c.get("stk_nm", code) if not code or code in self.holdings: continue if code in self.untradable_skip_set: continue # 재진입 쿨다운 (스캘핑 전용 SCALP_COOLDOWN_SEC → 없으면 REENTRY_COOLDOWN_SEC 폴백) reentry_cd = get_env_int("SCALP_COOLDOWN_SEC", None) or get_env_int("REENTRY_COOLDOWN_SEC", 300) elapsed = time.time() - self.recently_sold.get(code, 0) if elapsed < reentry_cd: remaining = int(reentry_cd - elapsed) logger.info("⏳ [재진입차단] %s %s — %d초 남음", name, code, remaining) continue signal = self.check_buy_signal_scalp(code, name) if signal: ok = self.execute_buy(signal) if ok: time.sleep(random.uniform(1, 2)) break time.sleep(random.uniform(0.3, 0.8)) continue time.sleep(random.uniform(0.2, 0.5)) # 스캘핑 루프: 짧은 대기 (1분봉 집계 시 1~2초면 충분) time.sleep(random.uniform(1, 2)) except KeyboardInterrupt: logger.info("⏹ 봇 종료 (KeyboardInterrupt)") if self._report_task: self._report_task.cancel() break except Exception as e: logger.error("❌ 루프 에러: %s", e) time.sleep(5) # ------------------------------------------------------------------ # 리포트 스케줄러 # ------------------------------------------------------------------ async def _report_scheduler(self): """장 시작(9:05) / 마감(15:35) 리포트 전송.""" while True: try: now = dt.now() if (now.hour == 9 and now.minute == 5 and not self.morning_report_sent): self._send_report("morning") self.morning_report_sent = True elif (now.hour == 15 and now.minute == 35 and not self.closing_report_sent): self._send_report("closing") self.closing_report_sent = True except Exception as e: logger.error("리포트 스케줄러 오류: %s", e) await asyncio.sleep(30) def _send_report(self, report_type: str): """간단한 보유 현황 리포트 전송.""" try: now_str = dt.now().strftime("%Y-%m-%d %H:%M") title = "🌅 장 시작 보유현황" if report_type == "morning" else "📊 마감 스캘핑 리포트" lines = [f"**{title}** ({now_str})\n"] if self.holdings: for code, h in self.holdings.items(): price_data = self.client.inquire_price(code) curr = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) \ if price_data else h.get("buy_price", 0) pnl_pct = (curr - h["buy_price"]) / h["buy_price"] * 100 if h["buy_price"] > 0 else 0 lines.append(f"- {h['name']}({code}): {curr:,.0f}원 {pnl_pct:+.1f}%") time.sleep(0.2) else: lines.append("- 보유 종목 없음") # 오늘 매매 결과 — 스캘핑(SCALP_) 전략만 집계 today_yyyymmdd = dt.now().strftime("%Y%m%d") all_today = self.db.get_trades_by_date(today_yyyymmdd) history = [t for t in all_today if str(t.get("strategy", "")).startswith("SCALP")] if history: wins = [t for t in history if (t.get("profit_rate") or 0) >= 0] total_pnl = sum((t.get("realized_pnl") or 0) for t in history) lines.append(f"\n오늘 매매: {len(history)}건 | 승{len(wins)}/패{len(history)-len(wins)} | " f"손익 {total_pnl:+,.0f}원") msg_mm("\n".join(lines)) except Exception as e: logger.error("리포트 전송 실패: %s", e) # ══════════════════════════════════════════════════════════════════════ if __name__ == "__main__": bot = ScalpingBotV1() bot.run()