import time import json import datetime import pandas as pd import numpy as np import os import logging import requests import random from dotenv import load_dotenv # ========================================================== # [Step 0] 환경 변수 및 기본 설정 # ========================================================== current_dir = os.path.dirname(os.path.abspath(__file__)) env_path = os.path.join(current_dir, ".env") if not os.path.exists(env_path): env_path = os.path.join(os.path.dirname(current_dir), ".env") load_dotenv(env_path) # Mattermost 설정 MM_SERVER_URL = "https://mattermost.hoonfam.org" MM_BOT_TOKEN = os.environ.get("MM_BOT_TOKEN_", "") MM_CONFIG_FILE = os.path.join(current_dir, "mm_config.json") # [Logger 설정] - 로그 포맷 및 핸들러 설정 logging.basicConfig( format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO ) logger = logging.getLogger("JungleBot") # 외부 라이브러리 로그 레벨 조정 (너무 시끄러운 로그 방지) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) # 키움 API 모듈 임포트 from kiwoom_rest_api.auth.token import TokenManager from kiwoom_rest_api.koreanstock.stockinfo import StockInfo from kiwoom_rest_api.koreanstock.chart import Chart from kiwoom_rest_api.koreanstock.order import Order from kiwoom_rest_api.koreanstock.rank_info import RankInfo from kiwoom_rest_api.koreanstock.account import Account from kiwoom_rest_api.koreanstock.market_condition import MarketCondition # ========================================================== # [Part 0] Mattermost 봇 클래스 (알림 전송용) # ========================================================== class MattermostBot: def __init__(self): self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts" self.headers = { "Authorization": f"Bearer {MM_BOT_TOKEN}", "Content-Type": "application/json" } self.channels = self._load_channels() def _load_channels(self): """설정 파일에서 채널 ID 정보를 읽어옵니다.""" try: if os.path.exists(MM_CONFIG_FILE): with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f).get("channels", {}) return {} except Exception as e: logger.error(f"⚠️ MM 설정 로드 실패: {e}") return {} def send(self, channel_alias, message): """지정된 채널(alias)로 메시지를 전송합니다.""" channel_id = self.channels.get(channel_alias) if not channel_id: # 채널 ID가 없으면 로그만 찍고 넘어감 (봇 중단 방지) logger.warning(f"❌ '{channel_alias}' 채널 ID 없음. mm_config.json 확인 필요.") return False payload = {"channel_id": channel_id, "message": message} try: res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3) res.raise_for_status() return True except Exception as e: logger.error(f"❌ MM 전송 에러: {e}") return False # ========================================================== # [Part 1] 데이터 클래스 (주식 정보 객체) # ========================================================== class StockData: def __init__(self, code, name, price, open_p, high, low, close, volume): self.code = code self.name = name self.current_price = price self.open = open_p self.high = high self.low = low self.close = close self.volume = volume # ========================================================== # [Part 2] 브로커 API (키움증권 REST API 연동) # ========================================================== class BrokerAPI: def __init__(self): logger.info("🔵 키움(REST) 브로커 연결 시도...") try: self.token_manager = TokenManager() self.stock_info = StockInfo(token_manager=self.token_manager) self.chart = Chart(token_manager=self.token_manager) self.order = Order(token_manager=self.token_manager) self.rank = RankInfo(token_manager=self.token_manager) self.account = Account(token_manager=self.token_manager) self.market = MarketCondition(token_manager=self.token_manager) self.acc_no = os.environ.get("KIWOOM_ACCOUNT_NO", "YOUR_ACCOUNT_NO") logger.info(f"✅ 브로커 연결 완료 (계좌: {self.acc_no})") except Exception as e: logger.critical(f"❌ 브로커 초기화 실패: {e}") raise e def _safe_request(self, func, *args, **kwargs): """ API 호출 안전장치 (타임아웃 및 429 에러 핸들링 강화) - 429 에러(Too Many Requests) 발생 시 대기 후 재시도 - 일반 에러 발생 시 로그 기록 """ full_name = func.__name__ api_id = full_name.split('_')[-1] max_retries = 3 for i in range(max_retries): try: # 기본 안전 대기 (API 과부하 방지) time.sleep(1) result = func(*args, **kwargs) # 키움 API 특성상 200 OK라도 에러 메시지가 있을 수 있음 if isinstance(result, dict) and result.get('return_code') != '0' and '초과' in result.get('msg1', ''): raise Exception("429 Rate Limit Detected") return result except Exception as e: # 429 또는 과부하 에러 시 더 오래 대기 if "429" in str(e) or "과부하" in str(e): logger.warning(f"⚠️ [{api_id}] API 과부하 감지 -> 5초 대기 후 재시도 ({i + 1}/{max_retries})") time.sleep(5) else: logger.error(f"❌ [{api_id}] 호출 에러: {e}") time.sleep(1) logger.error(f"💀 [{api_id}] 3회 재시도 실패 -> 빈 값 반환") return {} def get_deposit_only(self): """예수금(주문 가능 금액)만 빠르게 조회""" try: res = self._safe_request(self.account.deposit_detail_status_request_kt00001, qry_tp="2") d2_deposit = float(res.get('d2_entra', 0)) if res else 0 current_deposit = float(res.get('ord_alow_amt', 0)) if res else 0 # D+2 예수금이 마이너스면 미수 발생 상황이므로 0으로 처리 return 0 if d2_deposit < 0 else current_deposit except Exception as e: logger.error(f"예수금 조회 실패: {e}") return 0 def get_intraday_investor(self, code): """ 장중 투자자별 매매 차트 (수급 확인용) - 외국인/기관의 실시간 순매수 수량을 리턴 """ try: res = self._safe_request(self.chart.intraday_investor_trading_chart_request_ka10064, mrkt_tp="000", amt_qty_tp="2", trde_tp="0", stk_cd=code) if not res or 'opmr_invsr_trde_chart' not in res: return 0, 0 data_list = res['opmr_invsr_trde_chart'] if not data_list: return 0, 0 latest = data_list[0] foreigner = int(latest.get('frgnr_invsr', 0)) institution = int(latest.get('orgn', 0)) return foreigner, institution except Exception as e: # 수급 조회 실패는 치명적이지 않으므로 0 리턴 return 0, 0 def _is_target_stock(self, name, code): """종목 필터링 (스팩, ETN, 우선주 등 제외)""" if len(code) != 6 or not code.isdigit(): return False exclude = ['스팩', 'ETN', 'W', 'ELW', '채권', '레버리지', '인버스', '곱버스', '선물', '콜', '풋', '2X', '3X', '합성', 'H', 'B'] if any(k in name for k in exclude): return False if name.endswith('우') or name.endswith('우B'): return False return True def check_market_status(self): """ 장 운영 시간 체크 (08:30 ~ 16:00) - 주말 및 장운영 시간 외에는 False 반환 """ now = datetime.datetime.now() # 1. 시간 체크 (08:30 ~ 16:00) if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)): if now.minute == 0 and now.second < 2: logger.info(f"💤 [장운영] 정규장 시간이 아닙니다. ({now.time().strftime('%H:%M:%S')})") return False # 2. 주말 체크 if now.weekday() >= 5: if now.minute == 0 and now.second < 2: logger.info("⏸️ [장운영] 주말 휴장입니다.") return False # 공휴일 체크 로직은 제외 (Fail-Open 방식: 장이 안 열리면 주문 실패로 처리됨) return True def get_multi_stock_data(self, code_list): """여러 종목의 현재가 정보를 한 번에 조회""" if not code_list: return {} code_str = "|".join([str(c).strip() for c in code_list]) try: res = self._safe_request(self.stock_info.watchlist_stock_information_request_ka10095, stock_code=code_str) result_map = {} if res and 'atn_stk_infr' in res: for item in res['atn_stk_infr']: code = item.get('stk_cd', '') if not code: continue if len(code) > 6: code = code[-6:] try: c = abs(float(item['cur_prc'])) if c > 0: result_map[code] = { 'code': code, 'name': item['stk_nm'], 'price': c, 'open': abs(float(item['open_pric'])), 'high': abs(float(item['high_pric'])), 'low': abs(float(item['low_pric'])) } except: continue return result_map except Exception as e: logger.error(f"멀티 시세 조회 실패: {e}") return {} def get_ohlcv_limit(self, code, timeframe='1m'): """분봉 차트 데이터 조회 (틱 범위 지정 가능)""" tic_scope = "1" if timeframe == '3m': tic_scope = "3" elif timeframe == '5m': tic_scope = "5" elif timeframe == '10m': tic_scope = "10" try: res = self._safe_request(self.chart.stock_minute_chart_request_ka10080, stk_cd=code, tic_scope=tic_scope, upd_stkpc_tp="1") data = res.get('stk_min_pole_chart_qry', []) if res else [] if not data: return pd.DataFrame() df = pd.DataFrame(data) df = df.rename(columns={'cur_prc': 'close', 'open_pric': 'open', 'high_pric': 'high', 'low_pric': 'low', 'trde_qty': 'volume'}) # 데이터를 시간순(과거->현재)으로 정렬 return df[['open', 'high', 'low', 'close', 'volume']].astype(float).abs().iloc[::-1].reset_index(drop=True) except Exception as e: logger.error(f"차트 조회 실패({code}): {e}") return pd.DataFrame() def scan_ant_shaking_candidates(self, max_price_limit=None): """ [조건검색 대체 로직] 개미털기(눌림목) 후보 종목 스캔 - 거래대금 및 회전율 상위 종목 중, 고점 대비 일정 비율 하락 후 반등 시도하는 종목 추출 """ logger.info(f"🐜 [개미털기] 스캔 시작") raw_codes_set = set() scan_strategies = [("3", "거래대금"), ("2", "회전율")] for sort_tp, desc in scan_strategies: try: res = self._safe_request(self.rank.top_trading_volume_today_request_ka10030, mrkt_tp="000", sort_tp=sort_tp, mang_stk_incls="3", pric_tp="0", crd_tp="0", trde_qty_tp="0", trde_prica_tp="0", mrkt_open_tp="0", stex_tp="3", cont_yn="N") if not res or 'tdy_trde_qty_upper' not in res: continue for stock in res['tdy_trde_qty_upper']: code = stock['stk_cd'].split('_')[0] try: price = abs(float(stock['cur_prc'])) except: continue # 전일 종가 조회 (API 함수 확인 필요) open_price = abs(float(stock.get('open_pric', price))) change_rate = ((price - open_price) / open_price * 100) if open_price > 0 else 0 if price < 1000: continue # 동전주 제외 if change_rate > 20: # 상한가 근처 logger.info(f"🚫 상한가 제외: {stock['stk_nm']} (+{change_rate:.1f}%)") continue if price > 200000: continue # 10만원 이상 제외 if self._is_target_stock(stock['stk_nm'], code): raw_codes_set.add(code) except Exception as e: logger.error(f"스캔({desc}) 중 에러: {e}") raw_codes = list(raw_codes_set) logger.info(f" 📥 후보 수집: {len(raw_codes)}개 -> 정밀 분석") final_list = [] chunk_size = 50 try: for i in range(0, len(raw_codes), chunk_size): chunk = raw_codes[i: i + chunk_size] multi_data = self.get_multi_stock_data(chunk) time.sleep(0.5) for code, data in multi_data.items(): if max_price_limit and data['price'] > max_price_limit: continue op, hi, lo, cl = data['open'], data['high'], data['low'], data['price'] if op == 0: continue drop_rate = (op - lo) / op total_range = hi - lo # 낙폭이 어느 정도 있고(3% 이상), 아래꼬리를 달고 올라온 종목 if total_range > 0 and drop_rate > 0.03: recovery_pos = (cl - lo) / total_range if recovery_pos > 0.5: score = drop_rate * 100 final_list.append({'code': code, 'name': data['name'], 'price': cl, 'score': score}) final_list.sort(key=lambda x: x['score'], reverse=True) return final_list except Exception as e: logger.error(f"분석 중 치명적 에러: {e}") return [] def get_account_info(self): """ 전체 계좌 잔고 및 평가금액 조회 - 예수금, 총 자산, 보유 종목 리스트 반환 """ try: # 1. 예수금 상세 res = self._safe_request(self.account.deposit_detail_status_request_kt00001, qry_tp="2") d2_deposit = float(res.get('d2_entra', 0)) if res else 0 current_deposit = float(res.get('ord_alow_amt', 0)) if res else 0 deposit = 0 if d2_deposit < 0 else current_deposit logger.info(f"예수금: {deposit:,.0f}원") # 2. 잔고 상세 res_bal = self._safe_request(self.account.account_evaluation_balance_detail_request_kt00018, query_type="1", domestic_exchange_type="KRX") total_asset = float(res_bal.get('tot_aset_amt', 0)) if res_bal else 0 balances = {} if res_bal and 'acnt_evlt_remn_indv_tot' in res_bal: for item in res_bal['acnt_evlt_remn_indv_tot']: code = item['stk_cd'].strip()[1:] if item['stk_cd'].startswith('A') else item['stk_cd'].strip() balances[code] = { 'buy_price': abs(float(item['pur_pric'])), 'qty': int(item['rmnd_qty']), 'name': item['stk_nm'].strip(), 'current_price': abs(float(item['cur_prc'])), 'profit_rate': float(item.get('erng_rt', 0)) } # API에서 총자산이 0으로 올 경우, 직접 계산 if total_asset == 0: stock_val = sum([b['current_price'] * b['qty'] for b in balances.values()]) total_asset = current_deposit + stock_val return total_asset, deposit, balances except Exception as e: logger.error(f"계좌 조회 에러: {e}") return 0, 0, {} def get_current_data(self, code): """단일 종목 현재가 조회""" try: res = self._safe_request(self.stock_info.watchlist_stock_information_request_ka10095, stock_code=code) if res and 'atn_stk_infr' in res and len(res['atn_stk_infr']) > 0: item = res['atn_stk_infr'][0]; p = abs(float(item.get('cur_prc', 0))) return StockData(code, item.get('stk_nm', 'Unknown'), p, abs(float(item.get('open_pric', p))), abs(float(item.get('high_pric', p))), abs(float(item.get('low_pric', p))), p, int(item.get('trde_qty', 0))) return None except Exception as e: logger.error(f"현재가 조회 에러({code}): {e}") return None def buy_market_order(self, code, qty): """시장가 매수 주문""" try: res = self._safe_request(self.order.stock_buy_order_request_kt10000, dmst_stex_tp="KRX", stk_cd=code, ord_qty=str(qty), trde_tp="3", ord_uv="0") if str(res.get('return_code')) == '0': return True logger.error(f"매수 주문 실패({code}): {res}") return False except Exception as e: logger.error(f"매수 주문 예외({code}): {e}") return False def sell_market_order(self, code, qty): """시장가 매도 주문""" try: res = self._safe_request(self.order.stock_sell_order_request_kt10001, dmst_stex_tp="KRX", stk_cd=code, ord_qty=str(qty), trde_tp="3", ord_uv="0") if str(res.get('return_code')) == '0': return True logger.error(f"매도 주문 실패({code}): {res}") return False except Exception as e: logger.error(f"매도 주문 예외({code}): {e}") return False # ========================================================== # [Part 3] 정글 서바이버 봇 (개선된 메인 로직) # ========================================================== class JungleSurvivorBot: def __init__(self, broker_api, budget=None): self.api = broker_api self.budget = budget # Mattermost 초기화 self.mm = MattermostBot() self.mm_channel = "stock" # 기본 채널 alias # 파일 경로 설정 self.portfolio_file = os.path.join(current_dir, 'portfolio.json') self.history_real_file = os.path.join(current_dir, 'trade_history_real.json') self.target_file = os.path.join(current_dir, 'target_universe.json') self.banned_file = os.path.join(current_dir, 'banned_codes.json') # 자금 관리 설정 self.max_stocks = 5 self.slot_money = 0 self.daily_stop_loss_pct = -0.05 # 일일 손실 한도 -5% self.stop_loss_pct = -0.035 # 종목별 손절 -3.5% self.enable_consecutive_loss_cut = False # 연속 손절 제한 (일단 끔) # 매수/매도 로직 파라미터 self.rsi_overheat_threshold = float(os.environ.get("RSI_OVERHEAT_THRESHOLD", "73")) # RSI 과열 기준 self.shoulder_cut_pct = float(os.environ.get("SHOULDER_CUT_PCT", "0.03")) # 어깨 매도 기준 (고점 대비 하락률) self.high_price_chase_threshold = float( os.environ.get("HIGH_PRICE_CHASE_THRESHOLD", "0.96")) # 일일 최고가 추격 매수 방지 기준 self.min_recovery_ratio = float(os.environ.get("MIN_RECOVERY_RATIO", "0.5")) # 꼬리 잡기 최소 회복 비율 self.max_recovery_ratio = float(os.environ.get("MAX_RECOVERY_RATIO", "0.8")) # 꼬리 잡기 최대 회복 비율 self.candle_open_price_buffer = float(os.environ.get("CANDLE_OPEN_PRICE_BUFFER", "0.995")) # 시가 대비 현재가 버퍼 self.volume_avg_multiplier = float(os.environ.get("VOLUME_AVG_MULTIPLIER", "1.0")) # 거래량 평균 대비 배율 self.intraday_investor_net_buy_threshold = int(os.environ.get("INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "-1000")) # 장중 투자자 순매수 기준 self.stop_atr_multiplier_tail = float(os.environ.get("STOP_ATR_MULTIPLIER_TAIL", "3.5")) # TAIL_CATCH_3M 전략용 (2일 보유 전제) self.target_atr_multiplier_tail = float(os.environ.get("TARGET_ATR_MULTIPLIER_TAIL", "8.0")) # TAIL_CATCH_3M 전략용 (2일 보유 전제) self.stop_atr_multiplier_normal = float(os.environ.get("STOP_ATR_MULTIPLIER_NORMAL", "2.5")) # 일반 전략용 self.target_atr_multiplier_normal = float(os.environ.get("TARGET_ATR_MULTIPLIER_NORMAL", "5.0")) # 일반 전략용 # 상태 변수 초기화 self.current_cash = 0 self.start_of_day_asset = 0 self.today_date = datetime.datetime.now().strftime("%Y%m%d") self.consecutive_losses = 0 self.trading_halted = False self.was_market_open = False self.is_first_run = True # 초기 데이터 로드 및 정리 self.refresh_account_status(is_init=True) self.cleanup_banned_list() msg = f"🤖 **[정글봇 가동-Full Version]**\n- 시작 자산: {self.start_of_day_asset:,.0f}원\n- **[NEW] 3% 하락시 어깨 매도 적용**\n- **[NEW] RSI 과열 필터 적용**" logger.info(msg) self.send_mm(msg) def send_mm(self, msg): """Mattermost 알림 전송 래퍼 함수""" try: self.mm.send(self.mm_channel, msg) except Exception as e: logger.error(f"❌ MM 전송 중 에러: {e}") # ====================================================== # 파일 입출력 및 관리 유틸리티 # ====================================================== def load_json_file(self, path, is_list=False): try: if os.path.exists(path): if os.path.getsize(path) == 0: logger.warning(f"⚠️ 빈 파일 감지됨(초기화): {os.path.basename(path)}") return [] if is_list else {} with open(path, 'r', encoding='utf-8') as f: return json.load(f) return [] if is_list else {} except json.JSONDecodeError: logger.warning(f"⚠️ JSON 형식이 깨짐(초기화): {os.path.basename(path)}") return [] if is_list else {} except Exception as e: logger.error(f"❌ load_json_file 에러: {e}") return [] if is_list else {} def save_json_file(self, path, data): try: with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f" 파일저장에러 {e}") def get_banned_codes(self): """금일 매매 금지(손절 등) 종목 리스트 반환""" data = self.load_json_file(self.banned_file) active = [] now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') for c, e in data.items(): if e > now: active.append(c) return active def add_ban(self, code, hours=24): """종목을 벤(Ban) 리스트에 추가""" data = self.load_json_file(self.banned_file) data[code] = (datetime.datetime.now() + datetime.timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S') self.save_json_file(self.banned_file, data) def cleanup_banned_list(self): """만료된 벤 리스트 정리""" data = self.load_json_file(self.banned_file) now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') new_data = {k: v for k, v in data.items() if v > now} if len(data) != len(new_data): self.save_json_file(self.banned_file, new_data) # ====================================================== # 계좌 및 자산 관리 # ====================================================== def refresh_account_status(self, is_init=False): """계좌 상태를 API로부터 갱신하고 슬롯 머니를 재계산""" try: curr_date = datetime.datetime.now().strftime("%Y%m%d") if curr_date != self.today_date: self.today_date = curr_date self.consecutive_losses = 0 self.trading_halted = False self.start_of_day_asset = 0 logger.info("📅 [날짜변경] 데이터 리셋") total, deposit, balances = self.api.get_account_info() if total == 0: logger.warning("⚠️ 자산 조회 실패(0원) -> 기존 자산 유지") total = self.current_total_asset if hasattr(self, 'current_total_asset') else 1000000 if self.start_of_day_asset == 0 or is_init: self.start_of_day_asset = total self.current_total_asset = total self.current_cash = min(self.budget, deposit) if self.budget else deposit # 현재 보유 주식의 대략적인 평가금액 (예수금과의 갭 확인용) try: stock_val = sum( [b.get('current_price', 0) * b.get('qty', 0) for b in balances.values()] ) if balances else 0 except Exception: stock_val = 0 # 예수금의 90%만 사용 (안전 버퍼) investable = self.current_cash * 0.9 min_bet = 100000 if investable < min_bet: self.max_stocks = 1 self.slot_money = int(investable) else: if (investable / 30) < min_bet: self.max_stocks = max(int(investable / min_bet), 1) else: self.max_stocks = 30 self.slot_money = int(investable / self.max_stocks) if is_init: logger.info( f"💰 [자금 설정] " f"추정자산: {self.current_total_asset:,.0f}원 " f"(예수금: {self.current_cash:,.0f}원 + 주식평가: {stock_val:,.0f}원) | " f"1종목당: {self.slot_money:,.0f}원" ) self.portfolio = self.sync_portfolio_internal(balances) self.check_risk_status(total) except Exception as e: logger.error(f"계좌갱신오류: {e}") def update_account_light(self, profit_val=0): """ API 부하를 줄이기 위해 예수금만 빠르게 갱신하고 총자산은 로컬 계산으로 추정하는 경량 함수 """ try: new_cash = self.api.get_deposit_only() if new_cash > 0 or self.current_cash == 0: self.current_cash = new_cash if profit_val != 0: self.current_total_asset += profit_val except Exception as e: logger.error(f"경량 갱신 실패: {e}") def get_daily_profit_rate(self, current_asset): if self.start_of_day_asset == 0: return 0.0 return (current_asset - self.start_of_day_asset) / self.start_of_day_asset def check_risk_status(self, current_asset): """일일 손실 한도 체크""" p = self.get_daily_profit_rate(current_asset) if p <= self.daily_stop_loss_pct and not self.trading_halted: self.trading_halted = True msg = f"🛑 **[STOP LOSS 발동]**\n일 손실률 {p * 100:.2f}% 도달 -> 금일 매수 중단" logger.critical(msg) self.send_mm(msg) if self.enable_consecutive_loss_cut and self.consecutive_losses >= 4 and not self.trading_halted: self.trading_halted = True msg = f"🛑 **[연속 손절 과다]**\n4회 연속 손절 -> 금일 매수 중단" logger.critical(msg) self.send_mm(msg) def sync_portfolio_internal(self, real_balances): """API 잔고와 로컬 포트폴리오 파일 동기화""" local = self.load_json_file(self.portfolio_file) final = {} # 1. 실제 잔고 기준 업데이트 for c, r in real_balances.items(): final[c] = r if c in local: final[c].update({ 'strategy': local[c].get('strategy', 'MANUAL'), 'max_price': max(r['current_price'], local[c].get('max_price', 0)), 'target_price': local[c].get('target_price', 0), 'stop_price': local[c].get('stop_price', 0), 'atr_at_entry': local[c].get('atr_at_entry', 0), 'buy_date': local[c].get('buy_date', '') }) else: final[c].update({'strategy': 'MANUAL', 'max_price': r['current_price']}) # 데이터 복구 (봇 재시작 시 누락된 전략 정보 복원) if final[c].get('atr_at_entry') == 0 or final[c].get('strategy') == 'MANUAL': try: df_temp = self.api.get_ohlcv_limit(c, timeframe='1m') if not df_temp.empty: new_atr = self.calculate_atr(df_temp) base_p = r.get('buy_price', r['current_price']) final[c].update({ 'strategy': 'RECOVERED', 'atr_at_entry': new_atr, 'stop_price': base_p - (new_atr * 3.0), 'target_price': base_p + (new_atr * 5.0), 'buy_date': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) logger.info(f"♻️ [데이터 복구] {r['name']} 전략 정보 재계산 완료") except Exception as e: logger.error(f"❌ {c} 데이터 복구 실패: {e}") # 2. 동기화 보호 (매수 직후 API 반영 지연 시 로컬 데이터 우선) now = datetime.datetime.now() for code, info in local.items(): if code not in final: try: buy_time = datetime.datetime.strptime(info.get('buy_date', ''), '%Y-%m-%d %H:%M:%S') if (now - buy_time).total_seconds() < 120: final[code] = info logger.info(f"🛡️ [동기화 보호] {info['name']}: 잔고 미반영 유지") except: pass self.save_json_file(self.portfolio_file, final) return final def update_universe(self): """매수 후보군 업데이트""" logger.info(f"🔄 [리스트 갱신] 배정액 {self.slot_money:,.0f}원") candidates = self.api.scan_ant_shaking_candidates(max_price_limit=self.slot_money) if not candidates: return candidates.sort(key=lambda x: (x['score'], x['price']), reverse=True) current_data = [] for item in candidates: current_data.append( {"code": item['code'], "name": item['name'], "score": item['score'], "price": item['price']}) self.save_json_file(self.target_file, current_data) top_picks = [f"{x['name']}({x['score']:.1f})" for x in current_data[:5]] logger.info(f" 🔝 Top 5: {', '.join(top_picks)}") # ====================================================== # 기술적 지표 계산 # ====================================================== def calculate_atr(self, df, period=14): """ATR(Average True Range) 계산""" df['tr'] = np.maximum(df['high'] - df['low'], np.maximum(np.abs(df['high'] - df['close'].shift()), np.abs(df['low'] - df['close'].shift()))) return df['tr'].rolling(window=period).mean().iloc[-1] def calculate_rsi(self, series, period=14): """RSI(Relative Strength Index) 계산""" delta = series.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() return 100 - (100 / (1 + gain / loss)) # ========================================================= # ⚡ [핵심 수정 1] 매수 시그널 - RSI 필터 및 고점 추격 방지 # ========================================================= def check_buy_signal(self, code): """ 매수 시그널 확인 함수 (3분봉 꼬리잡기 전략) - RSI 과열 체크 + 고가 추격 방지 + 꼬리 필터 강화 """ # ⭐ name 변수 먼저 초기화 (에러 방지) name = code # 1. 밴(Ban) 리스트 확인 if code in self.get_banned_codes(): return None try: # 2. 데이터 조회 df = self.api.get_ohlcv_limit(code, timeframe='3m') curr = self.api.get_current_data(code) # name 업데이트 if curr: name = curr.name # 데이터 유효성 검사 if curr is None or df is None or len(df) < 20: logger.info(f"🔍 [데이터 부족] {name} {code}: DF 길이 {len(df) if df is not None else 0}") return None # 3. 지표 계산 ma20 = df['close'].rolling(window=20).mean().iloc[-1] avg_vol = df['volume'].rolling(window=20).mean().iloc[-1] current_price = curr.current_price current_vol = df['volume'].iloc[-1] # --------------------------------------------------------- # [필터 1] RSI 과열 체크 rsi = self.calculate_rsi(df['close']).iloc[-1] if rsi >= self.rsi_overheat_threshold: return None # [필터 2] 일일 최고가 부근 추격 매수 방지 if current_price >= curr.high * self.high_price_chase_threshold: return None # [필터 3] 20일 이동평균선(MA20) 아래인지? if current_price < ma20: logger.info(f"🔍 [Pass-MA20] {name} {code}: 현재가({current_price}) < MA20({ma20:.2f})") return None # [필터 4] 최소 거래량 필터 (평균의 30%도 안되면 패스) if current_vol < avg_vol * 0.3: logger.info(f"🔍 [Pass-Vol] {name} {code}: 거래량 부족 ({current_vol} < {avg_vol * 0.3:.1f})") return None # --------------------------------------------------------- # [타점 분석] 3분봉 꼬리 잡기 로직 candle_open = df['open'].iloc[-1] candle_low = min(df['low'].iloc[-1], current_price) total_tail = candle_open - candle_low if total_tail <= 0: return None # 회복 비율 계산 recovery_ratio = (current_price - candle_low) / total_tail # --- 로그로 상세 수치 확인 --- log_msg = ( f"🧐 분석({name} {code}): 가격{current_price} | MA20 {ma20:.1f} | " f"회복률 {recovery_ratio:.2f} (조건:{self.min_recovery_ratio}~{self.max_recovery_ratio}) | " f"거래량 {current_vol} (조건:>{avg_vol * self.volume_avg_multiplier:.1f})" ) # [조건 1] 회복 탄력성 (환경변수 사용: 기본 0.5~0.8) if not (self.min_recovery_ratio <= recovery_ratio <= self.max_recovery_ratio): logger.info(f"{log_msg} -> ❌ 회복률 미달/초과") return None # [조건 2] 시가 근접성 (환경변수 사용: 기본 99.5%) if current_price < candle_open * self.candle_open_price_buffer: logger.info(f"{log_msg} -> ❌ 시가 회복 부족") return None # [조건 3] 거래량 폭발 여부 (환경변수 사용: 기본 평균 × 1.0) if current_vol <= avg_vol * self.volume_avg_multiplier: logger.info(f"{log_msg} -> ❌ 거래량 파워 부족") return None # --------------------------------------------------------- # [수급 필터] 최종 관문 foreigner, institution = self.api.get_intraday_investor(code) logger.info(f"✨ 1차 통과({name} {code}): 수급 확인 중... 외인:{foreigner}, 기관:{institution}") # 양쪽에서 동시에 대량 매도 중이면 스킵 (환경변수 사용: 기본 -1000) if foreigner < self.intraday_investor_net_buy_threshold and institution < self.intraday_investor_net_buy_threshold: logger.info(f"⛔ 수급 이탈 감지({name} {code}): 외인{foreigner} / 기관{institution} -> 매수 포기") return None # 최종 매수 시그널 logger.info(f"🚀 [매수 신호] {name} {code}: 3분봉 꼬리 공략! (회복률: {recovery_ratio:.2f})") return 'TAIL_CATCH_3M' except Exception as e: logger.error(f"⚠️ 매수 시그널 분석 중 에러({name} {code}): {e}", exc_info=True) return None def execute_buy(self, code, strategy): """매수 실행 및 포트폴리오 등록""" if self.trading_halted: return if self.current_cash < self.slot_money: return curr = self.api.get_current_data(code) if not curr or curr.current_price > self.slot_money: return # ATR 계산 및 손절/목표가 설정 df = self.api.get_ohlcv_limit(code, timeframe='1m') atr = curr.current_price * 0.01 if len(df) >= 20: atr = self.calculate_atr(df) # stop = curr.current_price - (atr * 2.0) # target = curr.current_price + (atr * 3.0) if strategy == 'TAIL_CATCH_3M': # 2일 보유 전제: 손절 넓게, 목표가 크게 stop = curr.current_price - (atr * self.stop_atr_multiplier_tail) target = curr.current_price + (atr * self.target_atr_multiplier_tail) else: stop = curr.current_price - (atr * self.stop_atr_multiplier_normal) target = curr.current_price + (atr * self.target_atr_multiplier_tail) # 분할 매수 실행 total_qty = int(self.slot_money / curr.current_price) if total_qty > 0: split_count = 1 if self.slot_money >= 100000: split_count = random.randint(10, 20) base_qty = total_qty // split_count remainder = total_qty % split_count bought_qty = 0 logger.info(f"🔫 [진입 시작] {curr.name} 총 {total_qty}주 | {split_count}회 분할") for i in range(split_count): qty_to_buy = base_qty + (remainder if i == split_count - 1 else 0) if qty_to_buy <= 0: continue if self.api.buy_market_order(code, qty_to_buy): bought_qty += qty_to_buy if i < split_count - 1: time.sleep(random.uniform(0.8, 1)) else: logger.error(f"❌ 매수 주문 실패: {curr.name}") # 매수 성공 시 처리 if bought_qty > 0: self.portfolio = self.load_json_file(self.portfolio_file) now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.portfolio[code] = { 'buy_price': curr.current_price, 'qty': bought_qty, 'strategy': strategy, 'buy_date': now_str, 'name': curr.name, 'stop_price': stop, 'target_price': target, 'atr_at_entry': atr, 'max_price': curr.current_price # 초기값은 매수단가 } self.save_json_file(self.portfolio_file, self.portfolio) self.add_history('BUY', code, curr.name, curr.current_price, bought_qty, strategy) # 알림 try: self.update_account_light(profit_val=0) total_p = self.get_daily_profit_rate(self.current_total_asset) * 100 msg = f"🔫 **[매수 완료] {curr.name}**\n수량: {bought_qty}주\n전략: {strategy}\n자산변동: {total_p:+.2f}%" self.send_mm(msg) except Exception as e: logger.error(f"알림 전송 실패: {e}") time.sleep(0.5) # ========================================================= # ⚡ [핵심 수정 2] 매도 로직 - 어깨 매도 및 거래량 체크 추가 # ========================================================= def check_sell(self, code): """매도 조건 체크 (트레일링 스탑, 손절, 어깨 매도)""" if code not in self.portfolio: return False, None, 0, 0 info = self.portfolio[code] curr = self.api.get_current_data(code) if not curr: return False, None, 0, 0 buy_price = info.get('buy_price', 0) current_price = curr.current_price # 고점(Max Price) 갱신 if current_price > info.get('max_price', 0): info['max_price'] = current_price self.save_json_file(self.portfolio_file, self.portfolio) max_price = info.get('max_price', buy_price) profit_pct = (current_price - buy_price) / buy_price if buy_price > 0 else 0 profit_val = (current_price - buy_price) * info['qty'] atr = info.get('atr_at_entry', buy_price * 0.01) reason = None # [필수 1] 어깨 매도 (Shoulder Cut) # 고점 대비 3% 이상 빠지면 수익/손실 여부 불문하고 즉시 탈출 # (선익시스템 사례 방지: 고점에서 물렸을 때 빠르게 자르기 위함) drop_from_high = (max_price - current_price) / max_price if max_price > 0 else 0 if drop_from_high >= self.shoulder_cut_pct: # 추가 체크: 거래량이 터지면서 떨어지면 더 위험하지만, 어깨 매도는 무조건 자르는게 원칙 return True, f"어깨매도(고점대비-{drop_from_high * 100:.1f}%)", profit_pct, profit_val # 기존 로직 분기 if info.get('strategy') == 'TAIL_CATCH_3M': # 스캘핑 로직 if (max_price >= buy_price + atr * 1.0) and (current_price <= buy_price + atr * 0.2): reason = "스캘핑_본절사수" elif current_price < (max_price - atr * 1.0) and profit_pct > 0: reason = "스캘핑_익절보존" # 시간 컷 (10분 내 승부 안나면 매도) buy_time_str = info.get('buy_date', '') hours_passed = 0 if buy_time_str: buy_time = datetime.datetime.strptime(buy_time_str, '%Y-%m-%d %H:%M:%S') hours_passed = (datetime.datetime.now() - buy_time).total_seconds() / 3600 # [전략별 분기] if info.get('strategy') == 'TAIL_CATCH_3M': # 2일(48시간) 이내는 보수적 보유 if hours_passed < 48: # 큰 수익만 익절 if profit_pct > 0.05: reason = "💰 2일내 5%+ 익절" elif max_price >= buy_price * 1.07 and current_price <= max_price * 0.97: reason = "📈 2일내 고점7% 찍고 3% 하락" else: # 2일 경과 후 적극 익절 if profit_pct > 0.02: reason = "⏰ 2일 경과 2%+ 익절" elif profit_pct > 0 and current_price < max_price * 0.97: reason = "⏰ 2일 경과 익절보호" else: # 일반/MANUAL/RECOVERED 전략 max_profit_pct = (max_price - buy_price) / buy_price if buy_price > 0 else 0 if max_profit_pct >= 0.015 and profit_pct <= 0.005: reason = "본절보호" elif profit_pct > 0 and max_profit_pct >= 0.03 and current_price < max_price * 0.99: reason = "트레일링스탑" # [공통] 최후의 보루 (목표가 달성 및 손절) if not reason: if current_price >= info.get('target_price', 9999999): reason = "목표달성" elif profit_pct <= self.stop_loss_pct: reason = f"칼손절({profit_pct * 100:.1f}%)" self.add_ban(code) self.consecutive_losses += 1 elif current_price <= info.get('stop_price', 0): reason = "전략손절" if reason: if profit_pct > 0: self.consecutive_losses = 0 return True, reason, profit_pct, profit_val return False, None, 0, 0 def add_history(self, type, code, name, price, qty, reason="", strategy=""): """매매 이력 저장""" try: rec = { 'type': type, 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'code': code, 'name': name, 'price': price, 'qty': qty, 'reason': reason, 'strategy': strategy } h = self.load_json_file(self.history_real_file, is_list=True) h.append(rec) with open(self.history_real_file, 'w', encoding='utf-8') as f: json.dump(h, f, ensure_ascii=False, indent=4) except: pass def run(self): """메인 실행 루프""" logger.info(f"🚀 감시 시작 (손절: {self.stop_loss_pct * 100}%)") # 봇 시작 시 장 상태 확인 및 알림 first_check = self.api.check_market_status() if not first_check: self.send_mm("💤 현재 장 운영 시간이 아닙니다. 봇이 대기 모드에 들어갑니다.") while True: # 장 상태 체크 is_open = self.api.check_market_status() # 장 시작/마감 이벤트 처리 if is_open and not self.was_market_open: self.refresh_account_status() self.was_market_open = True self.send_mm("🌅 **[장 시작]** 봇이 매매를 시작합니다.") elif not is_open and self.was_market_open: self.was_market_open = False self.refresh_account_status() day_profit = self.current_total_asset - self.start_of_day_asset self.send_mm(f"🌙 **[장 마감]**\n오늘 손익: {day_profit:,.0f}원") self.is_first_run = True # 다음 날을 위해 리셋 # 장 휴장 시 대기 if not is_open: time.sleep(60) continue try: now = datetime.datetime.now() # [유니버스 갱신] 5분 주기 if self.is_first_run or (now.minute % 5 == 0 and now.second < 5): self.update_universe() self.cleanup_banned_list() self.is_first_run = False logger.info("⏳ [주기] 유니버스 갱신 완료, 5초 대기 후 다음 루프") time.sleep(5) # [생존 신고] 1분 주기 (로그만) if now.minute % 1 == 0 and now.second < 2: targets = self.load_json_file(self.target_file, is_list=True) logger.info(f"👀 [생존] 타겟:{len(targets)} | 보유:{len(self.portfolio)}/{self.max_stocks} | 2초 대기") time.sleep(2) # 1. 매도 로직 (우선 순위) for code in list(self.portfolio.keys()): sell, reason, profit_pct, profit_val = self.check_sell(code) if sell: info = self.portfolio[code] if self.api.sell_market_order(code, info['qty']): del self.portfolio[code] self.save_json_file(self.portfolio_file, self.portfolio) strategy_tag = info.get('strategy', '') self.add_history('SELL', code, info['name'], 0, info['qty'], reason, strategy_tag) # 알림 전송 try: self.update_account_light(profit_val) total_p = self.get_daily_profit_rate(self.current_total_asset) * 100 icon = "💰" if profit_pct > 0 else "💧" r_tag = "[RECOVERED] " if strategy_tag == "RECOVERED" else "" msg = f"{icon} **[매도] {r_tag}{info['name']}**\n수익: {profit_pct * 100:.2f}% ({profit_val:,.0f}원)\n사유: {reason}\n누적: {total_p:+.2f}%" self.send_mm(msg) except Exception as e: logger.error(f"매도 알림 실패: {e}") time.sleep(1) # 2. 매수 로직 if not self.trading_halted and len(self.portfolio) < self.max_stocks: targets = self.load_json_file(self.target_file, is_list=True) if targets: for item in targets: if item['code'] in self.portfolio: continue # 돈 없으면 스탑 if self.current_cash < self.slot_money: break # 매수 시그널 확인 sig = self.check_buy_signal(item['code']) if sig: self.execute_buy(item['code'], sig) time.sleep(1) time.sleep(1) except KeyboardInterrupt: logger.info("🛑 사용자에 의해 종료됨") break except Exception as e: logger.error(f"메인 루프 에러: {e}") self.send_mm(f"⚠️ **[봇 에러 발생]**\n내용: {e}") logger.info("⏳ [복구] 에러 후 5초 대기 후 재개") time.sleep(5) if __name__ == "__main__": bot = JungleSurvivorBot(BrokerAPI()) bot.run()