""" 스마트 주문 집행 모듈 (TWAP - Time Weighted Average Price) - 1초 간격 기계적 매수 방지 - 랜덤 딜레이로 시장 충격 최소화 - 분할 매수 진행 상태 저장 (봇 재시작 시에도 안전) """ import time import random import json import os import logging from datetime import datetime, timedelta from typing import Dict, Callable, Optional logger = logging.getLogger("SmartExecutor") class SmartOrderExecutor: """ [스텔스 주문 집행기] - TWAP(시간 가중 평균) 방식으로 긴 시간 동안 랜덤하게 물량 분할 집행 - 호가 잡아먹힘 방지 및 슬리피지 최소화 """ def __init__( self, state_file: str = "smart_orders.json", min_split_amount: int = 500000, # 1회 최소 주문 금액 (50만원) max_split_amount: int = 2000000, # 1회 최대 주문 금액 (200만원) min_delay_seconds: int = 30, # 최소 대기 시간 (초) max_delay_seconds: int = 180, # 최대 대기 시간 (초) default_duration_minutes: int = 30 # 기본 분할 기간 (분) ): """ Args: state_file: 진행 상태 저장 파일 min_split_amount: 1회 최소 주문 금액 max_split_amount: 1회 최대 주문 금액 min_delay_seconds: 분할 매수 간 최소 딜레이 max_delay_seconds: 분할 매수 간 최대 딜레이 default_duration_minutes: 기본 분할 기간 """ self.state_file = state_file self.min_split = min_split_amount self.max_split = max_split_amount self.min_delay = min_delay_seconds self.max_delay = max_delay_seconds self.duration = default_duration_minutes self.active_orders: Dict = self._load_state() logger.info( f"🎯 SmartExecutor 초기화: " f"분할{self.min_split//10000}~{self.max_split//10000}만원 | " f"딜레이{min_delay_seconds}~{max_delay_seconds}초" ) def _load_state(self) -> Dict: """저장된 진행 상태 로드""" if os.path.exists(self.state_file): try: with open(self.state_file, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"📂 진행 중인 스마트 주문 로드: {len(data)}개") return data except Exception as e: logger.error(f"❌ 상태 파일 로드 실패: {e}") return {} return {} def _save_state(self): """현재 상태 저장 (Atomic Write)""" try: # 임시 파일에 먼저 쓰고 rename (원자성 보장) temp_file = f"{self.state_file}.tmp" with open(temp_file, 'w', encoding='utf-8') as f: json.dump(self.active_orders, f, indent=2, ensure_ascii=False) # Windows 호환성을 위해 기존 파일 삭제 후 rename if os.path.exists(self.state_file): os.remove(self.state_file) os.rename(temp_file, self.state_file) except Exception as e: logger.error(f"❌ 상태 저장 실패: {e}") def add_order( self, stock_code: str, stock_name: str, total_amount: int, duration_minutes: Optional[int] = None ): """ 스마트 주문 등록 (분할 매수 시작) Args: stock_code: 종목코드 stock_name: 종목명 total_amount: 총 매수 금액 duration_minutes: 분할 기간 (분), None이면 기본값 사용 """ if stock_code in self.active_orders: logger.warning(f"⚠️ [{stock_name}] 이미 진행 중인 스마트 주문 존재") return False duration = duration_minutes or self.duration # 분할 횟수 계산 (총 금액을 min~max 사이로 나눔) avg_split = (self.min_split + self.max_split) / 2 split_count = max(2, int(total_amount / avg_split)) amount_per_trade = int(total_amount / split_count) # 첫 실행 시간 설정 (즉시 ~ 1분 후) next_run = datetime.now() + timedelta(seconds=random.randint(1, 60)) end_time = datetime.now() + timedelta(minutes=duration) self.active_orders[stock_code] = { 'name': stock_name, 'total_amount': total_amount, 'remaining_amount': total_amount, 'split_count': split_count, 'amount_per_trade': amount_per_trade, 'executed_count': 0, 'start_time': datetime.now().isoformat(), 'end_time': end_time.isoformat(), 'next_run_time': next_run.isoformat() } self._save_state() logger.info( f"📝 [{stock_name}] 스마트 주문 등록: " f"{total_amount:,}원 -> {split_count}회 분할 ({duration}분)" ) return True def process_orders( self, current_prices: Dict[str, float], buy_callback: Callable[[str, str, int, float], bool] ) -> int: """ 스마트 주문 처리 (메인 루프에서 주기적으로 호출) Args: current_prices: {종목코드: 현재가} 딕셔너리 buy_callback: 실제 매수 실행 함수 (code, name, amount, price) -> success Returns: 이번 사이클에서 실행한 주문 수 """ executed_count = 0 completed_codes = [] now = datetime.now() for code, order in list(self.active_orders.items()): try: # 1. 실행 시간 체크 next_run = datetime.fromisoformat(order['next_run_time']) if now < next_run: continue # 아직 실행 시간 안됨 # 2. 종료 시간 체크 end_time = datetime.fromisoformat(order['end_time']) if now > end_time: logger.warning( f"⏰ [{order['name']}] 시간 초과 (잔액: {order['remaining_amount']:,}원) " f"-> 남은 금액은 다음 기회에..." ) completed_codes.append(code) continue # 3. 현재가 확인 current_price = current_prices.get(code) if not current_price or current_price <= 0: logger.debug(f"⚠️ [{order['name']}] 현재가 없음 -> 스킵") continue # 4. 이번 실행 금액 결정 remaining = order['remaining_amount'] buy_amount = min(order['amount_per_trade'], remaining) # 5. 콜백으로 실제 매수 시도 success = buy_callback(code, order['name'], buy_amount, current_price) if success: # 6. 성공 시 상태 업데이트 order['remaining_amount'] -= buy_amount order['executed_count'] += 1 executed_count += 1 logger.info( f"✅ [{order['name']}] 분할 매수 {order['executed_count']}회차: " f"{buy_amount:,}원 @ {current_price:,} " f"(잔액: {order['remaining_amount']:,}원)" ) # 7. 완료 체크 if order['remaining_amount'] < 10000: # 1만원 미만 잔돈은 무시 logger.info(f"🎉 [{order['name']}] 스마트 주문 완료!") completed_codes.append(code) else: # 8. 다음 실행 시간 랜덤 설정 delay = random.randint(self.min_delay, self.max_delay) order['next_run_time'] = (now + timedelta(seconds=delay)).isoformat() logger.info( f"⏳ [{order['name']}] 다음 매수는 {delay}초 후 " f"({order['split_count'] - order['executed_count']}회 남음)" ) else: # 매수 실패 시 1분 뒤 재시도 order['next_run_time'] = (now + timedelta(seconds=60)).isoformat() logger.warning(f"⚠️ [{order['name']}] 매수 실패 -> 1분 후 재시도") except Exception as e: logger.error(f"❌ [{code}] 스마트 주문 처리 에러: {e}") continue # 완료된 주문 삭제 for code in completed_codes: del self.active_orders[code] # 상태 저장 (변경사항 있을 때만) if executed_count > 0 or completed_codes: self._save_state() return executed_count def cancel_order(self, stock_code: str) -> bool: """스마트 주문 취소""" if stock_code in self.active_orders: order = self.active_orders.pop(stock_code) self._save_state() logger.info(f"🚫 [{order['name']}] 스마트 주문 취소") return True return False def get_status(self, stock_code: str = None) -> Dict: """ 진행 상태 조회 Args: stock_code: 특정 종목 (None이면 전체) Returns: 주문 상태 딕셔너리 """ if stock_code: return self.active_orders.get(stock_code, {}) return self.active_orders.copy() def has_active_order(self, stock_code: str) -> bool: """해당 종목의 진행 중인 스마트 주문이 있는지 확인""" return stock_code in self.active_orders def get_active_count(self) -> int: """진행 중인 스마트 주문 개수""" return len(self.active_orders)