888 lines
34 KiB
Python
888 lines
34 KiB
Python
"""
|
||
etf_ver1.py — ETF 액티브 매매 봇 (RSI 기반 분할 매수 & 슈팅 익절)
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
역할: 테마성 ETF (원자력, 전력망 등) 의 눌림목 분할 매수와 슈팅 익절 전략
|
||
한투 API 연동, WebSocket 실시간 가격 수신, DB 기반 상태 관리
|
||
|
||
전략 개요:
|
||
- 매수: RSI 35/30/25 이하에서 3 분할 매수 (30%/30%/40%)
|
||
- 매도: 수익률 +4% 이상 또는 RSI 70 이상에서 전량 익절
|
||
- 손절: 평단가 대비 -10% 에서 전량 손절 (리스크 관리)
|
||
- 유니버스: DB(env) 에서 지정한 ETF 종목 목록
|
||
|
||
KIS API 준수:
|
||
- SafeRequest: HTTP 429 에러 대비 재시도 로직
|
||
- WebSocket: H0STCNT0 실시간 체결가 수신 (kis_ws.py)
|
||
- 메신저 알림: 텔레그램 (HTML), 매터모스트 (Markdown) 분리
|
||
- Atomic Save: 매매 발생 시 DB 즉시 저장 (재시작 안전성)
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import random
|
||
import logging
|
||
import datetime
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional
|
||
|
||
import pandas as pd
|
||
import requests
|
||
|
||
from database import TradeDB, ENV_CONFIG_KEYS
|
||
|
||
# WebSocket 실시간 체결가 캐시
|
||
try:
|
||
from kis_ws import KISWebSocketPriceCache
|
||
_KIS_WS_AVAILABLE = True
|
||
except ImportError:
|
||
_KIS_WS_AVAILABLE = False
|
||
|
||
# 로깅 설정
|
||
logging.basicConfig(
|
||
format='[%(asctime)s] %(message)s',
|
||
datefmt='%H:%M:%S',
|
||
level=logging.INFO,
|
||
)
|
||
logger = logging.getLogger("ETFActiveBot")
|
||
|
||
# DB 초기화
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db"))
|
||
|
||
# ==============================================================================
|
||
# [환경 변수 로드] DB 우선 (하드코딩 금지)
|
||
# ==============================================================================
|
||
def get_env_from_db(key, default=""):
|
||
"""DB 에서 환경변수 읽기"""
|
||
env_data = db.get_latest_env()
|
||
if env_data and env_data.get("snapshot"):
|
||
return env_data["snapshot"].get(key, default)
|
||
return default
|
||
|
||
def get_env_float(key, default):
|
||
"""환경변수를 float 로 변환 (DB 우선)"""
|
||
value = get_env_from_db(key, str(default))
|
||
if isinstance(value, str) and "#" in value:
|
||
value = value.split("#")[0].strip()
|
||
try:
|
||
return float(value) if value else default
|
||
except (ValueError, TypeError):
|
||
return default
|
||
|
||
def get_env_int(key, default):
|
||
"""환경변수를 int 로 변환 (DB 우선)"""
|
||
value = get_env_from_db(key, str(default))
|
||
if isinstance(value, str) and "#" in value:
|
||
value = value.split("#")[0].strip()
|
||
try:
|
||
return int(value) if value else default
|
||
except (ValueError, TypeError):
|
||
return default
|
||
|
||
def get_env_bool(key, default=False):
|
||
"""환경변수를 bool 로 변환 (DB 우선)"""
|
||
value = get_env_from_db(key, str(default)).lower()
|
||
return value in ("true", "1", "yes")
|
||
|
||
# ==============================================================================
|
||
# [메신저 알림] 텔레그램 & 매터모스트
|
||
# ==============================================================================
|
||
def msg_tg(token: str, chat_id: str, message: str):
|
||
"""텔레그램 메시지 전송 (HTML 방식)"""
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
payload = {
|
||
"chat_id": chat_id,
|
||
"text": message,
|
||
"parse_mode": "HTML"
|
||
}
|
||
try:
|
||
requests.post(url, data=payload, timeout=5)
|
||
except Exception as e:
|
||
logger.debug(f"[텔레그램 전송 에러] {e}")
|
||
|
||
def msg_mm(webhook_url: str, message: str):
|
||
"""매터모스트 메시지 전송 (Markdown 방식)"""
|
||
payload = {"text": message}
|
||
try:
|
||
requests.post(webhook_url, json=payload, timeout=5)
|
||
except Exception as e:
|
||
logger.debug(f"[매터모스트 전송 에러] {e}")
|
||
|
||
# ==============================================================================
|
||
# [한투 API 클라이언트] REST API 안전성 관리
|
||
# ==============================================================================
|
||
class KISAPI:
|
||
"""
|
||
한국투자증권 REST API 클라이언트
|
||
- 429 에러 대비 재시도 로직
|
||
- 모의투자/실전투자 분리
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.mock = get_env_bool("KIS_MOCK", True)
|
||
self.app_key = get_env_from_db("KIS_APP_KEY_MOCK" if self.mock else "KIS_APP_KEY_REAL", "")
|
||
self.app_secret = get_env_from_db("KIS_APP_SECRET_MOCK" if self.mock else "KIS_APP_SECRET_REAL", "")
|
||
|
||
self.base_url = (
|
||
"https://openapivts.koreainvestment.com:29443"
|
||
if self.mock
|
||
else "https://openapi.koreainvestment.com:9443"
|
||
)
|
||
|
||
# 계좌 정보
|
||
self.account_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK" if self.mock else "KIS_ACCOUNT_NO_REAL", "")
|
||
self.account_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK" if self.mock else "KIS_ACCOUNT_CODE_REAL", "")
|
||
|
||
# 액세스 토큰
|
||
self.access_token = None
|
||
self.token_expiry = 0
|
||
|
||
# WebSocket 캐시 (ETF 는 REST 만 사용, 단타용과 공유 안함)
|
||
# ETF 는 일봉 RSI 기반, 1~3 분 주기 체크로 REST 로 충분
|
||
self.ws_cache = None
|
||
# if _KIS_WS_AVAILABLE:
|
||
# try:
|
||
# self.ws_cache = KISWebSocketPriceCache(
|
||
# app_key=self.app_key,
|
||
# app_secret=self.app_secret,
|
||
# is_mock=self.mock
|
||
# )
|
||
# if self.ws_cache.start(force_cleanup=True):
|
||
# logger.info("✅ WebSocket 활성 (force_cleanup=True, 비정상 종료 대비)")
|
||
# else:
|
||
# logger.info("ℹ️ WebSocket 비활성 (모의 or 키 미설정) → REST fallback")
|
||
# except Exception as e:
|
||
# logger.warning(f"⚠️ WebSocket 초기화 예외: {e}")
|
||
# self.ws_cache = None
|
||
|
||
def get_access_token(self) -> str:
|
||
"""액세스 토큰 발급 (유효기간 23 시간)"""
|
||
now = time.time()
|
||
if self.access_token and now < self.token_expiry:
|
||
return self.access_token
|
||
|
||
try:
|
||
url = f"{self.base_url}/oauth2/token"
|
||
data = {
|
||
"grant_type": "client_credentials",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
}
|
||
response = requests.post(url, json=data, timeout=10)
|
||
data = response.json()
|
||
|
||
self.access_token = data.get("access_token")
|
||
self.token_expiry = now + 82800 # 23 시간
|
||
|
||
logger.info(f"✅ 액세스 토큰 발급 완료 (앞 8 자: {self.access_token[:8]}...)")
|
||
return self.access_token
|
||
except Exception as e:
|
||
logger.error(f"❌ 토큰 발급 실패: {e}")
|
||
raise
|
||
|
||
def request(self, method: str, url: str, headers: dict = None, data: dict = None, params: dict = None, max_retries: int = 3):
|
||
"""
|
||
HTTP 요청 (429 에러 재시도 로직 포함)
|
||
|
||
Args:
|
||
method: HTTP 메서드 (GET, POST 등)
|
||
url: 요청 URL
|
||
headers: HTTP 헤더
|
||
data: 요청 바디
|
||
params: 쿼리 파라미터
|
||
max_retries: 최대 재시도 횟수
|
||
"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = requests.request(
|
||
method=method,
|
||
url=url,
|
||
headers=headers,
|
||
json=data,
|
||
params=params,
|
||
timeout=10
|
||
)
|
||
|
||
# HTTP 429 (Too Many Requests)
|
||
if response.status_code == 429:
|
||
wait_time = (attempt + 1) * 2
|
||
logger.warning(f"⚠️ API 제한 (429) → {wait_time}초 대기 후 재시도...")
|
||
time.sleep(wait_time)
|
||
continue
|
||
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
except requests.exceptions.HTTPError as e:
|
||
logger.error(f"❌ HTTP 에러: {e}")
|
||
if attempt >= max_retries - 1:
|
||
raise
|
||
time.sleep(2)
|
||
except Exception as e:
|
||
logger.error(f"❌ 요청 실패: {e}")
|
||
if attempt >= max_retries - 1:
|
||
raise
|
||
time.sleep(2)
|
||
|
||
raise Exception("API 요청 최대 재시도 횟수 초과")
|
||
|
||
def get_stock_price(self, code: str) -> Optional[float]:
|
||
"""
|
||
주식/ETF 현재가 조회 (WebSocket 우선, fallback REST)
|
||
|
||
Args:
|
||
code: 종목코드 (6 자리)
|
||
|
||
Returns:
|
||
현재가 (float) 또는 None
|
||
"""
|
||
# WebSocket 캐시 확인
|
||
if self.ws_cache and self.ws_cache.is_active:
|
||
ws_data = self.ws_cache.get_price(code, max_age_sec=5.0)
|
||
if ws_data:
|
||
price = float(ws_data.get("stck_prpr", 0))
|
||
if price > 0:
|
||
logger.debug(f"📡 WebSocket 가격 수신: {code} → {price:,.0f}원")
|
||
return price
|
||
|
||
# REST fallback
|
||
try:
|
||
token = self.get_access_token()
|
||
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
"tr_id": "FHKST01010100",
|
||
}
|
||
params = {
|
||
"FID_COND_MRKT_DIV_CODE": "J",
|
||
"FID_INPUT_ISCD": code,
|
||
}
|
||
|
||
result = self.request("GET", url, headers=headers, params=params)
|
||
output = result.get("output", {})
|
||
price = float(output.get("stck_prpr", 0))
|
||
|
||
if price > 0:
|
||
logger.debug(f"📡 REST 가격 수신: {code} → {price:,.0f}원")
|
||
return price
|
||
except Exception as e:
|
||
logger.error(f"❌ 현재가 조회 실패 ({code}): {e}")
|
||
|
||
return None
|
||
|
||
def get_account_balance(self) -> Dict:
|
||
"""
|
||
계좌 잔고 조회
|
||
|
||
Returns:
|
||
{'cash': float, 'total_asset': float, 'holdings': {code: {'qty': int, 'avg_price': float}}}
|
||
"""
|
||
try:
|
||
token = self.get_access_token()
|
||
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/inquire-balance"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
"tr_id": "TTDO84013", # 모의투자 잔고조회
|
||
}
|
||
params = {
|
||
"CANO": self.account_no,
|
||
"ACNT_PRDT_CD": self.account_code,
|
||
"AFHR_FLPR_YN": "N",
|
||
"OFL_YN": "",
|
||
"INQR_DVSN": "01",
|
||
"UNPR_DVSN": "01",
|
||
"FUND_STTL_ICLD_YN": "N",
|
||
"FNAG_AMT_AUTO_INPT_STPL_YN": "N",
|
||
"CTX_AREA_FK100": "",
|
||
"CTX_AREA_NK100": "",
|
||
}
|
||
|
||
result = self.request("GET", url, headers=headers, params=params)
|
||
output = result.get("output", {})
|
||
|
||
cash = float(output.get("prvs_rcdl_excc_amt", 0))
|
||
total_asset = float(output.get("tot_asst_amt", 0))
|
||
|
||
holdings = {}
|
||
for item in output.get("fdtl_invest_item_lst", []):
|
||
code = item.get("pdno", "")
|
||
qty = int(item.get("hldg_qty", 0))
|
||
avg_price = float(item.get("pchs_amt", 0)) / qty if qty > 0 else 0
|
||
if code and qty > 0:
|
||
holdings[code] = {'qty': qty, 'avg_price': avg_price}
|
||
|
||
return {'cash': cash, 'total_asset': total_asset, 'holdings': holdings}
|
||
except Exception as e:
|
||
logger.error(f"❌ 잔고 조회 실패: {e}")
|
||
return {'cash': 0, 'total_asset': 0, 'holdings': {}}
|
||
|
||
def buy(self, code: str, qty: int, price: Optional[int] = None) -> bool:
|
||
"""
|
||
주식/ETF 매수 주문
|
||
|
||
Args:
|
||
code: 종목코드
|
||
qty: 매수 수량
|
||
price: 매수가 (None 이면 시장가)
|
||
|
||
Returns:
|
||
성공 시 True
|
||
"""
|
||
try:
|
||
token = self.get_access_token()
|
||
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
"tr_id": "VTTC0802U", # 모의투자 현금매수
|
||
}
|
||
|
||
# 시장가 주문 (ETF 는 호가단위 없음)
|
||
if price is None:
|
||
ord_pric = "0"
|
||
ord_dvsn = "01" # 시장가
|
||
else:
|
||
ord_pric = str(price)
|
||
ord_dvsn = "00" # 지정가
|
||
|
||
data = {
|
||
"CANO": self.account_no,
|
||
"ACNT_PRDT_CD": self.account_code,
|
||
"PDNO": code,
|
||
"ORD_DVSN": ord_dvsn,
|
||
"ORD_QTY": str(qty),
|
||
"ORD_UNPR": ord_pric,
|
||
}
|
||
|
||
result = self.request("POST", url, headers=headers, data=data)
|
||
|
||
if result.get("rt_cd") == "0":
|
||
logger.info(f"✅ 매수 주문 성공: {code} {qty}주")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ 매수 주문 실패: {result.get('msg1', '')}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"❌ 매수 주문 예외: {e}")
|
||
return False
|
||
|
||
def sell(self, code: str, qty: int, price: Optional[int] = None) -> bool:
|
||
"""
|
||
주식/ETF 매도 주문
|
||
|
||
Args:
|
||
code: 종목코드
|
||
qty: 매도 수량
|
||
price: 매도가 (None 이면 시장가)
|
||
|
||
Returns:
|
||
성공 시 True
|
||
"""
|
||
try:
|
||
token = self.get_access_token()
|
||
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"secretkey": self.app_secret,
|
||
"tr_id": "VTTC0801U", # 모의투자 현금매도
|
||
}
|
||
|
||
if price is None:
|
||
ord_pric = "0"
|
||
ord_dvsn = "01" # 시장가
|
||
else:
|
||
ord_pric = str(price)
|
||
ord_dvsn = "00" # 지정가
|
||
|
||
data = {
|
||
"CANO": self.account_no,
|
||
"ACNT_PRDT_CD": self.account_code,
|
||
"PDNO": code,
|
||
"ORD_DVSN": ord_dvsn,
|
||
"ORD_QTY": str(qty),
|
||
"ORD_UNPR": ord_pric,
|
||
}
|
||
|
||
result = self.request("POST", url, headers=headers, data=data)
|
||
|
||
if result.get("rt_cd") == "0":
|
||
logger.info(f"✅ 매도 주문 성공: {code} {qty}주")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ 매도 주문 실패: {result.get('msg1', '')}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"❌ 매도 주문 예외: {e}")
|
||
return False
|
||
|
||
|
||
# ==============================================================================
|
||
# [핵심 매매 로직] ETF 액티브 (RSI 기반 분할 매수)
|
||
# ==============================================================================
|
||
class ETFActiveTrader:
|
||
"""
|
||
ETF 액티브 매매 트레이더
|
||
|
||
사용법:
|
||
trader = ETFActiveTrader()
|
||
trader.run()
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.api = KISAPI()
|
||
self.db = db
|
||
|
||
# ETF 유니버스 (DB 에서 로드)
|
||
self.etf_universe = self._load_etf_universe()
|
||
|
||
# 활성 트레이딩 상태 (DB 에서 복원)
|
||
self.active_trades = self.db.get_active_trades(strategy_prefix="ETF")
|
||
|
||
# 메신저 설정
|
||
self.mm_token = get_env_from_db("MM_BOT_TOKEN_", "")
|
||
self.mm_channel = get_env_from_db("KIS_SHORT_MM_CHANNEL", "stock")
|
||
self.mm_server = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org")
|
||
|
||
logger.info(f"✅ ETF 액티브 트레이더 초기화 완료")
|
||
logger.info(f" 유니버스: {len(self.etf_universe)}개 종목")
|
||
logger.info(f" 활성 트레이딩: {len(self.active_trades)}개")
|
||
|
||
def _load_etf_universe(self) -> List[str]:
|
||
"""
|
||
DB 에서 ETF 유니버스 로드
|
||
|
||
Returns:
|
||
ETF 종목코드 목록
|
||
"""
|
||
# 예시: env 에서 "ETF_UNIVERSE" 키로 콤마 구분된 종목코드 로드
|
||
# 예: "069500,114800,280670" (KODEX 200, KODEX 은행, KODEX 원자력)
|
||
etf_list_str = get_env_from_db("ETF_UNIVERSE", "069500,114800,280670")
|
||
etf_codes = [code.strip() for code in etf_list_str.split(",") if code.strip()]
|
||
|
||
if not etf_codes:
|
||
logger.warning("⚠️ ETF 유니버스가 비어있습니다. 기본값을 사용합니다.")
|
||
etf_codes = ["069500", "114800", "280670"] # KODEX 200, 은행, 원자력
|
||
|
||
logger.info(f"📊 ETF 유니버스 로드: {etf_codes}")
|
||
return etf_codes
|
||
|
||
def calculate_rsi(self, prices: List[float], period: int = 14) -> Optional[float]:
|
||
"""
|
||
RSI 계산 (단순 이동평균 방식)
|
||
|
||
Args:
|
||
prices: 종가 목록 (최신순)
|
||
period: RSI 기간
|
||
|
||
Returns:
|
||
RSI 값 또는 None
|
||
"""
|
||
if len(prices) < period + 1:
|
||
return None
|
||
|
||
# 최근 N 일 사용
|
||
prices = prices[:period+1][::-1] # 오름차순 정렬
|
||
|
||
gains = []
|
||
losses = []
|
||
|
||
for i in range(1, len(prices)):
|
||
delta = prices[i] - prices[i-1]
|
||
if delta > 0:
|
||
gains.append(delta)
|
||
losses.append(0)
|
||
else:
|
||
gains.append(0)
|
||
losses.append(abs(delta))
|
||
|
||
avg_gain = sum(gains) / period
|
||
avg_loss = sum(losses) / period
|
||
|
||
if avg_loss == 0:
|
||
return 100.0
|
||
|
||
rs = avg_gain / avg_loss
|
||
rsi = 100 - (100 / (1 + rs))
|
||
|
||
return rsi
|
||
|
||
def get_daily_prices(self, code: str, days: int = 30) -> List[float]:
|
||
"""
|
||
일봉 종가 조회 (최근 N 일)
|
||
|
||
Args:
|
||
code: 종목코드
|
||
days: 일수
|
||
|
||
Returns:
|
||
종가 목록 (최신순)
|
||
"""
|
||
try:
|
||
# 한투 API 일봉 조회
|
||
token = self.api.get_access_token()
|
||
url = f"{self.api.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.api.app_key,
|
||
"secretkey": self.api.app_secret,
|
||
"tr_id": "FHKST03010100",
|
||
}
|
||
params = {
|
||
"FID_COND_MRKT_DIV_CODE": "J",
|
||
"FID_INPUT_ISCD": code,
|
||
"FID_INPUT_DATE_1": "",
|
||
"FID_INPUT_DATE_2": "",
|
||
"FID_PERIOD_DIV_CODE": "D",
|
||
"FID_ORG_ADJ_PRC": "0",
|
||
}
|
||
|
||
result = self.api.request("GET", url, headers=headers, params=params)
|
||
output = result.get("output", {})
|
||
data_list = output.get("output2", [])
|
||
|
||
prices = []
|
||
for item in data_list[:days]:
|
||
close = float(item.get("stck_clpr", 0))
|
||
if close > 0:
|
||
prices.append(close)
|
||
|
||
return prices
|
||
except Exception as e:
|
||
logger.error(f"❌ 일봉 조회 실패 ({code}): {e}")
|
||
return []
|
||
|
||
def send_notification(self, title: str, content: str):
|
||
"""메신저 알림 발송"""
|
||
# 텔레그램
|
||
if self.mm_token:
|
||
tg_msg = f"<b>[ETF 액티브]</b> {title}\n\n{content}"
|
||
msg_tg(self.mm_token, self.mm_channel, tg_msg)
|
||
|
||
# 매터모스트
|
||
mm_msg = f"**[ETF 액티브]** {title}\n\n{content}"
|
||
msg_mm(f"{self.mm_server}/hooks/{self.mm_token}", mm_msg)
|
||
|
||
def run(self):
|
||
"""메인 루프 (최적화 + 매수/매도 동기화)"""
|
||
logger.info("\n=== [ETF 액티브 매매] 메인 루프 시작 ===\n")
|
||
|
||
# [최적화] 일봉 데이터는 10 분마다 갱신 (캐시)
|
||
daily_price_cache = {}
|
||
last_price_cache_update = 0
|
||
|
||
while True:
|
||
try:
|
||
now = time.time()
|
||
|
||
# [최적화 1] 일봉 데이터 10 분마다 갱신
|
||
if now - last_price_cache_update > 600: # 600 초 = 10 분
|
||
for code in self.etf_universe + list(self.active_trades.keys()):
|
||
prices = self.get_daily_prices(code, days=30)
|
||
if prices:
|
||
daily_price_cache[code] = prices
|
||
last_price_cache_update = now
|
||
logger.debug(f"📊 일봉 데이터 갱신: {len(daily_price_cache)}종목")
|
||
|
||
# 1. 활성 트레이딩 상태 업데이트 (매도 판단)
|
||
for code in list(self.active_trades.keys()):
|
||
self._update_active_trade(code, daily_price_cache)
|
||
|
||
# 2. 매수 기회 탐색 (RSI 35 이하)
|
||
for code in self.etf_universe:
|
||
if code not in self.active_trades:
|
||
self._check_buy_opportunity(code, daily_price_cache)
|
||
|
||
# 3. 서버 부하 방지 (1~3 분 대기)
|
||
sleep_time = random.uniform(60, 180)
|
||
logger.info(f"💤 {sleep_time/60:.1f}분 대기...")
|
||
time.sleep(sleep_time)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 사용자 요청으로 매매 중단")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"❌ 메인 루프 예외: {e}")
|
||
time.sleep(60)
|
||
|
||
def _update_active_trade(self, code: str, daily_price_cache: Dict = None):
|
||
"""
|
||
활성 트레이딩 상태 업데이트 (매도 판단)
|
||
|
||
Args:
|
||
code: 종목코드
|
||
daily_price_cache: 일봉 데이터 캐시 (최적화용)
|
||
"""
|
||
trade = self.active_trades.get(code)
|
||
if not trade:
|
||
return
|
||
|
||
# 현재가 조회 (WebSocket 또는 REST)
|
||
current_price = self.api.get_stock_price(code)
|
||
if not current_price:
|
||
return
|
||
|
||
# 평균 단가
|
||
avg_price = trade.get('avg_buy_price', 0)
|
||
if avg_price <= 0:
|
||
return
|
||
|
||
# 수익률
|
||
profit_rate = (current_price - avg_price) / avg_price
|
||
|
||
# 일봉으로 RSI 계산 (캐시 사용)
|
||
prices = daily_price_cache.get(code) if daily_price_cache else None
|
||
if not prices:
|
||
prices = self.get_daily_prices(code, days=30)
|
||
rsi = self.calculate_rsi(prices) if prices else None
|
||
|
||
logger.info(f"📊 {code} 현재: {current_price:,.0f}원 | 수익률: {profit_rate*100:.2f}% | RSI: {rsi}")
|
||
|
||
# [매도 판단]
|
||
sell_reason = None
|
||
|
||
# 1. 익절: +4% 이상 또는 RSI 70 이상
|
||
if profit_rate >= 0.04:
|
||
sell_reason = "슈팅 익절 (+4%)"
|
||
elif rsi and rsi >= 70:
|
||
sell_reason = f"RSI 과열 ({rsi:.1f})"
|
||
|
||
# 2. 손절: -10% 이하
|
||
elif profit_rate <= -0.10:
|
||
sell_reason = "안전 손절 (-10%)"
|
||
|
||
# 매도 실행
|
||
if sell_reason:
|
||
self._execute_sell(code, trade, current_price, sell_reason)
|
||
|
||
def _execute_sell(self, code: str, trade: Dict, current_price: float, reason: str):
|
||
"""
|
||
매도 실행
|
||
|
||
Args:
|
||
code: 종목코드
|
||
trade: 트레이드 정보
|
||
current_price: 현재가
|
||
reason: 매도 사유
|
||
"""
|
||
qty = trade.get('qty', 0)
|
||
avg_price = trade.get('avg_buy_price', 0)
|
||
|
||
if qty <= 0:
|
||
logger.warning(f"⚠️ 매도 수량 없음: {code}")
|
||
return
|
||
|
||
# 시장가 매도
|
||
success = self.api.sell(code, qty)
|
||
|
||
if success:
|
||
# 손익 계산
|
||
profit_loss = (current_price - avg_price) * qty
|
||
|
||
# DB 에서 제거
|
||
self.db.close_trade(
|
||
code=code,
|
||
sell_price=current_price,
|
||
sell_reason=reason,
|
||
size_class=trade.get('size_class')
|
||
)
|
||
|
||
# 활성 트레이딩에서 제거
|
||
del self.active_trades[code]
|
||
|
||
logger.info(f"✅ [{code}] 매도 완료: {qty}주 @ {current_price:,.0f}원 | {reason} | 손익: {profit_loss:,.0f}원")
|
||
|
||
# 메신저 알림
|
||
self.send_notification(
|
||
f"매도 완료: {trade.get('name', code)}",
|
||
f"▪️ 종목: {code}\n"
|
||
f"▪️ 수량: {qty}주\n"
|
||
f"▪️ 가격: {current_price:,.0f}원\n"
|
||
f"▪️ 사유: {reason}\n"
|
||
f"▪️ 손익: {profit_loss:,.0f}원"
|
||
)
|
||
|
||
def _check_buy_opportunity(self, code: str, daily_price_cache: Dict = None):
|
||
"""
|
||
매수 기회 탐색 (RSI 기반 3 분할 매수)
|
||
|
||
Args:
|
||
code: 종목코드
|
||
daily_price_cache: 일봉 데이터 캐시 (최적화용)
|
||
"""
|
||
# 이미 보유 중인 종목은 추가 매수만 고려
|
||
is_additional_buy = code in self.active_trades
|
||
|
||
# 일봉으로 RSI 계산 (캐시 사용)
|
||
prices = daily_price_cache.get(code) if daily_price_cache else None
|
||
if not prices:
|
||
prices = self.get_daily_prices(code, days=30)
|
||
if not prices or len(prices) < 15:
|
||
return
|
||
|
||
rsi = self.calculate_rsi(prices)
|
||
if rsi is None:
|
||
return
|
||
|
||
current_price = self.api.get_stock_price(code)
|
||
if not current_price:
|
||
return
|
||
|
||
logger.debug(f"🔍 {code} 확인: RSI={rsi:.1f} | 가격={current_price:,.0f}원 | 보유여부: {is_additional_buy}")
|
||
|
||
# [중요] 매수 직전 계좌 잔고 확인 (즉시 동기화)
|
||
# 매수/매도 시마다 계좌와 동기화되므로 여기서 반드시 확인
|
||
balance = self.api.get_account_balance()
|
||
cash = balance.get('cash', 0)
|
||
|
||
if cash < current_price * 10: # 최소 10 주 매수 가능 현금
|
||
logger.warning(f"⚠️ 현금 부족: {cash:,.0f}원")
|
||
return
|
||
|
||
# [1 차 매수] RSI 35 이하 && 보유 없음
|
||
if not is_additional_buy and rsi < 35:
|
||
target_amount = cash * 0.3 # 1 차 매수 금액 (자본금의 30%)
|
||
target_qty = int(target_amount / current_price)
|
||
|
||
if target_qty < 10:
|
||
logger.warning(f"⚠️ 매수 수량 부족: {target_qty}주")
|
||
return
|
||
|
||
# 시장가 매수
|
||
success = self.api.buy(code, target_qty)
|
||
|
||
if success:
|
||
# DB 에 등록 (평단가 = 현재가)
|
||
trade_data = {
|
||
'code': code,
|
||
'name': f"ETF-{code}",
|
||
'strategy': 'ETF_ACTIVE',
|
||
'avg_buy_price': current_price, # 1 차는 현재가가 평단가
|
||
'current_price': current_price,
|
||
'target_qty': target_qty * 3, # 3 분할 목표
|
||
'current_qty': target_qty,
|
||
'total_invested': current_price * target_qty,
|
||
'status': 'BUYING',
|
||
'size_class': 'MID',
|
||
'rsi': rsi,
|
||
}
|
||
|
||
self.db.upsert_trade(trade_data)
|
||
self.active_trades[code] = trade_data
|
||
|
||
logger.info(f"✅ [{code}] 1 차 매수: {target_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f}")
|
||
|
||
# 메신저 알림
|
||
self.send_notification(
|
||
f"1 차 매수: {trade_data['name']}",
|
||
f"▪️ 종목: {code}\n"
|
||
f"▪️ 수량: {target_qty}주\n"
|
||
f"▪️ 가격: {current_price:,.0f}원\n"
|
||
f"▪️ RSI: {rsi:.1f}\n"
|
||
f"▪️ 현금: {cash:,.0f}원"
|
||
)
|
||
|
||
# [2 차, 3 차 매수] 보유 중인 종목 && RSI 추가 하락
|
||
elif is_additional_buy:
|
||
trade = self.active_trades[code]
|
||
current_qty = trade.get('current_qty', 0)
|
||
target_qty = trade.get('target_qty', 0)
|
||
avg_price = trade.get('avg_buy_price', 0)
|
||
|
||
# 2 차 매수: RSI 30 이하 && 1 차 수량보다 적을 때
|
||
if rsi < 30 and current_qty < target_qty * 0.6:
|
||
add_amount = cash * 0.3 # 2 차 매수 금액 (자본금의 30%)
|
||
add_qty = int(add_amount / current_price)
|
||
|
||
if add_qty < 10:
|
||
return
|
||
|
||
success = self.api.buy(code, add_qty)
|
||
|
||
if success:
|
||
# 평단가 재계산: (기존 투자금 + 추가 투자금) / (기존 수량 + 추가 수량)
|
||
new_total_invested = (avg_price * current_qty) + (current_price * add_qty)
|
||
new_total_qty = current_qty + add_qty
|
||
new_avg_price = new_total_invested / new_total_qty if new_total_qty > 0 else 0
|
||
|
||
# DB 업데이트
|
||
trade['avg_buy_price'] = new_avg_price
|
||
trade['current_qty'] = new_total_qty
|
||
trade['total_invested'] = new_total_invested
|
||
trade['rsi'] = rsi
|
||
|
||
self.db.upsert_trade(trade)
|
||
self.active_trades[code] = trade
|
||
|
||
logger.info(f"✅ [{code}] 2 차 매수: {add_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f} | 평단가: {new_avg_price:,.0f}원")
|
||
|
||
# 메신저 알림
|
||
self.send_notification(
|
||
f"2 차 매수 (물타기): {trade['name']}",
|
||
f"▪️ 종목: {code}\n"
|
||
f"▪️ 추가수량: {add_qty}주\n"
|
||
f"▪️ 총수량: {new_total_qty}주\n"
|
||
f"▪️ 현재가: {current_price:,.0f}원\n"
|
||
f"▪️ 평단가: {new_avg_price:,.0f}원 (기존: {avg_price:,.0f}원)\n"
|
||
f"▪️ RSI: {rsi:.1f}"
|
||
)
|
||
|
||
# 3 차 매수: RSI 25 이하 && 2 차 수량보다 적을 때
|
||
elif rsi < 25 and current_qty < target_qty * 0.9:
|
||
add_amount = cash * 0.4 # 3 차 매수 금액 (자본금의 40%)
|
||
add_qty = int(add_amount / current_price)
|
||
|
||
if add_qty < 10:
|
||
return
|
||
|
||
success = self.api.buy(code, add_qty)
|
||
|
||
if success:
|
||
# 평단가 재계산
|
||
new_total_invested = (avg_price * current_qty) + (current_price * add_qty)
|
||
new_total_qty = current_qty + add_qty
|
||
new_avg_price = new_total_invested / new_total_qty if new_total_qty > 0 else 0
|
||
|
||
# DB 업데이트
|
||
trade['avg_buy_price'] = new_avg_price
|
||
trade['current_qty'] = new_total_qty
|
||
trade['total_invested'] = new_total_invested
|
||
trade['status'] = 'HOLDING' # 3 차까지 완료
|
||
trade['rsi'] = rsi
|
||
|
||
self.db.upsert_trade(trade)
|
||
self.active_trades[code] = trade
|
||
|
||
logger.info(f"✅ [{code}] 3 차 매수: {add_qty}주 @ {current_price:,.0f}원 | RSI={rsi:.1f} | 평단가: {new_avg_price:,.0f}원")
|
||
|
||
# 메신저 알림
|
||
self.send_notification(
|
||
f"3 차 매수 (풀매수): {trade['name']}",
|
||
f"▪️ 종목: {code}\n"
|
||
f"▪️ 추가수량: {add_qty}주\n"
|
||
f"▪️ 총수량: {new_total_qty}주\n"
|
||
f"▪️ 현재가: {current_price:,.0f}원\n"
|
||
f"▪️ 평단가: {new_avg_price:,.0f}원\n"
|
||
f"▪️ RSI: {rsi:.1f}\n"
|
||
f"▪️ 3 분할 완료!"
|
||
)
|
||
|
||
|
||
# ==============================================================================
|
||
# [실행부]
|
||
# ==============================================================================
|
||
if __name__ == "__main__":
|
||
trader = ETFActiveTrader()
|
||
trader.run()
|