Files
kis_bot/kiwoom_trader_dual.py
2026-03-17 12:33:30 +09:00

1131 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import json
import datetime
import pandas as pd
import numpy as np
import os
import logging
import requests
import random
from dotenv import load_dotenv
# ==========================================================
# [Step 0] 환경 변수 및 기본 설정
# ==========================================================
current_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(current_dir, ".env")
if not os.path.exists(env_path):
env_path = os.path.join(os.path.dirname(current_dir), ".env")
load_dotenv(env_path)
# Mattermost 설정
MM_SERVER_URL = "https://mattermost.hoonfam.org"
MM_BOT_TOKEN = os.environ.get("MM_BOT_TOKEN_", "")
MM_CONFIG_FILE = os.path.join(current_dir, "mm_config.json")
# [Logger 설정] - 로그 포맷 및 핸들러 설정
logging.basicConfig(
format='[%(asctime)s] %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO
)
logger = logging.getLogger("JungleBot")
# 외부 라이브러리 로그 레벨 조정 (너무 시끄러운 로그 방지)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
# 키움 API 모듈 임포트
from kiwoom_rest_api.auth.token import TokenManager
from kiwoom_rest_api.koreanstock.stockinfo import StockInfo
from kiwoom_rest_api.koreanstock.chart import Chart
from kiwoom_rest_api.koreanstock.order import Order
from kiwoom_rest_api.koreanstock.rank_info import RankInfo
from kiwoom_rest_api.koreanstock.account import Account
from kiwoom_rest_api.koreanstock.market_condition import MarketCondition
# ==========================================================
# [Part 0] Mattermost 봇 클래스 (알림 전송용)
# ==========================================================
class MattermostBot:
def __init__(self):
self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts"
self.headers = {
"Authorization": f"Bearer {MM_BOT_TOKEN}",
"Content-Type": "application/json"
}
self.channels = self._load_channels()
def _load_channels(self):
"""설정 파일에서 채널 ID 정보를 읽어옵니다."""
try:
if os.path.exists(MM_CONFIG_FILE):
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get("channels", {})
return {}
except Exception as e:
logger.error(f"⚠️ MM 설정 로드 실패: {e}")
return {}
def send(self, channel_alias, message):
"""지정된 채널(alias)로 메시지를 전송합니다."""
channel_id = self.channels.get(channel_alias)
if not channel_id:
# 채널 ID가 없으면 로그만 찍고 넘어감 (봇 중단 방지)
logger.warning(f"'{channel_alias}' 채널 ID 없음. mm_config.json 확인 필요.")
return False
payload = {"channel_id": channel_id, "message": message}
try:
res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3)
res.raise_for_status()
return True
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
return False
# ==========================================================
# [Part 1] 데이터 클래스 (주식 정보 객체)
# ==========================================================
class StockData:
def __init__(self, code, name, price, open_p, high, low, close, volume):
self.code = code
self.name = name
self.current_price = price
self.open = open_p
self.high = high
self.low = low
self.close = close
self.volume = volume
# ==========================================================
# [Part 2] 브로커 API (키움증권 REST API 연동)
# ==========================================================
class BrokerAPI:
def __init__(self):
logger.info("🔵 키움(REST) 브로커 연결 시도...")
try:
self.token_manager = TokenManager()
self.stock_info = StockInfo(token_manager=self.token_manager)
self.chart = Chart(token_manager=self.token_manager)
self.order = Order(token_manager=self.token_manager)
self.rank = RankInfo(token_manager=self.token_manager)
self.account = Account(token_manager=self.token_manager)
self.market = MarketCondition(token_manager=self.token_manager)
self.acc_no = os.environ.get("KIWOOM_ACCOUNT_NO", "YOUR_ACCOUNT_NO")
logger.info(f"✅ 브로커 연결 완료 (계좌: {self.acc_no})")
except Exception as e:
logger.critical(f"❌ 브로커 초기화 실패: {e}")
raise e
def _safe_request(self, func, *args, **kwargs):
"""
API 호출 안전장치 (타임아웃 및 429 에러 핸들링 강화)
- 429 에러(Too Many Requests) 발생 시 대기 후 재시도
- 일반 에러 발생 시 로그 기록
"""
full_name = func.__name__
api_id = full_name.split('_')[-1]
max_retries = 3
for i in range(max_retries):
try:
# 기본 안전 대기 (API 과부하 방지)
time.sleep(1)
result = func(*args, **kwargs)
# 키움 API 특성상 200 OK라도 에러 메시지가 있을 수 있음
if isinstance(result, dict) and result.get('return_code') != '0' and '초과' in result.get('msg1', ''):
raise Exception("429 Rate Limit Detected")
return result
except Exception as e:
# 429 또는 과부하 에러 시 더 오래 대기
if "429" in str(e) or "과부하" in str(e):
logger.warning(f"⚠️ [{api_id}] API 과부하 감지 -> 5초 대기 후 재시도 ({i + 1}/{max_retries})")
time.sleep(5)
else:
logger.error(f"❌ [{api_id}] 호출 에러: {e}")
time.sleep(1)
logger.error(f"💀 [{api_id}] 3회 재시도 실패 -> 빈 값 반환")
return {}
def get_deposit_only(self):
"""예수금(주문 가능 금액)만 빠르게 조회"""
try:
res = self._safe_request(self.account.deposit_detail_status_request_kt00001, qry_tp="2")
d2_deposit = float(res.get('d2_entra', 0)) if res else 0
current_deposit = float(res.get('ord_alow_amt', 0)) if res else 0
# D+2 예수금이 마이너스면 미수 발생 상황이므로 0으로 처리
return 0 if d2_deposit < 0 else current_deposit
except Exception as e:
logger.error(f"예수금 조회 실패: {e}")
return 0
def get_intraday_investor(self, code):
"""
장중 투자자별 매매 차트 (수급 확인용)
- 외국인/기관의 실시간 순매수 수량을 리턴
"""
try:
res = self._safe_request(self.chart.intraday_investor_trading_chart_request_ka10064,
mrkt_tp="000", amt_qty_tp="2", trde_tp="0", stk_cd=code)
if not res or 'opmr_invsr_trde_chart' not in res:
return 0, 0
data_list = res['opmr_invsr_trde_chart']
if not data_list:
return 0, 0
latest = data_list[0]
foreigner = int(latest.get('frgnr_invsr', 0))
institution = int(latest.get('orgn', 0))
return foreigner, institution
except Exception as e:
# 수급 조회 실패는 치명적이지 않으므로 0 리턴
return 0, 0
def _is_target_stock(self, name, code):
"""종목 필터링 (스팩, ETN, 우선주 등 제외)"""
if len(code) != 6 or not code.isdigit(): return False
exclude = ['스팩', 'ETN', 'W', 'ELW', '채권', '레버리지', '인버스', '곱버스', '선물', '', '', '2X', '3X', '합성', 'H', 'B']
if any(k in name for k in exclude): return False
if name.endswith('') or name.endswith('우B'): return False
return True
def check_market_status(self):
"""
장 운영 시간 체크 (08:30 ~ 16:00)
- 주말 및 장운영 시간 외에는 False 반환
"""
now = datetime.datetime.now()
# 1. 시간 체크 (08:30 ~ 16:00)
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
if now.minute == 0 and now.second < 2:
logger.info(f"💤 [장운영] 정규장 시간이 아닙니다. ({now.time().strftime('%H:%M:%S')})")
return False
# 2. 주말 체크
if now.weekday() >= 5:
if now.minute == 0 and now.second < 2:
logger.info("⏸️ [장운영] 주말 휴장입니다.")
return False
# 공휴일 체크 로직은 제외 (Fail-Open 방식: 장이 안 열리면 주문 실패로 처리됨)
return True
def get_multi_stock_data(self, code_list):
"""여러 종목의 현재가 정보를 한 번에 조회"""
if not code_list: return {}
code_str = "|".join([str(c).strip() for c in code_list])
try:
res = self._safe_request(self.stock_info.watchlist_stock_information_request_ka10095, stock_code=code_str)
result_map = {}
if res and 'atn_stk_infr' in res:
for item in res['atn_stk_infr']:
code = item.get('stk_cd', '')
if not code: continue
if len(code) > 6: code = code[-6:]
try:
c = abs(float(item['cur_prc']))
if c > 0:
result_map[code] = {
'code': code, 'name': item['stk_nm'],
'price': c, 'open': abs(float(item['open_pric'])),
'high': abs(float(item['high_pric'])), 'low': abs(float(item['low_pric']))
}
except:
continue
return result_map
except Exception as e:
logger.error(f"멀티 시세 조회 실패: {e}")
return {}
def get_ohlcv_limit(self, code, timeframe='1m'):
"""분봉 차트 데이터 조회 (틱 범위 지정 가능)"""
tic_scope = "1"
if timeframe == '3m':
tic_scope = "3"
elif timeframe == '5m':
tic_scope = "5"
elif timeframe == '10m':
tic_scope = "10"
try:
res = self._safe_request(self.chart.stock_minute_chart_request_ka10080, stk_cd=code, tic_scope=tic_scope,
upd_stkpc_tp="1")
data = res.get('stk_min_pole_chart_qry', []) if res else []
if not data: return pd.DataFrame()
df = pd.DataFrame(data)
df = df.rename(columns={'cur_prc': 'close', 'open_pric': 'open', 'high_pric': 'high', 'low_pric': 'low',
'trde_qty': 'volume'})
# 데이터를 시간순(과거->현재)으로 정렬
return df[['open', 'high', 'low', 'close', 'volume']].astype(float).abs().iloc[::-1].reset_index(drop=True)
except Exception as e:
logger.error(f"차트 조회 실패({code}): {e}")
return pd.DataFrame()
def scan_ant_shaking_candidates(self, max_price_limit=None):
"""
[조건검색 대체 로직] 개미털기(눌림목) 후보 종목 스캔
- 거래대금 및 회전율 상위 종목 중, 고점 대비 일정 비율 하락 후 반등 시도하는 종목 추출
"""
logger.info(f"🐜 [개미털기] 스캔 시작")
raw_codes_set = set()
scan_strategies = [("3", "거래대금"), ("2", "회전율")]
for sort_tp, desc in scan_strategies:
try:
res = self._safe_request(self.rank.top_trading_volume_today_request_ka10030,
mrkt_tp="000", sort_tp=sort_tp, mang_stk_incls="3",
pric_tp="0", crd_tp="0", trde_qty_tp="0", trde_prica_tp="0",
mrkt_open_tp="0", stex_tp="3", cont_yn="N")
if not res or 'tdy_trde_qty_upper' not in res: continue
for stock in res['tdy_trde_qty_upper']:
code = stock['stk_cd'].split('_')[0]
try:
price = abs(float(stock['cur_prc']))
except:
continue
# 전일 종가 조회 (API 함수 확인 필요)
open_price = abs(float(stock.get('open_pric', price)))
change_rate = ((price - open_price) / open_price * 100) if open_price > 0 else 0
if price < 1000: continue # 동전주 제외
if change_rate > 20: # 상한가 근처
logger.info(f"🚫 상한가 제외: {stock['stk_nm']} (+{change_rate:.1f}%)")
continue
if price > 200000: continue # 10만원 이상 제외
if self._is_target_stock(stock['stk_nm'], code):
raw_codes_set.add(code)
except Exception as e:
logger.error(f"스캔({desc}) 중 에러: {e}")
raw_codes = list(raw_codes_set)
logger.info(f" 📥 후보 수집: {len(raw_codes)}개 -> 정밀 분석")
final_list = []
chunk_size = 50
try:
for i in range(0, len(raw_codes), chunk_size):
chunk = raw_codes[i: i + chunk_size]
multi_data = self.get_multi_stock_data(chunk)
time.sleep(0.5)
for code, data in multi_data.items():
if max_price_limit and data['price'] > max_price_limit: continue
op, hi, lo, cl = data['open'], data['high'], data['low'], data['price']
if op == 0: continue
drop_rate = (op - lo) / op
total_range = hi - lo
# 낙폭이 어느 정도 있고(3% 이상), 아래꼬리를 달고 올라온 종목
if total_range > 0 and drop_rate > 0.03:
recovery_pos = (cl - lo) / total_range
if recovery_pos > 0.5:
score = drop_rate * 100
final_list.append({'code': code, 'name': data['name'], 'price': cl, 'score': score})
final_list.sort(key=lambda x: x['score'], reverse=True)
return final_list
except Exception as e:
logger.error(f"분석 중 치명적 에러: {e}")
return []
def get_account_info(self):
"""
전체 계좌 잔고 및 평가금액 조회
- 예수금, 총 자산, 보유 종목 리스트 반환
"""
try:
# 1. 예수금 상세
res = self._safe_request(self.account.deposit_detail_status_request_kt00001, qry_tp="2")
d2_deposit = float(res.get('d2_entra', 0)) if res else 0
current_deposit = float(res.get('ord_alow_amt', 0)) if res else 0
deposit = 0 if d2_deposit < 0 else current_deposit
logger.info(f"예수금: {deposit:,.0f}")
# 2. 잔고 상세
res_bal = self._safe_request(self.account.account_evaluation_balance_detail_request_kt00018, query_type="1",
domestic_exchange_type="KRX")
total_asset = float(res_bal.get('tot_aset_amt', 0)) if res_bal else 0
balances = {}
if res_bal and 'acnt_evlt_remn_indv_tot' in res_bal:
for item in res_bal['acnt_evlt_remn_indv_tot']:
code = item['stk_cd'].strip()[1:] if item['stk_cd'].startswith('A') else item['stk_cd'].strip()
balances[code] = {
'buy_price': abs(float(item['pur_pric'])),
'qty': int(item['rmnd_qty']),
'name': item['stk_nm'].strip(),
'current_price': abs(float(item['cur_prc'])),
'profit_rate': float(item.get('erng_rt', 0))
}
# API에서 총자산이 0으로 올 경우, 직접 계산
if total_asset == 0:
stock_val = sum([b['current_price'] * b['qty'] for b in balances.values()])
total_asset = current_deposit + stock_val
return total_asset, deposit, balances
except Exception as e:
logger.error(f"계좌 조회 에러: {e}")
return 0, 0, {}
def get_current_data(self, code):
"""단일 종목 현재가 조회"""
try:
res = self._safe_request(self.stock_info.watchlist_stock_information_request_ka10095, stock_code=code)
if res and 'atn_stk_infr' in res and len(res['atn_stk_infr']) > 0:
item = res['atn_stk_infr'][0];
p = abs(float(item.get('cur_prc', 0)))
return StockData(code, item.get('stk_nm', 'Unknown'), p, abs(float(item.get('open_pric', p))),
abs(float(item.get('high_pric', p))), abs(float(item.get('low_pric', p))), p,
int(item.get('trde_qty', 0)))
return None
except Exception as e:
logger.error(f"현재가 조회 에러({code}): {e}")
return None
def buy_market_order(self, code, qty):
"""시장가 매수 주문"""
try:
res = self._safe_request(self.order.stock_buy_order_request_kt10000, dmst_stex_tp="KRX", stk_cd=code,
ord_qty=str(qty), trde_tp="3", ord_uv="0")
if str(res.get('return_code')) == '0': return True
logger.error(f"매수 주문 실패({code}): {res}")
return False
except Exception as e:
logger.error(f"매수 주문 예외({code}): {e}")
return False
def sell_market_order(self, code, qty):
"""시장가 매도 주문"""
try:
res = self._safe_request(self.order.stock_sell_order_request_kt10001, dmst_stex_tp="KRX", stk_cd=code,
ord_qty=str(qty), trde_tp="3", ord_uv="0")
if str(res.get('return_code')) == '0': return True
logger.error(f"매도 주문 실패({code}): {res}")
return False
except Exception as e:
logger.error(f"매도 주문 예외({code}): {e}")
return False
# ==========================================================
# [Part 3] 정글 서바이버 봇 (개선된 메인 로직)
# ==========================================================
class JungleSurvivorBot:
def __init__(self, broker_api, budget=None):
self.api = broker_api
self.budget = budget
# Mattermost 초기화
self.mm = MattermostBot()
self.mm_channel = "stock" # 기본 채널 alias
# 파일 경로 설정
self.portfolio_file = os.path.join(current_dir, 'portfolio.json')
self.history_real_file = os.path.join(current_dir, 'trade_history_real.json')
self.target_file = os.path.join(current_dir, 'target_universe.json')
self.banned_file = os.path.join(current_dir, 'banned_codes.json')
# 자금 관리 설정
self.max_stocks = 5
self.slot_money = 0
self.daily_stop_loss_pct = -0.05 # 일일 손실 한도 -5%
self.stop_loss_pct = -0.035 # 종목별 손절 -3.5%
self.enable_consecutive_loss_cut = False # 연속 손절 제한 (일단 끔)
# 매수/매도 로직 파라미터
self.rsi_overheat_threshold = float(os.environ.get("RSI_OVERHEAT_THRESHOLD", "73")) # RSI 과열 기준
self.shoulder_cut_pct = float(os.environ.get("SHOULDER_CUT_PCT", "0.03")) # 어깨 매도 기준 (고점 대비 하락률)
self.high_price_chase_threshold = float(
os.environ.get("HIGH_PRICE_CHASE_THRESHOLD", "0.96")) # 일일 최고가 추격 매수 방지 기준
self.min_recovery_ratio = float(os.environ.get("MIN_RECOVERY_RATIO", "0.5")) # 꼬리 잡기 최소 회복 비율
self.max_recovery_ratio = float(os.environ.get("MAX_RECOVERY_RATIO", "0.8")) # 꼬리 잡기 최대 회복 비율
self.candle_open_price_buffer = float(os.environ.get("CANDLE_OPEN_PRICE_BUFFER", "0.995")) # 시가 대비 현재가 버퍼
self.volume_avg_multiplier = float(os.environ.get("VOLUME_AVG_MULTIPLIER", "1.0")) # 거래량 평균 대비 배율
self.intraday_investor_net_buy_threshold = int(os.environ.get("INTRADAY_INVESTOR_NET_BUY_THRESHOLD", "-1000")) # 장중 투자자 순매수 기준
self.stop_atr_multiplier_tail = float(os.environ.get("STOP_ATR_MULTIPLIER_TAIL", "3.5")) # TAIL_CATCH_3M 전략용 (2일 보유 전제)
self.target_atr_multiplier_tail = float(os.environ.get("TARGET_ATR_MULTIPLIER_TAIL", "8.0")) # TAIL_CATCH_3M 전략용 (2일 보유 전제)
self.stop_atr_multiplier_normal = float(os.environ.get("STOP_ATR_MULTIPLIER_NORMAL", "2.5")) # 일반 전략용
self.target_atr_multiplier_normal = float(os.environ.get("TARGET_ATR_MULTIPLIER_NORMAL", "5.0")) # 일반 전략용
# 상태 변수 초기화
self.current_cash = 0
self.start_of_day_asset = 0
self.today_date = datetime.datetime.now().strftime("%Y%m%d")
self.consecutive_losses = 0
self.trading_halted = False
self.was_market_open = False
self.is_first_run = True
# 초기 데이터 로드 및 정리
self.refresh_account_status(is_init=True)
self.cleanup_banned_list()
msg = f"🤖 **[정글봇 가동-Full Version]**\n- 시작 자산: {self.start_of_day_asset:,.0f}\n- **[NEW] 3% 하락시 어깨 매도 적용**\n- **[NEW] RSI 과열 필터 적용**"
logger.info(msg)
self.send_mm(msg)
def send_mm(self, msg):
"""Mattermost 알림 전송 래퍼 함수"""
try:
self.mm.send(self.mm_channel, msg)
except Exception as e:
logger.error(f"❌ MM 전송 중 에러: {e}")
# ======================================================
# 파일 입출력 및 관리 유틸리티
# ======================================================
def load_json_file(self, path, is_list=False):
try:
if os.path.exists(path):
if os.path.getsize(path) == 0:
logger.warning(f"⚠️ 빈 파일 감지됨(초기화): {os.path.basename(path)}")
return [] if is_list else {}
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return [] if is_list else {}
except json.JSONDecodeError:
logger.warning(f"⚠️ JSON 형식이 깨짐(초기화): {os.path.basename(path)}")
return [] if is_list else {}
except Exception as e:
logger.error(f"❌ load_json_file 에러: {e}")
return [] if is_list else {}
def save_json_file(self, path, data):
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except Exception as e:
logger.error(f" 파일저장에러 {e}")
def get_banned_codes(self):
"""금일 매매 금지(손절 등) 종목 리스트 반환"""
data = self.load_json_file(self.banned_file)
active = []
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for c, e in data.items():
if e > now: active.append(c)
return active
def add_ban(self, code, hours=24):
"""종목을 벤(Ban) 리스트에 추가"""
data = self.load_json_file(self.banned_file)
data[code] = (datetime.datetime.now() + datetime.timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S')
self.save_json_file(self.banned_file, data)
def cleanup_banned_list(self):
"""만료된 벤 리스트 정리"""
data = self.load_json_file(self.banned_file)
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
new_data = {k: v for k, v in data.items() if v > now}
if len(data) != len(new_data): self.save_json_file(self.banned_file, new_data)
# ======================================================
# 계좌 및 자산 관리
# ======================================================
def refresh_account_status(self, is_init=False):
"""계좌 상태를 API로부터 갱신하고 슬롯 머니를 재계산"""
try:
curr_date = datetime.datetime.now().strftime("%Y%m%d")
if curr_date != self.today_date:
self.today_date = curr_date
self.consecutive_losses = 0
self.trading_halted = False
self.start_of_day_asset = 0
logger.info("📅 [날짜변경] 데이터 리셋")
total, deposit, balances = self.api.get_account_info()
if total == 0:
logger.warning("⚠️ 자산 조회 실패(0원) -> 기존 자산 유지")
total = self.current_total_asset if hasattr(self, 'current_total_asset') else 1000000
if self.start_of_day_asset == 0 or is_init:
self.start_of_day_asset = total
self.current_total_asset = total
self.current_cash = min(self.budget, deposit) if self.budget else deposit
# 현재 보유 주식의 대략적인 평가금액 (예수금과의 갭 확인용)
try:
stock_val = sum(
[b.get('current_price', 0) * b.get('qty', 0) for b in balances.values()]
) if balances else 0
except Exception:
stock_val = 0
# 예수금의 90%만 사용 (안전 버퍼)
investable = self.current_cash * 0.9
min_bet = 100000
if investable < min_bet:
self.max_stocks = 1
self.slot_money = int(investable)
else:
if (investable / 30) < min_bet:
self.max_stocks = max(int(investable / min_bet), 1)
else:
self.max_stocks = 30
self.slot_money = int(investable / self.max_stocks)
if is_init:
logger.info(
f"💰 [자금 설정] "
f"추정자산: {self.current_total_asset:,.0f}"
f"(예수금: {self.current_cash:,.0f}원 + 주식평가: {stock_val:,.0f}원) | "
f"1종목당: {self.slot_money:,.0f}"
)
self.portfolio = self.sync_portfolio_internal(balances)
self.check_risk_status(total)
except Exception as e:
logger.error(f"계좌갱신오류: {e}")
def update_account_light(self, profit_val=0):
"""
API 부하를 줄이기 위해 예수금만 빠르게 갱신하고
총자산은 로컬 계산으로 추정하는 경량 함수
"""
try:
new_cash = self.api.get_deposit_only()
if new_cash > 0 or self.current_cash == 0:
self.current_cash = new_cash
if profit_val != 0:
self.current_total_asset += profit_val
except Exception as e:
logger.error(f"경량 갱신 실패: {e}")
def get_daily_profit_rate(self, current_asset):
if self.start_of_day_asset == 0: return 0.0
return (current_asset - self.start_of_day_asset) / self.start_of_day_asset
def check_risk_status(self, current_asset):
"""일일 손실 한도 체크"""
p = self.get_daily_profit_rate(current_asset)
if p <= self.daily_stop_loss_pct and not self.trading_halted:
self.trading_halted = True
msg = f"🛑 **[STOP LOSS 발동]**\n일 손실률 {p * 100:.2f}% 도달 -> 금일 매수 중단"
logger.critical(msg)
self.send_mm(msg)
if self.enable_consecutive_loss_cut and self.consecutive_losses >= 4 and not self.trading_halted:
self.trading_halted = True
msg = f"🛑 **[연속 손절 과다]**\n4회 연속 손절 -> 금일 매수 중단"
logger.critical(msg)
self.send_mm(msg)
def sync_portfolio_internal(self, real_balances):
"""API 잔고와 로컬 포트폴리오 파일 동기화"""
local = self.load_json_file(self.portfolio_file)
final = {}
# 1. 실제 잔고 기준 업데이트
for c, r in real_balances.items():
final[c] = r
if c in local:
final[c].update({
'strategy': local[c].get('strategy', 'MANUAL'),
'max_price': max(r['current_price'], local[c].get('max_price', 0)),
'target_price': local[c].get('target_price', 0),
'stop_price': local[c].get('stop_price', 0),
'atr_at_entry': local[c].get('atr_at_entry', 0),
'buy_date': local[c].get('buy_date', '')
})
else:
final[c].update({'strategy': 'MANUAL', 'max_price': r['current_price']})
# 데이터 복구 (봇 재시작 시 누락된 전략 정보 복원)
if final[c].get('atr_at_entry') == 0 or final[c].get('strategy') == 'MANUAL':
try:
df_temp = self.api.get_ohlcv_limit(c, timeframe='1m')
if not df_temp.empty:
new_atr = self.calculate_atr(df_temp)
base_p = r.get('buy_price', r['current_price'])
final[c].update({
'strategy': 'RECOVERED',
'atr_at_entry': new_atr,
'stop_price': base_p - (new_atr * 3.0),
'target_price': base_p + (new_atr * 5.0),
'buy_date': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
logger.info(f"♻️ [데이터 복구] {r['name']} 전략 정보 재계산 완료")
except Exception as e:
logger.error(f"{c} 데이터 복구 실패: {e}")
# 2. 동기화 보호 (매수 직후 API 반영 지연 시 로컬 데이터 우선)
now = datetime.datetime.now()
for code, info in local.items():
if code not in final:
try:
buy_time = datetime.datetime.strptime(info.get('buy_date', ''), '%Y-%m-%d %H:%M:%S')
if (now - buy_time).total_seconds() < 120:
final[code] = info
logger.info(f"🛡️ [동기화 보호] {info['name']}: 잔고 미반영 유지")
except:
pass
self.save_json_file(self.portfolio_file, final)
return final
def update_universe(self):
"""매수 후보군 업데이트"""
logger.info(f"🔄 [리스트 갱신] 배정액 {self.slot_money:,.0f}")
candidates = self.api.scan_ant_shaking_candidates(max_price_limit=self.slot_money)
if not candidates: return
candidates.sort(key=lambda x: (x['score'], x['price']), reverse=True)
current_data = []
for item in candidates:
current_data.append(
{"code": item['code'], "name": item['name'], "score": item['score'], "price": item['price']})
self.save_json_file(self.target_file, current_data)
top_picks = [f"{x['name']}({x['score']:.1f})" for x in current_data[:5]]
logger.info(f" 🔝 Top 5: {', '.join(top_picks)}")
# ======================================================
# 기술적 지표 계산
# ======================================================
def calculate_atr(self, df, period=14):
"""ATR(Average True Range) 계산"""
df['tr'] = np.maximum(df['high'] - df['low'],
np.maximum(np.abs(df['high'] - df['close'].shift()),
np.abs(df['low'] - df['close'].shift())))
return df['tr'].rolling(window=period).mean().iloc[-1]
def calculate_rsi(self, series, period=14):
"""RSI(Relative Strength Index) 계산"""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
return 100 - (100 / (1 + gain / loss))
# =========================================================
# ⚡ [핵심 수정 1] 매수 시그널 - RSI 필터 및 고점 추격 방지
# =========================================================
def check_buy_signal(self, code):
"""
매수 시그널 확인 함수 (3분봉 꼬리잡기 전략)
- RSI 과열 체크 + 고가 추격 방지 + 꼬리 필터 강화
"""
# ⭐ name 변수 먼저 초기화 (에러 방지)
name = code
# 1. 밴(Ban) 리스트 확인
if code in self.get_banned_codes():
return None
try:
# 2. 데이터 조회
df = self.api.get_ohlcv_limit(code, timeframe='3m')
curr = self.api.get_current_data(code)
# name 업데이트
if curr:
name = curr.name
# 데이터 유효성 검사
if curr is None or df is None or len(df) < 20:
logger.info(f"🔍 [데이터 부족] {name} {code}: DF 길이 {len(df) if df is not None else 0}")
return None
# 3. 지표 계산
ma20 = df['close'].rolling(window=20).mean().iloc[-1]
avg_vol = df['volume'].rolling(window=20).mean().iloc[-1]
current_price = curr.current_price
current_vol = df['volume'].iloc[-1]
# ---------------------------------------------------------
# [필터 1] RSI 과열 체크
rsi = self.calculate_rsi(df['close']).iloc[-1]
if rsi >= self.rsi_overheat_threshold:
return None
# [필터 2] 일일 최고가 부근 추격 매수 방지
if current_price >= curr.high * self.high_price_chase_threshold:
return None
# [필터 3] 20일 이동평균선(MA20) 아래인지?
if current_price < ma20:
logger.info(f"🔍 [Pass-MA20] {name} {code}: 현재가({current_price}) < MA20({ma20:.2f})")
return None
# [필터 4] 최소 거래량 필터 (평균의 30%도 안되면 패스)
if current_vol < avg_vol * 0.3:
logger.info(f"🔍 [Pass-Vol] {name} {code}: 거래량 부족 ({current_vol} < {avg_vol * 0.3:.1f})")
return None
# ---------------------------------------------------------
# [타점 분석] 3분봉 꼬리 잡기 로직
candle_open = df['open'].iloc[-1]
candle_low = min(df['low'].iloc[-1], current_price)
total_tail = candle_open - candle_low
if total_tail <= 0:
return None
# 회복 비율 계산
recovery_ratio = (current_price - candle_low) / total_tail
# --- 로그로 상세 수치 확인 ---
log_msg = (
f"🧐 분석({name} {code}): 가격{current_price} | MA20 {ma20:.1f} | "
f"회복률 {recovery_ratio:.2f} (조건:{self.min_recovery_ratio}~{self.max_recovery_ratio}) | "
f"거래량 {current_vol} (조건:>{avg_vol * self.volume_avg_multiplier:.1f})"
)
# [조건 1] 회복 탄력성 (환경변수 사용: 기본 0.5~0.8)
if not (self.min_recovery_ratio <= recovery_ratio <= self.max_recovery_ratio):
logger.info(f"{log_msg} -> ❌ 회복률 미달/초과")
return None
# [조건 2] 시가 근접성 (환경변수 사용: 기본 99.5%)
if current_price < candle_open * self.candle_open_price_buffer:
logger.info(f"{log_msg} -> ❌ 시가 회복 부족")
return None
# [조건 3] 거래량 폭발 여부 (환경변수 사용: 기본 평균 × 1.0)
if current_vol <= avg_vol * self.volume_avg_multiplier:
logger.info(f"{log_msg} -> ❌ 거래량 파워 부족")
return None
# ---------------------------------------------------------
# [수급 필터] 최종 관문
foreigner, institution = self.api.get_intraday_investor(code)
logger.info(f"✨ 1차 통과({name} {code}): 수급 확인 중... 외인:{foreigner}, 기관:{institution}")
# 양쪽에서 동시에 대량 매도 중이면 스킵 (환경변수 사용: 기본 -1000)
if foreigner < self.intraday_investor_net_buy_threshold and institution < self.intraday_investor_net_buy_threshold:
logger.info(f"⛔ 수급 이탈 감지({name} {code}): 외인{foreigner} / 기관{institution} -> 매수 포기")
return None
# 최종 매수 시그널
logger.info(f"🚀 [매수 신호] {name} {code}: 3분봉 꼬리 공략! (회복률: {recovery_ratio:.2f})")
return 'TAIL_CATCH_3M'
except Exception as e:
logger.error(f"⚠️ 매수 시그널 분석 중 에러({name} {code}): {e}", exc_info=True)
return None
def execute_buy(self, code, strategy):
"""매수 실행 및 포트폴리오 등록"""
if self.trading_halted: return
if self.current_cash < self.slot_money: return
curr = self.api.get_current_data(code)
if not curr or curr.current_price > self.slot_money: return
# ATR 계산 및 손절/목표가 설정
df = self.api.get_ohlcv_limit(code, timeframe='1m')
atr = curr.current_price * 0.01
if len(df) >= 20: atr = self.calculate_atr(df)
# stop = curr.current_price - (atr * 2.0)
# target = curr.current_price + (atr * 3.0)
if strategy == 'TAIL_CATCH_3M':
# 2일 보유 전제: 손절 넓게, 목표가 크게
stop = curr.current_price - (atr * self.stop_atr_multiplier_tail)
target = curr.current_price + (atr * self.target_atr_multiplier_tail)
else:
stop = curr.current_price - (atr * self.stop_atr_multiplier_normal)
target = curr.current_price + (atr * self.target_atr_multiplier_tail)
# 분할 매수 실행
total_qty = int(self.slot_money / curr.current_price)
if total_qty > 0:
split_count = 1
if self.slot_money >= 100000:
split_count = random.randint(10, 20)
base_qty = total_qty // split_count
remainder = total_qty % split_count
bought_qty = 0
logger.info(f"🔫 [진입 시작] {curr.name}{total_qty}주 | {split_count}회 분할")
for i in range(split_count):
qty_to_buy = base_qty + (remainder if i == split_count - 1 else 0)
if qty_to_buy <= 0: continue
if self.api.buy_market_order(code, qty_to_buy):
bought_qty += qty_to_buy
if i < split_count - 1:
time.sleep(random.uniform(0.8, 1))
else:
logger.error(f"❌ 매수 주문 실패: {curr.name}")
# 매수 성공 시 처리
if bought_qty > 0:
self.portfolio = self.load_json_file(self.portfolio_file)
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.portfolio[code] = {
'buy_price': curr.current_price,
'qty': bought_qty,
'strategy': strategy,
'buy_date': now_str,
'name': curr.name,
'stop_price': stop,
'target_price': target,
'atr_at_entry': atr,
'max_price': curr.current_price # 초기값은 매수단가
}
self.save_json_file(self.portfolio_file, self.portfolio)
self.add_history('BUY', code, curr.name, curr.current_price, bought_qty, strategy)
# 알림
try:
self.update_account_light(profit_val=0)
total_p = self.get_daily_profit_rate(self.current_total_asset) * 100
msg = f"🔫 **[매수 완료] {curr.name}**\n수량: {bought_qty}\n전략: {strategy}\n자산변동: {total_p:+.2f}%"
self.send_mm(msg)
except Exception as e:
logger.error(f"알림 전송 실패: {e}")
time.sleep(0.5)
# =========================================================
# ⚡ [핵심 수정 2] 매도 로직 - 어깨 매도 및 거래량 체크 추가
# =========================================================
def check_sell(self, code):
"""매도 조건 체크 (트레일링 스탑, 손절, 어깨 매도)"""
if code not in self.portfolio: return False, None, 0, 0
info = self.portfolio[code]
curr = self.api.get_current_data(code)
if not curr: return False, None, 0, 0
buy_price = info.get('buy_price', 0)
current_price = curr.current_price
# 고점(Max Price) 갱신
if current_price > info.get('max_price', 0):
info['max_price'] = current_price
self.save_json_file(self.portfolio_file, self.portfolio)
max_price = info.get('max_price', buy_price)
profit_pct = (current_price - buy_price) / buy_price if buy_price > 0 else 0
profit_val = (current_price - buy_price) * info['qty']
atr = info.get('atr_at_entry', buy_price * 0.01)
reason = None
# [필수 1] 어깨 매도 (Shoulder Cut)
# 고점 대비 3% 이상 빠지면 수익/손실 여부 불문하고 즉시 탈출
# (선익시스템 사례 방지: 고점에서 물렸을 때 빠르게 자르기 위함)
drop_from_high = (max_price - current_price) / max_price if max_price > 0 else 0
if drop_from_high >= self.shoulder_cut_pct:
# 추가 체크: 거래량이 터지면서 떨어지면 더 위험하지만, 어깨 매도는 무조건 자르는게 원칙
return True, f"어깨매도(고점대비-{drop_from_high * 100:.1f}%)", profit_pct, profit_val
# 기존 로직 분기
if info.get('strategy') == 'TAIL_CATCH_3M':
# 스캘핑 로직
if (max_price >= buy_price + atr * 1.0) and (current_price <= buy_price + atr * 0.2):
reason = "스캘핑_본절사수"
elif current_price < (max_price - atr * 1.0) and profit_pct > 0:
reason = "스캘핑_익절보존"
# 시간 컷 (10분 내 승부 안나면 매도)
buy_time_str = info.get('buy_date', '')
hours_passed = 0
if buy_time_str:
buy_time = datetime.datetime.strptime(buy_time_str, '%Y-%m-%d %H:%M:%S')
hours_passed = (datetime.datetime.now() - buy_time).total_seconds() / 3600
# [전략별 분기]
if info.get('strategy') == 'TAIL_CATCH_3M':
# 2일(48시간) 이내는 보수적 보유
if hours_passed < 48:
# 큰 수익만 익절
if profit_pct > 0.05:
reason = "💰 2일내 5%+ 익절"
elif max_price >= buy_price * 1.07 and current_price <= max_price * 0.97:
reason = "📈 2일내 고점7% 찍고 3% 하락"
else:
# 2일 경과 후 적극 익절
if profit_pct > 0.02:
reason = "⏰ 2일 경과 2%+ 익절"
elif profit_pct > 0 and current_price < max_price * 0.97:
reason = "⏰ 2일 경과 익절보호"
else:
# 일반/MANUAL/RECOVERED 전략
max_profit_pct = (max_price - buy_price) / buy_price if buy_price > 0 else 0
if max_profit_pct >= 0.015 and profit_pct <= 0.005:
reason = "본절보호"
elif profit_pct > 0 and max_profit_pct >= 0.03 and current_price < max_price * 0.99:
reason = "트레일링스탑"
# [공통] 최후의 보루 (목표가 달성 및 손절)
if not reason:
if current_price >= info.get('target_price', 9999999):
reason = "목표달성"
elif profit_pct <= self.stop_loss_pct:
reason = f"칼손절({profit_pct * 100:.1f}%)"
self.add_ban(code)
self.consecutive_losses += 1
elif current_price <= info.get('stop_price', 0):
reason = "전략손절"
if reason:
if profit_pct > 0: self.consecutive_losses = 0
return True, reason, profit_pct, profit_val
return False, None, 0, 0
def add_history(self, type, code, name, price, qty, reason="", strategy=""):
"""매매 이력 저장"""
try:
rec = {
'type': type,
'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'code': code,
'name': name,
'price': price,
'qty': qty,
'reason': reason,
'strategy': strategy
}
h = self.load_json_file(self.history_real_file, is_list=True)
h.append(rec)
with open(self.history_real_file, 'w', encoding='utf-8') as f:
json.dump(h, f, ensure_ascii=False, indent=4)
except:
pass
def run(self):
"""메인 실행 루프"""
logger.info(f"🚀 감시 시작 (손절: {self.stop_loss_pct * 100}%)")
# 봇 시작 시 장 상태 확인 및 알림
first_check = self.api.check_market_status()
if not first_check:
self.send_mm("💤 현재 장 운영 시간이 아닙니다. 봇이 대기 모드에 들어갑니다.")
while True:
# 장 상태 체크
is_open = self.api.check_market_status()
# 장 시작/마감 이벤트 처리
if is_open and not self.was_market_open:
self.refresh_account_status()
self.was_market_open = True
self.send_mm("🌅 **[장 시작]** 봇이 매매를 시작합니다.")
elif not is_open and self.was_market_open:
self.was_market_open = False
self.refresh_account_status()
day_profit = self.current_total_asset - self.start_of_day_asset
self.send_mm(f"🌙 **[장 마감]**\n오늘 손익: {day_profit:,.0f}")
self.is_first_run = True # 다음 날을 위해 리셋
# 장 휴장 시 대기
if not is_open:
time.sleep(60)
continue
try:
now = datetime.datetime.now()
# [유니버스 갱신] 5분 주기
if self.is_first_run or (now.minute % 5 == 0 and now.second < 5):
self.update_universe()
self.cleanup_banned_list()
self.is_first_run = False
logger.info("⏳ [주기] 유니버스 갱신 완료, 5초 대기 후 다음 루프")
time.sleep(5)
# [생존 신고] 1분 주기 (로그만)
if now.minute % 1 == 0 and now.second < 2:
targets = self.load_json_file(self.target_file, is_list=True)
logger.info(f"👀 [생존] 타겟:{len(targets)} | 보유:{len(self.portfolio)}/{self.max_stocks} | 2초 대기")
time.sleep(2)
# 1. 매도 로직 (우선 순위)
for code in list(self.portfolio.keys()):
sell, reason, profit_pct, profit_val = self.check_sell(code)
if sell:
info = self.portfolio[code]
if self.api.sell_market_order(code, info['qty']):
del self.portfolio[code]
self.save_json_file(self.portfolio_file, self.portfolio)
strategy_tag = info.get('strategy', '')
self.add_history('SELL', code, info['name'], 0, info['qty'], reason, strategy_tag)
# 알림 전송
try:
self.update_account_light(profit_val)
total_p = self.get_daily_profit_rate(self.current_total_asset) * 100
icon = "💰" if profit_pct > 0 else "💧"
r_tag = "[RECOVERED] " if strategy_tag == "RECOVERED" else ""
msg = f"{icon} **[매도] {r_tag}{info['name']}**\n수익: {profit_pct * 100:.2f}% ({profit_val:,.0f}원)\n사유: {reason}\n누적: {total_p:+.2f}%"
self.send_mm(msg)
except Exception as e:
logger.error(f"매도 알림 실패: {e}")
time.sleep(1)
# 2. 매수 로직
if not self.trading_halted and len(self.portfolio) < self.max_stocks:
targets = self.load_json_file(self.target_file, is_list=True)
if targets:
for item in targets:
if item['code'] in self.portfolio: continue
# 돈 없으면 스탑
if self.current_cash < self.slot_money: break
# 매수 시그널 확인
sig = self.check_buy_signal(item['code'])
if sig:
self.execute_buy(item['code'], sig)
time.sleep(1)
time.sleep(1)
except KeyboardInterrupt:
logger.info("🛑 사용자에 의해 종료됨")
break
except Exception as e:
logger.error(f"메인 루프 에러: {e}")
self.send_mm(f"⚠️ **[봇 에러 발생]**\n내용: {e}")
logger.info("⏳ [복구] 에러 후 5초 대기 후 재개")
time.sleep(5)
if __name__ == "__main__":
bot = JungleSurvivorBot(BrokerAPI())
bot.run()