2187 lines
98 KiB
Python
2187 lines
98 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
holding_bot.py — 장기 홀딩 전략 봇 (추세추종 + 눌림목 분할매수)
|
||
=======================================================================
|
||
전략 개요:
|
||
[추세 상승: MA20 > MA60]
|
||
매수: RSI 눌림목(rsi_buy1/2) 시 분할 매수 — 기준 완화(추세장 특성)
|
||
매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락) OR RSI 과열(rsi_sell)
|
||
OR 데드크로스(MA20 < MA60) 발생 시 추세 반전 청산
|
||
|
||
[추세 하락/횡보: MA20 ≤ MA60] (trend_filter=0 이면 전 구간 적용)
|
||
매수: 기존 RSI 3단계 역추세 분할매수 (rsi_buy1/2/3)
|
||
매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct)
|
||
|
||
봉: 일봉(D) 기준, KIS API로 최대 100봉씩 페이지네이션 수집
|
||
|
||
DB 테이블:
|
||
holding_candles — 종목별 일봉 OHLCV (별도 관리)
|
||
holding_stock_config — 종목별 파라미터 (code별 최신 INSERT 방식)
|
||
|
||
실행:
|
||
python3 holding_bot.py
|
||
"""
|
||
|
||
import os, sys, time, json, logging, random
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional, Tuple
|
||
|
||
import requests
|
||
|
||
ROOT = Path(__file__).resolve().parent
|
||
sys.path.insert(0, str(ROOT))
|
||
|
||
from database import TradeDB
|
||
|
||
logging.basicConfig(
|
||
format="[%(asctime)s] %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
level=logging.INFO,
|
||
)
|
||
logger = logging.getLogger("HoldingBot")
|
||
logging.getLogger("TradeDB").setLevel(logging.WARNING)
|
||
|
||
# 관심종목 파일
|
||
WATCHLIST_PATH = ROOT / "long_term_watchlist.json"
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 기본 파라미터 (종목별 DB에 없으면 이 값 사용)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
DEFAULT_STOCK_CONFIG = {
|
||
"rsi_period": 14, # RSI 계산 기간
|
||
"rsi_buy1": 55.0, # 1차 매수 RSI ≤ (추세장: 50~55 / 횡보장: 45 이하)
|
||
"rsi_buy2": 45.0, # 2차 매수 RSI ≤
|
||
"rsi_buy3": 35.0, # 3차 매수 RSI ≤ (횡보/하락장 전용)
|
||
"rsi_sell": 75.0, # RSI 과열 매도 기준 (추세장에선 80 수준으로 올려 조기매도 방지)
|
||
"take_profit_pct": 15.0, # 수익률 익절 기준 (%) — 추세장에선 크게 잡아야 탈 안 남
|
||
"stop_loss_pct": 10.0, # 손절 기준 (%) — 양수로 저장, 음수 방향 적용
|
||
"buy1_ratio": 50.0, # 1차 투자 비중 (%) — 추세장엔 처음부터 크게
|
||
"buy2_ratio": 50.0, # 2차 투자 비중 (%)
|
||
"buy3_ratio": 0.0, # 3차 투자 비중 (%) — 추세장에선 0 (자금 보류 안 함)
|
||
"slot_money": 3_000_000, # 종목당 총 투자금 (원)
|
||
"atr_period": 14, # ATR 계산 기간 (True Range의 지수이동평균)
|
||
# ── 샹들리에 엑시트 (Chandelier Exit) ─────────────────────────────
|
||
# 추세장 청산에서 사용: 고점 − ATR × atr_mult → 변동성 기반 동적 방어선
|
||
# mult=2: 타이트(빠른 청산) / mult=3: 기본 / mult=4: 여유롭게(더 오래 홀딩)
|
||
"atr_mult": 3.0, # 샹들리에 엑시트 배수 (ATR_MULT)
|
||
# ── 추세추종 신규 파라미터 ──────────────────────────────────────
|
||
# MA 골든/데드크로스 기반 추세 판단
|
||
"ma_fast": 20.0, # 단기 이동평균 기간 (일봉 기준 약 1달)
|
||
"ma_slow": 60.0, # 장기 이동평균 기간 (일봉 기준 약 3달)
|
||
# 트레일링 스탑: 포지션 보유 중 고점 대비 이 비율 이상 하락 시 청산 (0=비활성)
|
||
# 예) 8.0 → 고점 대비 -8% 하락하면 추세 이탈로 판단해 청산
|
||
"trail_stop_pct": 8.0,
|
||
# 추세 필터 사용 여부 (1=사용, 0=미사용 → 기존 역추세 방식으로 고정)
|
||
"trend_filter": 1.0,
|
||
# ── 컨텍스트 낙폭 필터 (0=비활성화) ──────────────────────────
|
||
"ath_drop_min_pct": 0.0,
|
||
"year_drop_min_pct": 0.0,
|
||
"w52_drop_min_pct": 0.0,
|
||
}
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 관심종목 로드
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def load_watchlist() -> List[Dict]:
|
||
try:
|
||
with open(WATCHLIST_PATH, encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
return data.get("items", [])
|
||
except Exception as e:
|
||
logger.error(f"관심종목 파일 로드 실패: {e}")
|
||
return []
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# DB 테이블 생성 (holding 전용)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
HOLDING_DDL = [
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS holding_min_candles (
|
||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||
code VARCHAR(10) NOT NULL,
|
||
candle_dt DATETIME NOT NULL COMMENT '60분봉 기준 시작 시간(KST)',
|
||
tf SMALLINT NOT NULL DEFAULT 60 COMMENT '분봉 단위 (60=60분봉)',
|
||
open FLOAT NOT NULL DEFAULT 0,
|
||
high FLOAT NOT NULL DEFAULT 0,
|
||
low FLOAT NOT NULL DEFAULT 0,
|
||
close FLOAT NOT NULL DEFAULT 0,
|
||
volume BIGINT NOT NULL DEFAULT 0,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
UNIQUE KEY uq_holding_min (code, tf, candle_dt)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS holding_candles (
|
||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||
code VARCHAR(10) NOT NULL,
|
||
candle_date DATE NOT NULL,
|
||
open FLOAT NOT NULL DEFAULT 0,
|
||
high FLOAT NOT NULL DEFAULT 0,
|
||
low FLOAT NOT NULL DEFAULT 0,
|
||
close FLOAT NOT NULL DEFAULT 0,
|
||
volume BIGINT NOT NULL DEFAULT 0,
|
||
change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)',
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
UNIQUE KEY uq_holding_candle (code, candle_date)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS holding_stock_config (
|
||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||
code VARCHAR(10) NOT NULL,
|
||
name VARCHAR(50) DEFAULT '',
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
rsi_period FLOAT DEFAULT 14,
|
||
rsi_buy1 FLOAT DEFAULT 45,
|
||
rsi_buy2 FLOAT DEFAULT 40,
|
||
rsi_buy3 FLOAT DEFAULT 35,
|
||
rsi_sell FLOAT DEFAULT 70,
|
||
take_profit_pct FLOAT DEFAULT 5.0,
|
||
stop_loss_pct FLOAT DEFAULT 10.0,
|
||
buy1_ratio FLOAT DEFAULT 30,
|
||
buy2_ratio FLOAT DEFAULT 30,
|
||
buy3_ratio FLOAT DEFAULT 40,
|
||
slot_money FLOAT DEFAULT 3000000,
|
||
atr_period FLOAT DEFAULT 14,
|
||
ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)',
|
||
year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)',
|
||
w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)',
|
||
ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간',
|
||
ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간',
|
||
trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)',
|
||
trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)',
|
||
KEY idx_holding_config_code (code)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||
""",
|
||
]
|
||
|
||
def ensure_holding_tables(db: TradeDB):
|
||
"""holding 전용 테이블 없으면 생성, 기존 테이블에 신규 컬럼 추가(마이그레이션)"""
|
||
for ddl in HOLDING_DDL:
|
||
db.conn.execute(ddl.strip())
|
||
db.conn.commit()
|
||
|
||
# ── 기존 테이블에 신규 컬럼 추가 (이미 있으면 무시) ─────────────
|
||
migrations = [
|
||
# holding_candles: change_rate 추가
|
||
"ALTER TABLE holding_candles ADD COLUMN change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)'",
|
||
# holding_stock_config: 낙폭 필터 3종 추가
|
||
"ALTER TABLE holding_stock_config ADD COLUMN ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)'",
|
||
"ALTER TABLE holding_stock_config ADD COLUMN year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)'",
|
||
"ALTER TABLE holding_stock_config ADD COLUMN w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)'",
|
||
# 추세추종 신규 파라미터
|
||
"ALTER TABLE holding_stock_config ADD COLUMN ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간'",
|
||
"ALTER TABLE holding_stock_config ADD COLUMN ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간'",
|
||
"ALTER TABLE holding_stock_config ADD COLUMN trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)'",
|
||
"ALTER TABLE holding_stock_config ADD COLUMN trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)'",
|
||
]
|
||
for sql in migrations:
|
||
try:
|
||
db.conn.execute(sql)
|
||
db.conn.commit()
|
||
except Exception:
|
||
pass # 이미 컬럼 존재 → 무시
|
||
|
||
logger.info("✅ holding_candles / holding_stock_config 테이블 확인 완료")
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 종목별 파라미터 관리
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def get_stock_config(db: TradeDB, code: str) -> Dict:
|
||
"""종목별 최신 파라미터 조회 (없으면 DEFAULT 반환)"""
|
||
row = db.conn.execute(
|
||
"SELECT * FROM holding_stock_config WHERE code=%s ORDER BY id DESC LIMIT 1",
|
||
[code]
|
||
).fetchone()
|
||
if row:
|
||
cfg = dict(row)
|
||
cfg.pop("id", None)
|
||
cfg.pop("created_at", None)
|
||
return cfg
|
||
# 기본값 반환 (DB에 없으면)
|
||
cfg = dict(DEFAULT_STOCK_CONFIG)
|
||
cfg["code"] = code
|
||
cfg["name"] = ""
|
||
return cfg
|
||
|
||
def set_stock_config(db: TradeDB, code: str, name: str, params: Dict):
|
||
"""종목별 파라미터 저장 (INSERT 신규 row = 최신값 추가 방식)"""
|
||
cols = ["code", "name"]
|
||
vals = [code, name]
|
||
for k, v in params.items():
|
||
if k in DEFAULT_STOCK_CONFIG:
|
||
cols.append(k)
|
||
vals.append(float(v))
|
||
col_sql = ", ".join(cols)
|
||
ph_sql = ", ".join(["%s"] * len(vals))
|
||
db.conn.execute(
|
||
f"INSERT INTO holding_stock_config ({col_sql}) VALUES ({ph_sql})",
|
||
vals
|
||
)
|
||
db.conn.commit()
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# KIS API — 일봉 OHLCV 조회 (최대 100봉씩 페이지네이션)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def _get_kis_token(db: TradeDB) -> Tuple[str, str, str, bool]:
|
||
"""
|
||
env_config에서 KIS 인증 정보 조회.
|
||
KIS_MOCK=true → 모의 키/도메인 사용 (시세 조회도 모의 도메인에서 동작)
|
||
KIS_MOCK=false → 실전 키/도메인 사용
|
||
None 값은 빈 문자열로 안전 처리.
|
||
"""
|
||
row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone()
|
||
if not row:
|
||
raise RuntimeError("env_config 테이블이 비어 있습니다.")
|
||
r = dict(row)
|
||
|
||
def safe(val):
|
||
return val if val else ""
|
||
|
||
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
|
||
|
||
if is_mock:
|
||
app_key = safe(r.get("KIS_APP_KEY_MOCK"))
|
||
app_secret = safe(r.get("KIS_APP_SECRET_MOCK"))
|
||
base_url = "https://openapivts.koreainvestment.com:29443"
|
||
else:
|
||
app_key = safe(r.get("KIS_APP_KEY_REAL")) or safe(r.get("KIS_APP_KEY"))
|
||
app_secret = safe(r.get("KIS_APP_SECRET_REAL")) or safe(r.get("KIS_APP_SECRET"))
|
||
base_url = "https://openapi.koreainvestment.com:9443"
|
||
|
||
if not app_key or not app_secret:
|
||
raise RuntimeError(
|
||
f"API 키 미설정 (KIS_MOCK={is_mock})\n"
|
||
f"필요 키: {'KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK' if is_mock else 'KIS_APP_KEY_REAL / KIS_APP_SECRET_REAL'}"
|
||
)
|
||
|
||
return app_key, app_secret, base_url, is_mock
|
||
|
||
def _ensure_token(mock: bool) -> str:
|
||
"""
|
||
KisTokenManager 싱글톤으로 토큰 반환.
|
||
- 유효하면 메모리 캐시 즉시 반환
|
||
- 만료 10분 전 선제 갱신 후 반환
|
||
- KIS 23시간 정책 자동 준수
|
||
"""
|
||
try:
|
||
from kis_token_manager import KisTokenManager
|
||
token = KisTokenManager.instance(is_mock=mock).get_token()
|
||
if token:
|
||
logger.info("🔑 KIS 토큰 준비 완료 [%s] (앞8자: %s…)",
|
||
"모의" if mock else "실전", token[:8])
|
||
return token
|
||
except Exception as e:
|
||
logger.warning("KisTokenManager 실패, 파일 폴백: %s", e)
|
||
|
||
# 폴백: 직접 파일 읽기
|
||
path = ROOT / (".kis_token_cache_mock.json" if mock else ".kis_token_cache_real.json")
|
||
if not path.exists():
|
||
raise RuntimeError(
|
||
f"토큰 캐시 파일 없음: {path}\n"
|
||
"kis_short_ver2.py 또는 kis_scalping_ver1.py 를 한 번 실행하면 자동 생성됩니다."
|
||
)
|
||
try:
|
||
cache = json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception as e:
|
||
raise RuntimeError(f"토큰 캐시 파일 읽기 실패: {e}")
|
||
|
||
token = cache.get("access_token", "")
|
||
if not token:
|
||
raise RuntimeError("토큰 캐시 파일에 access_token 없음")
|
||
logger.info("🔑 KIS 토큰 파일 폴백 재사용 [%s] (앞8자: %s…)",
|
||
"모의" if mock else "실전", token[:8])
|
||
return token
|
||
|
||
def fetch_daily_ohlcv(
|
||
code: str,
|
||
start_date: str, # "YYYY-MM-DD"
|
||
end_date: str, # "YYYY-MM-DD"
|
||
app_key: str,
|
||
app_secret: str,
|
||
base_url: str,
|
||
mock: bool = False,
|
||
max_retries: int = 3,
|
||
) -> List[Dict]:
|
||
"""
|
||
KIS API 일봉 OHLCV 조회 (날짜 범위, 100봉씩 자동 페이지네이션)
|
||
- 토큰은 기존 .kis_token_cache_real/mock.json 캐시 파일 재사용 (새 발급 안 함)
|
||
반환: [{"date": "YYYYMMDD", "open": float, "high": float, "low": float,
|
||
"close": float, "volume": int}, ...] 최신 → 과거 순
|
||
"""
|
||
token = _ensure_token(mock) # 캐시 파일에서 읽기만
|
||
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
|
||
|
||
# 날짜 형식 통일 YYYYMMDD
|
||
sd = start_date.replace("-", "")
|
||
ed = end_date.replace("-", "")
|
||
|
||
all_rows: List[Dict] = []
|
||
cursor_end = ed # 첫 호출은 end_date부터 과거 방향 조회
|
||
|
||
for attempt_page in range(100): # 최대 ~10,000봉
|
||
# 페이지당 ~140일 윈도우 (약 100 거래일)
|
||
# DATE_1을 cursor_end 기준 180일 전으로 설정해야 API가 정상 응답
|
||
window_start = (
|
||
datetime.strptime(cursor_end, "%Y%m%d") - timedelta(days=180)
|
||
).strftime("%Y%m%d")
|
||
# 단, 실제 요청 시작일(sd)보다 앞으로 가지 않음
|
||
page_date1 = max(window_start, sd)
|
||
|
||
data = None
|
||
for retry in range(max_retries):
|
||
try:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": app_key,
|
||
"appsecret": app_secret,
|
||
"tr_id": "FHKST03010100",
|
||
}
|
||
params = {
|
||
"FID_COND_MRKT_DIV_CODE": "J",
|
||
"FID_INPUT_ISCD": code,
|
||
"FID_INPUT_DATE_1": page_date1, # 윈도우 시작
|
||
"FID_INPUT_DATE_2": cursor_end, # 윈도우 끝 (커서)
|
||
"FID_PERIOD_DIV_CODE": "D",
|
||
"FID_ORG_ADJ_PRC": "0",
|
||
}
|
||
resp = requests.get(url, headers=headers, params=params, timeout=10)
|
||
if resp.status_code == 429:
|
||
time.sleep((retry + 1) * 2)
|
||
continue
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
break
|
||
except Exception as e:
|
||
if retry >= max_retries - 1:
|
||
logger.error(f"❌ 일봉 조회 실패 {code} (page {attempt_page}): {e}")
|
||
return all_rows
|
||
time.sleep(2)
|
||
|
||
if data is None:
|
||
break
|
||
|
||
rows = data.get("output2", [])
|
||
if not rows:
|
||
break
|
||
|
||
done = False
|
||
for item in rows:
|
||
dt = item.get("stck_bsop_date", "") # YYYYMMDD
|
||
if dt < sd: # start_date 이전 → 수집 완료
|
||
done = True
|
||
break
|
||
c = float(item.get("stck_clpr", 0) or 0)
|
||
if c <= 0:
|
||
continue
|
||
all_rows.append({
|
||
"date": dt,
|
||
"open": float(item.get("stck_oprc", 0) or 0),
|
||
"high": float(item.get("stck_hgpr", 0) or 0),
|
||
"low": float(item.get("stck_lwpr", 0) or 0),
|
||
"close": c,
|
||
"volume": int(item.get("acml_vol", 0) or 0),
|
||
"change_rate": float(item.get("prdy_ctrt", 0) or 0), # 전일대비등락률(%)
|
||
})
|
||
|
||
if done:
|
||
break
|
||
|
||
# 다음 페이지: 이번 배치 최고(가장 오래된) 날짜 - 1일
|
||
oldest_dt = rows[-1].get("stck_bsop_date", "")
|
||
if not oldest_dt or oldest_dt <= sd:
|
||
break
|
||
prev = (datetime.strptime(oldest_dt, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
|
||
if prev == cursor_end: # 무한루프 방지
|
||
break
|
||
cursor_end = prev
|
||
time.sleep(0.3) # API 과부하 방지
|
||
|
||
return all_rows
|
||
|
||
def store_candles(db: TradeDB, code: str, rows: List[Dict]) -> int:
|
||
"""일봉 데이터를 holding_candles에 저장 (ON DUPLICATE KEY UPDATE)"""
|
||
saved = 0
|
||
for r in rows:
|
||
try:
|
||
d = r["date"]
|
||
date_fmt = f"{d[:4]}-{d[4:6]}-{d[6:8]}"
|
||
db.conn.execute(
|
||
"""INSERT INTO holding_candles
|
||
(code, candle_date, open, high, low, close, volume, change_rate)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
open=VALUES(open), high=VALUES(high), low=VALUES(low),
|
||
close=VALUES(close), volume=VALUES(volume),
|
||
change_rate=VALUES(change_rate)""",
|
||
[code, date_fmt,
|
||
r["open"], r["high"], r["low"], r["close"],
|
||
r["volume"], r.get("change_rate", 0)]
|
||
)
|
||
saved += 1
|
||
except Exception as e:
|
||
logger.debug(f"캔들 저장 오류 ({code} {r.get('date')}): {e}")
|
||
db.conn.commit()
|
||
return saved
|
||
|
||
def get_stored_candles(db: TradeDB, code: str,
|
||
start_date: str = "", end_date: str = "") -> List[Dict]:
|
||
"""holding_candles에서 종목 일봉 조회 (날짜 오름차순, change_rate 포함)"""
|
||
sql = """SELECT candle_date, open, high, low, close, volume,
|
||
IFNULL(change_rate, 0) AS change_rate
|
||
FROM holding_candles WHERE code=%s"""
|
||
args = [code]
|
||
if start_date:
|
||
sql += " AND candle_date >= %s"
|
||
args.append(start_date)
|
||
if end_date:
|
||
sql += " AND candle_date <= %s"
|
||
args.append(end_date)
|
||
sql += " ORDER BY candle_date ASC"
|
||
rows = db.conn.execute(sql, args).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 60분봉 수집 / 저장 / 조회
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class _KiwoomTokenManager:
|
||
"""
|
||
kiwoom_universe_scanner.py / kiwoom_trader_dual.py 의 TokenManager와
|
||
동일한 인터페이스 — DB에서 읽은 키를 직접 주입 (env 변수 오염 없음).
|
||
|
||
TokenManager.get_token() 호환: Chart(token_manager=...) 에 그대로 전달 가능.
|
||
"""
|
||
def __init__(self, app_key: str, app_secret: str, is_mock: bool = False):
|
||
self._app_key = app_key
|
||
self._app_secret = app_secret
|
||
self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
|
||
self._token: Optional[str] = None
|
||
self._expiry: Optional[datetime] = None
|
||
|
||
def _is_valid(self) -> bool:
|
||
if not self._token or not self._expiry:
|
||
return False
|
||
return datetime.now() < self._expiry - timedelta(seconds=30)
|
||
|
||
def _request_new_token(self) -> None:
|
||
"""키움 /oauth2/token 으로 새 토큰 발급 (kiwoom_universe_scanner 와 동일 흐름)"""
|
||
url = f"https://{self._domain}/oauth2/token"
|
||
body = {
|
||
"grant_type": "client_credentials",
|
||
"appkey": self._app_key,
|
||
"secretkey": self._app_secret,
|
||
}
|
||
resp = requests.post(url, json=body, timeout=15)
|
||
data = resp.json()
|
||
token = data.get("token") or data.get("access_token", "")
|
||
if not token:
|
||
raise RuntimeError(
|
||
f"키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 실패: {data}"
|
||
)
|
||
# 만료 시간 파싱 (expires_dt: "YYYYMMDDHHMMSS")
|
||
exp_s = data.get("expires_dt", "")
|
||
try:
|
||
self._expiry = datetime.strptime(str(exp_s), "%Y%m%d%H%M%S")
|
||
except Exception:
|
||
self._expiry = datetime.now() + timedelta(hours=23)
|
||
self._token = token
|
||
logger.info(
|
||
f"✅ 키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 완료"
|
||
f" (앞8자: {token[:8]}…, 만료: {self._expiry})"
|
||
)
|
||
|
||
def get_token(self) -> str:
|
||
"""
|
||
TokenManager.get_token() 호환 — 만료 시 자동 재발급.
|
||
kis_ws.KiwoomTokenManager 싱글톤 풀을 공유하여 au10001 rate limit 방지.
|
||
(동일 appkey로 kis_ws 에서 이미 발급받은 토큰은 재사용)
|
||
"""
|
||
try:
|
||
from kis_ws import _get_kiwoom_token_cached
|
||
is_mock = "mockapi" in self._domain
|
||
token = _get_kiwoom_token_cached(self._app_key, self._app_secret, is_mock)
|
||
if token:
|
||
self._token = token
|
||
return self._token
|
||
except Exception:
|
||
pass
|
||
# kis_ws 임포트 실패 시 자체 로직으로 폴백
|
||
if not self._is_valid():
|
||
self._request_new_token()
|
||
return self._token
|
||
|
||
|
||
def fetch_60min_via_kiwoom(
|
||
code: str,
|
||
start_date: str, # "YYYY-MM-DD"
|
||
end_date: str, # "YYYY-MM-DD"
|
||
kiwoom_key: str, # DB env_config의 KIWOOM_APP_KEY_REAL 또는 KIWOOM_APP_KEY_MOCK
|
||
kiwoom_secret: str, # DB env_config의 KIWOOM_APP_SECRET_REAL 또는 KIWOOM_APP_SECRET_MOCK
|
||
is_mock: bool = False, # True → mockapi.kiwoom.com (모의)
|
||
max_pages: int = 50, # 연속조회 최대 횟수 (1페이지 ≈ 900봉)
|
||
) -> List[Dict]:
|
||
"""
|
||
키움증권 REST API (ka10080) 로 60분봉 수집 → holding_min_candles 저장용.
|
||
|
||
kiwoom_universe_scanner.py / kiwoom_trader_dual.py 와 동일한
|
||
TokenManager + Chart 패턴 사용.
|
||
|
||
▶ KIS 한계: inquire-time-itemchartprice 는 당일(2일치) 데이터만 제공
|
||
▶ 키움 장점: ka10080 1회 호출 = 900봉 (60분봉 기준 약 128영업일)
|
||
cont-yn: Y + next-key 연속조회로 2년치 이상 수집 가능
|
||
|
||
반환: [{"candle_date": "YYYY-MM-DD HH:MM:SS", "open": float, "high": float,
|
||
"low": float, "close": float, "volume": int}, ...]
|
||
최신 → 과거 순, start_date~end_date 범위 필터 적용
|
||
"""
|
||
from kiwoom_rest_api.koreanstock.chart import Chart
|
||
|
||
mode_str = "모의" if is_mock else "실전"
|
||
base_url = "https://mockapi.kiwoom.com" if is_mock else "https://api.kiwoom.com"
|
||
logger.info(f"키움 60분봉 수집 시작: {code} [{mode_str}] {start_date}~{end_date}")
|
||
|
||
# ── 1. TokenManager + Chart 초기화 (kiwoom_universe_scanner 와 동일 패턴) ──
|
||
try:
|
||
token_manager = _KiwoomTokenManager(kiwoom_key, kiwoom_secret, is_mock=is_mock)
|
||
chart = Chart(base_url=base_url, token_manager=token_manager)
|
||
# 최초 토큰 발급 확인
|
||
_ = token_manager.get_token()
|
||
except Exception as e:
|
||
logger.error(f"❌ 키움 초기화 실패 [{mode_str}]: {e}")
|
||
return []
|
||
|
||
# ── 2. 날짜 범위 파싱 ─────────────────────────────────────────────────────
|
||
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
|
||
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
|
||
|
||
rows: List[Dict] = []
|
||
cont_yn = "N"
|
||
next_key = ""
|
||
|
||
for page in range(max_pages):
|
||
try:
|
||
resp = chart.stock_minute_chart_request_ka10080(
|
||
stk_cd = code,
|
||
tic_scope = "60", # 60분봉 (1/3/5/10/15/30/45/60 지원)
|
||
upd_stkpc_tp = "1", # 수정주가 반영
|
||
cont_yn = cont_yn,
|
||
next_key = next_key,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"❌ 키움 ka10080 조회 실패 (page {page}): {e}")
|
||
break
|
||
|
||
records = resp.get("stk_min_pole_chart_qry") or []
|
||
if not records:
|
||
logger.info(f"키움 60분봉: 더 이상 데이터 없음 (page {page})")
|
||
break
|
||
|
||
reached_start = False
|
||
for rec in records:
|
||
# cntr_tm: "YYYYMMDDHHMMSS" 형식
|
||
raw_dt = str(rec.get("cntr_tm", ""))
|
||
if len(raw_dt) < 14:
|
||
continue
|
||
try:
|
||
candle_dt = datetime(
|
||
int(raw_dt[0:4]), int(raw_dt[4:6]), int(raw_dt[6:8]),
|
||
int(raw_dt[8:10]), int(raw_dt[10:12]), int(raw_dt[12:14]),
|
||
)
|
||
except Exception:
|
||
continue
|
||
|
||
# 역방향(최신→과거) 수집이므로 start_date보다 과거이면 종료
|
||
if candle_dt.date() < start_dt.date():
|
||
reached_start = True
|
||
break
|
||
if candle_dt.date() > end_dt.date():
|
||
continue
|
||
|
||
try:
|
||
rows.append({
|
||
"candle_date": candle_dt.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"open": abs(float(rec.get("open_pric", 0) or 0)),
|
||
"high": abs(float(rec.get("high_pric", 0) or 0)),
|
||
"low": abs(float(rec.get("low_pric", 0) or 0)),
|
||
"close": abs(float(rec.get("cur_prc", 0) or 0)),
|
||
"volume": abs(int(float(rec.get("trde_qty", 0) or 0))),
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
if reached_start:
|
||
break
|
||
|
||
# 연속조회 키는 응답 dict에서 읽음 (Chart._execute_request 가 헤더를 dict로 포함)
|
||
new_cont = str(resp.get("cont-yn", resp.get("cont_yn", "N"))).upper()
|
||
new_nkey = str(resp.get("next-key", resp.get("next_key", "")))
|
||
if new_cont != "Y" or not new_nkey:
|
||
break
|
||
cont_yn = new_cont
|
||
next_key = new_nkey
|
||
time.sleep(0.3) # API 레이트리밋 준수 (초당 3회 이하)
|
||
|
||
logger.info(f"✅ 키움 60분봉 수집 완료: {code} {len(rows)}봉 ({start_date}~{end_date})")
|
||
return rows
|
||
|
||
def fetch_min_ohlcv(
|
||
code: str,
|
||
start_date: str, # "YYYY-MM-DD"
|
||
end_date: str, # "YYYY-MM-DD"
|
||
app_key: str,
|
||
app_secret: str,
|
||
base_url: str,
|
||
tf_min: int = 60, # 집계 단위 (60 = 60분봉)
|
||
mock: bool = False,
|
||
max_retries: int = 3,
|
||
) -> List[Dict]:
|
||
"""
|
||
KIS FHKST03010200(분봉조회) 1분봉 수집 → tf_min 단위 집계 반환.
|
||
|
||
페이지네이션 전략:
|
||
FID_INPUT_HOUR_1=HHMMSS 커서로 역방향(최신→과거) 순회.
|
||
FID_PW_DATA_INCU_YN=Y → 과거 날짜까지 연속 조회 가능.
|
||
각 응답 ~30 레코드, 마지막 레코드 절대 시각 - 1분을 다음 커서로 사용.
|
||
|
||
60분봉 집계 규칙 (역방향 수집이므로 open/close 처리 주의):
|
||
처음 만난 레코드 = 이 버킷의 가장 최신(close)
|
||
이후 이전 레코드로 갈수록 open 덮어씀 → 결국 가장 오래된 open이 남음
|
||
high = MAX, low = MIN, volume = SUM
|
||
"""
|
||
token = _ensure_token(mock)
|
||
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
|
||
|
||
try:
|
||
sd = datetime.strptime(start_date, "%Y-%m-%d")
|
||
ed = datetime.strptime(end_date, "%Y-%m-%d")
|
||
except ValueError as e:
|
||
raise ValueError(f"날짜 형식 오류: {e}")
|
||
|
||
# 60분봉 버킷 집계 저장소
|
||
buckets: Dict[datetime, Dict] = {}
|
||
cursor_dt = ed.replace(hour=15, minute=30, second=0) # end_date 15:30 부터 역순
|
||
|
||
total_1min = 0 # 수집한 1분봉 수 (로그용)
|
||
|
||
for page in range(3000): # 최대 ~90,000 1분봉 (약 7개월치)
|
||
cursor_str = cursor_dt.strftime("%H%M%S")
|
||
|
||
data = None
|
||
for retry in range(max_retries):
|
||
try:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": app_key,
|
||
"appsecret": app_secret,
|
||
# 분봉 조회는 실전/모의 도메인만 다르고 TR_ID는 동일
|
||
"tr_id": "FHKST03010200",
|
||
}
|
||
params = {
|
||
"FID_ETC_CLS_CODE": "",
|
||
"FID_COND_MRKT_DIV_CODE": "J",
|
||
"FID_INPUT_ISCD": code,
|
||
"FID_INPUT_HOUR_1": cursor_str,
|
||
"FID_PW_DATA_INCU_YN": "Y", # 과거 날짜 데이터 포함
|
||
}
|
||
resp = requests.get(url, headers=headers, params=params, timeout=10)
|
||
if resp.status_code == 429:
|
||
time.sleep((retry + 1) * 2)
|
||
continue
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
break
|
||
except Exception as e:
|
||
if retry >= max_retries - 1:
|
||
logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}")
|
||
return _buckets_to_min_list(buckets, tf_min)
|
||
time.sleep(2)
|
||
|
||
if data is None:
|
||
break
|
||
|
||
rows = data.get("output2", [])
|
||
if not rows:
|
||
break
|
||
|
||
done = False
|
||
last_abs_dt = None
|
||
|
||
for row in rows:
|
||
date_s = row.get("stck_bsop_date", "")
|
||
time_s = row.get("stck_cntg_hour", "")
|
||
if len(date_s) < 8 or len(time_s) < 6:
|
||
continue
|
||
try:
|
||
abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S")
|
||
except ValueError:
|
||
continue
|
||
|
||
last_abs_dt = abs_dt
|
||
|
||
if abs_dt.date() > ed.date():
|
||
continue # end_date 이후 → 스킵
|
||
|
||
if abs_dt.date() < sd.date():
|
||
done = True # start_date 이전 → 수집 완료
|
||
break
|
||
|
||
# 장중 시간만 처리 (09:00~15:30)
|
||
market_min = abs_dt.hour * 60 + abs_dt.minute
|
||
if market_min < 540 or market_min > 930: # 09:00 ~ 15:30
|
||
continue
|
||
|
||
c = float(row.get("stck_prpr", 0) or 0)
|
||
if c <= 0:
|
||
continue
|
||
|
||
total_1min += 1
|
||
|
||
# tf_min 단위 버킷 시작 시간 계산 (09:00, 10:00, 11:00, ...)
|
||
bucket_min = (abs_dt.minute // tf_min) * tf_min
|
||
bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0)
|
||
|
||
o = float(row.get("stck_oprc", c) or c)
|
||
h = float(row.get("stck_hgpr", c) or c)
|
||
l = float(row.get("stck_lwpr", c) or c)
|
||
v = int(row.get("cntg_vol", 0) or 0)
|
||
|
||
if bucket_dt not in buckets:
|
||
# 역방향: 처음 만나는 레코드 = 버킷의 가장 최신 = close
|
||
buckets[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v}
|
||
else:
|
||
b = buckets[bucket_dt]
|
||
b["open"] = o # 더 이른 open으로 계속 갱신
|
||
b["high"] = max(b["high"], h)
|
||
b["low"] = min(b["low"], l)
|
||
b["volume"] += v
|
||
# close는 첫 기록(최신) 유지 — 갱신하지 않음
|
||
|
||
if done:
|
||
break
|
||
|
||
# 다음 커서: 이번 배치 마지막(가장 오래된) 절대 시각 - 1분
|
||
if last_abs_dt is None:
|
||
break
|
||
next_cur = last_abs_dt - timedelta(minutes=1)
|
||
if next_cur.date() < sd.date():
|
||
break
|
||
cursor_dt = next_cur
|
||
time.sleep(0.15) # API 과부하 방지 (~6 req/s)
|
||
|
||
logger.info(f" [{code}] 1분봉 {total_1min}개 수집 → {tf_min}분봉 {len(buckets)}개 집계")
|
||
return _buckets_to_min_list(buckets, tf_min)
|
||
|
||
|
||
def fetch_and_store_min_candles(
|
||
db: "TradeDB",
|
||
code: str,
|
||
start_date: str,
|
||
end_date: str,
|
||
app_key: str,
|
||
app_secret: str,
|
||
base_url: str,
|
||
tf_min: int = 60,
|
||
mock: bool = False,
|
||
max_retries: int = 3,
|
||
progress: Optional[Dict] = None, # 외부 dict: {"status","fetched","saved"}
|
||
) -> int:
|
||
"""
|
||
1분봉 페이지 단위로 수집하면서 날짜가 바뀌는 순간 해당 날의 버킷을 즉시 DB 저장.
|
||
Flask 백그라운드 스레드에서 호출해 웹페이지가 블로킹되지 않도록 한다.
|
||
progress dict를 통해 진행상황을 실시간으로 노출한다.
|
||
"""
|
||
def _upd(**kw):
|
||
if progress is not None:
|
||
progress.update(kw)
|
||
|
||
token = _ensure_token(mock)
|
||
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
|
||
|
||
try:
|
||
sd = datetime.strptime(start_date, "%Y-%m-%d")
|
||
ed = datetime.strptime(end_date, "%Y-%m-%d")
|
||
except ValueError as e:
|
||
raise ValueError(f"날짜 형식 오류: {e}")
|
||
|
||
# 날짜별로 버킷 집계: {date: {bucket_dt: {o,h,l,c,v}}}
|
||
day_buckets: Dict[str, Dict[datetime, Dict]] = {}
|
||
cursor_dt = ed.replace(hour=15, minute=30, second=0)
|
||
total_1min = 0
|
||
total_saved = 0
|
||
prev_date_s: Optional[str] = None # 이전 루프의 날짜 (날짜 전환 감지용)
|
||
|
||
for page in range(3000):
|
||
cursor_str = cursor_dt.strftime("%H%M%S")
|
||
|
||
data = None
|
||
for retry in range(max_retries):
|
||
try:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": app_key,
|
||
"appsecret": app_secret,
|
||
"tr_id": "FHKST03010200",
|
||
}
|
||
params = {
|
||
"FID_ETC_CLS_CODE": "",
|
||
"FID_COND_MRKT_DIV_CODE": "J",
|
||
"FID_INPUT_ISCD": code,
|
||
"FID_INPUT_HOUR_1": cursor_str,
|
||
"FID_PW_DATA_INCU_YN": "Y",
|
||
}
|
||
resp = requests.get(url, headers=headers, params=params, timeout=10)
|
||
if resp.status_code == 429:
|
||
time.sleep((retry + 1) * 2)
|
||
continue
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
break
|
||
except Exception as e:
|
||
if retry >= max_retries - 1:
|
||
logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}")
|
||
data = None
|
||
break
|
||
time.sleep(2)
|
||
|
||
if data is None:
|
||
break
|
||
|
||
rows = data.get("output2", [])
|
||
if not rows:
|
||
break
|
||
|
||
done = False
|
||
last_abs_dt = None
|
||
|
||
for row in rows:
|
||
date_s = row.get("stck_bsop_date", "")
|
||
time_s = row.get("stck_cntg_hour", "")
|
||
if len(date_s) < 8 or len(time_s) < 6:
|
||
continue
|
||
try:
|
||
abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S")
|
||
except ValueError:
|
||
continue
|
||
|
||
last_abs_dt = abs_dt
|
||
|
||
if abs_dt.date() > ed.date():
|
||
continue
|
||
if abs_dt.date() < sd.date():
|
||
done = True
|
||
break
|
||
|
||
market_min = abs_dt.hour * 60 + abs_dt.minute
|
||
if market_min < 540 or market_min > 930:
|
||
continue
|
||
|
||
c = float(row.get("stck_prpr", 0) or 0)
|
||
if c <= 0:
|
||
continue
|
||
|
||
total_1min += 1
|
||
cur_date_s = date_s[:8] # "YYYYMMDD"
|
||
|
||
# 날짜가 바뀌면 이전 날짜 데이터를 즉시 DB에 flush
|
||
if prev_date_s is not None and cur_date_s != prev_date_s:
|
||
flush_buckets = day_buckets.pop(prev_date_s, {})
|
||
if flush_buckets:
|
||
rows_to_save = _buckets_to_min_list(flush_buckets, tf_min)
|
||
n = store_min_candles(db, code, rows_to_save, tf_min)
|
||
total_saved += n
|
||
_upd(saved=total_saved, fetched=total_1min,
|
||
current_date=prev_date_s)
|
||
logger.info(f" [{code}] {prev_date_s} → {tf_min}분봉 {n}개 저장 (누적 {total_saved})")
|
||
|
||
prev_date_s = cur_date_s
|
||
|
||
bucket_min = (abs_dt.minute // tf_min) * tf_min
|
||
bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0)
|
||
|
||
o = float(row.get("stck_oprc", c) or c)
|
||
h = float(row.get("stck_hgpr", c) or c)
|
||
l = float(row.get("stck_lwpr", c) or c)
|
||
v = int(row.get("cntg_vol", 0) or 0)
|
||
|
||
if cur_date_s not in day_buckets:
|
||
day_buckets[cur_date_s] = {}
|
||
bkt = day_buckets[cur_date_s]
|
||
|
||
if bucket_dt not in bkt:
|
||
bkt[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v}
|
||
else:
|
||
b = bkt[bucket_dt]
|
||
b["open"] = o
|
||
b["high"] = max(b["high"], h)
|
||
b["low"] = min(b["low"], l)
|
||
b["volume"] += v
|
||
|
||
_upd(fetched=total_1min)
|
||
|
||
if done:
|
||
break
|
||
|
||
if last_abs_dt is None:
|
||
break
|
||
next_cur = last_abs_dt - timedelta(minutes=1)
|
||
if next_cur.date() < sd.date():
|
||
break
|
||
cursor_dt = next_cur
|
||
time.sleep(0.15)
|
||
|
||
# 루프 종료 후 남은 날짜 데이터 flush
|
||
for date_s, flush_buckets in day_buckets.items():
|
||
if flush_buckets:
|
||
rows_to_save = _buckets_to_min_list(flush_buckets, tf_min)
|
||
n = store_min_candles(db, code, rows_to_save, tf_min)
|
||
total_saved += n
|
||
_upd(saved=total_saved, fetched=total_1min)
|
||
logger.info(f" [{code}] {date_s}(말미) → {tf_min}분봉 {n}개 저장 (누적 {total_saved})")
|
||
|
||
logger.info(f" [{code}] 수집완료: 1분봉 {total_1min}개 → {tf_min}분봉 {total_saved}개 저장")
|
||
return total_saved
|
||
|
||
|
||
def _buckets_to_min_list(buckets: Dict, tf_min: int) -> List[Dict]:
|
||
"""버킷 dict → 시간순 list 변환 (store_min_candles 입력용)"""
|
||
return [
|
||
{
|
||
"dt": dt.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"tf": tf_min,
|
||
"open": b["open"],
|
||
"high": b["high"],
|
||
"low": b["low"],
|
||
"close": b["close"],
|
||
"volume": b["volume"],
|
||
}
|
||
for dt, b in sorted(buckets.items())
|
||
]
|
||
|
||
|
||
def store_min_candles(db: TradeDB, code: str,
|
||
rows: List[Dict], tf_min: int = 60) -> int:
|
||
"""60분봉 데이터를 holding_min_candles에 저장 (ON DUPLICATE KEY UPDATE)"""
|
||
saved = 0
|
||
for r in rows:
|
||
try:
|
||
db.conn.execute(
|
||
"""INSERT INTO holding_min_candles
|
||
(code, candle_dt, tf, open, high, low, close, volume)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
open=VALUES(open), high=VALUES(high), low=VALUES(low),
|
||
close=VALUES(close), volume=VALUES(volume)""",
|
||
[code, r["dt"], tf_min,
|
||
r["open"], r["high"], r["low"], r["close"], r["volume"]]
|
||
)
|
||
saved += 1
|
||
except Exception as e:
|
||
logger.debug(f"분봉 저장 오류 ({code} {r.get('dt')}): {e}")
|
||
db.conn.commit()
|
||
return saved
|
||
|
||
|
||
def get_stored_min_candles(db: TradeDB, code: str,
|
||
start_dt: str = "", end_dt: str = "",
|
||
tf_min: int = 60) -> List[Dict]:
|
||
"""
|
||
holding_min_candles에서 종목 분봉 조회 (시간 오름차순).
|
||
run_backtest() 호환: candle_dt를 candle_date 키로 반환
|
||
"""
|
||
sql = """SELECT candle_dt AS candle_date,
|
||
open, high, low, close, volume
|
||
FROM holding_min_candles WHERE code=%s AND tf=%s"""
|
||
args = [code, tf_min]
|
||
if start_dt:
|
||
sql += " AND candle_dt >= %s"
|
||
args.append(start_dt)
|
||
if end_dt:
|
||
end_val = end_dt + " 23:59:59" if len(end_dt) == 10 else end_dt
|
||
sql += " AND candle_dt <= %s"
|
||
args.append(end_val)
|
||
sql += " ORDER BY candle_dt ASC"
|
||
rows = db.conn.execute(sql, args).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def get_min_candle_stats(db: TradeDB, code: str, tf_min: int = 60) -> Dict:
|
||
"""60분봉 보유 현황 (개수, 최소/최대 datetime)"""
|
||
row = db.conn.execute(
|
||
"SELECT COUNT(*) AS cnt, MIN(candle_dt) AS mn, MAX(candle_dt) AS mx "
|
||
"FROM holding_min_candles WHERE code=%s AND tf=%s",
|
||
[code, tf_min]
|
||
).fetchone()
|
||
if row and row["cnt"]:
|
||
return {
|
||
"count": int(row["cnt"]),
|
||
"min": str(row["mn"])[:16] if row["mn"] else "",
|
||
"max": str(row["mx"])[:16] if row["mx"] else "",
|
||
}
|
||
return {"count": 0, "min": "", "max": ""}
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 기술 지표 계산
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def _rsi_series(closes: List[float], period: int) -> List[Optional[float]]:
|
||
"""Wilder RSI 시리즈"""
|
||
n = len(closes)
|
||
rsi = [None] * n
|
||
if n < period + 1:
|
||
return rsi
|
||
deltas = [closes[i] - closes[i - 1] for i in range(1, n)]
|
||
gains = [max(d, 0) for d in deltas]
|
||
losses = [max(-d, 0) for d in deltas]
|
||
ag = sum(gains[:period]) / period
|
||
al = sum(losses[:period]) / period
|
||
for i in range(period, n):
|
||
idx = i - 1
|
||
if i > period:
|
||
ag = (ag * (period - 1) + gains[idx]) / period
|
||
al = (al * (period - 1) + losses[idx]) / period
|
||
rs = ag / al if al > 0 else float("inf")
|
||
rsi[i] = 100.0 - (100.0 / (1 + rs)) if al > 0 else 100.0
|
||
return rsi
|
||
|
||
def _ma_series(closes: List[float], period: int) -> List[Optional[float]]:
|
||
"""단순이동평균(SMA) 시리즈. period 미만 구간은 None."""
|
||
n = len(closes)
|
||
ma = [None] * n
|
||
if period < 1 or n < period:
|
||
return ma
|
||
window_sum = sum(closes[:period])
|
||
ma[period - 1] = window_sum / period
|
||
for i in range(period, n):
|
||
window_sum += closes[i] - closes[i - period]
|
||
ma[i] = window_sum / period
|
||
return ma
|
||
|
||
|
||
def _atr_series(highs, lows, closes, period: int) -> List[Optional[float]]:
|
||
"""ATR 시리즈"""
|
||
n = len(closes)
|
||
tr_list = [None] * n
|
||
for i in range(1, n):
|
||
tr_list[i] = max(
|
||
highs[i] - lows[i],
|
||
abs(highs[i] - closes[i - 1]),
|
||
abs(lows[i] - closes[i - 1]),
|
||
)
|
||
atr = [None] * n
|
||
if n < period + 1:
|
||
return atr
|
||
atr[period] = sum(t for t in tr_list[1:period+1] if t) / period
|
||
for i in range(period + 1, n):
|
||
if tr_list[i] is not None and atr[i - 1] is not None:
|
||
atr[i] = (atr[i - 1] * (period - 1) + tr_list[i]) / period
|
||
return atr
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 컨텍스트 신호 계산 (ATH · 연도고점 · 52주 고/저)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def _calc_context_signals(candles: List[Dict], current_price: float) -> Dict:
|
||
"""
|
||
DB 일봉 전체를 기반으로 현재가 대비 컨텍스트 낙폭 지표를 반환.
|
||
백테스트와 실매매 양쪽에서 재사용.
|
||
|
||
반환 키:
|
||
ath - 역대 최고가 (DB 전체 기간 high 최댓값)
|
||
year_high - 당해 연도 최고가
|
||
w52_high - 최근 252봉(약 1년) 최고가
|
||
w52_low - 최근 252봉 최저가
|
||
ath_drop_pct - 현재가 기준 ATH 대비 낙폭 (%) ← 클수록 많이 빠진 것
|
||
year_drop_pct - 연도고점 대비 낙폭 (%)
|
||
w52_drop_pct - 52주 고점 대비 낙폭 (%)
|
||
"""
|
||
if not candles:
|
||
return {
|
||
"ath": current_price, "year_high": current_price,
|
||
"w52_high": current_price, "w52_low": current_price,
|
||
"ath_drop_pct": 0.0, "year_drop_pct": 0.0, "w52_drop_pct": 0.0,
|
||
}
|
||
|
||
this_year = str(datetime.now().year)
|
||
all_highs = [float(c["high"]) for c in candles if float(c["high"]) > 0]
|
||
year_highs = [float(c["high"]) for c in candles
|
||
if str(c["candle_date"])[:4] == this_year and float(c["high"]) > 0]
|
||
# 52주 ≈ 최근 252 거래일
|
||
w52_bars = candles[-252:] if len(candles) >= 252 else candles
|
||
w52_highs = [float(c["high"]) for c in w52_bars if float(c["high"]) > 0]
|
||
w52_lows = [float(c["low"]) for c in w52_bars if float(c["low"]) > 0]
|
||
|
||
ath = max(all_highs) if all_highs else current_price
|
||
year_high = max(year_highs) if year_highs else current_price
|
||
w52_high = max(w52_highs) if w52_highs else current_price
|
||
w52_low = min(w52_lows) if w52_lows else current_price
|
||
|
||
def drop_pct(ref: float) -> float:
|
||
return round((ref - current_price) / ref * 100, 2) if ref > 0 else 0.0
|
||
|
||
return {
|
||
"ath": ath,
|
||
"year_high": year_high,
|
||
"w52_high": w52_high,
|
||
"w52_low": w52_low,
|
||
"ath_drop_pct": drop_pct(ath),
|
||
"year_drop_pct": drop_pct(year_high),
|
||
"w52_drop_pct": drop_pct(w52_high),
|
||
}
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 백테스트 엔진
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def run_backtest(
|
||
candles: List[Dict],
|
||
cfg: Dict,
|
||
fee_rate: float = 0.015 / 100, # 매수/매도 각각 0.015%
|
||
sell_tax: float = 0.18 / 100, # 증권거래세 0.18%
|
||
tf_min: int = 0, # 0=일봉, 60=60분봉 등. 0이면 자동감지
|
||
) -> Dict:
|
||
"""
|
||
홀딩 전략 백테스트 (추세추종 + 눌림목 / 역추세 선택)
|
||
|
||
[추세 상승: MA_FAST > MA_SLOW]
|
||
매수: RSI ≤ rsi_buy1 → 1차, RSI ≤ rsi_buy2 → 2차 추가매수
|
||
매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락)
|
||
OR RSI 과열(rsi_sell) OR 익절(take_profit_pct)
|
||
OR 데드크로스(MA_FAST < MA_SLOW) 발생 시 추세 반전 청산
|
||
|
||
[추세 하락/횡보: MA_FAST ≤ MA_SLOW] or [trend_filter=0]
|
||
매수: RSI ≤ rsi_buy1/2/3 (기존 3단계 역추세 분할매수)
|
||
매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct)
|
||
|
||
진입가: 신호봉 다음 봉 시가 (1봉 지연)
|
||
"""
|
||
if len(candles) < 20:
|
||
return {"error": "봉 부족", "total_trades": 0}
|
||
|
||
# ── 봉 유형 자동 감지 ─────────────────────────────────────────────
|
||
_sample_dt = str(candles[0].get("candle_date", ""))
|
||
_is_min = tf_min > 0 or len(_sample_dt) > 10
|
||
|
||
rsi_period = int(cfg.get("rsi_period", 14))
|
||
rsi_buy1 = float(cfg.get("rsi_buy1", 55))
|
||
rsi_buy2 = float(cfg.get("rsi_buy2", 45))
|
||
rsi_buy3 = float(cfg.get("rsi_buy3", 35))
|
||
rsi_sell = float(cfg.get("rsi_sell", 75))
|
||
tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100
|
||
sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100
|
||
buy1_ratio = float(cfg.get("buy1_ratio", 50)) / 100
|
||
buy2_ratio = float(cfg.get("buy2_ratio", 50)) / 100
|
||
buy3_ratio = float(cfg.get("buy3_ratio", 0)) / 100
|
||
slot_money = float(cfg.get("slot_money", 3_000_000))
|
||
ma_fast_p = int(cfg.get("ma_fast", 20))
|
||
ma_slow_p = int(cfg.get("ma_slow", 60))
|
||
trail_stop_p = float(cfg.get("trail_stop_pct", 8.0)) # %, 0=비활성
|
||
trend_filter = float(cfg.get("trend_filter", 1.0)) >= 0.5 # bool 변환
|
||
atr_period_p = int(cfg.get("atr_period", 14))
|
||
# 샹들리에 엑시트 배수: env/카드에서 조정 가능 (하드코딩 금지)
|
||
atr_mult = float(cfg.get("atr_mult", 3.0))
|
||
ath_drop_min = float(cfg.get("ath_drop_min_pct", 0.0))
|
||
year_drop_min = float(cfg.get("year_drop_min_pct", 0.0))
|
||
w52_drop_min = float(cfg.get("w52_drop_min_pct", 0.0))
|
||
|
||
closes = [float(c["close"]) for c in candles]
|
||
highs = [float(c["high"]) for c in candles]
|
||
lows = [float(c["low"]) for c in candles]
|
||
opens = [float(c["open"]) for c in candles]
|
||
datetimes = [str(c["candle_date"]) for c in candles]
|
||
dates = [s[:10] for s in datetimes]
|
||
|
||
rsis = _rsi_series(closes, rsi_period)
|
||
ma_fast_arr = _ma_series(closes, ma_fast_p)
|
||
ma_slow_arr = _ma_series(closes, ma_slow_p)
|
||
# ATR: 샹들리에 엑시트에 사용 (추세장 청산 핵심 지표)
|
||
atr_arr = _atr_series(highs, lows, closes, atr_period_p)
|
||
|
||
# ── 컨텍스트 배열 사전 계산 ──────────────────────────────────────
|
||
ath_arr = []
|
||
year_high_arr = []
|
||
w52_high_arr = []
|
||
w52_low_arr = []
|
||
_year_max: Dict[str, float] = {}
|
||
_ath_run = 0.0
|
||
|
||
from datetime import date as _date, timedelta as _td
|
||
for i in range(len(candles)):
|
||
h = highs[i]; l = lows[i]; y = dates[i][:4]
|
||
_ath_run = max(_ath_run, h)
|
||
_year_max[y] = max(_year_max.get(y, 0.0), h)
|
||
if _is_min:
|
||
try:
|
||
cutoff = str(_date.fromisoformat(dates[i]) - _td(days=365))
|
||
w52_s = i
|
||
while w52_s > 0 and dates[w52_s - 1] >= cutoff:
|
||
w52_s -= 1
|
||
except Exception:
|
||
w52_s = max(0, i - 252 * 7)
|
||
else:
|
||
w52_s = max(0, i - 251)
|
||
ath_arr.append(_ath_run)
|
||
year_high_arr.append(_year_max[y])
|
||
w52_high_arr.append(max(highs[w52_s:i + 1]))
|
||
w52_low_arr.append(min(lows[w52_s:i + 1]))
|
||
|
||
# ── 포지션 상태 ───────────────────────────────────────────────────
|
||
position = None # None or dict
|
||
trades: List[Dict] = []
|
||
equity: List[Dict] = []
|
||
cum_pnl = 0.0
|
||
|
||
start_i = max(rsi_period + 1, ma_slow_p) # MA_SLOW가 준비된 이후부터 진입
|
||
|
||
for i in range(start_i, len(candles) - 1):
|
||
rsi = rsis[i]
|
||
if rsi is None:
|
||
continue
|
||
|
||
next_open = float(opens[i + 1]) if opens[i + 1] > 0 else closes[i]
|
||
if next_open <= 0:
|
||
continue
|
||
|
||
mf = ma_fast_arr[i] # 단기 MA 현재봉
|
||
ms = ma_slow_arr[i] # 장기 MA 현재봉
|
||
mf_prev = ma_fast_arr[i - 1] if i > 0 else mf
|
||
ms_prev = ma_slow_arr[i - 1] if i > 0 else ms
|
||
|
||
# 추세 방향 판단
|
||
is_uptrend = trend_filter and mf is not None and ms is not None and mf > ms
|
||
|
||
# ─── 데드크로스 감지: 추세 반전 신호 ─────────────────────────
|
||
is_deadcross = (
|
||
trend_filter
|
||
and mf is not None and ms is not None
|
||
and mf_prev is not None and ms_prev is not None
|
||
and mf < ms # 현재봉: MA_FAST < MA_SLOW
|
||
and mf_prev >= ms_prev # 직전봉: MA_FAST ≥ MA_SLOW (방금 크로스)
|
||
)
|
||
|
||
# ─── 보유 중: 매도 체크 ───────────────────────────────────────
|
||
if position is not None:
|
||
avg = position["avg"]
|
||
qty = position["qty"]
|
||
c_now = closes[i]
|
||
|
||
# 트레일링 스탑: 보유 중 최고가 업데이트 후 고점 대비 낙폭 체크
|
||
position["peak"] = max(position["peak"], c_now)
|
||
profit_pct = (c_now - avg) / avg if avg > 0 else 0.0
|
||
sell_reason = None
|
||
|
||
if position.get("trend_mode"):
|
||
# ─── 추세장 청산: 고정 익절·RSI과열 무시 → 끝까지 추세 추적 ───
|
||
# 핵심: rsi_sell·tp_pct로 조기 청산하면 200% 추세를 15%에 팔게 됨
|
||
if is_deadcross:
|
||
# MA 데드크로스 → 추세 반전 확정 → 즉시 청산
|
||
sell_reason = f"데드크로스({mf:.0f}<{ms:.0f})"
|
||
else:
|
||
# 샹들리에 엑시트 = 고점 − ATR × 배수 (변동성 기반 동적 방어선)
|
||
# ATR이 클수록 방어선이 낮아져 변동성 큰 종목에 더 여유를 줌
|
||
_atr_v = atr_arr[i] if (atr_arr[i] is not None) else closes[i] * 0.05
|
||
chandelier = position["peak"] - _atr_v * atr_mult
|
||
if trail_stop_p > 0:
|
||
# 퍼센트 방어선도 계산, 더 타이트한(높은) 쪽 채택
|
||
pct_trail = position["peak"] * (1.0 - trail_stop_p / 100.0)
|
||
chandelier = max(chandelier, pct_trail)
|
||
if c_now <= chandelier:
|
||
sell_reason = f"샹들리에({profit_pct*100:+.1f}%)"
|
||
elif profit_pct <= -sl_pct:
|
||
# 손절은 추세장에서도 반드시 유지 (무제한 손실 방지)
|
||
sell_reason = f"손절({profit_pct*100:.1f}%)"
|
||
else:
|
||
# ─── 횡보/역추세장 청산: 철저히 고정 익절·손절비 준수 ──────────
|
||
if profit_pct >= tp_pct:
|
||
sell_reason = f"익절(+{profit_pct*100:.1f}%)"
|
||
elif rsi >= rsi_sell:
|
||
sell_reason = f"RSI과열({rsi:.1f})"
|
||
elif profit_pct <= -sl_pct:
|
||
sell_reason = f"손절({profit_pct*100:.1f}%)"
|
||
|
||
if sell_reason:
|
||
exit_price = next_open
|
||
fee = exit_price * qty * (fee_rate + sell_tax)
|
||
pnl = (exit_price - avg) * qty - fee
|
||
hold = i - position["entry_i"]
|
||
cum_pnl += pnl
|
||
trades.append({
|
||
"buy_date": datetimes[position["entry_i"]][:16],
|
||
"sell_date": datetimes[i + 1][:16],
|
||
"avg_price": round(avg),
|
||
"exit_price": round(exit_price),
|
||
"qty": qty,
|
||
"pnl": round(pnl),
|
||
"hold_days": hold,
|
||
"reason": sell_reason,
|
||
"rsi_sell": round(rsi, 1),
|
||
"ath_drop": position.get("ath_drop", 0),
|
||
"year_drop": position.get("year_drop", 0),
|
||
"w52_drop": position.get("w52_drop", 0),
|
||
})
|
||
equity.append({"date": datetimes[i + 1][:16], "cum_pnl": round(cum_pnl)})
|
||
position = None
|
||
continue # 매도 후 당일 매수 방지
|
||
|
||
# ─── 미보유: 컨텍스트 낙폭 필터 ─────────────────────────────
|
||
price_now = closes[i]
|
||
def _drop(ref: float) -> float:
|
||
return (ref - price_now) / ref * 100 if ref > 0 else 0.0
|
||
|
||
if ath_drop_min > 0 and _drop(ath_arr[i]) < ath_drop_min: continue
|
||
if year_drop_min > 0 and _drop(year_high_arr[i]) < year_drop_min: continue
|
||
if w52_drop_min > 0 and _drop(w52_high_arr[i]) < w52_drop_min: continue
|
||
|
||
# ─── 추세 상승 모드 매수 ─────────────────────────────────────
|
||
if is_uptrend:
|
||
# RSI 눌림목: rsi_buy1(1차), rsi_buy2(2차) 기준으로 진입
|
||
stage = 0
|
||
if rsi <= rsi_buy2: stage = 2
|
||
elif rsi <= rsi_buy1: stage = 1
|
||
if stage == 0:
|
||
continue
|
||
ratio = buy2_ratio if stage == 2 else buy1_ratio
|
||
invest = slot_money * ratio
|
||
qty = max(1, int(invest / next_open))
|
||
cost = next_open * qty * (1 + fee_rate)
|
||
position = {
|
||
"avg": next_open,
|
||
"qty": qty,
|
||
"cost": cost,
|
||
"stage": stage,
|
||
"entry_i": i + 1,
|
||
"peak": next_open, # 트레일링 스탑용 고점
|
||
"trend_mode": True, # 추세 모드 진입 여부 (청산 로직 구분)
|
||
"ath_drop": round(_drop(ath_arr[i]), 1),
|
||
"year_drop": round(_drop(year_high_arr[i]), 1),
|
||
"w52_drop": round(_drop(w52_high_arr[i]), 1),
|
||
}
|
||
|
||
# ─── 횡보/하락 모드 (역추세 분할매수) ───────────────────────
|
||
else:
|
||
stage = 0
|
||
if rsi <= rsi_buy3: stage = 3
|
||
elif rsi <= rsi_buy2: stage = 2
|
||
elif rsi <= rsi_buy1: stage = 1
|
||
if stage == 0:
|
||
continue
|
||
ratios = [buy1_ratio, buy2_ratio, buy3_ratio]
|
||
ratio = ratios[stage - 1]
|
||
if ratio <= 0:
|
||
continue # buy3_ratio=0이면 3단계 진입 안 함
|
||
invest = slot_money * ratio
|
||
qty = max(1, int(invest / next_open))
|
||
cost = next_open * qty * (1 + fee_rate)
|
||
position = {
|
||
"avg": next_open,
|
||
"qty": qty,
|
||
"cost": cost,
|
||
"stage": stage,
|
||
"entry_i": i + 1,
|
||
"peak": next_open,
|
||
"trend_mode": False,
|
||
"ath_drop": round(_drop(ath_arr[i]), 1),
|
||
"year_drop": round(_drop(year_high_arr[i]), 1),
|
||
"w52_drop": round(_drop(w52_high_arr[i]), 1),
|
||
}
|
||
|
||
# ─── 미청산 포지션 마지막 봉 강제 청산 ──────────────────────────
|
||
if position is not None and len(candles) > 0:
|
||
exit_price = closes[-1]
|
||
avg = position["avg"]; qty = position["qty"]
|
||
fee = exit_price * qty * (fee_rate + sell_tax)
|
||
pnl = (exit_price - avg) * qty - fee
|
||
cum_pnl += pnl
|
||
trades.append({
|
||
"buy_date": dates[position["entry_i"]],
|
||
"sell_date": dates[-1],
|
||
"avg_price": round(avg),
|
||
"exit_price": round(exit_price),
|
||
"qty": qty,
|
||
"pnl": round(pnl),
|
||
"hold_days": len(candles) - 1 - position["entry_i"],
|
||
"reason": "기간종료",
|
||
"rsi_sell": None,
|
||
"ath_drop": position.get("ath_drop", 0),
|
||
"year_drop": position.get("year_drop", 0),
|
||
"w52_drop": position.get("w52_drop", 0),
|
||
})
|
||
equity.append({"date": dates[-1], "cum_pnl": round(cum_pnl)})
|
||
|
||
# ─── 요약 통계 ───────────────────────────────────────────────────
|
||
total = len(trades)
|
||
wins = [t for t in trades if t["pnl"] > 0]
|
||
losses = [t for t in trades if t["pnl"] < 0]
|
||
total_pnl = sum(t["pnl"] for t in trades)
|
||
avg_hold = sum(t["hold_days"] for t in trades) / total if total else 0
|
||
|
||
win_pnl = sum(t["pnl"] for t in wins)
|
||
loss_pnl = sum(t["pnl"] for t in losses)
|
||
pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0
|
||
|
||
peak, mdd, cum = 0.0, 0.0, 0.0
|
||
for t in trades:
|
||
cum += t["pnl"]
|
||
peak = max(peak, cum)
|
||
mdd = max(mdd, peak - cum)
|
||
|
||
# 이유별 집계
|
||
reasons: Dict[str, int] = {}
|
||
for t in trades:
|
||
r = t["reason"]
|
||
prefix = r.split("(")[0] # "익절", "RSI과열", "손절", "기간종료"
|
||
reasons[prefix] = reasons.get(prefix, 0) + 1
|
||
|
||
# ── Buy & Hold 벤치마크 ──────────────────────────────────────────────
|
||
# 첫 봉 종가 매수 → 마지막 봉 종가 매도 (수수료·세금 제외 단순 계산)
|
||
bnh_pct = round((closes[-1] - closes[0]) / closes[0] * 100, 2) if closes[0] > 0 else 0.0
|
||
bnh_pnl = round(slot_money * bnh_pct / 100)
|
||
# 봇 수익률: 투자 원금(slot_money) 대비 실현 손익
|
||
bot_pct = round(total_pnl / slot_money * 100, 2) if slot_money > 0 else 0.0
|
||
# 알파 = 봇 수익률 - Buy & Hold 수익률 (양수면 봇이 시장 이김)
|
||
alpha_pct = round(bot_pct - bnh_pct, 2)
|
||
|
||
return {
|
||
"summary": {
|
||
"total_trades": total,
|
||
"win_trades": len(wins),
|
||
"loss_trades": len(losses),
|
||
"win_rate": round(len(wins) / total * 100, 1) if total else 0,
|
||
"total_pnl": round(total_pnl),
|
||
"avg_hold_days": round(avg_hold, 1),
|
||
"profit_factor": round(min(pf, 9999.0), 2),
|
||
"max_drawdown": round(mdd),
|
||
# Buy & Hold 비교
|
||
"bnh_pct": bnh_pct, # 기간 동안 그냥 들고 있었을 때 수익률(%)
|
||
"bnh_pnl": bnh_pnl, # 그냥 들고 있었을 때 손익(원)
|
||
"bot_pct": bot_pct, # 봇 수익률(투자금 대비 %)
|
||
"alpha_pct": alpha_pct, # 초과수익(봇 - Buy&Hold, %p)
|
||
},
|
||
"equity": equity,
|
||
"reasons": reasons,
|
||
"trades": trades[-200:],
|
||
}
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 파라미터 Grid Search (단일 종목)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def run_param_search(
|
||
candles: List[Dict],
|
||
grid: Optional[Dict] = None,
|
||
min_trades: int = 2,
|
||
base_cfg: Optional[Dict] = None, # 사용자가 설정한 현재 파라미터 (없으면 DEFAULT 사용)
|
||
) -> List[Dict]:
|
||
"""
|
||
종목 하나에 대해 파라미터 Grid Search 실행
|
||
반환: 수익 기준 내림차순 정렬된 결과 리스트
|
||
|
||
base_cfg: 탐색 그리드에 없는 파라미터들의 기준값.
|
||
웹 카드에서 사용자가 직접 설정한 값을 전달하면
|
||
rsi_sell, rsi_period, rsi_buy3 등이 그 값으로 고정됨.
|
||
None 이면 DEFAULT_STOCK_CONFIG 사용.
|
||
"""
|
||
from itertools import product as iproduct
|
||
|
||
if grid is None:
|
||
grid = {
|
||
# ── MA 추세 판단 ──────────────────────────────────────────────
|
||
"ma_fast": [15, 20, 30],
|
||
"ma_slow": [40, 60, 120],
|
||
# ── 샹들리에 배수: 추세장 청산 핵심 파라미터 ────────────────────
|
||
# 작을수록 타이트(빨리 청산) / 클수록 여유(오래 홀딩)
|
||
"atr_mult": [2.0, 3.0, 4.0],
|
||
# 퍼센트 보조 방어선 (0=비활성, ATR선이 더 낮을 때만 활성화)
|
||
"trail_stop_pct": [0.0, 8.0],
|
||
# ── 진입 RSI 눌림목 ────────────────────────────────────────────
|
||
"rsi_buy1": [60, 55, 50, 45],
|
||
"rsi_buy2": [50, 45, 40, 35],
|
||
# ── 횡보장 익절/손절 기준 (추세장엔 무시됨) ────────────────────
|
||
"take_profit_pct": [15.0, 30.0],
|
||
"stop_loss_pct": [7.0, 15.0],
|
||
# ── ATH 낙폭 필터 ─────────────────────────────────────────────
|
||
"ath_drop_min_pct": [0.0, 20.0],
|
||
}
|
||
|
||
keys = list(grid.keys())
|
||
combos = list(iproduct(*[grid[k] for k in keys]))
|
||
|
||
results = []
|
||
# base_cfg가 주어지면 사용자 설정값 우선, 없으면 DEFAULT 사용
|
||
_base = dict(DEFAULT_STOCK_CONFIG)
|
||
if base_cfg:
|
||
_base.update(base_cfg)
|
||
|
||
for vals in combos:
|
||
cfg = dict(_base)
|
||
cfg.update(dict(zip(keys, vals)))
|
||
# MA 단기 < 장기 조건 보정
|
||
if cfg["ma_fast"] >= cfg["ma_slow"]:
|
||
continue
|
||
# RSI buy1 > buy2 조건 보정
|
||
if cfg["rsi_buy1"] <= cfg["rsi_buy2"]:
|
||
continue
|
||
res = run_backtest(candles, cfg)
|
||
s = res.get("summary", {})
|
||
if s.get("total_trades", 0) < min_trades:
|
||
continue
|
||
results.append({
|
||
"params": {k: cfg[k] for k in keys},
|
||
"total_pnl": s["total_pnl"],
|
||
"win_rate": s["win_rate"],
|
||
"total_trades": s["total_trades"],
|
||
"pf": s["profit_factor"],
|
||
"avg_hold": s["avg_hold_days"],
|
||
"mdd": s["max_drawdown"],
|
||
})
|
||
|
||
results.sort(key=lambda x: x["total_pnl"], reverse=True)
|
||
return results
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 실매매 봇 (HoldingTrader)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
class HoldingTrader:
|
||
"""
|
||
장기 홀딩 실매매 봇
|
||
- active_trades 에 strategy='HOLDING' 으로 기록 → 다른 봇과 계좌 분리
|
||
- 종목별 파라미터는 holding_stock_config 에서 조회
|
||
- 10분 주기로 RSI 체크 → 3단계 분할매수 / 익절·손절 매도
|
||
- 진입가: 시장가 주문 (장중 호가 추종)
|
||
"""
|
||
|
||
STRATEGY = "HOLDING" # active_trades.strategy 값 — 다른 봇과 충돌 방지
|
||
BOT_NAME = "홀딩봇"
|
||
|
||
def __init__(self):
|
||
self.db = TradeDB()
|
||
ensure_holding_tables(self.db)
|
||
|
||
# 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신)
|
||
try:
|
||
from kis_token_manager import ensure_both_tokens
|
||
ensure_both_tokens()
|
||
except Exception as _te:
|
||
logger.warning(f"토큰 자동갱신 건너뜀: {_te}")
|
||
|
||
cfg_row = self.db.conn.execute(
|
||
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
|
||
).fetchone()
|
||
r = dict(cfg_row) if cfg_row else {}
|
||
|
||
def safe(v): return v if v else ""
|
||
|
||
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
|
||
self.mock = is_mock
|
||
self.app_key = safe(r.get("KIS_APP_KEY_MOCK" if is_mock else "KIS_APP_KEY_REAL"))
|
||
self.app_secret = safe(r.get("KIS_APP_SECRET_MOCK" if is_mock else "KIS_APP_SECRET_REAL"))
|
||
self.base_url = (
|
||
"https://openapivts.koreainvestment.com:29443"
|
||
if is_mock else
|
||
"https://openapi.koreainvestment.com:9443"
|
||
)
|
||
self.account_no = safe(r.get("KIS_ACCOUNT_NO_MOCK" if is_mock else "KIS_ACCOUNT_NO_REAL"))
|
||
self.account_code = safe(r.get("KIS_ACCOUNT_CODE_MOCK" if is_mock else "KIS_ACCOUNT_CODE_REAL")) or "01"
|
||
|
||
# 알림 설정
|
||
self.mm_server = safe(r.get("MM_SERVER_URL", "")) or "https://mattermost.hoonfam.org"
|
||
self.mm_token = safe(r.get("MM_BOT_TOKEN_", ""))
|
||
self.mm_channel = safe(r.get("KIS_LONG_MM_CHANNEL", "")) or "stock"
|
||
self.tg_token = safe(r.get("MM_BOT_TOKEN_", "")) # TG 토큰 있으면 별도로
|
||
self.tg_chat = safe(r.get("KIS_LONG_MM_CHANNEL", ""))
|
||
|
||
self.watchlist = load_watchlist()
|
||
logger.info(f"✅ {self.BOT_NAME} 초기화 | 모의={is_mock} | 종목={len(self.watchlist)}개")
|
||
|
||
# ──────────────────── 알림 ────────────────────
|
||
def _notify(self, title: str, body: str):
|
||
msg = f"**[{self.BOT_NAME}]** {title}\n{body}"
|
||
if self.mm_token and self.mm_server:
|
||
try:
|
||
requests.post(
|
||
f"{self.mm_server}/hooks/{self.mm_token}",
|
||
json={"text": msg}, timeout=5
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"알림 전송 실패: {e}")
|
||
|
||
# ──────────────────── KIS REST ────────────────────
|
||
def _token(self) -> str:
|
||
return _ensure_token(self.mock)
|
||
|
||
def _get(self, path: str, params: dict) -> dict:
|
||
"""GET 요청 (429 재시도)"""
|
||
url = f"{self.base_url}{path}"
|
||
token = self._token()
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"appsecret": self.app_secret,
|
||
}
|
||
for attempt in range(3):
|
||
try:
|
||
r = requests.get(url, headers=headers, params=params, timeout=10)
|
||
if r.status_code == 429:
|
||
time.sleep((attempt + 1) * 2)
|
||
continue
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception as e:
|
||
if attempt == 2:
|
||
raise
|
||
time.sleep(2)
|
||
raise RuntimeError("GET 요청 실패")
|
||
|
||
def _post(self, path: str, tr_id: str, data: dict) -> dict:
|
||
"""POST 요청 (주문용, 429 재시도)"""
|
||
url = f"{self.base_url}{path}"
|
||
token = self._token()
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {token}",
|
||
"appkey": self.app_key,
|
||
"appsecret": self.app_secret,
|
||
"tr_id": tr_id,
|
||
}
|
||
for attempt in range(3):
|
||
try:
|
||
r = requests.post(url, headers=headers, json=data, timeout=10)
|
||
if r.status_code == 429:
|
||
time.sleep((attempt + 1) * 2)
|
||
continue
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception as e:
|
||
if attempt == 2:
|
||
raise
|
||
time.sleep(2)
|
||
raise RuntimeError("POST 요청 실패")
|
||
|
||
def get_price(self, code: str) -> Optional[Dict]:
|
||
"""
|
||
현재가 + 당일 고/저, 52주 고/저, 등락률 조회 (FHKST01010100)
|
||
반환 dict 키:
|
||
price - 현재가
|
||
day_high - 당일 고가
|
||
day_low - 당일 저가
|
||
w52_high - 52주 최고가
|
||
w52_low - 52주 최저가
|
||
change_rate - 전일대비 등락률(%)
|
||
volume - 누적 거래량
|
||
"""
|
||
try:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"authorization": f"Bearer {self._token()}",
|
||
"appkey": self.app_key,
|
||
"appsecret": self.app_secret,
|
||
"tr_id": "FHKST01010100",
|
||
}
|
||
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}
|
||
r = requests.get(
|
||
f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price",
|
||
headers=headers, params=params, timeout=10
|
||
)
|
||
out = r.json().get("output", {})
|
||
price = float(out.get("stck_prpr", 0) or 0)
|
||
if price <= 0:
|
||
return None
|
||
return {
|
||
"price": price,
|
||
"day_high": float(out.get("stck_hgpr", 0) or 0), # 당일 고가
|
||
"day_low": float(out.get("stck_lwpr", 0) or 0), # 당일 저가
|
||
"w52_high": float(out.get("w52_hgpr", 0) or 0), # 52주 최고가
|
||
"w52_low": float(out.get("w52_lwpr", 0) or 0), # 52주 최저가
|
||
"change_rate": float(out.get("prdy_ctrt", 0) or 0), # 등락률(%)
|
||
"volume": int(out.get("acml_vol", 0) or 0), # 누적 거래량
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"현재가 조회 실패 ({code}): {e}")
|
||
return None
|
||
|
||
def _order(self, code: str, qty: int, is_buy: bool) -> bool:
|
||
"""시장가 주문 (매수/매도)"""
|
||
if qty <= 0:
|
||
return False
|
||
# 모의: VTTC0802U(매수) / VTTC0801U(매도) 실전: TTTC0802U / TTTC0801U
|
||
if is_buy:
|
||
tr_id = "VTTC0802U" if self.mock else "TTTC0802U"
|
||
else:
|
||
tr_id = "VTTC0801U" if self.mock else "TTTC0801U"
|
||
try:
|
||
data = {
|
||
"CANO": self.account_no,
|
||
"ACNT_PRDT_CD": self.account_code,
|
||
"PDNO": code,
|
||
"ORD_DVSN": "01", # 시장가
|
||
"ORD_QTY": str(qty),
|
||
"ORD_UNPR": "0",
|
||
}
|
||
result = self._post(
|
||
"/uapi/domestic-stock/v1/trading/order-cash", tr_id, data
|
||
)
|
||
ok = result.get("rt_cd") == "0"
|
||
if not ok:
|
||
logger.error(f"주문 실패 ({code} {'매수' if is_buy else '매도'}): {result.get('msg1','')}")
|
||
return ok
|
||
except Exception as e:
|
||
logger.error(f"주문 예외 ({code}): {e}")
|
||
return False
|
||
|
||
# ──────────────────── DB 포지션 ────────────────────
|
||
def _get_position(self, code: str) -> Optional[dict]:
|
||
"""HOLDING 전략 포지션 조회"""
|
||
row = self.db.conn.execute(
|
||
"SELECT * FROM active_trades WHERE code=%s AND strategy=%s",
|
||
[code, self.STRATEGY]
|
||
).fetchone()
|
||
return dict(row) if row else None
|
||
|
||
def _upsert_position(self, code: str, name: str, avg_price: float,
|
||
qty: int, invested: float, stage: int,
|
||
is_uptrend: bool = False):
|
||
"""
|
||
포지션 저장/갱신 (strategy='HOLDING').
|
||
status = T{stage}(추세장 진입) or S{stage}(횡보장 진입)
|
||
→ 봇 재시작 후에도 청산 모드(T/S)를 구분하여 샹들리에/고정익절 적용
|
||
"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
mode_prefix = "T" if is_uptrend else "S" # T=Trend / S=Sideways
|
||
self.db.conn.execute(
|
||
"""INSERT INTO active_trades
|
||
(code, name, strategy, avg_buy_price, current_qty, target_qty,
|
||
total_invested, status, buy_date, updated_at)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||
ON DUPLICATE KEY UPDATE
|
||
avg_buy_price=VALUES(avg_buy_price),
|
||
current_qty=VALUES(current_qty),
|
||
target_qty=VALUES(target_qty),
|
||
total_invested=VALUES(total_invested),
|
||
status=VALUES(status),
|
||
updated_at=VALUES(updated_at)""",
|
||
[code, name, self.STRATEGY, round(avg_price, 2), qty, qty,
|
||
round(invested, 2), f"{mode_prefix}{stage}", now, now]
|
||
)
|
||
self.db.conn.commit()
|
||
|
||
def _delete_position(self, code: str):
|
||
"""포지션 삭제 (청산 후)"""
|
||
self.db.conn.execute(
|
||
"DELETE FROM active_trades WHERE code=%s AND strategy=%s",
|
||
[code, self.STRATEGY]
|
||
)
|
||
self.db.conn.commit()
|
||
|
||
# ──────────────────── RSI 계산 ────────────────────
|
||
def _get_rsi(self, code: str, cfg: dict, current_price: float) -> Optional[float]:
|
||
"""
|
||
저장된 일봉 + 당일 현재가(가중치 없이 단순 추가)로 RSI 계산.
|
||
장중에도 '오늘의 종가가 이 가격이라면' 의 RSI를 반환.
|
||
"""
|
||
period = int(cfg.get("rsi_period", 14))
|
||
candles = get_stored_candles(self.db, code)
|
||
if len(candles) < period + 2:
|
||
return None
|
||
closes = [float(c["close"]) for c in candles]
|
||
# 오늘 데이터가 DB에 없으면 현재가로 추가
|
||
today_str = datetime.now().strftime("%Y-%m-%d")
|
||
if str(candles[-1]["candle_date"])[:10] != today_str:
|
||
closes.append(float(current_price))
|
||
rsis = _rsi_series(closes, period)
|
||
for r in reversed(rsis):
|
||
if r is not None:
|
||
return r
|
||
return None
|
||
|
||
# ──────────────────── 매수 로직 ────────────────────
|
||
def _try_buy(self, code: str, name: str, current_price: float,
|
||
rsi: float, cfg: dict, ctx: Optional[Dict] = None,
|
||
is_uptrend: bool = False):
|
||
"""
|
||
분할매수 체크 및 실행.
|
||
is_uptrend: 추세 상승장 여부 → status에 T/S 기록해 청산 모드 구분
|
||
ctx: _calc_context_signals() 반환값 (ATH·연도고점·52주 낙폭 필터)
|
||
"""
|
||
rsi_buy1 = float(cfg.get("rsi_buy1", 45))
|
||
rsi_buy2 = float(cfg.get("rsi_buy2", 40))
|
||
rsi_buy3 = float(cfg.get("rsi_buy3", 35))
|
||
b1 = float(cfg.get("buy1_ratio", 30)) / 100
|
||
b2 = float(cfg.get("buy2_ratio", 30)) / 100
|
||
b3 = float(cfg.get("buy3_ratio", 40)) / 100
|
||
slot = float(cfg.get("slot_money", 3_000_000))
|
||
ath_min = float(cfg.get("ath_drop_min_pct", 0.0))
|
||
year_min = float(cfg.get("year_drop_min_pct", 0.0))
|
||
w52_min = float(cfg.get("w52_drop_min_pct", 0.0))
|
||
|
||
pos = self._get_position(code)
|
||
cur_status = (pos["status"] or "S0") if pos else "S0"
|
||
cur_stage = int(cur_status[1]) if len(cur_status) > 1 and cur_status[1].isdigit() else 0
|
||
|
||
# ─── 매수 단계 및 비중 결정 ────────────────────────────────────────
|
||
target_stage, ratio = 0, 0.0
|
||
if cur_stage == 0 and rsi <= rsi_buy1:
|
||
target_stage, ratio = 1, b1
|
||
elif cur_stage == 1 and rsi <= rsi_buy2:
|
||
target_stage, ratio = 2, b2
|
||
elif cur_stage == 2 and rsi <= rsi_buy3:
|
||
target_stage, ratio = 3, b3
|
||
if target_stage == 0:
|
||
return # RSI 조건 미충족
|
||
|
||
# ─── 컨텍스트 낙폭 필터 ────────────────────────────────────────────
|
||
if ctx:
|
||
if ath_min > 0 and ctx["ath_drop_pct"] < ath_min:
|
||
logger.info(f"[{code}] ATH 낙폭 부족 ({ctx['ath_drop_pct']:.1f}% < {ath_min}%) → 매수 보류")
|
||
return
|
||
if year_min > 0 and ctx["year_drop_pct"] < year_min:
|
||
logger.info(f"[{code}] 연도 낙폭 부족 ({ctx['year_drop_pct']:.1f}% < {year_min}%) → 매수 보류")
|
||
return
|
||
if w52_min > 0 and ctx["w52_drop_pct"] < w52_min:
|
||
logger.info(f"[{code}] 52주 낙폭 부족 ({ctx['w52_drop_pct']:.1f}% < {w52_min}%) → 매수 보류")
|
||
return
|
||
|
||
invest = slot * ratio
|
||
qty = max(1, int(invest / current_price))
|
||
cost = current_price * qty
|
||
|
||
mode_str = "📈추세" if is_uptrend else "📉역추세"
|
||
logger.info(f"[{code}] {mode_str} 매수 RSI={rsi:.1f} → {target_stage}차 {qty}주 @ {current_price:,.0f}원")
|
||
|
||
if not self._order(code, qty, is_buy=True):
|
||
return
|
||
|
||
# 평단가 갱신
|
||
if pos:
|
||
prev_cost = float(pos["avg_buy_price"]) * int(pos["current_qty"])
|
||
prev_qty = int(pos["current_qty"])
|
||
new_qty = prev_qty + qty
|
||
new_avg = (prev_cost + cost) / new_qty
|
||
new_inv = float(pos["total_invested"]) + cost
|
||
else:
|
||
new_qty = qty
|
||
new_avg = current_price
|
||
new_inv = cost
|
||
|
||
self._upsert_position(code, name, new_avg, new_qty, new_inv, target_stage, is_uptrend)
|
||
|
||
# 컨텍스트 낙폭 정보 알림에 포함
|
||
ctx_txt = ""
|
||
if ctx:
|
||
ctx_txt = (
|
||
f"\n▪ ATH 낙폭: {ctx['ath_drop_pct']:.1f}%"
|
||
f" | 연도: {ctx['year_drop_pct']:.1f}%"
|
||
f" | 52주: {ctx['w52_drop_pct']:.1f}%"
|
||
)
|
||
pnl_txt = f"평단가: {new_avg:,.0f}원 | 수량: {new_qty}주"
|
||
self._notify(
|
||
f"📥 {name}({code}) {target_stage}차 매수",
|
||
f"▪ 체결가: {current_price:,.0f}원\n"
|
||
f"▪ 수량: {qty}주 ({cost:,.0f}원)\n"
|
||
f"▪ {pnl_txt}\n"
|
||
f"▪ RSI: {rsi:.1f} (기준: {[rsi_buy1,rsi_buy2,rsi_buy3][target_stage-1]})"
|
||
f"{ctx_txt}"
|
||
)
|
||
|
||
# ──────────────────── 매도 로직 ────────────────────
|
||
def _try_sell(self, code: str, name: str, current_price: float,
|
||
rsi: float, cfg: dict,
|
||
is_deadcross: bool = False,
|
||
peak_price: float = 0.0,
|
||
atr_val: float = 0.0):
|
||
"""
|
||
익절/손절 체크 및 실행.
|
||
|
||
[추세장 진입(status=T*)]
|
||
→ 샹들리에 엑시트(peak − ATR × mult) + 데드크로스만 청산
|
||
→ 고정 익절·RSI과열 무시: 추세 끝까지 추적
|
||
[횡보장 진입(status=S*)]
|
||
→ 기존 방식: 고정 익절% / RSI과열 / 손절%
|
||
"""
|
||
pos = self._get_position(code)
|
||
if not pos:
|
||
return
|
||
|
||
avg = float(pos["avg_buy_price"])
|
||
qty = int(pos["current_qty"])
|
||
pnl_r = (current_price - avg) / avg if avg > 0 else 0.0
|
||
cur_status = (pos.get("status") or "S1")
|
||
is_trend_mode = cur_status.startswith("T") # T=추세장, S=횡보장
|
||
|
||
tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100
|
||
sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100
|
||
rsi_sell_thr = float(cfg.get("rsi_sell", 75.0))
|
||
trail_stop_p = float(cfg.get("trail_stop_pct", 8.0))
|
||
atr_mult = float(cfg.get("atr_mult", 3.0))
|
||
|
||
sell_reason = None
|
||
|
||
if is_trend_mode:
|
||
# ─── 추세장: 고정 익절·RSI과열 무시 → 추세 끝까지 추적 ──────────
|
||
if is_deadcross:
|
||
sell_reason = f"데드크로스({pnl_r*100:+.1f}%)"
|
||
elif peak_price > 0 and atr_val > 0:
|
||
chandelier = peak_price - atr_val * atr_mult
|
||
if trail_stop_p > 0:
|
||
pct_trail = peak_price * (1.0 - trail_stop_p / 100.0)
|
||
chandelier = max(chandelier, pct_trail)
|
||
if current_price <= chandelier:
|
||
sell_reason = f"샹들리에({pnl_r*100:+.1f}%)"
|
||
# 손절은 추세장에서도 유지 (치명적 손실 방지)
|
||
if sell_reason is None and pnl_r <= -sl_pct:
|
||
sell_reason = f"손절({pnl_r*100:.1f}%)"
|
||
else:
|
||
# ─── 횡보장: 기존 방식 철저히 준수 ──────────────────────────────
|
||
if pnl_r >= tp_pct:
|
||
sell_reason = f"익절(+{pnl_r*100:.1f}%)"
|
||
elif rsi and rsi >= rsi_sell_thr:
|
||
sell_reason = f"RSI과열({rsi:.1f})"
|
||
elif pnl_r <= -sl_pct:
|
||
sell_reason = f"손절({pnl_r*100:.1f}%)"
|
||
|
||
if not sell_reason:
|
||
return
|
||
|
||
logger.info(f"[{code}] 매도 신호: {sell_reason} | {qty}주 @ {current_price:,.0f}원 "
|
||
f"(peak:{peak_price:,.0f} atr:{atr_val:,.0f})")
|
||
|
||
if not self._order(code, qty, is_buy=False):
|
||
return
|
||
|
||
fee = current_price * qty * (0.015 / 100 + 0.18 / 100)
|
||
pnl = (current_price - avg) * qty - fee
|
||
self._delete_position(code)
|
||
|
||
self._notify(
|
||
f"📤 {name}({code}) 매도 | {sell_reason}",
|
||
f"▪ 체결가: {current_price:,.0f}원\n"
|
||
f"▪ 수량: {qty}주\n"
|
||
f"▪ 평단가: {avg:,.0f}원\n"
|
||
f"▪ 최고가: {peak_price:,.0f}원 | ATR: {atr_val:,.0f}\n"
|
||
f"▪ 손익: {pnl:+,.0f}원\n"
|
||
f"▪ 수익률: {pnl_r*100:+.2f}%"
|
||
)
|
||
|
||
# ──────────────────── 메인 루프 ────────────────────
|
||
def _is_market_hour(self) -> bool:
|
||
"""장 시간 체크 (09:00~15:30 평일)"""
|
||
now = datetime.now()
|
||
if now.weekday() >= 5: # 토·일
|
||
return False
|
||
t = now.hour * 100 + now.minute
|
||
return 900 <= t <= 1530
|
||
|
||
def _fetch_today_candles(self):
|
||
"""
|
||
장 마감 직후 오늘 일봉을 자동 수집하여 DB 갱신.
|
||
KIS API는 당일 종가가 확정되는 15:30 이후 조회 가능.
|
||
"""
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
app_key, app_secret, base_url, mock = _get_kis_token(self.db)
|
||
logger.info(f"📥 장 마감 후 자동 캔들 수집 ({today})")
|
||
total_saved = 0
|
||
for item in self.watchlist:
|
||
code, name = item["code"], item["name"]
|
||
try:
|
||
rows = fetch_daily_ohlcv(code, today, today,
|
||
app_key, app_secret, base_url, mock=mock)
|
||
saved = store_candles(self.db, code, rows)
|
||
total_saved += saved
|
||
logger.info(f" [{code}] {name}: {saved}봉 저장")
|
||
time.sleep(0.3)
|
||
except Exception as e:
|
||
logger.error(f" [{code}] 캔들 자동 수집 오류: {e}")
|
||
logger.info(f"✅ 자동 캔들 수집 완료 (총 {total_saved}봉 저장)")
|
||
self._notify(
|
||
"📊 일봉 자동 수집 완료",
|
||
f"{today} 종가 기준 {len(self.watchlist)}종목 수집 ({total_saved}봉 저장)"
|
||
)
|
||
|
||
def run(self):
|
||
logger.info(f"\n{'='*50}")
|
||
logger.info(f" {self.BOT_NAME} 시작 | 모의={self.mock}")
|
||
logger.info(f" 종목: {[i['name'] for i in self.watchlist]}")
|
||
logger.info(f"{'='*50}\n")
|
||
self._notify("🚀 봇 시작", f"종목 {len(self.watchlist)}개 모니터링 시작")
|
||
|
||
_candles_fetched_date = "" # 오늘 자동수집 완료 날짜 (중복 방지)
|
||
|
||
while True:
|
||
try:
|
||
now = datetime.now()
|
||
t = now.hour * 100 + now.minute
|
||
|
||
# ── 장 마감 후 자동 일봉 수집 (15:35~15:50, 하루 1회) ──
|
||
today_str = now.strftime("%Y-%m-%d")
|
||
if (now.weekday() < 5
|
||
and 1535 <= t <= 1550
|
||
and _candles_fetched_date != today_str):
|
||
self._fetch_today_candles()
|
||
_candles_fetched_date = today_str
|
||
|
||
if not self._is_market_hour():
|
||
logger.info("⏳ 장 외 시간 대기...")
|
||
time.sleep(60)
|
||
continue
|
||
|
||
for item in self.watchlist:
|
||
code = item["code"]
|
||
name = item["name"]
|
||
try:
|
||
cfg = get_stock_config(self.db, code)
|
||
pi = self.get_price(code) # price info dict
|
||
if not pi:
|
||
continue
|
||
price = pi["price"]
|
||
|
||
# DB 일봉 기반 컨텍스트 신호 (ATH·연도고점·52주 고저)
|
||
stored = get_stored_candles(self.db, code)
|
||
ctx = _calc_context_signals(stored, price)
|
||
|
||
rsi = self._get_rsi(code, cfg, price)
|
||
pos = self._get_position(code)
|
||
|
||
# ── 실시간 MA / ATR / Peak 계산 ──────────────────────
|
||
is_uptrend = False
|
||
is_deadcross = False
|
||
peak_price = price
|
||
atr_val = price * 0.05 # fallback: 현재가의 5%
|
||
|
||
if len(stored) >= 30:
|
||
_cl = [float(c["close"]) for c in stored]
|
||
_hi = [float(c["high"]) for c in stored]
|
||
_lo = [float(c["low"]) for c in stored]
|
||
# 오늘 실시간 봉 추가 (당일 고·저·종가로 지표 갱신)
|
||
_cl.append(price); _hi.append(pi["day_high"])
|
||
_lo.append(pi["day_low"])
|
||
|
||
maf_p = int(cfg.get("ma_fast", 20))
|
||
mas_p = int(cfg.get("ma_slow", 60))
|
||
atr_p = int(cfg.get("atr_period", 14))
|
||
tf = float(cfg.get("trend_filter", 1.0)) >= 0.5
|
||
|
||
maf_a = _ma_series(_cl, maf_p)
|
||
mas_a = _ma_series(_cl, mas_p)
|
||
atr_a = _atr_series(_hi, _lo, _cl, atr_p)
|
||
|
||
mf = maf_a[-1]; ms = mas_a[-1]
|
||
mfp = maf_a[-2] if len(maf_a) > 1 else mf
|
||
msp = mas_a[-2] if len(mas_a) > 1 else ms
|
||
|
||
if atr_a[-1] is not None:
|
||
atr_val = atr_a[-1]
|
||
if tf and mf and ms:
|
||
is_uptrend = mf > ms
|
||
is_deadcross = (mf < ms and mfp is not None
|
||
and msp is not None and mfp >= msp)
|
||
|
||
# ── 보유 시 진입일 이후 최고가(Peak) 추적 ────────────
|
||
if pos:
|
||
buy_d = str(pos.get("buy_date", ""))[:10]
|
||
if buy_d:
|
||
post = [c for c in stored if str(c["candle_date"])[:10] >= buy_d]
|
||
highs = [float(c["high"]) for c in post] + [pi["day_high"], price]
|
||
peak_price = max(highs) if highs else price
|
||
|
||
logger.info(
|
||
f"[{code}] {name} | "
|
||
f"현재가:{price:,.0f}({pi['change_rate']:+.2f}%) | "
|
||
f"ATH낙폭:{ctx['ath_drop_pct']:.1f}% "
|
||
f"연도:{ctx['year_drop_pct']:.1f}% | "
|
||
f"RSI:{f'{rsi:.1f}' if rsi else 'N/A'} | "
|
||
f"추세:{'상승' if is_uptrend else '하락/횡보'} | "
|
||
f"ATR:{atr_val:,.0f} | "
|
||
f"포지션:{'있음(' + pos['status'] + ')' if pos else '없음'}"
|
||
)
|
||
|
||
if pos:
|
||
self._try_sell(code, name, price, rsi, cfg,
|
||
is_deadcross=is_deadcross,
|
||
peak_price=peak_price,
|
||
atr_val=atr_val)
|
||
else:
|
||
self._try_buy(code, name, price, rsi, cfg, ctx,
|
||
is_uptrend=is_uptrend)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{code}] 처리 중 오류: {e}")
|
||
time.sleep(3)
|
||
|
||
sleep_sec = random.uniform(540, 660) # 9~11분 랜덤 대기
|
||
logger.info(f"💤 {sleep_sec/60:.1f}분 후 다음 체크...")
|
||
time.sleep(sleep_sec)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 사용자 중단")
|
||
self._notify("🛑 봇 중단", "사용자가 종료했습니다.")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"메인 루프 오류: {e}")
|
||
time.sleep(60)
|
||
|
||
self.db.close()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# CLI: 캔들 수집 / 백테스트 실행
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="홀딩 전략 봇")
|
||
sub = parser.add_subparsers(dest="cmd")
|
||
|
||
p_fetch = sub.add_parser("fetch", help="일봉 캔들 수집")
|
||
p_fetch.add_argument("--start", default="2023-01-01", help="시작일 YYYY-MM-DD")
|
||
p_fetch.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d"))
|
||
|
||
p_bt = sub.add_parser("backtest", help="백테스트 실행")
|
||
p_bt.add_argument("--code", default="", help="종목코드 (공백=전체)")
|
||
p_bt.add_argument("--start", default="2023-01-01")
|
||
p_bt.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d"))
|
||
|
||
sub.add_parser("live", help="실매매 봇 시작")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.cmd == "live":
|
||
# 실매매 봇 시작 — DB는 HoldingTrader 내부에서 관리
|
||
trader = HoldingTrader()
|
||
trader.run()
|
||
sys.exit(0)
|
||
|
||
db = TradeDB()
|
||
ensure_holding_tables(db)
|
||
|
||
if args.cmd == "fetch":
|
||
items = load_watchlist()
|
||
app_key, app_secret, base_url, mock = _get_kis_token(db)
|
||
for item in items:
|
||
code, name = item["code"], item["name"]
|
||
logger.info(f" [{code}] {name} 일봉 수집 ({args.start} ~ {args.end})")
|
||
rows = fetch_daily_ohlcv(code, args.start, args.end, app_key, app_secret, base_url, mock=mock)
|
||
saved = store_candles(db, code, rows)
|
||
logger.info(f" → {saved}봉 저장 완료")
|
||
time.sleep(0.5)
|
||
|
||
elif args.cmd == "backtest":
|
||
items = load_watchlist()
|
||
if args.code:
|
||
items = [i for i in items if i["code"] == args.code]
|
||
for item in items:
|
||
code = item["code"]
|
||
candles = get_stored_candles(db, code, args.start, args.end)
|
||
cfg = get_stock_config(db, code)
|
||
result = run_backtest(candles, cfg)
|
||
s = result.get("summary", {})
|
||
print(f"\n[{code}] {item['name']}")
|
||
print(f" 거래:{s.get('total_trades')} | 승률:{s.get('win_rate')}% | "
|
||
f"손익:{s.get('total_pnl'):+,}원 | PF:{s.get('profit_factor')} | "
|
||
f"MDD:{s.get('max_drawdown'):,}원")
|
||
|
||
else:
|
||
parser.print_help()
|
||
|
||
db.close()
|