Files
kis_bot/kiwoom_trader_ver2.py
2026-02-22 21:42:41 +09:00

1046 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Kiwoom Trading Bot Ver2 - DB 기반 고급 트레이딩 시스템
- SQLite DB 기반 안전한 데이터 관리
- 변동성 기반 자금 관리 (Risk Manager)
- TWAP 스마트 분할 매수 (Smart Executor)
- 하프 켈리 공식 적용
- 기존 TAIL_CATCH_3M 전략 유지 및 강화
"""
import time
import json
import datetime
import pandas as pd
import numpy as np
import os
import logging
import requests
from dotenv import load_dotenv
# 새로운 모듈 임포트
from database import TradeDB
from risk_manager import RiskManager
from smart_executor import SmartOrderExecutor
# ==========================================================
# [Step 0] 환경 변수 및 기본 설정
# ==========================================================
current_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(current_dir, ".env")
if not os.path.exists(env_path):
env_path = os.path.join(os.path.dirname(current_dir), ".env")
load_dotenv(env_path)
# Mattermost 설정
MM_SERVER_URL = "https://mattermost.hoonfam.org"
MM_BOT_TOKEN = os.environ.get("MM_BOT_TOKEN_", "").strip()
MM_CONFIG_FILE = os.path.join(current_dir, "mm_config.json")
# [Logger 설정]
logging.basicConfig(
format='[%(asctime)s] %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO
)
logger = logging.getLogger("TradingBotV2")
# 외부 라이브러리 로그 억제
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
# 키움 API 모듈 임포트
try:
from kiwoom_rest_api.auth.token import TokenManager
from kiwoom_rest_api.koreanstock.stockinfo import StockInfo
from kiwoom_rest_api.koreanstock.chart import Chart
from kiwoom_rest_api.koreanstock.order import Order
from kiwoom_rest_api.koreanstock.rank_info import RankInfo
from kiwoom_rest_api.koreanstock.account import Account
from kiwoom_rest_api.koreanstock.market_condition import MarketCondition
except ImportError as e:
logger.critical(f"❌ 키움 REST API 모듈 임포트 실패: {e}")
raise e
# ==========================================================
# [Part 0] Mattermost 봇 클래스
# ==========================================================
class MattermostBot:
def __init__(self):
self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts"
self.headers = {
"Authorization": f"Bearer {MM_BOT_TOKEN}",
"Content-Type": "application/json"
}
self.channels = self._load_channels()
def _load_channels(self):
try:
if os.path.exists(MM_CONFIG_FILE):
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get("channels", {})
return {}
except Exception as e:
logger.error(f"⚠️ MM 설정 로드 실패: {e}")
return {}
def send(self, channel_alias, message):
channel_id = self.channels.get(channel_alias)
if not channel_id:
logger.warning(f"'{channel_alias}' 채널 ID 없음")
return False
payload = {"channel_id": channel_id, "message": message}
try:
res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3)
res.raise_for_status()
return True
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
return False
# ==========================================================
# [Part 1] 브로커 API (키움증권 REST API 연동)
# ==========================================================
class BrokerAPI:
def __init__(self):
logger.info("🔵 키움(REST) 브로커 연결 시도...")
try:
self.token_manager = TokenManager()
self.stock_info = StockInfo(token_manager=self.token_manager)
self.chart = Chart(token_manager=self.token_manager)
self.order = Order(token_manager=self.token_manager)
self.rank = RankInfo(token_manager=self.token_manager)
self.account = Account(token_manager=self.token_manager)
self.market = MarketCondition(token_manager=self.token_manager)
self.acc_no = os.environ.get("KIWOOM_ACCOUNT_NO", "")
logger.info(f"✅ 브로커 연결 완료 (계좌: {self.acc_no})")
except Exception as e:
logger.critical(f"❌ 브로커 초기화 실패: {e}")
raise e
def _safe_request(self, func, *args, **kwargs):
"""API 호출 안전장치 (429 에러 핸들링)"""
full_name = func.__name__
api_id = full_name.split('_')[-1]
max_retries = 3
for i in range(max_retries):
try:
time.sleep(1) # 기본 안전 대기
result = func(*args, **kwargs)
if isinstance(result, dict) and result.get('return_code') != '0' and '초과' in result.get('msg1', ''):
raise Exception("429 Rate Limit Detected")
return result
except Exception as e:
if "429" in str(e) or "과부하" in str(e):
logger.warning(f"⚠️ [{api_id}] API 과부하 -> 5초 대기 ({i + 1}/{max_retries})")
logger.info("5초 대기 시작...")
time.sleep(5)
logger.info("5초 대기 완료, 재시도합니다.")
else:
logger.error(f"❌ [{api_id}] 호출 에러: {e}")
time.sleep(1)
logger.error(f"💀 [{api_id}] 3회 재시도 실패")
return {}
def get_deposit_only(self):
"""예수금 조회"""
try:
res = self._safe_request(self.account.deposit_detail_status_request_kt00001, qry_tp="2")
d2_deposit = float(res.get('d2_entra', 0)) if res else 0
current_deposit = float(res.get('ord_alow_amt', 0)) if res else 0
return 0 if d2_deposit < 0 else current_deposit
except Exception as e:
logger.error(f"예수금 조회 실패: {e}")
return 0
def get_account_info(self):
"""계좌 평가 정보 조회 (전체 자산, 주식 평가액 등)"""
try:
res = self._safe_request(self.account.account_evaluation_balance_detail_request_kt00018,
qry_tp="2", prd_tp="1")
if not res:
return {'total_asset': 0, 'deposit': 0, 'stock_value': 0, 'profit_rate': 0}
total_asset = float(res.get('estm_ast_amt', 0))
deposit = float(res.get('ord_alow_amt', 0))
stock_value = float(res.get('stck_pbls_amt', 0))
# 평가손익률 계산
deposit_clean = float(res.get('d2_entra', deposit))
profit_rate = 0.0
if (deposit_clean + stock_value) > 0:
profit_rate = ((total_asset - (deposit_clean + stock_value)) / (deposit_clean + stock_value)) * 100
return {
'total_asset': total_asset,
'deposit': deposit,
'stock_value': stock_value,
'profit_rate': profit_rate
}
except Exception as e:
logger.error(f"계좌 정보 조회 실패: {e}")
return {'total_asset': 0, 'deposit': 0, 'stock_value': 0, 'profit_rate': 0}
def check_market_status(self):
"""장 운영 시간 체크 (08:30 ~ 16:00)"""
now = datetime.datetime.now()
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
return False
if now.weekday() >= 5: # 주말
return False
return True
def get_ohlcv(self, code, timeframe='3m', limit=100):
"""분봉 차트 데이터 조회"""
tic_scope = {"1m": "1", "3m": "3", "5m": "5", "10m": "10"}.get(timeframe, "3")
try:
res = self._safe_request(
self.chart.stock_minute_chart_request_ka10080,
stk_cd=code, tic_scope=tic_scope, upd_stkpc_tp="1"
)
data = res.get('stk_min_pole_chart_qry', []) if res else []
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
df = df.rename(columns={
'cur_prc': 'close', 'open_pric': 'open',
'high_pric': 'high', 'low_pric': 'low',
'trde_qty': 'volume'
})
# 시간순 정렬 (과거->현재)
df = df[['open', 'high', 'low', 'close', 'volume']].astype(float).abs()
return df.iloc[::-1].reset_index(drop=True).tail(limit)
except Exception as e:
logger.error(f"차트 조회 실패({code}): {e}")
return pd.DataFrame()
def get_current_price(self, code):
"""현재가 조회"""
try:
res = self._safe_request(
self.stock_info.watchlist_stock_information_request_ka10095,
stock_code=code
)
if res and 'atn_stk_infr' in res and len(res['atn_stk_infr']) > 0:
item = res['atn_stk_infr'][0]
return abs(float(item.get('cur_prc', 0)))
return None
except Exception as e:
logger.error(f"현재가 조회 에러({code}): {e}")
return None
def buy_market_order(self, code, qty):
"""시장가 매수 주문"""
try:
res = self._safe_request(
self.order.stock_buy_order_request_kt10000,
dmst_stex_tp="KRX", stk_cd=code,
ord_qty=str(qty), trde_tp="3", ord_uv="0"
)
if str(res.get('return_code')) == '0':
return True
logger.error(f"매수 주문 실패({code}): {res}")
return False
except Exception as e:
logger.error(f"매수 주문 예외({code}): {e}")
return False
def sell_market_order(self, code, qty):
"""시장가 매도 주문"""
try:
res = self._safe_request(
self.order.stock_sell_order_request_kt10001,
dmst_stex_tp="KRX", stk_cd=code,
ord_qty=str(qty), trde_tp="3", ord_uv="0"
)
if str(res.get('return_code')) == '0':
return True
logger.error(f"매도 주문 실패({code}): {res}")
return False
except Exception as e:
logger.error(f"매도 주문 예외({code}): {e}")
return False
# ==========================================================
# [Part 2] 메인 트레이딩 봇 Ver2
# ==========================================================
class TradingBotV2:
def __init__(self, broker_api):
self.api = broker_api
# Mattermost 초기화
self.mm = MattermostBot()
self.mm_channel = "stock"
# 파일 경로
self.bot_state_file = os.path.join(current_dir, 'bot_state.json')
# DB 초기화
self.db = TradeDB(db_path="quant_bot.db")
# Risk Manager 초기화
kelly_enabled = os.environ.get("USE_KELLY", "false").lower() == "true"
self.risk_mgr = RiskManager(
risk_pct_per_trade=float(os.environ.get("RISK_PCT_PER_TRADE", "0.02")),
max_position_pct=float(os.environ.get("MAX_POSITION_PCT", "0.20")),
min_position_amount=int(os.environ.get("MIN_POSITION_AMOUNT", "50000")),
use_kelly=kelly_enabled
)
# Smart Executor 초기화 (TWAP 분할 매수)
use_twap = os.environ.get("USE_TWAP", "false").lower() == "true"
self.use_twap = use_twap
if use_twap:
self.executor = SmartOrderExecutor(
min_split_amount=int(os.environ.get("TWAP_MIN_SPLIT", "500000")),
max_split_amount=int(os.environ.get("TWAP_MAX_SPLIT", "2000000")),
min_delay_seconds=int(os.environ.get("TWAP_MIN_DELAY", "30")),
max_delay_seconds=int(os.environ.get("TWAP_MAX_DELAY", "180"))
)
else:
self.executor = None
# 거래 설정
self.max_stocks = int(os.environ.get("MAX_STOCKS", "5"))
self.stop_loss_pct = float(os.environ.get("STOP_LOSS_PCT", "-0.035"))
self.daily_stop_loss_pct = float(os.environ.get("DAILY_STOP_LOSS_PCT", "-0.05"))
# 전략 파라미터
self.rsi_threshold = float(os.environ.get("RSI_OVERHEAT_THRESHOLD", "73"))
self.shoulder_cut_pct = float(os.environ.get("SHOULDER_CUT_PCT", "0.03"))
self.min_recovery_ratio = float(os.environ.get("MIN_RECOVERY_RATIO", "0.5"))
self.max_recovery_ratio = float(os.environ.get("MAX_RECOVERY_RATIO", "0.8"))
# 상태 변수
self.current_cash = 0
self.current_total_asset = 0
self.start_asset = 0
self.prev_session_asset = 0 # 이전 실행 시 자산
self.start_day_asset = 0 # 오늘 장 시작 시 자산
self.today_date = datetime.datetime.now().strftime("%Y%m%d")
self.trading_halted = False
# 리포트 플래그
self.morning_report_sent = False # 오전 장 뜸할 때 (13:00)
self.closing_report_sent = False # 장마감 전 (15:15)
self.final_report_sent = False # 장마감 후 (15:35)
# 봇 상태 로드 (이전 실행 정보)
self._load_bot_state()
# 초기 계좌 정보 로드
self.refresh_account()
# JSON 마이그레이션 (최초 1회)
self._migrate_from_json_if_needed()
# 시작 메시지
self._send_startup_message(kelly_enabled, use_twap)
# 봇 상태 저장
self._save_bot_state()
def _load_bot_state(self):
"""이전 실행 시 봇 상태 로드"""
try:
bot_state_file = getattr(self, 'bot_state_file', None) or os.path.join(current_dir, 'bot_state.json')
if os.path.exists(bot_state_file):
with open(self.bot_state_file, 'r', encoding='utf-8') as f:
state = json.load(f)
self.prev_session_asset = float(state.get('start_equity', 0))
prev_day = state.get('start_day', '')
# 날짜가 바뀌었으면 오늘 시작 자산 갱신 필요
if prev_day != self.today_date:
self.start_day_asset = 0 # 계좌 조회 후 설정
logger.info(f"📅 날짜 변경 감지: {prev_day}{self.today_date}")
else:
self.start_day_asset = self.prev_session_asset
logger.info(f"📂 봇 상태 로드: 이전 자산 {self.prev_session_asset:,.0f}")
else:
logger.info("📂 봇 상태 파일 없음 (최초 실행)")
self.prev_session_asset = 0
self.start_day_asset = 0
except Exception as e:
logger.error(f"❌ 봇 상태 로드 실패: {e}")
self.prev_session_asset = 0
self.start_day_asset = 0
def _save_bot_state(self):
"""현재 봇 상태 저장"""
try:
state = {
'start_equity': self.current_total_asset,
'start_day': self.today_date,
'last_update': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
with open(self.bot_state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=4, ensure_ascii=False)
except Exception as e:
logger.error(f"❌ 봇 상태 저장 실패: {e}")
def _send_startup_message(self, kelly_enabled, use_twap):
"""시작 메시지 전송 (이전 실행 대비 손익률 포함)"""
# 이전 실행 대비 손익률 계산
if self.prev_session_asset > 0:
session_pnl = self.current_total_asset - self.prev_session_asset
session_pnl_pct = (session_pnl / self.prev_session_asset) * 100
session_info = f"\n- 이전 실행 대비: {session_pnl:+,.0f}원 ({session_pnl_pct:+.2f}%)"
else:
session_info = "\n- 이전 실행 대비: 데이터 없음 (최초 실행)"
# 오늘 장 시작 대비 손익률
if self.start_day_asset > 0 and self.start_day_asset != self.current_total_asset:
day_pnl = self.current_total_asset - self.start_day_asset
day_pnl_pct = (day_pnl / self.start_day_asset) * 100
day_info = f"\n- 오늘 장 시작 대비: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)"
else:
day_info = ""
msg = (
f"🤖 **[트레이딩봇 Ver2 가동]**\n"
f"- 현재 자산: {self.current_total_asset:,.0f}"
f"{session_info}"
f"{day_info}\n"
f"- DB 기반 안전 관리\n"
f"- 변동성 기반 자금 관리\n"
f"- TWAP: {'ON' if use_twap else 'OFF'}\n"
f"- 켈리 공식: {'ON' if kelly_enabled else 'OFF'}"
)
logger.info(msg)
self.send_mm(msg)
def _migrate_from_json_if_needed(self):
"""기존 JSON 파일에서 DB로 마이그레이션 (1회성)"""
portfolio_file = os.path.join(current_dir, 'portfolio.json')
if os.path.exists(portfolio_file):
try:
with open(portfolio_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data:
count = self.db.migrate_from_json(data)
logger.info(f"📦 JSON 마이그레이션 완료: {count}개 종목")
# 백업 후 삭제
backup_path = portfolio_file + ".backup"
os.rename(portfolio_file, backup_path)
logger.info(f"💾 기존 JSON 백업: {backup_path}")
except Exception as e:
logger.error(f"❌ JSON 마이그레이션 실패: {e}")
def send_mm(self, msg):
"""Mattermost 알림 전송"""
try:
self.mm.send(self.mm_channel, msg)
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
def refresh_account(self):
"""계좌 정보 갱신"""
try:
info = self.api.get_account_info()
self.current_cash = info['deposit']
self.current_total_asset = info['total_asset']
# 첫 실행 시 시작 자산 설정
if self.start_asset == 0:
self.start_asset = info['total_asset']
# 오늘 장 시작 자산 설정 (날짜 바뀜 or 최초 실행)
if self.start_day_asset == 0:
self.start_day_asset = info['total_asset']
logger.info(f"📅 오늘 장 시작 자산 설정: {self.start_day_asset:,.0f}")
# 손익률 계산
day_pnl = self.current_total_asset - self.start_day_asset
day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
logger.info(
f"💰 예수금: {self.current_cash:,.0f}원 | "
f"총자산: {self.current_total_asset:,.0f}"
f"(예수금 {self.current_cash:,.0f} + 주식 {info['stock_value']:,.0f}) | "
f"오늘: {day_pnl:+,.0f}원({day_pnl_pct:+.2f}%)"
)
except Exception as e:
logger.error(f"❌ 계좌 정보 갱신 실패: {e}")
def calculate_rsi(self, df, period=14):
"""RSI 계산"""
try:
if df is None or len(df) < period + 1:
return 50
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1] if not np.isnan(rsi.iloc[-1]) else 50
except:
return 50
def calculate_atr(self, df, period=14):
"""ATR 계산"""
try:
if df is None or len(df) < period:
return 0
df = df.copy()
df['tr'] = np.maximum(
df['high'] - df['low'],
np.maximum(
np.abs(df['high'] - df['close'].shift()),
np.abs(df['low'] - df['close'].shift())
)
)
atr = df['tr'].rolling(window=period).mean().iloc[-1]
return atr if not np.isnan(atr) else 0
except:
return 0
def check_buy_signal_tail_catch(self, code, name):
"""
TAIL_CATCH_3M 전략: 3분봉 꼬리 잡기
- 기존 로직 유지 + 변동성 기반 비중 계산 추가
"""
try:
df = self.api.get_ohlcv(code, timeframe='3m', limit=50)
if df is None or len(df) < 20:
return None
current_price = df['close'].iloc[-1]
candle = df.iloc[-1]
# 1. RSI 과열 필터
rsi = self.calculate_rsi(df)
if rsi > self.rsi_threshold:
return None
# 2. 일일 고점 추격 방지
daily_high = df['high'].max()
if current_price > daily_high * 0.96:
return None
# 3. 아래꼬리 확인
candle_low = candle['low']
candle_high = candle['high']
candle_open = candle['open']
candle_close = candle['close']
body_top = max(candle_open, candle_close)
body_bottom = min(candle_open, candle_close)
tail_length = body_bottom - candle_low
body_length = body_top - body_bottom
if tail_length <= 0 or body_length <= 0:
return None
# 꼬리 길이 비율 확인
tail_ratio = tail_length / body_length
tail_pct = tail_length / candle_low
if tail_ratio < 1.5 or tail_pct < 0.003:
return None
# 4. 회복 속도 확인
total_range = candle_high - candle_low
if total_range <= 0:
return None
recovery_ratio = (current_price - candle_low) / total_range
if not (self.min_recovery_ratio <= recovery_ratio <= self.max_recovery_ratio):
return None
# 5. 변동성 계산 및 매수 금액 산출
atr = self.calculate_atr(df)
# 켈리 비율 가져오기
kelly_fraction = None
if self.risk_mgr.use_kelly:
kelly_fraction = self.db.calculate_half_kelly()
# Risk Manager를 통한 안전 매수 금액 계산
safe_amount = self.risk_mgr.get_position_size(
stock_name=name,
current_balance=self.current_cash,
df=df,
kelly_fraction=kelly_fraction
)
if safe_amount < self.risk_mgr.min_amount:
logger.info(f"🚫 [{name}] 계산된 금액이 최소 매수액 미달")
return None
# 6. 수량 계산
qty = self.risk_mgr.calculate_quantity(current_price, safe_amount)
if qty < 1:
return None
# 손절가/목표가 설정 (ATR 기반)
atr_multiplier_stop = 3.5
atr_multiplier_target = 8.0
stop_price = current_price - (atr * atr_multiplier_stop)
target_price = current_price + (atr * atr_multiplier_target)
logger.info(
f"✅ [{name}] TAIL_CATCH 시그널 발생 | "
f"가격:{current_price:,.0f} | RSI:{rsi:.1f} | "
f"꼬리비율:{tail_ratio:.2f} | 회복:{recovery_ratio:.2%} | "
f"매수:{safe_amount:,.0f}원({qty}주)"
)
return {
'code': code,
'name': name,
'price': current_price,
'qty': qty,
'amount': safe_amount,
'strategy': 'TAIL_CATCH_3M',
'stop_price': stop_price,
'target_price': target_price,
'atr': atr
}
except Exception as e:
logger.error(f"❌ [{name}] 매수 시그널 체크 실패: {e}")
return None
def execute_buy(self, signal):
"""매수 실행 (TWAP 분할 매수 또는 즉시 매수)"""
try:
code = signal['code']
name = signal['name']
qty = signal['qty']
amount = signal['amount']
# DB에 이미 있는지 확인
active_trades = self.db.get_active_trades()
if code in active_trades:
logger.warning(f"⚠️ [{name}] 이미 보유 중 -> 매수 스킵")
return False
# TWAP 활성화 시 스마트 주문 등록
if self.use_twap and self.executor and amount >= 1000000: # 100만원 이상만 분할
self.executor.add_order(code, name, amount, duration_minutes=30)
logger.info(f"📝 [{name}] TWAP 분할 매수 등록: {amount:,.0f}")
return True
# 일반 매수 (즉시 실행)
success = self.api.buy_market_order(code, qty)
if success:
# DB에 저장
trade_data = {
'code': code,
'name': name,
'avg_buy_price': signal['price'],
'stop_price': signal['stop_price'],
'target_price': signal['target_price'],
'atr_entry': signal['atr'],
'target_qty': qty,
'current_qty': qty,
'total_invested': signal['price'] * qty,
'status': 'HOLDING',
'strategy': signal['strategy']
}
self.db.upsert_trade(trade_data)
msg = f"💰 **[매수 체결]** {name}\n가격: {signal['price']:,.0f}× {qty}"
logger.info(msg)
self.send_mm(msg)
return True
return False
except Exception as e:
logger.error(f"❌ 매수 실행 실패: {e}")
return False
def check_sell_signals(self):
"""보유 종목 매도 시그널 체크"""
active_trades = self.db.get_active_trades()
if not active_trades:
return
for code, trade in list(active_trades.items()):
try:
name = trade['name']
buy_price = trade['avg_buy_price']
stop_price = trade['stop_price']
target_price = trade['target_price']
qty = trade['current_qty']
# 현재가 조회
current_price = self.api.get_current_price(code)
if not current_price:
continue
# DB 현재가 업데이트
self.db.update_current_price(code, current_price)
# 손익률 계산
profit_rate = ((current_price - buy_price) / buy_price) * 100
# 매도 사유 판단
sell_reason = None
# 1. 손절
if current_price <= stop_price:
sell_reason = f"손절({profit_rate:.1f}%)"
# 2. 목표가 달성
elif current_price >= target_price:
sell_reason = f"목표달성({profit_rate:.1f}%)"
# 3. 어깨 매도 (고점 대비 하락)
max_price = trade.get('max_price', buy_price)
if current_price > max_price:
self.db.update_max_price(code, current_price)
else:
drop_from_high = (max_price - current_price) / max_price
if drop_from_high > self.shoulder_cut_pct:
sell_reason = f"어깨매도({profit_rate:.1f}%)"
# 매도 실행
if sell_reason:
if self.api.sell_market_order(code, qty):
self.db.close_trade(code, current_price, sell_reason)
msg = f"📤 **[매도 체결]** {name}\n사유: {sell_reason}\n수익: {profit_rate:+.2f}%"
logger.info(msg)
self.send_mm(msg)
except Exception as e:
logger.error(f"❌ [{code}] 매도 체크 실패: {e}")
def send_morning_report(self):
"""오전 장 뜸할 때 리포트 (13:00)"""
try:
# 계좌 정보 갱신
info = self.api.get_account_info()
# 손익 계산
day_pnl = info['total_asset'] - self.start_day_asset
day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
# 보유 종목 정보
active_trades = self.db.get_active_trades()
holdings_info = ""
if active_trades:
holdings_info = "\n\n**보유 종목:**"
for code, trade in active_trades.items():
current_price = self.api.get_current_price(code)
if current_price:
profit = ((current_price - trade['avg_buy_price']) / trade['avg_buy_price']) * 100
holdings_info += f"\n- {trade['name']}: {profit:+.2f}%"
# 오늘 거래 통계
today_start = datetime.datetime.now().strftime('%Y-%m-%d 00:00:00')
cursor = self.db.conn.execute(
"SELECT COUNT(*) as cnt, SUM(realized_pnl) as pnl FROM trade_history WHERE sell_date >= ?",
(today_start,)
)
row = cursor.fetchone()
today_trades = row['cnt'] if row else 0
today_trade_pnl = row['pnl'] if row and row['pnl'] else 0
msg = (
f"🌞 **[오전 장 리포트 13:00]**\n"
f"- 시작 자산: {self.start_day_asset:,.0f}\n"
f"- 현재 자산: {info['total_asset']:,.0f}\n"
f"- 오늘 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)\n"
f"- 오늘 거래: {today_trades}건 ({today_trade_pnl:+,.0f}원)"
f"{holdings_info}"
)
logger.info(msg)
self.send_mm(msg)
self.morning_report_sent = True
except Exception as e:
logger.error(f"❌ 오전 리포트 전송 실패: {e}")
def send_closing_report(self):
"""장마감 전 리포트 (15:15)"""
try:
info = self.api.get_account_info()
day_pnl = info['total_asset'] - self.start_day_asset
day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
# 오늘 거래 통계
today_start = datetime.datetime.now().strftime('%Y-%m-%d 00:00:00')
cursor = self.db.conn.execute(
"SELECT COUNT(*) as cnt, SUM(realized_pnl) as pnl FROM trade_history WHERE sell_date >= ?",
(today_start,)
)
row = cursor.fetchone()
today_trades = row['cnt'] if row else 0
today_trade_pnl = row['pnl'] if row and row['pnl'] else 0
msg = (
f"🔔 **[장마감 전 리포트 15:15]**\n"
f"- 현재 자산: {info['total_asset']:,.0f}\n"
f"- 오늘 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)\n"
f"- 오늘 거래: {today_trades}건 ({today_trade_pnl:+,.0f}원)\n"
f"- 보유 종목: {len(self.db.get_active_trades())}"
)
logger.info(msg)
self.send_mm(msg)
self.closing_report_sent = True
except Exception as e:
logger.error(f"❌ 장마감 전 리포트 전송 실패: {e}")
def send_final_report(self):
"""장마감 후 최종 리포트 (15:35)"""
try:
info = self.api.get_account_info()
# 오늘 손익
day_pnl = info['total_asset'] - self.start_day_asset
day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
# 누적 손익 (총 입금액 대비)
# 총 입금액은 환경 변수나 별도 파일로 관리 필요
total_deposit = float(os.environ.get("TOTAL_DEPOSIT", str(self.start_day_asset)))
total_pnl = info['total_asset'] - total_deposit
total_pnl_pct = (total_pnl / total_deposit * 100) if total_deposit > 0 else 0
# 오늘 거래 통계
today_start = datetime.datetime.now().strftime('%Y-%m-%d 00:00:00')
cursor = self.db.conn.execute(
"""SELECT
COUNT(*) as cnt,
SUM(CASE WHEN profit_rate > 0 THEN 1 ELSE 0 END) as wins,
SUM(realized_pnl) as pnl
FROM trade_history
WHERE sell_date >= ?""",
(today_start,)
)
row = cursor.fetchone()
today_trades = row['cnt'] if row else 0
today_wins = row['wins'] if row else 0
today_trade_pnl = row['pnl'] if row and row['pnl'] else 0
today_win_rate = (today_wins / today_trades * 100) if today_trades > 0 else 0
# 전체 통계
stats = self.db.get_trade_stats()
msg = (
f"🌙 **[장마감 최종 리포트 15:35]**\n\n"
f"**📊 오늘 실적**\n"
f"- 시작: {self.start_day_asset:,.0f}원 → 종료: {info['total_asset']:,.0f}\n"
f"- 오늘 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)\n"
f"- 오늘 거래: {today_trades}건 (익절 {today_wins}건, 승률 {today_win_rate:.1f}%)\n"
f"- 거래 손익: {today_trade_pnl:+,.0f}\n\n"
f"**💰 누적 실적**\n"
f"- 총 입금액: {total_deposit:,.0f}\n"
f"- 현재 자산: {info['total_asset']:,.0f}\n"
f"- 누적 손익: {total_pnl:+,.0f}원 ({total_pnl_pct:+.2f}%)\n"
f"- 전체 거래: {stats['total_trades']}건 (승률 {stats['win_rate']:.1f}%)\n"
f"- 전체 손익: {stats['total_pnl']:+,.0f}"
)
logger.info(msg)
self.send_mm(msg)
self.final_report_sent = True
# 다음날을 위해 상태 저장
self._save_bot_state()
except Exception as e:
logger.error(f"❌ 최종 리포트 전송 실패: {e}")
def process_twap_orders(self):
"""TWAP 분할 매수 처리"""
if not self.use_twap or not self.executor:
return
# 현재가 정보 수집
active_orders = self.executor.get_status()
if not active_orders:
return
current_prices = {}
for code in active_orders.keys():
price = self.api.get_current_price(code)
if price:
current_prices[code] = price
# 매수 콜백 함수
def buy_callback(code, name, amount, price):
qty = int(amount / price)
if qty < 1:
return False
success = self.api.buy_market_order(code, qty)
if success:
# DB 업데이트 (분할 매수 누적)
active_trades = self.db.get_active_trades()
if code in active_trades:
# 기존 보유분 있음 -> 평단가 계산
existing = active_trades[code]
old_qty = existing['current_qty']
old_price = existing['avg_buy_price']
old_invested = existing['total_invested']
new_qty = old_qty + qty
new_invested = old_invested + (price * qty)
new_avg_price = new_invested / new_qty
self.db.upsert_trade({
'code': code,
'name': name,
'avg_buy_price': new_avg_price,
'current_qty': new_qty,
'total_invested': new_invested,
'status': 'HOLDING',
**existing # 기존 정보 유지
})
else:
# 신규 매수
self.db.upsert_trade({
'code': code,
'name': name,
'avg_buy_price': price,
'target_qty': qty, # 임시
'current_qty': qty,
'total_invested': price * qty,
'status': 'HOLDING',
'strategy': 'TAIL_CATCH_3M'
})
return True
return False
# TWAP 처리
self.executor.process_orders(current_prices, buy_callback)
def run(self):
"""메인 루프"""
logger.info("🚀 트레이딩봇 Ver2 가동 시작")
loop_count = 0
while True:
try:
loop_count += 1
now = datetime.datetime.now()
current_time = now.time()
# 1. 장 운영 시간 체크
if not self.api.check_market_status():
# 장 시간 외: 플래그 초기화
if now.weekday() < 5: # 주중
# 날짜가 바뀌었는지 확인
today_str = now.strftime("%Y%m%d")
if today_str != self.today_date:
logger.info(f"📅 날짜 변경: {self.today_date}{today_str}")
self.today_date = today_str
self.start_day_asset = 0
self.morning_report_sent = False
self.closing_report_sent = False
self.final_report_sent = False
if loop_count % 60 == 0: # 1분마다
logger.info("💤 장 운영 시간 외")
time.sleep(60)
continue
# 2. 장 중 리포트 타이밍 체크
# 오전 장 뜸할 때: 13:00
if not self.morning_report_sent and datetime.time(13, 0) <= current_time < datetime.time(13, 5):
self.send_morning_report()
# 장마감 전: 15:15
if not self.closing_report_sent and datetime.time(15, 15) <= current_time < datetime.time(15, 20):
self.send_closing_report()
# 장마감 후: 15:35 (모든 체결 완료 후)
if not self.final_report_sent and datetime.time(15, 35) <= current_time < datetime.time(15, 40):
self.send_final_report()
# 3. 계좌 정보 갱신 (5분마다)
if loop_count % 30 == 0:
self.refresh_account()
# 4. TWAP 분할 매수 처리
if self.use_twap:
self.process_twap_orders()
# 5. 보유 종목 매도 체크
self.check_sell_signals()
# 6. 새로운 매수 기회 탐색 (보유 종목이 max보다 적을 때)
active_count = len(self.db.get_active_trades())
if active_count < self.max_stocks and self.current_cash >= self.risk_mgr.min_amount:
# 실제로는 scan_ant_shaking_candidates 같은 로직 필요
# 여기서는 예시로 생략
pass
# 7. 대기
time.sleep(10)
except KeyboardInterrupt:
logger.info("⏸️ 사용자 중단")
break
except Exception as e:
logger.error(f"❌ 메인 루프 에러: {e}")
logger.info("⏳ 예외 발생 -> 5초 대기 후 재시도")
time.sleep(5)
# 종료 처리
logger.info("🛑 봇 종료 중...")
self._save_bot_state() # 최종 상태 저장
if self.db:
self.db.close()
logger.info("✅ 정상 종료 완료")
# ==========================================================
# [메인 실행]
# ==========================================================
if __name__ == "__main__":
try:
broker = BrokerAPI()
bot = TradingBotV2(broker)
bot.run()
except Exception as e:
logger.critical(f"💀 봇 실행 실패: {e}")
raise e