""" etf_ver1.py — ETF 액티브 매매 봇 (RSI 기반 분할 매수 & 슈팅 익절) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 역할: 테마성 ETF (원자력, 전력망 등) 의 눌림목 분할 매수와 슈팅 익절 전략 한투 API 연동, WebSocket 실시간 가격 수신, DB 기반 상태 관리 전략 개요: - 매수: RSI 35/30/25 이하에서 3 분할 매수 (30%/30%/40%) - 매도: 수익률 +4% 이상 또는 RSI 70 이상에서 전량 익절 - 손절: 평단가 대비 -10% 에서 전량 손절 (리스크 관리) - 유니버스: DB(env) 에서 지정한 ETF 종목 목록 KIS API 준수: - SafeRequest: HTTP 429 에러 대비 재시도 로직 - WebSocket: H0STCNT0 실시간 체결가 수신 (kis_ws.py) - 메신저 알림: 텔레그램 (HTML), 매터모스트 (Markdown) 분리 - Atomic Save: 매매 발생 시 DB 즉시 저장 (재시작 안전성) """ import os import json import time import random import logging import datetime from pathlib import Path from typing import List, Dict, Optional import pandas as pd import requests from database import TradeDB, ENV_CONFIG_KEYS # WebSocket 실시간 체결가 캐시 try: from kis_ws import KISWebSocketPriceCache _KIS_WS_AVAILABLE = True except ImportError: _KIS_WS_AVAILABLE = False # 로깅 설정 logging.basicConfig( format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO, ) logger = logging.getLogger("ETFActiveBot") # DB 초기화 SCRIPT_DIR = Path(__file__).resolve().parent db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db")) # ============================================================================== # [환경 변수 로드] DB 우선 (하드코딩 금지) # ============================================================================== def get_env_from_db(key, default=""): """DB 에서 환경변수 읽기""" env_data = db.get_latest_env() if env_data and env_data.get("snapshot"): return env_data["snapshot"].get(key, default) return default def get_env_float(key, default): """환경변수를 float 로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return float(value) if value else default except (ValueError, TypeError): return default def get_env_int(key, default): """환경변수를 int 로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return int(value) if value else default except (ValueError, TypeError): return default def get_env_bool(key, default=False): """환경변수를 bool 로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)).lower() return value in ("true", "1", "yes") # ============================================================================== # [메신저 알림] 텔레그램 & 매터모스트 # ============================================================================== def msg_tg(token: str, chat_id: str, message: str): """텔레그램 메시지 전송 (HTML 방식)""" url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": message, "parse_mode": "HTML" } try: requests.post(url, data=payload, timeout=5) except Exception as e: logger.debug(f"[텔레그램 전송 에러] {e}") def msg_mm(webhook_url: str, message: str): """매터모스트 메시지 전송 (Markdown 방식)""" payload = {"text": message} try: requests.post(webhook_url, json=payload, timeout=5) except Exception as e: logger.debug(f"[매터모스트 전송 에러] {e}") # ============================================================================== # [한투 API 클라이언트] REST API 안전성 관리 # ============================================================================== class KISAPI: """ 한국투자증권 REST API 클라이언트 - 429 에러 대비 재시도 로직 - 모의투자/실전투자 분리 """ def __init__(self): self.mock = get_env_bool("KIS_MOCK", True) self.app_key = get_env_from_db("KIS_APP_KEY_MOCK" if self.mock else "KIS_APP_KEY_REAL", "") self.app_secret = get_env_from_db("KIS_APP_SECRET_MOCK" if self.mock else "KIS_APP_SECRET_REAL", "") self.base_url = ( "https://openapivts.koreainvestment.com:29443" if self.mock else "https://openapi.koreainvestment.com:9443" ) # 계좌 정보 self.account_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK" if self.mock else "KIS_ACCOUNT_NO_REAL", "") self.account_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK" if self.mock else "KIS_ACCOUNT_CODE_REAL", "") # 액세스 토큰 self.access_token = None self.token_expiry = 0 # WebSocket 캐시 (ETF 는 REST 만 사용, 단타용과 공유 안함) # ETF 는 일봉 RSI 기반, 1~3 분 주기 체크로 REST 로 충분 self.ws_cache = None # if _KIS_WS_AVAILABLE: # try: # self.ws_cache = KISWebSocketPriceCache( # app_key=self.app_key, # app_secret=self.app_secret, # is_mock=self.mock # ) # if self.ws_cache.start(force_cleanup=True): # logger.info("✅ WebSocket 활성 (force_cleanup=True, 비정상 종료 대비)") # else: # logger.info("ℹ️ WebSocket 비활성 (모의 or 키 미설정) → REST fallback") # except Exception as e: # logger.warning(f"⚠️ WebSocket 초기화 예외: {e}") # self.ws_cache = None def get_access_token(self) -> str: """액세스 토큰 발급 (유효기간 23 시간)""" now = time.time() if self.access_token and now < self.token_expiry: return self.access_token try: url = f"{self.base_url}/oauth2/token" data = { "grant_type": "client_credentials", "appkey": self.app_key, "secretkey": self.app_secret, } response = requests.post(url, json=data, timeout=10) data = response.json() self.access_token = data.get("access_token") self.token_expiry = now + 82800 # 23 시간 logger.info(f"✅ 액세스 토큰 발급 완료 (앞 8 자: {self.access_token[:8]}...)") return self.access_token except Exception as e: logger.error(f"❌ 토큰 발급 실패: {e}") raise def request(self, method: str, url: str, headers: dict = None, data: dict = None, params: dict = None, max_retries: int = 3): """ HTTP 요청 (429 에러 재시도 로직 포함) Args: method: HTTP 메서드 (GET, POST 등) url: 요청 URL headers: HTTP 헤더 data: 요청 바디 params: 쿼리 파라미터 max_retries: 최대 재시도 횟수 """ for attempt in range(max_retries): try: response = requests.request( method=method, url=url, headers=headers, json=data, params=params, timeout=10 ) # HTTP 429 (Too Many Requests) if response.status_code == 429: wait_time = (attempt + 1) * 2 logger.warning(f"⚠️ API 제한 (429) → {wait_time}초 대기 후 재시도...") time.sleep(wait_time) continue response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: logger.error(f"❌ HTTP 에러: {e}") if attempt >= max_retries - 1: raise time.sleep(2) except Exception as e: logger.error(f"❌ 요청 실패: {e}") if attempt >= max_retries - 1: raise time.sleep(2) raise Exception("API 요청 최대 재시도 횟수 초과") def get_stock_price(self, code: str) -> Optional[float]: """ 주식/ETF 현재가 조회 (WebSocket 우선, fallback REST) Args: code: 종목코드 (6 자리) Returns: 현재가 (float) 또는 None """ # WebSocket 캐시 확인 if self.ws_cache and self.ws_cache.is_active: ws_data = self.ws_cache.get_price(code, max_age_sec=5.0) if ws_data: price = float(ws_data.get("stck_prpr", 0)) if price > 0: logger.debug(f"📡 WebSocket 가격 수신: {code} → {price:,.0f}원") return price # REST fallback try: token = self.get_access_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price" headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "secretkey": self.app_secret, "tr_id": "FHKST01010100", } params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, } result = self.request("GET", url, headers=headers, params=params) output = result.get("output", {}) price = float(output.get("stck_prpr", 0)) if price > 0: logger.debug(f"📡 REST 가격 수신: {code} → {price:,.0f}원") return price except Exception as e: logger.error(f"❌ 현재가 조회 실패 ({code}): {e}") return None def get_account_balance(self) -> Dict: """ 계좌 잔고 조회 Returns: {'cash': float, 'total_asset': float, 'holdings': {code: {'qty': int, 'avg_price': float}}} """ try: token = self.get_access_token() url = f"{self.base_url}/uapi/domestic-stock/v1/trading/inquire-balance" headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "secretkey": self.app_secret, "tr_id": "TTDO84013", # 모의투자 잔고조회 } params = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "01", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNAG_AMT_AUTO_INPT_STPL_YN": "N", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", } result = self.request("GET", url, headers=headers, params=params) output = result.get("output", {}) cash = float(output.get("prvs_rcdl_excc_amt", 0)) total_asset = float(output.get("tot_asst_amt", 0)) holdings = {} for item in output.get("fdtl_invest_item_lst", []): code = item.get("pdno", "") qty = int(item.get("hldg_qty", 0)) avg_price = float(item.get("pchs_amt", 0)) / qty if qty > 0 else 0 if code and qty > 0: holdings[code] = {'qty': qty, 'avg_price': avg_price} return {'cash': cash, 'total_asset': total_asset, 'holdings': holdings} except Exception as e: logger.error(f"❌ 잔고 조회 실패: {e}") return {'cash': 0, 'total_asset': 0, 'holdings': {}} def buy(self, code: str, qty: int, price: Optional[int] = None) -> bool: """ 주식/ETF 매수 주문 Args: code: 종목코드 qty: 매수 수량 price: 매수가 (None 이면 시장가) Returns: 성공 시 True """ try: token = self.get_access_token() url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash" headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "secretkey": self.app_secret, "tr_id": "VTTC0802U", # 모의투자 현금매수 } # 시장가 주문 (ETF 는 호가단위 없음) if price is None: ord_pric = "0" ord_dvsn = "01" # 시장가 else: ord_pric = str(price) ord_dvsn = "00" # 지정가 data = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "PDNO": code, "ORD_DVSN": ord_dvsn, "ORD_QTY": str(qty), "ORD_UNPR": ord_pric, } result = self.request("POST", url, headers=headers, data=data) if result.get("rt_cd") == "0": logger.info(f"✅ 매수 주문 성공: {code} {qty}주") return True else: logger.error(f"❌ 매수 주문 실패: {result.get('msg1', '')}") return False except Exception as e: logger.error(f"❌ 매수 주문 예외: {e}") return False def sell(self, code: str, qty: int, price: Optional[int] = None) -> bool: """ 주식/ETF 매도 주문 Args: code: 종목코드 qty: 매도 수량 price: 매도가 (None 이면 시장가) Returns: 성공 시 True """ try: token = self.get_access_token() url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash" headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.app_key, "secretkey": self.app_secret, "tr_id": "VTTC0801U", # 모의투자 현금매도 } if price is None: ord_pric = "0" ord_dvsn = "01" # 시장가 else: ord_pric = str(price) ord_dvsn = "00" # 지정가 data = { "CANO": self.account_no, "ACNT_PRDT_CD": self.account_code, "PDNO": code, "ORD_DVSN": ord_dvsn, "ORD_QTY": str(qty), "ORD_UNPR": ord_pric, } result = self.request("POST", url, headers=headers, data=data) if result.get("rt_cd") == "0": logger.info(f"✅ 매도 주문 성공: {code} {qty}주") return True else: logger.error(f"❌ 매도 주문 실패: {result.get('msg1', '')}") return False except Exception as e: logger.error(f"❌ 매도 주문 예외: {e}") return False # ============================================================================== # [핵심 매매 로직] ETF 액티브 (RSI 기반 분할 매수) # ============================================================================== class ETFActiveTrader: """ ETF 액티브 매매 트레이더 사용법: trader = ETFActiveTrader() trader.run() """ def __init__(self): self.api = KISAPI() self.db = db # ETF 유니버스 (DB 에서 로드) self.etf_universe = self._load_etf_universe() # 활성 트레이딩 상태 (DB 에서 복원) self.active_trades = self.db.get_active_trades(strategy_prefix="ETF") # 메신저 설정 self.mm_token = get_env_from_db("MM_BOT_TOKEN_", "") self.mm_channel = get_env_from_db("KIS_SHORT_MM_CHANNEL", "stock") self.mm_server = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org") logger.info(f"✅ ETF 액티브 트레이더 초기화 완료") logger.info(f" 유니버스: {len(self.etf_universe)}개 종목") logger.info(f" 활성 트레이딩: {len(self.active_trades)}개") def _load_etf_universe(self) -> List[str]: """ DB 에서 ETF 유니버스 로드 Returns: ETF 종목코드 목록 """ # 예시: env 에서 "ETF_UNIVERSE" 키로 콤마 구분된 종목코드 로드 # 예: "069500,114800,280670" (KODEX 200, KODEX 은행, KODEX 원자력) etf_list_str = get_env_from_db("ETF_UNIVERSE", "069500,114800,280670") etf_codes = [code.strip() for code in etf_list_str.split(",") if code.strip()] if not etf_codes: logger.warning("⚠️ ETF 유니버스가 비어있습니다. 기본값을 사용합니다.") etf_codes = ["069500", "114800", "280670"] # KODEX 200, 은행, 원자력 logger.info(f"📊 ETF 유니버스 로드: {etf_codes}") return etf_codes def calculate_rsi(self, prices: List[float], period: int = 14) -> Optional[float]: """ RSI 계산 (단순 이동평균 방식) Args: prices: 종가 목록 (최신순) period: RSI 기간 Returns: RSI 값 또는 None """ if len(prices) < period + 1: return None # 최근 N 일 사용 prices = prices[:period+1][::-1] # 오름차순 정렬 gains = [] losses = [] for i in range(1, len(prices)): delta = prices[i] - prices[i-1] if delta > 0: gains.append(delta) losses.append(0) else: gains.append(0) losses.append(abs(delta)) avg_gain = sum(gains) / period avg_loss = sum(losses) / period if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) return rsi def get_daily_prices(self, code: str, days: int = 30) -> List[float]: """ 일봉 종가 조회 (최근 N 일) Args: code: 종목코드 days: 일수 Returns: 종가 목록 (최신순) """ try: # 한투 API 일봉 조회 token = self.api.get_access_token() url = f"{self.api.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" headers = { "Content-Type": "application/json", "authorization": f"Bearer {token}", "appkey": self.api.app_key, "secretkey": self.api.app_secret, "tr_id": "FHKST03010100", } params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": "", "FID_INPUT_DATE_2": "", "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "0", } result = self.api.request("GET", url, headers=headers, params=params) output = result.get("output", {}) data_list = output.get("output2", []) prices = [] for item in data_list[:days]: close = float(item.get("stck_clpr", 0)) if close > 0: prices.append(close) return prices except Exception as e: logger.error(f"❌ 일봉 조회 실패 ({code}): {e}") return [] def send_notification(self, title: str, content: str): """메신저 알림 발송""" # 텔레그램 if self.mm_token: tg_msg = f"[ETF 액티브] {title}\n\n{content}" msg_tg(self.mm_token, self.mm_channel, tg_msg) # 매터모스트 mm_msg = f"**[ETF 액티브]** {title}\n\n{content}" msg_mm(f"{self.mm_server}/hooks/{self.mm_token}", mm_msg) def run(self): """메인 루프 (최적화 + 매수/매도 동기화)""" logger.info("\n=== [ETF 액티브 매매] 메인 루프 시작 ===\n") # [최적화] 일봉 데이터는 10 분마다 갱신 (캐시) daily_price_cache = {} last_price_cache_update = 0 while True: try: now = time.time() # [최적화 1] 일봉 데이터 10 분마다 갱신 if now - last_price_cache_update > 600: # 600 초 = 10 분 for code in self.etf_universe + list(self.active_trades.keys()): prices = self.get_daily_prices(code, days=30) if prices: daily_price_cache[code] = prices last_price_cache_update = now logger.debug(f"📊 일봉 데이터 갱신: {len(daily_price_cache)}종목") # 1. 활성 트레이딩 상태 업데이트 (매도 판단) for code in list(self.active_trades.keys()): self._update_active_trade(code, daily_price_cache) # 2. 매수 기회 탐색 (RSI 35 이하) for code in self.etf_universe: if code not in self.active_trades: self._check_buy_opportunity(code, daily_price_cache) # 3. 서버 부하 방지 (1~3 분 대기) sleep_time = random.uniform(60, 180) logger.info(f"💤 {sleep_time/60:.1f}분 대기...") time.sleep(sleep_time) except KeyboardInterrupt: logger.info("🛑 사용자 요청으로 매매 중단") break except Exception as e: logger.error(f"❌ 메인 루프 예외: {e}") time.sleep(60) def _update_active_trade(self, code: str, daily_price_cache: Dict = None): """ 활성 트레이딩 상태 업데이트 (매도 판단) Args: code: 종목코드 daily_price_cache: 일봉 데이터 캐시 (최적화용) """ trade = self.active_trades.get(code) if not trade: return # 현재가 조회 (WebSocket 또는 REST) current_price = self.api.get_stock_price(code) if not current_price: return # 평균 단가 avg_price = trade.get('avg_buy_price', 0) if avg_price <= 0: return # 수익률 profit_rate = (current_price - avg_price) / avg_price # 일봉으로 RSI 계산 (캐시 사용) prices = daily_price_cache.get(code) if daily_price_cache else None if not prices: prices = self.get_daily_prices(code, days=30) rsi = self.calculate_rsi(prices) if prices else None logger.info(f"📊 {code} 현재: {current_price:,.0f}원 | 수익률: {profit_rate*100:.2f}% | RSI: {rsi}") # [매도 판단] sell_reason = None # 1. 익절: +4% 이상 또는 RSI 70 이상 if profit_rate >= 0.04: sell_reason = "슈팅 익절 (+4%)" elif rsi and rsi >= 70: sell_reason = f"RSI 과열 ({rsi:.1f})" # 2. 손절: -10% 이하 elif profit_rate <= -0.10: sell_reason = "안전 손절 (-10%)" # 매도 실행 if sell_reason: self._execute_sell(code, trade, current_price, sell_reason) def _execute_sell(self, code: str, trade: Dict, current_price: float, reason: str): """ 매도 실행 Args: code: 종목코드 trade: 트레이드 정보 current_price: 현재가 reason: 매도 사유 """ qty = trade.get('qty', 0) avg_price = trade.get('avg_buy_price', 0) if qty <= 0: logger.warning(f"⚠️ 매도 수량 없음: {code}") return # 시장가 매도 success = self.api.sell(code, qty) if success: # 손익 계산 profit_loss = (current_price - avg_price) * qty # DB 에서 제거 self.db.close_trade( code=code, sell_price=current_price, sell_reason=reason, size_class=trade.get('size_class') ) # 활성 트레이딩에서 제거 del self.active_trades[code] logger.info(f"✅ [{code}] 매도 완료: {qty}주 @ {current_price:,.0f}원 | {reason} | 손익: {profit_loss:,.0f}원") # 메신저 알림 self.send_notification( f"매도 완료: {trade.get('name', code)}", f"▪️ 종목: {code}\n" f"▪️ 수량: {qty}주\n" f"▪️ 가격: {current_price:,.0f}원\n" f"▪️ 사유: {reason}\n" f"▪️ 손익: {profit_loss:,.0f}원" ) def _check_buy_opportunity(self, code: str, daily_price_cache: Dict = None): """ 매수 기회 탐색 (RSI 기반 3 분할 매수) Args: code: 종목코드 daily_price_cache: 일봉 데이터 캐시 (최적화용) """ # 이미 보유 중인 종목은 추가 매수만 고려 is_additional_buy = code in self.active_trades # 일봉으로 RSI 계산 (캐시 사용) prices = daily_price_cache.get(code) if daily_price_cache else None if not prices: prices = self.get_daily_prices(code, days=30) if not prices or len(prices) < 15: return rsi = self.calculate_rsi(prices) if rsi is None: return current_price = self.api.get_stock_price(code) if not current_price: return logger.debug(f"🔍 {code} 확인: RSI={rsi:.1f} | 가격={current_price:,.0f}원 | 보유여부: {is_additional_buy}") # [중요] 매수 직전 계좌 잔고 확인 (즉시 동기화) # 매수/매도 시마다 계좌와 동기화되므로 여기서 반드시 확인 balance = self.api.get_account_balance() cash = balance.get('cash', 0) if cash < current_price * 10: # 최소 10 주 매수 가능 현금 logger.warning(f"⚠️ 현금 부족: {cash:,.0f}원") return # [1 차 매수] RSI 35 이하 && 보유 없음 if not is_additional_buy and rsi < 35: target_amount = cash * 0.3 # 1 차 매수 금액 (자본금의 30%) target_qty = int(target_amount / current_price) if target_qty < 10: logger.warning(f"⚠️ 매수 수량 부족: {target_qty}주") return # 시장가 매수 success = self.api.buy(code, target_qty) if success: # DB 에 등록 (평단가 = 현재가) trade_data = { 'code': code, 'name': f"ETF-{code}", 'strategy': 'ETF_ACTIVE', 'avg_buy_price': current_price, # 1 차는 현재가가 평단가 'current_price': current_price, 'target_qty': target_qty * 3, # 3 분할 목표 'current_qty': target_qty, 'total_invested': current_price * target_qty, 'status': 'BUYING', 'size_class': 'MID', 'rsi': rsi, } self.db.upsert_trade(trade_data) self.active_trades[code] = trade_data logger.info(f"✅ [{code}] 1 차 매수: {target_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f}") # 메신저 알림 self.send_notification( f"1 차 매수: {trade_data['name']}", f"▪️ 종목: {code}\n" f"▪️ 수량: {target_qty}주\n" f"▪️ 가격: {current_price:,.0f}원\n" f"▪️ RSI: {rsi:.1f}\n" f"▪️ 현금: {cash:,.0f}원" ) # [2 차, 3 차 매수] 보유 중인 종목 && RSI 추가 하락 elif is_additional_buy: trade = self.active_trades[code] current_qty = trade.get('current_qty', 0) target_qty = trade.get('target_qty', 0) avg_price = trade.get('avg_buy_price', 0) # 2 차 매수: RSI 30 이하 && 1 차 수량보다 적을 때 if rsi < 30 and current_qty < target_qty * 0.6: add_amount = cash * 0.3 # 2 차 매수 금액 (자본금의 30%) add_qty = int(add_amount / current_price) if add_qty < 10: return success = self.api.buy(code, add_qty) if success: # 평단가 재계산: (기존 투자금 + 추가 투자금) / (기존 수량 + 추가 수량) new_total_invested = (avg_price * current_qty) + (current_price * add_qty) new_total_qty = current_qty + add_qty new_avg_price = new_total_invested / new_total_qty if new_total_qty > 0 else 0 # DB 업데이트 trade['avg_buy_price'] = new_avg_price trade['current_qty'] = new_total_qty trade['total_invested'] = new_total_invested trade['rsi'] = rsi self.db.upsert_trade(trade) self.active_trades[code] = trade logger.info(f"✅ [{code}] 2 차 매수: {add_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f} | 평단가: {new_avg_price:,.0f}원") # 메신저 알림 self.send_notification( f"2 차 매수 (물타기): {trade['name']}", f"▪️ 종목: {code}\n" f"▪️ 추가수량: {add_qty}주\n" f"▪️ 총수량: {new_total_qty}주\n" f"▪️ 현재가: {current_price:,.0f}원\n" f"▪️ 평단가: {new_avg_price:,.0f}원 (기존: {avg_price:,.0f}원)\n" f"▪️ RSI: {rsi:.1f}" ) # 3 차 매수: RSI 25 이하 && 2 차 수량보다 적을 때 elif rsi < 25 and current_qty < target_qty * 0.9: add_amount = cash * 0.4 # 3 차 매수 금액 (자본금의 40%) add_qty = int(add_amount / current_price) if add_qty < 10: return success = self.api.buy(code, add_qty) if success: # 평단가 재계산 new_total_invested = (avg_price * current_qty) + (current_price * add_qty) new_total_qty = current_qty + add_qty new_avg_price = new_total_invested / new_total_qty if new_total_qty > 0 else 0 # DB 업데이트 trade['avg_buy_price'] = new_avg_price trade['current_qty'] = new_total_qty trade['total_invested'] = new_total_invested trade['status'] = 'HOLDING' # 3 차까지 완료 trade['rsi'] = rsi self.db.upsert_trade(trade) self.active_trades[code] = trade logger.info(f"✅ [{code}] 3 차 매수: {add_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f} | 평단가: {new_avg_price:,.0f}원") # 메신저 알림 self.send_notification( f"3 차 매수 (풀매수): {trade['name']}", f"▪️ 종목: {code}\n" f"▪️ 추가수량: {add_qty}주\n" f"▪️ 총수량: {new_total_qty}주\n" f"▪️ 현재가: {current_price:,.0f}원\n" f"▪️ 평단가: {new_avg_price:,.0f}원\n" f"▪️ RSI: {rsi:.1f}\n" f"▪️ 3 분할 완료!" ) # ============================================================================== # [실행부] # ============================================================================== if __name__ == "__main__": trader = ETFActiveTrader() trader.run()