Files
kis_bot/holding_bot.py
2026-03-17 12:33:30 +09:00

2187 lines
98 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
holding_bot.py — 장기 홀딩 전략 봇 (추세추종 + 눌림목 분할매수)
=======================================================================
전략 개요:
[추세 상승: MA20 > MA60]
매수: RSI 눌림목(rsi_buy1/2) 시 분할 매수 — 기준 완화(추세장 특성)
매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락) OR RSI 과열(rsi_sell)
OR 데드크로스(MA20 < MA60) 발생 시 추세 반전 청산
[추세 하락/횡보: MA20 ≤ MA60] (trend_filter=0 이면 전 구간 적용)
매수: 기존 RSI 3단계 역추세 분할매수 (rsi_buy1/2/3)
매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct)
봉: 일봉(D) 기준, KIS API로 최대 100봉씩 페이지네이션 수집
DB 테이블:
holding_candles — 종목별 일봉 OHLCV (별도 관리)
holding_stock_config — 종목별 파라미터 (code별 최신 INSERT 방식)
실행:
python3 holding_bot.py
"""
import os, sys, time, json, logging, random
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional, Tuple
import requests
ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(ROOT))
from database import TradeDB
logging.basicConfig(
format="[%(asctime)s] %(message)s",
datefmt="%H:%M:%S",
level=logging.INFO,
)
logger = logging.getLogger("HoldingBot")
logging.getLogger("TradeDB").setLevel(logging.WARNING)
# 관심종목 파일
WATCHLIST_PATH = ROOT / "long_term_watchlist.json"
# ─────────────────────────────────────────────────────────────────────────────
# 기본 파라미터 (종목별 DB에 없으면 이 값 사용)
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_STOCK_CONFIG = {
"rsi_period": 14, # RSI 계산 기간
"rsi_buy1": 55.0, # 1차 매수 RSI ≤ (추세장: 50~55 / 횡보장: 45 이하)
"rsi_buy2": 45.0, # 2차 매수 RSI ≤
"rsi_buy3": 35.0, # 3차 매수 RSI ≤ (횡보/하락장 전용)
"rsi_sell": 75.0, # RSI 과열 매도 기준 (추세장에선 80 수준으로 올려 조기매도 방지)
"take_profit_pct": 15.0, # 수익률 익절 기준 (%) — 추세장에선 크게 잡아야 탈 안 남
"stop_loss_pct": 10.0, # 손절 기준 (%) — 양수로 저장, 음수 방향 적용
"buy1_ratio": 50.0, # 1차 투자 비중 (%) — 추세장엔 처음부터 크게
"buy2_ratio": 50.0, # 2차 투자 비중 (%)
"buy3_ratio": 0.0, # 3차 투자 비중 (%) — 추세장에선 0 (자금 보류 안 함)
"slot_money": 3_000_000, # 종목당 총 투자금 (원)
"atr_period": 14, # ATR 계산 기간 (True Range의 지수이동평균)
# ── 샹들리에 엑시트 (Chandelier Exit) ─────────────────────────────
# 추세장 청산에서 사용: 고점 ATR × atr_mult → 변동성 기반 동적 방어선
# mult=2: 타이트(빠른 청산) / mult=3: 기본 / mult=4: 여유롭게(더 오래 홀딩)
"atr_mult": 3.0, # 샹들리에 엑시트 배수 (ATR_MULT)
# ── 추세추종 신규 파라미터 ──────────────────────────────────────
# MA 골든/데드크로스 기반 추세 판단
"ma_fast": 20.0, # 단기 이동평균 기간 (일봉 기준 약 1달)
"ma_slow": 60.0, # 장기 이동평균 기간 (일봉 기준 약 3달)
# 트레일링 스탑: 포지션 보유 중 고점 대비 이 비율 이상 하락 시 청산 (0=비활성)
# 예) 8.0 → 고점 대비 -8% 하락하면 추세 이탈로 판단해 청산
"trail_stop_pct": 8.0,
# 추세 필터 사용 여부 (1=사용, 0=미사용 → 기존 역추세 방식으로 고정)
"trend_filter": 1.0,
# ── 컨텍스트 낙폭 필터 (0=비활성화) ──────────────────────────
"ath_drop_min_pct": 0.0,
"year_drop_min_pct": 0.0,
"w52_drop_min_pct": 0.0,
}
# ─────────────────────────────────────────────────────────────────────────────
# 관심종목 로드
# ─────────────────────────────────────────────────────────────────────────────
def load_watchlist() -> List[Dict]:
try:
with open(WATCHLIST_PATH, encoding="utf-8") as f:
data = json.load(f)
return data.get("items", [])
except Exception as e:
logger.error(f"관심종목 파일 로드 실패: {e}")
return []
# ─────────────────────────────────────────────────────────────────────────────
# DB 테이블 생성 (holding 전용)
# ─────────────────────────────────────────────────────────────────────────────
HOLDING_DDL = [
"""
CREATE TABLE IF NOT EXISTS holding_min_candles (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR(10) NOT NULL,
candle_dt DATETIME NOT NULL COMMENT '60분봉 기준 시작 시간(KST)',
tf SMALLINT NOT NULL DEFAULT 60 COMMENT '분봉 단위 (60=60분봉)',
open FLOAT NOT NULL DEFAULT 0,
high FLOAT NOT NULL DEFAULT 0,
low FLOAT NOT NULL DEFAULT 0,
close FLOAT NOT NULL DEFAULT 0,
volume BIGINT NOT NULL DEFAULT 0,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uq_holding_min (code, tf, candle_dt)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
"""
CREATE TABLE IF NOT EXISTS holding_candles (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR(10) NOT NULL,
candle_date DATE NOT NULL,
open FLOAT NOT NULL DEFAULT 0,
high FLOAT NOT NULL DEFAULT 0,
low FLOAT NOT NULL DEFAULT 0,
close FLOAT NOT NULL DEFAULT 0,
volume BIGINT NOT NULL DEFAULT 0,
change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)',
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uq_holding_candle (code, candle_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
"""
CREATE TABLE IF NOT EXISTS holding_stock_config (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR(10) NOT NULL,
name VARCHAR(50) DEFAULT '',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
rsi_period FLOAT DEFAULT 14,
rsi_buy1 FLOAT DEFAULT 45,
rsi_buy2 FLOAT DEFAULT 40,
rsi_buy3 FLOAT DEFAULT 35,
rsi_sell FLOAT DEFAULT 70,
take_profit_pct FLOAT DEFAULT 5.0,
stop_loss_pct FLOAT DEFAULT 10.0,
buy1_ratio FLOAT DEFAULT 30,
buy2_ratio FLOAT DEFAULT 30,
buy3_ratio FLOAT DEFAULT 40,
slot_money FLOAT DEFAULT 3000000,
atr_period FLOAT DEFAULT 14,
ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)',
year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)',
w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)',
ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간',
ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간',
trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)',
trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)',
KEY idx_holding_config_code (code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
]
def ensure_holding_tables(db: TradeDB):
"""holding 전용 테이블 없으면 생성, 기존 테이블에 신규 컬럼 추가(마이그레이션)"""
for ddl in HOLDING_DDL:
db.conn.execute(ddl.strip())
db.conn.commit()
# ── 기존 테이블에 신규 컬럼 추가 (이미 있으면 무시) ─────────────
migrations = [
# holding_candles: change_rate 추가
"ALTER TABLE holding_candles ADD COLUMN change_rate FLOAT NOT NULL DEFAULT 0 COMMENT '전일대비등락률(%%)'",
# holding_stock_config: 낙폭 필터 3종 추가
"ALTER TABLE holding_stock_config ADD COLUMN ath_drop_min_pct FLOAT DEFAULT 0 COMMENT 'ATH 대비 최소 낙폭%% (0=비활성)'",
"ALTER TABLE holding_stock_config ADD COLUMN year_drop_min_pct FLOAT DEFAULT 0 COMMENT '연도고점 대비 최소 낙폭%% (0=비활성)'",
"ALTER TABLE holding_stock_config ADD COLUMN w52_drop_min_pct FLOAT DEFAULT 0 COMMENT '52주고점 대비 최소 낙폭%% (0=비활성)'",
# 추세추종 신규 파라미터
"ALTER TABLE holding_stock_config ADD COLUMN ma_fast FLOAT DEFAULT 20 COMMENT '단기 이동평균 기간'",
"ALTER TABLE holding_stock_config ADD COLUMN ma_slow FLOAT DEFAULT 60 COMMENT '장기 이동평균 기간'",
"ALTER TABLE holding_stock_config ADD COLUMN trail_stop_pct FLOAT DEFAULT 8 COMMENT '트레일링 스탑 %% (0=비활성)'",
"ALTER TABLE holding_stock_config ADD COLUMN trend_filter FLOAT DEFAULT 1 COMMENT '추세 필터 (1=사용/0=미사용)'",
]
for sql in migrations:
try:
db.conn.execute(sql)
db.conn.commit()
except Exception:
pass # 이미 컬럼 존재 → 무시
logger.info("✅ holding_candles / holding_stock_config 테이블 확인 완료")
# ─────────────────────────────────────────────────────────────────────────────
# 종목별 파라미터 관리
# ─────────────────────────────────────────────────────────────────────────────
def get_stock_config(db: TradeDB, code: str) -> Dict:
"""종목별 최신 파라미터 조회 (없으면 DEFAULT 반환)"""
row = db.conn.execute(
"SELECT * FROM holding_stock_config WHERE code=%s ORDER BY id DESC LIMIT 1",
[code]
).fetchone()
if row:
cfg = dict(row)
cfg.pop("id", None)
cfg.pop("created_at", None)
return cfg
# 기본값 반환 (DB에 없으면)
cfg = dict(DEFAULT_STOCK_CONFIG)
cfg["code"] = code
cfg["name"] = ""
return cfg
def set_stock_config(db: TradeDB, code: str, name: str, params: Dict):
"""종목별 파라미터 저장 (INSERT 신규 row = 최신값 추가 방식)"""
cols = ["code", "name"]
vals = [code, name]
for k, v in params.items():
if k in DEFAULT_STOCK_CONFIG:
cols.append(k)
vals.append(float(v))
col_sql = ", ".join(cols)
ph_sql = ", ".join(["%s"] * len(vals))
db.conn.execute(
f"INSERT INTO holding_stock_config ({col_sql}) VALUES ({ph_sql})",
vals
)
db.conn.commit()
# ─────────────────────────────────────────────────────────────────────────────
# KIS API — 일봉 OHLCV 조회 (최대 100봉씩 페이지네이션)
# ─────────────────────────────────────────────────────────────────────────────
def _get_kis_token(db: TradeDB) -> Tuple[str, str, str, bool]:
"""
env_config에서 KIS 인증 정보 조회.
KIS_MOCK=true → 모의 키/도메인 사용 (시세 조회도 모의 도메인에서 동작)
KIS_MOCK=false → 실전 키/도메인 사용
None 값은 빈 문자열로 안전 처리.
"""
row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone()
if not row:
raise RuntimeError("env_config 테이블이 비어 있습니다.")
r = dict(row)
def safe(val):
return val if val else ""
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
if is_mock:
app_key = safe(r.get("KIS_APP_KEY_MOCK"))
app_secret = safe(r.get("KIS_APP_SECRET_MOCK"))
base_url = "https://openapivts.koreainvestment.com:29443"
else:
app_key = safe(r.get("KIS_APP_KEY_REAL")) or safe(r.get("KIS_APP_KEY"))
app_secret = safe(r.get("KIS_APP_SECRET_REAL")) or safe(r.get("KIS_APP_SECRET"))
base_url = "https://openapi.koreainvestment.com:9443"
if not app_key or not app_secret:
raise RuntimeError(
f"API 키 미설정 (KIS_MOCK={is_mock})\n"
f"필요 키: {'KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK' if is_mock else 'KIS_APP_KEY_REAL / KIS_APP_SECRET_REAL'}"
)
return app_key, app_secret, base_url, is_mock
def _ensure_token(mock: bool) -> str:
"""
KisTokenManager 싱글톤으로 토큰 반환.
- 유효하면 메모리 캐시 즉시 반환
- 만료 10분 전 선제 갱신 후 반환
- KIS 23시간 정책 자동 준수
"""
try:
from kis_token_manager import KisTokenManager
token = KisTokenManager.instance(is_mock=mock).get_token()
if token:
logger.info("🔑 KIS 토큰 준비 완료 [%s] (앞8자: %s…)",
"모의" if mock else "실전", token[:8])
return token
except Exception as e:
logger.warning("KisTokenManager 실패, 파일 폴백: %s", e)
# 폴백: 직접 파일 읽기
path = ROOT / (".kis_token_cache_mock.json" if mock else ".kis_token_cache_real.json")
if not path.exists():
raise RuntimeError(
f"토큰 캐시 파일 없음: {path}\n"
"kis_short_ver2.py 또는 kis_scalping_ver1.py 를 한 번 실행하면 자동 생성됩니다."
)
try:
cache = json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
raise RuntimeError(f"토큰 캐시 파일 읽기 실패: {e}")
token = cache.get("access_token", "")
if not token:
raise RuntimeError("토큰 캐시 파일에 access_token 없음")
logger.info("🔑 KIS 토큰 파일 폴백 재사용 [%s] (앞8자: %s…)",
"모의" if mock else "실전", token[:8])
return token
def fetch_daily_ohlcv(
code: str,
start_date: str, # "YYYY-MM-DD"
end_date: str, # "YYYY-MM-DD"
app_key: str,
app_secret: str,
base_url: str,
mock: bool = False,
max_retries: int = 3,
) -> List[Dict]:
"""
KIS API 일봉 OHLCV 조회 (날짜 범위, 100봉씩 자동 페이지네이션)
- 토큰은 기존 .kis_token_cache_real/mock.json 캐시 파일 재사용 (새 발급 안 함)
반환: [{"date": "YYYYMMDD", "open": float, "high": float, "low": float,
"close": float, "volume": int}, ...] 최신 → 과거 순
"""
token = _ensure_token(mock) # 캐시 파일에서 읽기만
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
# 날짜 형식 통일 YYYYMMDD
sd = start_date.replace("-", "")
ed = end_date.replace("-", "")
all_rows: List[Dict] = []
cursor_end = ed # 첫 호출은 end_date부터 과거 방향 조회
for attempt_page in range(100): # 최대 ~10,000봉
# 페이지당 ~140일 윈도우 (약 100 거래일)
# DATE_1을 cursor_end 기준 180일 전으로 설정해야 API가 정상 응답
window_start = (
datetime.strptime(cursor_end, "%Y%m%d") - timedelta(days=180)
).strftime("%Y%m%d")
# 단, 실제 요청 시작일(sd)보다 앞으로 가지 않음
page_date1 = max(window_start, sd)
data = None
for retry in range(max_retries):
try:
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {token}",
"appkey": app_key,
"appsecret": app_secret,
"tr_id": "FHKST03010100",
}
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_DATE_1": page_date1, # 윈도우 시작
"FID_INPUT_DATE_2": cursor_end, # 윈도우 끝 (커서)
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "0",
}
resp = requests.get(url, headers=headers, params=params, timeout=10)
if resp.status_code == 429:
time.sleep((retry + 1) * 2)
continue
resp.raise_for_status()
data = resp.json()
break
except Exception as e:
if retry >= max_retries - 1:
logger.error(f"❌ 일봉 조회 실패 {code} (page {attempt_page}): {e}")
return all_rows
time.sleep(2)
if data is None:
break
rows = data.get("output2", [])
if not rows:
break
done = False
for item in rows:
dt = item.get("stck_bsop_date", "") # YYYYMMDD
if dt < sd: # start_date 이전 → 수집 완료
done = True
break
c = float(item.get("stck_clpr", 0) or 0)
if c <= 0:
continue
all_rows.append({
"date": dt,
"open": float(item.get("stck_oprc", 0) or 0),
"high": float(item.get("stck_hgpr", 0) or 0),
"low": float(item.get("stck_lwpr", 0) or 0),
"close": c,
"volume": int(item.get("acml_vol", 0) or 0),
"change_rate": float(item.get("prdy_ctrt", 0) or 0), # 전일대비등락률(%)
})
if done:
break
# 다음 페이지: 이번 배치 최고(가장 오래된) 날짜 - 1일
oldest_dt = rows[-1].get("stck_bsop_date", "")
if not oldest_dt or oldest_dt <= sd:
break
prev = (datetime.strptime(oldest_dt, "%Y%m%d") - timedelta(days=1)).strftime("%Y%m%d")
if prev == cursor_end: # 무한루프 방지
break
cursor_end = prev
time.sleep(0.3) # API 과부하 방지
return all_rows
def store_candles(db: TradeDB, code: str, rows: List[Dict]) -> int:
"""일봉 데이터를 holding_candles에 저장 (ON DUPLICATE KEY UPDATE)"""
saved = 0
for r in rows:
try:
d = r["date"]
date_fmt = f"{d[:4]}-{d[4:6]}-{d[6:8]}"
db.conn.execute(
"""INSERT INTO holding_candles
(code, candle_date, open, high, low, close, volume, change_rate)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
open=VALUES(open), high=VALUES(high), low=VALUES(low),
close=VALUES(close), volume=VALUES(volume),
change_rate=VALUES(change_rate)""",
[code, date_fmt,
r["open"], r["high"], r["low"], r["close"],
r["volume"], r.get("change_rate", 0)]
)
saved += 1
except Exception as e:
logger.debug(f"캔들 저장 오류 ({code} {r.get('date')}): {e}")
db.conn.commit()
return saved
def get_stored_candles(db: TradeDB, code: str,
start_date: str = "", end_date: str = "") -> List[Dict]:
"""holding_candles에서 종목 일봉 조회 (날짜 오름차순, change_rate 포함)"""
sql = """SELECT candle_date, open, high, low, close, volume,
IFNULL(change_rate, 0) AS change_rate
FROM holding_candles WHERE code=%s"""
args = [code]
if start_date:
sql += " AND candle_date >= %s"
args.append(start_date)
if end_date:
sql += " AND candle_date <= %s"
args.append(end_date)
sql += " ORDER BY candle_date ASC"
rows = db.conn.execute(sql, args).fetchall()
return [dict(r) for r in rows]
# ─────────────────────────────────────────────────────────────────────────────
# 60분봉 수집 / 저장 / 조회
# ─────────────────────────────────────────────────────────────────────────────
class _KiwoomTokenManager:
"""
kiwoom_universe_scanner.py / kiwoom_trader_dual.py 의 TokenManager와
동일한 인터페이스 — DB에서 읽은 키를 직접 주입 (env 변수 오염 없음).
TokenManager.get_token() 호환: Chart(token_manager=...) 에 그대로 전달 가능.
"""
def __init__(self, app_key: str, app_secret: str, is_mock: bool = False):
self._app_key = app_key
self._app_secret = app_secret
self._domain = "mockapi.kiwoom.com" if is_mock else "api.kiwoom.com"
self._token: Optional[str] = None
self._expiry: Optional[datetime] = None
def _is_valid(self) -> bool:
if not self._token or not self._expiry:
return False
return datetime.now() < self._expiry - timedelta(seconds=30)
def _request_new_token(self) -> None:
"""키움 /oauth2/token 으로 새 토큰 발급 (kiwoom_universe_scanner 와 동일 흐름)"""
url = f"https://{self._domain}/oauth2/token"
body = {
"grant_type": "client_credentials",
"appkey": self._app_key,
"secretkey": self._app_secret,
}
resp = requests.post(url, json=body, timeout=15)
data = resp.json()
token = data.get("token") or data.get("access_token", "")
if not token:
raise RuntimeError(
f"키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 실패: {data}"
)
# 만료 시간 파싱 (expires_dt: "YYYYMMDDHHMMSS")
exp_s = data.get("expires_dt", "")
try:
self._expiry = datetime.strptime(str(exp_s), "%Y%m%d%H%M%S")
except Exception:
self._expiry = datetime.now() + timedelta(hours=23)
self._token = token
logger.info(
f"✅ 키움 {'모의' if '모의' in self._domain else '실전'} 토큰 발급 완료"
f" (앞8자: {token[:8]}…, 만료: {self._expiry})"
)
def get_token(self) -> str:
"""
TokenManager.get_token() 호환 — 만료 시 자동 재발급.
kis_ws.KiwoomTokenManager 싱글톤 풀을 공유하여 au10001 rate limit 방지.
(동일 appkey로 kis_ws 에서 이미 발급받은 토큰은 재사용)
"""
try:
from kis_ws import _get_kiwoom_token_cached
is_mock = "mockapi" in self._domain
token = _get_kiwoom_token_cached(self._app_key, self._app_secret, is_mock)
if token:
self._token = token
return self._token
except Exception:
pass
# kis_ws 임포트 실패 시 자체 로직으로 폴백
if not self._is_valid():
self._request_new_token()
return self._token
def fetch_60min_via_kiwoom(
code: str,
start_date: str, # "YYYY-MM-DD"
end_date: str, # "YYYY-MM-DD"
kiwoom_key: str, # DB env_config의 KIWOOM_APP_KEY_REAL 또는 KIWOOM_APP_KEY_MOCK
kiwoom_secret: str, # DB env_config의 KIWOOM_APP_SECRET_REAL 또는 KIWOOM_APP_SECRET_MOCK
is_mock: bool = False, # True → mockapi.kiwoom.com (모의)
max_pages: int = 50, # 연속조회 최대 횟수 (1페이지 ≈ 900봉)
) -> List[Dict]:
"""
키움증권 REST API (ka10080) 로 60분봉 수집 → holding_min_candles 저장용.
kiwoom_universe_scanner.py / kiwoom_trader_dual.py 와 동일한
TokenManager + Chart 패턴 사용.
▶ KIS 한계: inquire-time-itemchartprice 는 당일(2일치) 데이터만 제공
▶ 키움 장점: ka10080 1회 호출 = 900봉 (60분봉 기준 약 128영업일)
cont-yn: Y + next-key 연속조회로 2년치 이상 수집 가능
반환: [{"candle_date": "YYYY-MM-DD HH:MM:SS", "open": float, "high": float,
"low": float, "close": float, "volume": int}, ...]
최신 → 과거 순, start_date~end_date 범위 필터 적용
"""
from kiwoom_rest_api.koreanstock.chart import Chart
mode_str = "모의" if is_mock else "실전"
base_url = "https://mockapi.kiwoom.com" if is_mock else "https://api.kiwoom.com"
logger.info(f"키움 60분봉 수집 시작: {code} [{mode_str}] {start_date}~{end_date}")
# ── 1. TokenManager + Chart 초기화 (kiwoom_universe_scanner 와 동일 패턴) ──
try:
token_manager = _KiwoomTokenManager(kiwoom_key, kiwoom_secret, is_mock=is_mock)
chart = Chart(base_url=base_url, token_manager=token_manager)
# 최초 토큰 발급 확인
_ = token_manager.get_token()
except Exception as e:
logger.error(f"❌ 키움 초기화 실패 [{mode_str}]: {e}")
return []
# ── 2. 날짜 범위 파싱 ─────────────────────────────────────────────────────
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
rows: List[Dict] = []
cont_yn = "N"
next_key = ""
for page in range(max_pages):
try:
resp = chart.stock_minute_chart_request_ka10080(
stk_cd = code,
tic_scope = "60", # 60분봉 (1/3/5/10/15/30/45/60 지원)
upd_stkpc_tp = "1", # 수정주가 반영
cont_yn = cont_yn,
next_key = next_key,
)
except Exception as e:
logger.error(f"❌ 키움 ka10080 조회 실패 (page {page}): {e}")
break
records = resp.get("stk_min_pole_chart_qry") or []
if not records:
logger.info(f"키움 60분봉: 더 이상 데이터 없음 (page {page})")
break
reached_start = False
for rec in records:
# cntr_tm: "YYYYMMDDHHMMSS" 형식
raw_dt = str(rec.get("cntr_tm", ""))
if len(raw_dt) < 14:
continue
try:
candle_dt = datetime(
int(raw_dt[0:4]), int(raw_dt[4:6]), int(raw_dt[6:8]),
int(raw_dt[8:10]), int(raw_dt[10:12]), int(raw_dt[12:14]),
)
except Exception:
continue
# 역방향(최신→과거) 수집이므로 start_date보다 과거이면 종료
if candle_dt.date() < start_dt.date():
reached_start = True
break
if candle_dt.date() > end_dt.date():
continue
try:
rows.append({
"candle_date": candle_dt.strftime("%Y-%m-%d %H:%M:%S"),
"open": abs(float(rec.get("open_pric", 0) or 0)),
"high": abs(float(rec.get("high_pric", 0) or 0)),
"low": abs(float(rec.get("low_pric", 0) or 0)),
"close": abs(float(rec.get("cur_prc", 0) or 0)),
"volume": abs(int(float(rec.get("trde_qty", 0) or 0))),
})
except Exception:
continue
if reached_start:
break
# 연속조회 키는 응답 dict에서 읽음 (Chart._execute_request 가 헤더를 dict로 포함)
new_cont = str(resp.get("cont-yn", resp.get("cont_yn", "N"))).upper()
new_nkey = str(resp.get("next-key", resp.get("next_key", "")))
if new_cont != "Y" or not new_nkey:
break
cont_yn = new_cont
next_key = new_nkey
time.sleep(0.3) # API 레이트리밋 준수 (초당 3회 이하)
logger.info(f"✅ 키움 60분봉 수집 완료: {code} {len(rows)}봉 ({start_date}~{end_date})")
return rows
def fetch_min_ohlcv(
code: str,
start_date: str, # "YYYY-MM-DD"
end_date: str, # "YYYY-MM-DD"
app_key: str,
app_secret: str,
base_url: str,
tf_min: int = 60, # 집계 단위 (60 = 60분봉)
mock: bool = False,
max_retries: int = 3,
) -> List[Dict]:
"""
KIS FHKST03010200(분봉조회) 1분봉 수집 → tf_min 단위 집계 반환.
페이지네이션 전략:
FID_INPUT_HOUR_1=HHMMSS 커서로 역방향(최신→과거) 순회.
FID_PW_DATA_INCU_YN=Y → 과거 날짜까지 연속 조회 가능.
각 응답 ~30 레코드, 마지막 레코드 절대 시각 - 1분을 다음 커서로 사용.
60분봉 집계 규칙 (역방향 수집이므로 open/close 처리 주의):
처음 만난 레코드 = 이 버킷의 가장 최신(close)
이후 이전 레코드로 갈수록 open 덮어씀 → 결국 가장 오래된 open이 남음
high = MAX, low = MIN, volume = SUM
"""
token = _ensure_token(mock)
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
try:
sd = datetime.strptime(start_date, "%Y-%m-%d")
ed = datetime.strptime(end_date, "%Y-%m-%d")
except ValueError as e:
raise ValueError(f"날짜 형식 오류: {e}")
# 60분봉 버킷 집계 저장소
buckets: Dict[datetime, Dict] = {}
cursor_dt = ed.replace(hour=15, minute=30, second=0) # end_date 15:30 부터 역순
total_1min = 0 # 수집한 1분봉 수 (로그용)
for page in range(3000): # 최대 ~90,000 1분봉 (약 7개월치)
cursor_str = cursor_dt.strftime("%H%M%S")
data = None
for retry in range(max_retries):
try:
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {token}",
"appkey": app_key,
"appsecret": app_secret,
# 분봉 조회는 실전/모의 도메인만 다르고 TR_ID는 동일
"tr_id": "FHKST03010200",
}
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_HOUR_1": cursor_str,
"FID_PW_DATA_INCU_YN": "Y", # 과거 날짜 데이터 포함
}
resp = requests.get(url, headers=headers, params=params, timeout=10)
if resp.status_code == 429:
time.sleep((retry + 1) * 2)
continue
resp.raise_for_status()
data = resp.json()
break
except Exception as e:
if retry >= max_retries - 1:
logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}")
return _buckets_to_min_list(buckets, tf_min)
time.sleep(2)
if data is None:
break
rows = data.get("output2", [])
if not rows:
break
done = False
last_abs_dt = None
for row in rows:
date_s = row.get("stck_bsop_date", "")
time_s = row.get("stck_cntg_hour", "")
if len(date_s) < 8 or len(time_s) < 6:
continue
try:
abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S")
except ValueError:
continue
last_abs_dt = abs_dt
if abs_dt.date() > ed.date():
continue # end_date 이후 → 스킵
if abs_dt.date() < sd.date():
done = True # start_date 이전 → 수집 완료
break
# 장중 시간만 처리 (09:00~15:30)
market_min = abs_dt.hour * 60 + abs_dt.minute
if market_min < 540 or market_min > 930: # 09:00 ~ 15:30
continue
c = float(row.get("stck_prpr", 0) or 0)
if c <= 0:
continue
total_1min += 1
# tf_min 단위 버킷 시작 시간 계산 (09:00, 10:00, 11:00, ...)
bucket_min = (abs_dt.minute // tf_min) * tf_min
bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0)
o = float(row.get("stck_oprc", c) or c)
h = float(row.get("stck_hgpr", c) or c)
l = float(row.get("stck_lwpr", c) or c)
v = int(row.get("cntg_vol", 0) or 0)
if bucket_dt not in buckets:
# 역방향: 처음 만나는 레코드 = 버킷의 가장 최신 = close
buckets[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v}
else:
b = buckets[bucket_dt]
b["open"] = o # 더 이른 open으로 계속 갱신
b["high"] = max(b["high"], h)
b["low"] = min(b["low"], l)
b["volume"] += v
# close는 첫 기록(최신) 유지 — 갱신하지 않음
if done:
break
# 다음 커서: 이번 배치 마지막(가장 오래된) 절대 시각 - 1분
if last_abs_dt is None:
break
next_cur = last_abs_dt - timedelta(minutes=1)
if next_cur.date() < sd.date():
break
cursor_dt = next_cur
time.sleep(0.15) # API 과부하 방지 (~6 req/s)
logger.info(f" [{code}] 1분봉 {total_1min}개 수집 → {tf_min}분봉 {len(buckets)}개 집계")
return _buckets_to_min_list(buckets, tf_min)
def fetch_and_store_min_candles(
db: "TradeDB",
code: str,
start_date: str,
end_date: str,
app_key: str,
app_secret: str,
base_url: str,
tf_min: int = 60,
mock: bool = False,
max_retries: int = 3,
progress: Optional[Dict] = None, # 외부 dict: {"status","fetched","saved"}
) -> int:
"""
1분봉 페이지 단위로 수집하면서 날짜가 바뀌는 순간 해당 날의 버킷을 즉시 DB 저장.
Flask 백그라운드 스레드에서 호출해 웹페이지가 블로킹되지 않도록 한다.
progress dict를 통해 진행상황을 실시간으로 노출한다.
"""
def _upd(**kw):
if progress is not None:
progress.update(kw)
token = _ensure_token(mock)
url = f"{base_url}/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
try:
sd = datetime.strptime(start_date, "%Y-%m-%d")
ed = datetime.strptime(end_date, "%Y-%m-%d")
except ValueError as e:
raise ValueError(f"날짜 형식 오류: {e}")
# 날짜별로 버킷 집계: {date: {bucket_dt: {o,h,l,c,v}}}
day_buckets: Dict[str, Dict[datetime, Dict]] = {}
cursor_dt = ed.replace(hour=15, minute=30, second=0)
total_1min = 0
total_saved = 0
prev_date_s: Optional[str] = None # 이전 루프의 날짜 (날짜 전환 감지용)
for page in range(3000):
cursor_str = cursor_dt.strftime("%H%M%S")
data = None
for retry in range(max_retries):
try:
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {token}",
"appkey": app_key,
"appsecret": app_secret,
"tr_id": "FHKST03010200",
}
params = {
"FID_ETC_CLS_CODE": "",
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_HOUR_1": cursor_str,
"FID_PW_DATA_INCU_YN": "Y",
}
resp = requests.get(url, headers=headers, params=params, timeout=10)
if resp.status_code == 429:
time.sleep((retry + 1) * 2)
continue
resp.raise_for_status()
data = resp.json()
break
except Exception as e:
if retry >= max_retries - 1:
logger.error(f"❌ 분봉 조회 실패 {code} (page {page}): {e}")
data = None
break
time.sleep(2)
if data is None:
break
rows = data.get("output2", [])
if not rows:
break
done = False
last_abs_dt = None
for row in rows:
date_s = row.get("stck_bsop_date", "")
time_s = row.get("stck_cntg_hour", "")
if len(date_s) < 8 or len(time_s) < 6:
continue
try:
abs_dt = datetime.strptime(date_s + time_s, "%Y%m%d%H%M%S")
except ValueError:
continue
last_abs_dt = abs_dt
if abs_dt.date() > ed.date():
continue
if abs_dt.date() < sd.date():
done = True
break
market_min = abs_dt.hour * 60 + abs_dt.minute
if market_min < 540 or market_min > 930:
continue
c = float(row.get("stck_prpr", 0) or 0)
if c <= 0:
continue
total_1min += 1
cur_date_s = date_s[:8] # "YYYYMMDD"
# 날짜가 바뀌면 이전 날짜 데이터를 즉시 DB에 flush
if prev_date_s is not None and cur_date_s != prev_date_s:
flush_buckets = day_buckets.pop(prev_date_s, {})
if flush_buckets:
rows_to_save = _buckets_to_min_list(flush_buckets, tf_min)
n = store_min_candles(db, code, rows_to_save, tf_min)
total_saved += n
_upd(saved=total_saved, fetched=total_1min,
current_date=prev_date_s)
logger.info(f" [{code}] {prev_date_s}{tf_min}분봉 {n}개 저장 (누적 {total_saved})")
prev_date_s = cur_date_s
bucket_min = (abs_dt.minute // tf_min) * tf_min
bucket_dt = abs_dt.replace(minute=bucket_min, second=0, microsecond=0)
o = float(row.get("stck_oprc", c) or c)
h = float(row.get("stck_hgpr", c) or c)
l = float(row.get("stck_lwpr", c) or c)
v = int(row.get("cntg_vol", 0) or 0)
if cur_date_s not in day_buckets:
day_buckets[cur_date_s] = {}
bkt = day_buckets[cur_date_s]
if bucket_dt not in bkt:
bkt[bucket_dt] = {"open": o, "high": h, "low": l, "close": c, "volume": v}
else:
b = bkt[bucket_dt]
b["open"] = o
b["high"] = max(b["high"], h)
b["low"] = min(b["low"], l)
b["volume"] += v
_upd(fetched=total_1min)
if done:
break
if last_abs_dt is None:
break
next_cur = last_abs_dt - timedelta(minutes=1)
if next_cur.date() < sd.date():
break
cursor_dt = next_cur
time.sleep(0.15)
# 루프 종료 후 남은 날짜 데이터 flush
for date_s, flush_buckets in day_buckets.items():
if flush_buckets:
rows_to_save = _buckets_to_min_list(flush_buckets, tf_min)
n = store_min_candles(db, code, rows_to_save, tf_min)
total_saved += n
_upd(saved=total_saved, fetched=total_1min)
logger.info(f" [{code}] {date_s}(말미) → {tf_min}분봉 {n}개 저장 (누적 {total_saved})")
logger.info(f" [{code}] 수집완료: 1분봉 {total_1min}개 → {tf_min}분봉 {total_saved}개 저장")
return total_saved
def _buckets_to_min_list(buckets: Dict, tf_min: int) -> List[Dict]:
"""버킷 dict → 시간순 list 변환 (store_min_candles 입력용)"""
return [
{
"dt": dt.strftime("%Y-%m-%d %H:%M:%S"),
"tf": tf_min,
"open": b["open"],
"high": b["high"],
"low": b["low"],
"close": b["close"],
"volume": b["volume"],
}
for dt, b in sorted(buckets.items())
]
def store_min_candles(db: TradeDB, code: str,
rows: List[Dict], tf_min: int = 60) -> int:
"""60분봉 데이터를 holding_min_candles에 저장 (ON DUPLICATE KEY UPDATE)"""
saved = 0
for r in rows:
try:
db.conn.execute(
"""INSERT INTO holding_min_candles
(code, candle_dt, tf, open, high, low, close, volume)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
open=VALUES(open), high=VALUES(high), low=VALUES(low),
close=VALUES(close), volume=VALUES(volume)""",
[code, r["dt"], tf_min,
r["open"], r["high"], r["low"], r["close"], r["volume"]]
)
saved += 1
except Exception as e:
logger.debug(f"분봉 저장 오류 ({code} {r.get('dt')}): {e}")
db.conn.commit()
return saved
def get_stored_min_candles(db: TradeDB, code: str,
start_dt: str = "", end_dt: str = "",
tf_min: int = 60) -> List[Dict]:
"""
holding_min_candles에서 종목 분봉 조회 (시간 오름차순).
run_backtest() 호환: candle_dt를 candle_date 키로 반환
"""
sql = """SELECT candle_dt AS candle_date,
open, high, low, close, volume
FROM holding_min_candles WHERE code=%s AND tf=%s"""
args = [code, tf_min]
if start_dt:
sql += " AND candle_dt >= %s"
args.append(start_dt)
if end_dt:
end_val = end_dt + " 23:59:59" if len(end_dt) == 10 else end_dt
sql += " AND candle_dt <= %s"
args.append(end_val)
sql += " ORDER BY candle_dt ASC"
rows = db.conn.execute(sql, args).fetchall()
return [dict(r) for r in rows]
def get_min_candle_stats(db: TradeDB, code: str, tf_min: int = 60) -> Dict:
"""60분봉 보유 현황 (개수, 최소/최대 datetime)"""
row = db.conn.execute(
"SELECT COUNT(*) AS cnt, MIN(candle_dt) AS mn, MAX(candle_dt) AS mx "
"FROM holding_min_candles WHERE code=%s AND tf=%s",
[code, tf_min]
).fetchone()
if row and row["cnt"]:
return {
"count": int(row["cnt"]),
"min": str(row["mn"])[:16] if row["mn"] else "",
"max": str(row["mx"])[:16] if row["mx"] else "",
}
return {"count": 0, "min": "", "max": ""}
# ─────────────────────────────────────────────────────────────────────────────
# 기술 지표 계산
# ─────────────────────────────────────────────────────────────────────────────
def _rsi_series(closes: List[float], period: int) -> List[Optional[float]]:
"""Wilder RSI 시리즈"""
n = len(closes)
rsi = [None] * n
if n < period + 1:
return rsi
deltas = [closes[i] - closes[i - 1] for i in range(1, n)]
gains = [max(d, 0) for d in deltas]
losses = [max(-d, 0) for d in deltas]
ag = sum(gains[:period]) / period
al = sum(losses[:period]) / period
for i in range(period, n):
idx = i - 1
if i > period:
ag = (ag * (period - 1) + gains[idx]) / period
al = (al * (period - 1) + losses[idx]) / period
rs = ag / al if al > 0 else float("inf")
rsi[i] = 100.0 - (100.0 / (1 + rs)) if al > 0 else 100.0
return rsi
def _ma_series(closes: List[float], period: int) -> List[Optional[float]]:
"""단순이동평균(SMA) 시리즈. period 미만 구간은 None."""
n = len(closes)
ma = [None] * n
if period < 1 or n < period:
return ma
window_sum = sum(closes[:period])
ma[period - 1] = window_sum / period
for i in range(period, n):
window_sum += closes[i] - closes[i - period]
ma[i] = window_sum / period
return ma
def _atr_series(highs, lows, closes, period: int) -> List[Optional[float]]:
"""ATR 시리즈"""
n = len(closes)
tr_list = [None] * n
for i in range(1, n):
tr_list[i] = max(
highs[i] - lows[i],
abs(highs[i] - closes[i - 1]),
abs(lows[i] - closes[i - 1]),
)
atr = [None] * n
if n < period + 1:
return atr
atr[period] = sum(t for t in tr_list[1:period+1] if t) / period
for i in range(period + 1, n):
if tr_list[i] is not None and atr[i - 1] is not None:
atr[i] = (atr[i - 1] * (period - 1) + tr_list[i]) / period
return atr
# ─────────────────────────────────────────────────────────────────────────────
# 컨텍스트 신호 계산 (ATH · 연도고점 · 52주 고/저)
# ─────────────────────────────────────────────────────────────────────────────
def _calc_context_signals(candles: List[Dict], current_price: float) -> Dict:
"""
DB 일봉 전체를 기반으로 현재가 대비 컨텍스트 낙폭 지표를 반환.
백테스트와 실매매 양쪽에서 재사용.
반환 키:
ath - 역대 최고가 (DB 전체 기간 high 최댓값)
year_high - 당해 연도 최고가
w52_high - 최근 252봉(약 1년) 최고가
w52_low - 최근 252봉 최저가
ath_drop_pct - 현재가 기준 ATH 대비 낙폭 (%) ← 클수록 많이 빠진 것
year_drop_pct - 연도고점 대비 낙폭 (%)
w52_drop_pct - 52주 고점 대비 낙폭 (%)
"""
if not candles:
return {
"ath": current_price, "year_high": current_price,
"w52_high": current_price, "w52_low": current_price,
"ath_drop_pct": 0.0, "year_drop_pct": 0.0, "w52_drop_pct": 0.0,
}
this_year = str(datetime.now().year)
all_highs = [float(c["high"]) for c in candles if float(c["high"]) > 0]
year_highs = [float(c["high"]) for c in candles
if str(c["candle_date"])[:4] == this_year and float(c["high"]) > 0]
# 52주 ≈ 최근 252 거래일
w52_bars = candles[-252:] if len(candles) >= 252 else candles
w52_highs = [float(c["high"]) for c in w52_bars if float(c["high"]) > 0]
w52_lows = [float(c["low"]) for c in w52_bars if float(c["low"]) > 0]
ath = max(all_highs) if all_highs else current_price
year_high = max(year_highs) if year_highs else current_price
w52_high = max(w52_highs) if w52_highs else current_price
w52_low = min(w52_lows) if w52_lows else current_price
def drop_pct(ref: float) -> float:
return round((ref - current_price) / ref * 100, 2) if ref > 0 else 0.0
return {
"ath": ath,
"year_high": year_high,
"w52_high": w52_high,
"w52_low": w52_low,
"ath_drop_pct": drop_pct(ath),
"year_drop_pct": drop_pct(year_high),
"w52_drop_pct": drop_pct(w52_high),
}
# ─────────────────────────────────────────────────────────────────────────────
# 백테스트 엔진
# ─────────────────────────────────────────────────────────────────────────────
def run_backtest(
candles: List[Dict],
cfg: Dict,
fee_rate: float = 0.015 / 100, # 매수/매도 각각 0.015%
sell_tax: float = 0.18 / 100, # 증권거래세 0.18%
tf_min: int = 0, # 0=일봉, 60=60분봉 등. 0이면 자동감지
) -> Dict:
"""
홀딩 전략 백테스트 (추세추종 + 눌림목 / 역추세 선택)
[추세 상승: MA_FAST > MA_SLOW]
매수: RSI ≤ rsi_buy1 → 1차, RSI ≤ rsi_buy2 → 2차 추가매수
매도: 트레일링 스탑(고점 대비 trail_stop_pct% 하락)
OR RSI 과열(rsi_sell) OR 익절(take_profit_pct)
OR 데드크로스(MA_FAST < MA_SLOW) 발생 시 추세 반전 청산
[추세 하락/횡보: MA_FAST ≤ MA_SLOW] or [trend_filter=0]
매수: RSI ≤ rsi_buy1/2/3 (기존 3단계 역추세 분할매수)
매도: 익절(take_profit_pct) OR RSI 과열 OR 손절(stop_loss_pct)
진입가: 신호봉 다음 봉 시가 (1봉 지연)
"""
if len(candles) < 20:
return {"error": "봉 부족", "total_trades": 0}
# ── 봉 유형 자동 감지 ─────────────────────────────────────────────
_sample_dt = str(candles[0].get("candle_date", ""))
_is_min = tf_min > 0 or len(_sample_dt) > 10
rsi_period = int(cfg.get("rsi_period", 14))
rsi_buy1 = float(cfg.get("rsi_buy1", 55))
rsi_buy2 = float(cfg.get("rsi_buy2", 45))
rsi_buy3 = float(cfg.get("rsi_buy3", 35))
rsi_sell = float(cfg.get("rsi_sell", 75))
tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100
sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100
buy1_ratio = float(cfg.get("buy1_ratio", 50)) / 100
buy2_ratio = float(cfg.get("buy2_ratio", 50)) / 100
buy3_ratio = float(cfg.get("buy3_ratio", 0)) / 100
slot_money = float(cfg.get("slot_money", 3_000_000))
ma_fast_p = int(cfg.get("ma_fast", 20))
ma_slow_p = int(cfg.get("ma_slow", 60))
trail_stop_p = float(cfg.get("trail_stop_pct", 8.0)) # %, 0=비활성
trend_filter = float(cfg.get("trend_filter", 1.0)) >= 0.5 # bool 변환
atr_period_p = int(cfg.get("atr_period", 14))
# 샹들리에 엑시트 배수: env/카드에서 조정 가능 (하드코딩 금지)
atr_mult = float(cfg.get("atr_mult", 3.0))
ath_drop_min = float(cfg.get("ath_drop_min_pct", 0.0))
year_drop_min = float(cfg.get("year_drop_min_pct", 0.0))
w52_drop_min = float(cfg.get("w52_drop_min_pct", 0.0))
closes = [float(c["close"]) for c in candles]
highs = [float(c["high"]) for c in candles]
lows = [float(c["low"]) for c in candles]
opens = [float(c["open"]) for c in candles]
datetimes = [str(c["candle_date"]) for c in candles]
dates = [s[:10] for s in datetimes]
rsis = _rsi_series(closes, rsi_period)
ma_fast_arr = _ma_series(closes, ma_fast_p)
ma_slow_arr = _ma_series(closes, ma_slow_p)
# ATR: 샹들리에 엑시트에 사용 (추세장 청산 핵심 지표)
atr_arr = _atr_series(highs, lows, closes, atr_period_p)
# ── 컨텍스트 배열 사전 계산 ──────────────────────────────────────
ath_arr = []
year_high_arr = []
w52_high_arr = []
w52_low_arr = []
_year_max: Dict[str, float] = {}
_ath_run = 0.0
from datetime import date as _date, timedelta as _td
for i in range(len(candles)):
h = highs[i]; l = lows[i]; y = dates[i][:4]
_ath_run = max(_ath_run, h)
_year_max[y] = max(_year_max.get(y, 0.0), h)
if _is_min:
try:
cutoff = str(_date.fromisoformat(dates[i]) - _td(days=365))
w52_s = i
while w52_s > 0 and dates[w52_s - 1] >= cutoff:
w52_s -= 1
except Exception:
w52_s = max(0, i - 252 * 7)
else:
w52_s = max(0, i - 251)
ath_arr.append(_ath_run)
year_high_arr.append(_year_max[y])
w52_high_arr.append(max(highs[w52_s:i + 1]))
w52_low_arr.append(min(lows[w52_s:i + 1]))
# ── 포지션 상태 ───────────────────────────────────────────────────
position = None # None or dict
trades: List[Dict] = []
equity: List[Dict] = []
cum_pnl = 0.0
start_i = max(rsi_period + 1, ma_slow_p) # MA_SLOW가 준비된 이후부터 진입
for i in range(start_i, len(candles) - 1):
rsi = rsis[i]
if rsi is None:
continue
next_open = float(opens[i + 1]) if opens[i + 1] > 0 else closes[i]
if next_open <= 0:
continue
mf = ma_fast_arr[i] # 단기 MA 현재봉
ms = ma_slow_arr[i] # 장기 MA 현재봉
mf_prev = ma_fast_arr[i - 1] if i > 0 else mf
ms_prev = ma_slow_arr[i - 1] if i > 0 else ms
# 추세 방향 판단
is_uptrend = trend_filter and mf is not None and ms is not None and mf > ms
# ─── 데드크로스 감지: 추세 반전 신호 ─────────────────────────
is_deadcross = (
trend_filter
and mf is not None and ms is not None
and mf_prev is not None and ms_prev is not None
and mf < ms # 현재봉: MA_FAST < MA_SLOW
and mf_prev >= ms_prev # 직전봉: MA_FAST ≥ MA_SLOW (방금 크로스)
)
# ─── 보유 중: 매도 체크 ───────────────────────────────────────
if position is not None:
avg = position["avg"]
qty = position["qty"]
c_now = closes[i]
# 트레일링 스탑: 보유 중 최고가 업데이트 후 고점 대비 낙폭 체크
position["peak"] = max(position["peak"], c_now)
profit_pct = (c_now - avg) / avg if avg > 0 else 0.0
sell_reason = None
if position.get("trend_mode"):
# ─── 추세장 청산: 고정 익절·RSI과열 무시 → 끝까지 추세 추적 ───
# 핵심: rsi_sell·tp_pct로 조기 청산하면 200% 추세를 15%에 팔게 됨
if is_deadcross:
# MA 데드크로스 → 추세 반전 확정 → 즉시 청산
sell_reason = f"데드크로스({mf:.0f}<{ms:.0f})"
else:
# 샹들리에 엑시트 = 고점 ATR × 배수 (변동성 기반 동적 방어선)
# ATR이 클수록 방어선이 낮아져 변동성 큰 종목에 더 여유를 줌
_atr_v = atr_arr[i] if (atr_arr[i] is not None) else closes[i] * 0.05
chandelier = position["peak"] - _atr_v * atr_mult
if trail_stop_p > 0:
# 퍼센트 방어선도 계산, 더 타이트한(높은) 쪽 채택
pct_trail = position["peak"] * (1.0 - trail_stop_p / 100.0)
chandelier = max(chandelier, pct_trail)
if c_now <= chandelier:
sell_reason = f"샹들리에({profit_pct*100:+.1f}%)"
elif profit_pct <= -sl_pct:
# 손절은 추세장에서도 반드시 유지 (무제한 손실 방지)
sell_reason = f"손절({profit_pct*100:.1f}%)"
else:
# ─── 횡보/역추세장 청산: 철저히 고정 익절·손절비 준수 ──────────
if profit_pct >= tp_pct:
sell_reason = f"익절(+{profit_pct*100:.1f}%)"
elif rsi >= rsi_sell:
sell_reason = f"RSI과열({rsi:.1f})"
elif profit_pct <= -sl_pct:
sell_reason = f"손절({profit_pct*100:.1f}%)"
if sell_reason:
exit_price = next_open
fee = exit_price * qty * (fee_rate + sell_tax)
pnl = (exit_price - avg) * qty - fee
hold = i - position["entry_i"]
cum_pnl += pnl
trades.append({
"buy_date": datetimes[position["entry_i"]][:16],
"sell_date": datetimes[i + 1][:16],
"avg_price": round(avg),
"exit_price": round(exit_price),
"qty": qty,
"pnl": round(pnl),
"hold_days": hold,
"reason": sell_reason,
"rsi_sell": round(rsi, 1),
"ath_drop": position.get("ath_drop", 0),
"year_drop": position.get("year_drop", 0),
"w52_drop": position.get("w52_drop", 0),
})
equity.append({"date": datetimes[i + 1][:16], "cum_pnl": round(cum_pnl)})
position = None
continue # 매도 후 당일 매수 방지
# ─── 미보유: 컨텍스트 낙폭 필터 ─────────────────────────────
price_now = closes[i]
def _drop(ref: float) -> float:
return (ref - price_now) / ref * 100 if ref > 0 else 0.0
if ath_drop_min > 0 and _drop(ath_arr[i]) < ath_drop_min: continue
if year_drop_min > 0 and _drop(year_high_arr[i]) < year_drop_min: continue
if w52_drop_min > 0 and _drop(w52_high_arr[i]) < w52_drop_min: continue
# ─── 추세 상승 모드 매수 ─────────────────────────────────────
if is_uptrend:
# RSI 눌림목: rsi_buy1(1차), rsi_buy2(2차) 기준으로 진입
stage = 0
if rsi <= rsi_buy2: stage = 2
elif rsi <= rsi_buy1: stage = 1
if stage == 0:
continue
ratio = buy2_ratio if stage == 2 else buy1_ratio
invest = slot_money * ratio
qty = max(1, int(invest / next_open))
cost = next_open * qty * (1 + fee_rate)
position = {
"avg": next_open,
"qty": qty,
"cost": cost,
"stage": stage,
"entry_i": i + 1,
"peak": next_open, # 트레일링 스탑용 고점
"trend_mode": True, # 추세 모드 진입 여부 (청산 로직 구분)
"ath_drop": round(_drop(ath_arr[i]), 1),
"year_drop": round(_drop(year_high_arr[i]), 1),
"w52_drop": round(_drop(w52_high_arr[i]), 1),
}
# ─── 횡보/하락 모드 (역추세 분할매수) ───────────────────────
else:
stage = 0
if rsi <= rsi_buy3: stage = 3
elif rsi <= rsi_buy2: stage = 2
elif rsi <= rsi_buy1: stage = 1
if stage == 0:
continue
ratios = [buy1_ratio, buy2_ratio, buy3_ratio]
ratio = ratios[stage - 1]
if ratio <= 0:
continue # buy3_ratio=0이면 3단계 진입 안 함
invest = slot_money * ratio
qty = max(1, int(invest / next_open))
cost = next_open * qty * (1 + fee_rate)
position = {
"avg": next_open,
"qty": qty,
"cost": cost,
"stage": stage,
"entry_i": i + 1,
"peak": next_open,
"trend_mode": False,
"ath_drop": round(_drop(ath_arr[i]), 1),
"year_drop": round(_drop(year_high_arr[i]), 1),
"w52_drop": round(_drop(w52_high_arr[i]), 1),
}
# ─── 미청산 포지션 마지막 봉 강제 청산 ──────────────────────────
if position is not None and len(candles) > 0:
exit_price = closes[-1]
avg = position["avg"]; qty = position["qty"]
fee = exit_price * qty * (fee_rate + sell_tax)
pnl = (exit_price - avg) * qty - fee
cum_pnl += pnl
trades.append({
"buy_date": dates[position["entry_i"]],
"sell_date": dates[-1],
"avg_price": round(avg),
"exit_price": round(exit_price),
"qty": qty,
"pnl": round(pnl),
"hold_days": len(candles) - 1 - position["entry_i"],
"reason": "기간종료",
"rsi_sell": None,
"ath_drop": position.get("ath_drop", 0),
"year_drop": position.get("year_drop", 0),
"w52_drop": position.get("w52_drop", 0),
})
equity.append({"date": dates[-1], "cum_pnl": round(cum_pnl)})
# ─── 요약 통계 ───────────────────────────────────────────────────
total = len(trades)
wins = [t for t in trades if t["pnl"] > 0]
losses = [t for t in trades if t["pnl"] < 0]
total_pnl = sum(t["pnl"] for t in trades)
avg_hold = sum(t["hold_days"] for t in trades) / total if total else 0
win_pnl = sum(t["pnl"] for t in wins)
loss_pnl = sum(t["pnl"] for t in losses)
pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0
peak, mdd, cum = 0.0, 0.0, 0.0
for t in trades:
cum += t["pnl"]
peak = max(peak, cum)
mdd = max(mdd, peak - cum)
# 이유별 집계
reasons: Dict[str, int] = {}
for t in trades:
r = t["reason"]
prefix = r.split("(")[0] # "익절", "RSI과열", "손절", "기간종료"
reasons[prefix] = reasons.get(prefix, 0) + 1
# ── Buy & Hold 벤치마크 ──────────────────────────────────────────────
# 첫 봉 종가 매수 → 마지막 봉 종가 매도 (수수료·세금 제외 단순 계산)
bnh_pct = round((closes[-1] - closes[0]) / closes[0] * 100, 2) if closes[0] > 0 else 0.0
bnh_pnl = round(slot_money * bnh_pct / 100)
# 봇 수익률: 투자 원금(slot_money) 대비 실현 손익
bot_pct = round(total_pnl / slot_money * 100, 2) if slot_money > 0 else 0.0
# 알파 = 봇 수익률 - Buy & Hold 수익률 (양수면 봇이 시장 이김)
alpha_pct = round(bot_pct - bnh_pct, 2)
return {
"summary": {
"total_trades": total,
"win_trades": len(wins),
"loss_trades": len(losses),
"win_rate": round(len(wins) / total * 100, 1) if total else 0,
"total_pnl": round(total_pnl),
"avg_hold_days": round(avg_hold, 1),
"profit_factor": round(min(pf, 9999.0), 2),
"max_drawdown": round(mdd),
# Buy & Hold 비교
"bnh_pct": bnh_pct, # 기간 동안 그냥 들고 있었을 때 수익률(%)
"bnh_pnl": bnh_pnl, # 그냥 들고 있었을 때 손익(원)
"bot_pct": bot_pct, # 봇 수익률(투자금 대비 %)
"alpha_pct": alpha_pct, # 초과수익(봇 - Buy&Hold, %p)
},
"equity": equity,
"reasons": reasons,
"trades": trades[-200:],
}
# ─────────────────────────────────────────────────────────────────────────────
# 파라미터 Grid Search (단일 종목)
# ─────────────────────────────────────────────────────────────────────────────
def run_param_search(
candles: List[Dict],
grid: Optional[Dict] = None,
min_trades: int = 2,
base_cfg: Optional[Dict] = None, # 사용자가 설정한 현재 파라미터 (없으면 DEFAULT 사용)
) -> List[Dict]:
"""
종목 하나에 대해 파라미터 Grid Search 실행
반환: 수익 기준 내림차순 정렬된 결과 리스트
base_cfg: 탐색 그리드에 없는 파라미터들의 기준값.
웹 카드에서 사용자가 직접 설정한 값을 전달하면
rsi_sell, rsi_period, rsi_buy3 등이 그 값으로 고정됨.
None 이면 DEFAULT_STOCK_CONFIG 사용.
"""
from itertools import product as iproduct
if grid is None:
grid = {
# ── MA 추세 판단 ──────────────────────────────────────────────
"ma_fast": [15, 20, 30],
"ma_slow": [40, 60, 120],
# ── 샹들리에 배수: 추세장 청산 핵심 파라미터 ────────────────────
# 작을수록 타이트(빨리 청산) / 클수록 여유(오래 홀딩)
"atr_mult": [2.0, 3.0, 4.0],
# 퍼센트 보조 방어선 (0=비활성, ATR선이 더 낮을 때만 활성화)
"trail_stop_pct": [0.0, 8.0],
# ── 진입 RSI 눌림목 ────────────────────────────────────────────
"rsi_buy1": [60, 55, 50, 45],
"rsi_buy2": [50, 45, 40, 35],
# ── 횡보장 익절/손절 기준 (추세장엔 무시됨) ────────────────────
"take_profit_pct": [15.0, 30.0],
"stop_loss_pct": [7.0, 15.0],
# ── ATH 낙폭 필터 ─────────────────────────────────────────────
"ath_drop_min_pct": [0.0, 20.0],
}
keys = list(grid.keys())
combos = list(iproduct(*[grid[k] for k in keys]))
results = []
# base_cfg가 주어지면 사용자 설정값 우선, 없으면 DEFAULT 사용
_base = dict(DEFAULT_STOCK_CONFIG)
if base_cfg:
_base.update(base_cfg)
for vals in combos:
cfg = dict(_base)
cfg.update(dict(zip(keys, vals)))
# MA 단기 < 장기 조건 보정
if cfg["ma_fast"] >= cfg["ma_slow"]:
continue
# RSI buy1 > buy2 조건 보정
if cfg["rsi_buy1"] <= cfg["rsi_buy2"]:
continue
res = run_backtest(candles, cfg)
s = res.get("summary", {})
if s.get("total_trades", 0) < min_trades:
continue
results.append({
"params": {k: cfg[k] for k in keys},
"total_pnl": s["total_pnl"],
"win_rate": s["win_rate"],
"total_trades": s["total_trades"],
"pf": s["profit_factor"],
"avg_hold": s["avg_hold_days"],
"mdd": s["max_drawdown"],
})
results.sort(key=lambda x: x["total_pnl"], reverse=True)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 실매매 봇 (HoldingTrader)
# ─────────────────────────────────────────────────────────────────────────────
class HoldingTrader:
"""
장기 홀딩 실매매 봇
- active_trades 에 strategy='HOLDING' 으로 기록 → 다른 봇과 계좌 분리
- 종목별 파라미터는 holding_stock_config 에서 조회
- 10분 주기로 RSI 체크 → 3단계 분할매수 / 익절·손절 매도
- 진입가: 시장가 주문 (장중 호가 추종)
"""
STRATEGY = "HOLDING" # active_trades.strategy 값 — 다른 봇과 충돌 방지
BOT_NAME = "홀딩봇"
def __init__(self):
self.db = TradeDB()
ensure_holding_tables(self.db)
# 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신)
try:
from kis_token_manager import ensure_both_tokens
ensure_both_tokens()
except Exception as _te:
logger.warning(f"토큰 자동갱신 건너뜀: {_te}")
cfg_row = self.db.conn.execute(
"SELECT * FROM env_config ORDER BY id DESC LIMIT 1"
).fetchone()
r = dict(cfg_row) if cfg_row else {}
def safe(v): return v if v else ""
is_mock = str(r.get("KIS_MOCK", "true")).lower() in ("true", "1", "yes")
self.mock = is_mock
self.app_key = safe(r.get("KIS_APP_KEY_MOCK" if is_mock else "KIS_APP_KEY_REAL"))
self.app_secret = safe(r.get("KIS_APP_SECRET_MOCK" if is_mock else "KIS_APP_SECRET_REAL"))
self.base_url = (
"https://openapivts.koreainvestment.com:29443"
if is_mock else
"https://openapi.koreainvestment.com:9443"
)
self.account_no = safe(r.get("KIS_ACCOUNT_NO_MOCK" if is_mock else "KIS_ACCOUNT_NO_REAL"))
self.account_code = safe(r.get("KIS_ACCOUNT_CODE_MOCK" if is_mock else "KIS_ACCOUNT_CODE_REAL")) or "01"
# 알림 설정
self.mm_server = safe(r.get("MM_SERVER_URL", "")) or "https://mattermost.hoonfam.org"
self.mm_token = safe(r.get("MM_BOT_TOKEN_", ""))
self.mm_channel = safe(r.get("KIS_LONG_MM_CHANNEL", "")) or "stock"
self.tg_token = safe(r.get("MM_BOT_TOKEN_", "")) # TG 토큰 있으면 별도로
self.tg_chat = safe(r.get("KIS_LONG_MM_CHANNEL", ""))
self.watchlist = load_watchlist()
logger.info(f"{self.BOT_NAME} 초기화 | 모의={is_mock} | 종목={len(self.watchlist)}")
# ──────────────────── 알림 ────────────────────
def _notify(self, title: str, body: str):
msg = f"**[{self.BOT_NAME}]** {title}\n{body}"
if self.mm_token and self.mm_server:
try:
requests.post(
f"{self.mm_server}/hooks/{self.mm_token}",
json={"text": msg}, timeout=5
)
except Exception as e:
logger.debug(f"알림 전송 실패: {e}")
# ──────────────────── KIS REST ────────────────────
def _token(self) -> str:
return _ensure_token(self.mock)
def _get(self, path: str, params: dict) -> dict:
"""GET 요청 (429 재시도)"""
url = f"{self.base_url}{path}"
token = self._token()
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
}
for attempt in range(3):
try:
r = requests.get(url, headers=headers, params=params, timeout=10)
if r.status_code == 429:
time.sleep((attempt + 1) * 2)
continue
r.raise_for_status()
return r.json()
except Exception as e:
if attempt == 2:
raise
time.sleep(2)
raise RuntimeError("GET 요청 실패")
def _post(self, path: str, tr_id: str, data: dict) -> dict:
"""POST 요청 (주문용, 429 재시도)"""
url = f"{self.base_url}{path}"
token = self._token()
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": tr_id,
}
for attempt in range(3):
try:
r = requests.post(url, headers=headers, json=data, timeout=10)
if r.status_code == 429:
time.sleep((attempt + 1) * 2)
continue
r.raise_for_status()
return r.json()
except Exception as e:
if attempt == 2:
raise
time.sleep(2)
raise RuntimeError("POST 요청 실패")
def get_price(self, code: str) -> Optional[Dict]:
"""
현재가 + 당일 고/저, 52주 고/저, 등락률 조회 (FHKST01010100)
반환 dict 키:
price - 현재가
day_high - 당일 고가
day_low - 당일 저가
w52_high - 52주 최고가
w52_low - 52주 최저가
change_rate - 전일대비 등락률(%)
volume - 누적 거래량
"""
try:
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {self._token()}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": "FHKST01010100",
}
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}
r = requests.get(
f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price",
headers=headers, params=params, timeout=10
)
out = r.json().get("output", {})
price = float(out.get("stck_prpr", 0) or 0)
if price <= 0:
return None
return {
"price": price,
"day_high": float(out.get("stck_hgpr", 0) or 0), # 당일 고가
"day_low": float(out.get("stck_lwpr", 0) or 0), # 당일 저가
"w52_high": float(out.get("w52_hgpr", 0) or 0), # 52주 최고가
"w52_low": float(out.get("w52_lwpr", 0) or 0), # 52주 최저가
"change_rate": float(out.get("prdy_ctrt", 0) or 0), # 등락률(%)
"volume": int(out.get("acml_vol", 0) or 0), # 누적 거래량
}
except Exception as e:
logger.warning(f"현재가 조회 실패 ({code}): {e}")
return None
def _order(self, code: str, qty: int, is_buy: bool) -> bool:
"""시장가 주문 (매수/매도)"""
if qty <= 0:
return False
# 모의: VTTC0802U(매수) / VTTC0801U(매도) 실전: TTTC0802U / TTTC0801U
if is_buy:
tr_id = "VTTC0802U" if self.mock else "TTTC0802U"
else:
tr_id = "VTTC0801U" if self.mock else "TTTC0801U"
try:
data = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_code,
"PDNO": code,
"ORD_DVSN": "01", # 시장가
"ORD_QTY": str(qty),
"ORD_UNPR": "0",
}
result = self._post(
"/uapi/domestic-stock/v1/trading/order-cash", tr_id, data
)
ok = result.get("rt_cd") == "0"
if not ok:
logger.error(f"주문 실패 ({code} {'매수' if is_buy else '매도'}): {result.get('msg1','')}")
return ok
except Exception as e:
logger.error(f"주문 예외 ({code}): {e}")
return False
# ──────────────────── DB 포지션 ────────────────────
def _get_position(self, code: str) -> Optional[dict]:
"""HOLDING 전략 포지션 조회"""
row = self.db.conn.execute(
"SELECT * FROM active_trades WHERE code=%s AND strategy=%s",
[code, self.STRATEGY]
).fetchone()
return dict(row) if row else None
def _upsert_position(self, code: str, name: str, avg_price: float,
qty: int, invested: float, stage: int,
is_uptrend: bool = False):
"""
포지션 저장/갱신 (strategy='HOLDING').
status = T{stage}(추세장 진입) or S{stage}(횡보장 진입)
→ 봇 재시작 후에도 청산 모드(T/S)를 구분하여 샹들리에/고정익절 적용
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mode_prefix = "T" if is_uptrend else "S" # T=Trend / S=Sideways
self.db.conn.execute(
"""INSERT INTO active_trades
(code, name, strategy, avg_buy_price, current_qty, target_qty,
total_invested, status, buy_date, updated_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
avg_buy_price=VALUES(avg_buy_price),
current_qty=VALUES(current_qty),
target_qty=VALUES(target_qty),
total_invested=VALUES(total_invested),
status=VALUES(status),
updated_at=VALUES(updated_at)""",
[code, name, self.STRATEGY, round(avg_price, 2), qty, qty,
round(invested, 2), f"{mode_prefix}{stage}", now, now]
)
self.db.conn.commit()
def _delete_position(self, code: str):
"""포지션 삭제 (청산 후)"""
self.db.conn.execute(
"DELETE FROM active_trades WHERE code=%s AND strategy=%s",
[code, self.STRATEGY]
)
self.db.conn.commit()
# ──────────────────── RSI 계산 ────────────────────
def _get_rsi(self, code: str, cfg: dict, current_price: float) -> Optional[float]:
"""
저장된 일봉 + 당일 현재가(가중치 없이 단순 추가)로 RSI 계산.
장중에도 '오늘의 종가가 이 가격이라면' 의 RSI를 반환.
"""
period = int(cfg.get("rsi_period", 14))
candles = get_stored_candles(self.db, code)
if len(candles) < period + 2:
return None
closes = [float(c["close"]) for c in candles]
# 오늘 데이터가 DB에 없으면 현재가로 추가
today_str = datetime.now().strftime("%Y-%m-%d")
if str(candles[-1]["candle_date"])[:10] != today_str:
closes.append(float(current_price))
rsis = _rsi_series(closes, period)
for r in reversed(rsis):
if r is not None:
return r
return None
# ──────────────────── 매수 로직 ────────────────────
def _try_buy(self, code: str, name: str, current_price: float,
rsi: float, cfg: dict, ctx: Optional[Dict] = None,
is_uptrend: bool = False):
"""
분할매수 체크 및 실행.
is_uptrend: 추세 상승장 여부 → status에 T/S 기록해 청산 모드 구분
ctx: _calc_context_signals() 반환값 (ATH·연도고점·52주 낙폭 필터)
"""
rsi_buy1 = float(cfg.get("rsi_buy1", 45))
rsi_buy2 = float(cfg.get("rsi_buy2", 40))
rsi_buy3 = float(cfg.get("rsi_buy3", 35))
b1 = float(cfg.get("buy1_ratio", 30)) / 100
b2 = float(cfg.get("buy2_ratio", 30)) / 100
b3 = float(cfg.get("buy3_ratio", 40)) / 100
slot = float(cfg.get("slot_money", 3_000_000))
ath_min = float(cfg.get("ath_drop_min_pct", 0.0))
year_min = float(cfg.get("year_drop_min_pct", 0.0))
w52_min = float(cfg.get("w52_drop_min_pct", 0.0))
pos = self._get_position(code)
cur_status = (pos["status"] or "S0") if pos else "S0"
cur_stage = int(cur_status[1]) if len(cur_status) > 1 and cur_status[1].isdigit() else 0
# ─── 매수 단계 및 비중 결정 ────────────────────────────────────────
target_stage, ratio = 0, 0.0
if cur_stage == 0 and rsi <= rsi_buy1:
target_stage, ratio = 1, b1
elif cur_stage == 1 and rsi <= rsi_buy2:
target_stage, ratio = 2, b2
elif cur_stage == 2 and rsi <= rsi_buy3:
target_stage, ratio = 3, b3
if target_stage == 0:
return # RSI 조건 미충족
# ─── 컨텍스트 낙폭 필터 ────────────────────────────────────────────
if ctx:
if ath_min > 0 and ctx["ath_drop_pct"] < ath_min:
logger.info(f"[{code}] ATH 낙폭 부족 ({ctx['ath_drop_pct']:.1f}% < {ath_min}%) → 매수 보류")
return
if year_min > 0 and ctx["year_drop_pct"] < year_min:
logger.info(f"[{code}] 연도 낙폭 부족 ({ctx['year_drop_pct']:.1f}% < {year_min}%) → 매수 보류")
return
if w52_min > 0 and ctx["w52_drop_pct"] < w52_min:
logger.info(f"[{code}] 52주 낙폭 부족 ({ctx['w52_drop_pct']:.1f}% < {w52_min}%) → 매수 보류")
return
invest = slot * ratio
qty = max(1, int(invest / current_price))
cost = current_price * qty
mode_str = "📈추세" if is_uptrend else "📉역추세"
logger.info(f"[{code}] {mode_str} 매수 RSI={rsi:.1f}{target_stage}{qty}주 @ {current_price:,.0f}")
if not self._order(code, qty, is_buy=True):
return
# 평단가 갱신
if pos:
prev_cost = float(pos["avg_buy_price"]) * int(pos["current_qty"])
prev_qty = int(pos["current_qty"])
new_qty = prev_qty + qty
new_avg = (prev_cost + cost) / new_qty
new_inv = float(pos["total_invested"]) + cost
else:
new_qty = qty
new_avg = current_price
new_inv = cost
self._upsert_position(code, name, new_avg, new_qty, new_inv, target_stage, is_uptrend)
# 컨텍스트 낙폭 정보 알림에 포함
ctx_txt = ""
if ctx:
ctx_txt = (
f"\n▪ ATH 낙폭: {ctx['ath_drop_pct']:.1f}%"
f" | 연도: {ctx['year_drop_pct']:.1f}%"
f" | 52주: {ctx['w52_drop_pct']:.1f}%"
)
pnl_txt = f"평단가: {new_avg:,.0f}원 | 수량: {new_qty}"
self._notify(
f"📥 {name}({code}) {target_stage}차 매수",
f"▪ 체결가: {current_price:,.0f}\n"
f"▪ 수량: {qty}주 ({cost:,.0f}원)\n"
f"{pnl_txt}\n"
f"▪ RSI: {rsi:.1f} (기준: {[rsi_buy1,rsi_buy2,rsi_buy3][target_stage-1]})"
f"{ctx_txt}"
)
# ──────────────────── 매도 로직 ────────────────────
def _try_sell(self, code: str, name: str, current_price: float,
rsi: float, cfg: dict,
is_deadcross: bool = False,
peak_price: float = 0.0,
atr_val: float = 0.0):
"""
익절/손절 체크 및 실행.
[추세장 진입(status=T*)]
→ 샹들리에 엑시트(peak ATR × mult) + 데드크로스만 청산
→ 고정 익절·RSI과열 무시: 추세 끝까지 추적
[횡보장 진입(status=S*)]
→ 기존 방식: 고정 익절% / RSI과열 / 손절%
"""
pos = self._get_position(code)
if not pos:
return
avg = float(pos["avg_buy_price"])
qty = int(pos["current_qty"])
pnl_r = (current_price - avg) / avg if avg > 0 else 0.0
cur_status = (pos.get("status") or "S1")
is_trend_mode = cur_status.startswith("T") # T=추세장, S=횡보장
tp_pct = float(cfg.get("take_profit_pct", 15.0)) / 100
sl_pct = float(cfg.get("stop_loss_pct", 10.0)) / 100
rsi_sell_thr = float(cfg.get("rsi_sell", 75.0))
trail_stop_p = float(cfg.get("trail_stop_pct", 8.0))
atr_mult = float(cfg.get("atr_mult", 3.0))
sell_reason = None
if is_trend_mode:
# ─── 추세장: 고정 익절·RSI과열 무시 → 추세 끝까지 추적 ──────────
if is_deadcross:
sell_reason = f"데드크로스({pnl_r*100:+.1f}%)"
elif peak_price > 0 and atr_val > 0:
chandelier = peak_price - atr_val * atr_mult
if trail_stop_p > 0:
pct_trail = peak_price * (1.0 - trail_stop_p / 100.0)
chandelier = max(chandelier, pct_trail)
if current_price <= chandelier:
sell_reason = f"샹들리에({pnl_r*100:+.1f}%)"
# 손절은 추세장에서도 유지 (치명적 손실 방지)
if sell_reason is None and pnl_r <= -sl_pct:
sell_reason = f"손절({pnl_r*100:.1f}%)"
else:
# ─── 횡보장: 기존 방식 철저히 준수 ──────────────────────────────
if pnl_r >= tp_pct:
sell_reason = f"익절(+{pnl_r*100:.1f}%)"
elif rsi and rsi >= rsi_sell_thr:
sell_reason = f"RSI과열({rsi:.1f})"
elif pnl_r <= -sl_pct:
sell_reason = f"손절({pnl_r*100:.1f}%)"
if not sell_reason:
return
logger.info(f"[{code}] 매도 신호: {sell_reason} | {qty}주 @ {current_price:,.0f}"
f"(peak:{peak_price:,.0f} atr:{atr_val:,.0f})")
if not self._order(code, qty, is_buy=False):
return
fee = current_price * qty * (0.015 / 100 + 0.18 / 100)
pnl = (current_price - avg) * qty - fee
self._delete_position(code)
self._notify(
f"📤 {name}({code}) 매도 | {sell_reason}",
f"▪ 체결가: {current_price:,.0f}\n"
f"▪ 수량: {qty}\n"
f"▪ 평단가: {avg:,.0f}\n"
f"▪ 최고가: {peak_price:,.0f}원 | ATR: {atr_val:,.0f}\n"
f"▪ 손익: {pnl:+,.0f}\n"
f"▪ 수익률: {pnl_r*100:+.2f}%"
)
# ──────────────────── 메인 루프 ────────────────────
def _is_market_hour(self) -> bool:
"""장 시간 체크 (09:00~15:30 평일)"""
now = datetime.now()
if now.weekday() >= 5: # 토·일
return False
t = now.hour * 100 + now.minute
return 900 <= t <= 1530
def _fetch_today_candles(self):
"""
장 마감 직후 오늘 일봉을 자동 수집하여 DB 갱신.
KIS API는 당일 종가가 확정되는 15:30 이후 조회 가능.
"""
today = datetime.now().strftime("%Y-%m-%d")
app_key, app_secret, base_url, mock = _get_kis_token(self.db)
logger.info(f"📥 장 마감 후 자동 캔들 수집 ({today})")
total_saved = 0
for item in self.watchlist:
code, name = item["code"], item["name"]
try:
rows = fetch_daily_ohlcv(code, today, today,
app_key, app_secret, base_url, mock=mock)
saved = store_candles(self.db, code, rows)
total_saved += saved
logger.info(f" [{code}] {name}: {saved}봉 저장")
time.sleep(0.3)
except Exception as e:
logger.error(f" [{code}] 캔들 자동 수집 오류: {e}")
logger.info(f"✅ 자동 캔들 수집 완료 (총 {total_saved}봉 저장)")
self._notify(
"📊 일봉 자동 수집 완료",
f"{today} 종가 기준 {len(self.watchlist)}종목 수집 ({total_saved}봉 저장)"
)
def run(self):
logger.info(f"\n{'='*50}")
logger.info(f" {self.BOT_NAME} 시작 | 모의={self.mock}")
logger.info(f" 종목: {[i['name'] for i in self.watchlist]}")
logger.info(f"{'='*50}\n")
self._notify("🚀 봇 시작", f"종목 {len(self.watchlist)}개 모니터링 시작")
_candles_fetched_date = "" # 오늘 자동수집 완료 날짜 (중복 방지)
while True:
try:
now = datetime.now()
t = now.hour * 100 + now.minute
# ── 장 마감 후 자동 일봉 수집 (15:35~15:50, 하루 1회) ──
today_str = now.strftime("%Y-%m-%d")
if (now.weekday() < 5
and 1535 <= t <= 1550
and _candles_fetched_date != today_str):
self._fetch_today_candles()
_candles_fetched_date = today_str
if not self._is_market_hour():
logger.info("⏳ 장 외 시간 대기...")
time.sleep(60)
continue
for item in self.watchlist:
code = item["code"]
name = item["name"]
try:
cfg = get_stock_config(self.db, code)
pi = self.get_price(code) # price info dict
if not pi:
continue
price = pi["price"]
# DB 일봉 기반 컨텍스트 신호 (ATH·연도고점·52주 고저)
stored = get_stored_candles(self.db, code)
ctx = _calc_context_signals(stored, price)
rsi = self._get_rsi(code, cfg, price)
pos = self._get_position(code)
# ── 실시간 MA / ATR / Peak 계산 ──────────────────────
is_uptrend = False
is_deadcross = False
peak_price = price
atr_val = price * 0.05 # fallback: 현재가의 5%
if len(stored) >= 30:
_cl = [float(c["close"]) for c in stored]
_hi = [float(c["high"]) for c in stored]
_lo = [float(c["low"]) for c in stored]
# 오늘 실시간 봉 추가 (당일 고·저·종가로 지표 갱신)
_cl.append(price); _hi.append(pi["day_high"])
_lo.append(pi["day_low"])
maf_p = int(cfg.get("ma_fast", 20))
mas_p = int(cfg.get("ma_slow", 60))
atr_p = int(cfg.get("atr_period", 14))
tf = float(cfg.get("trend_filter", 1.0)) >= 0.5
maf_a = _ma_series(_cl, maf_p)
mas_a = _ma_series(_cl, mas_p)
atr_a = _atr_series(_hi, _lo, _cl, atr_p)
mf = maf_a[-1]; ms = mas_a[-1]
mfp = maf_a[-2] if len(maf_a) > 1 else mf
msp = mas_a[-2] if len(mas_a) > 1 else ms
if atr_a[-1] is not None:
atr_val = atr_a[-1]
if tf and mf and ms:
is_uptrend = mf > ms
is_deadcross = (mf < ms and mfp is not None
and msp is not None and mfp >= msp)
# ── 보유 시 진입일 이후 최고가(Peak) 추적 ────────────
if pos:
buy_d = str(pos.get("buy_date", ""))[:10]
if buy_d:
post = [c for c in stored if str(c["candle_date"])[:10] >= buy_d]
highs = [float(c["high"]) for c in post] + [pi["day_high"], price]
peak_price = max(highs) if highs else price
logger.info(
f"[{code}] {name} | "
f"현재가:{price:,.0f}({pi['change_rate']:+.2f}%) | "
f"ATH낙폭:{ctx['ath_drop_pct']:.1f}% "
f"연도:{ctx['year_drop_pct']:.1f}% | "
f"RSI:{f'{rsi:.1f}' if rsi else 'N/A'} | "
f"추세:{'상승' if is_uptrend else '하락/횡보'} | "
f"ATR:{atr_val:,.0f} | "
f"포지션:{'있음(' + pos['status'] + ')' if pos else '없음'}"
)
if pos:
self._try_sell(code, name, price, rsi, cfg,
is_deadcross=is_deadcross,
peak_price=peak_price,
atr_val=atr_val)
else:
self._try_buy(code, name, price, rsi, cfg, ctx,
is_uptrend=is_uptrend)
except Exception as e:
logger.error(f"[{code}] 처리 중 오류: {e}")
time.sleep(3)
sleep_sec = random.uniform(540, 660) # 9~11분 랜덤 대기
logger.info(f"💤 {sleep_sec/60:.1f}분 후 다음 체크...")
time.sleep(sleep_sec)
except KeyboardInterrupt:
logger.info("🛑 사용자 중단")
self._notify("🛑 봇 중단", "사용자가 종료했습니다.")
break
except Exception as e:
logger.error(f"메인 루프 오류: {e}")
time.sleep(60)
self.db.close()
# ─────────────────────────────────────────────────────────────────────────────
# CLI: 캔들 수집 / 백테스트 실행
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="홀딩 전략 봇")
sub = parser.add_subparsers(dest="cmd")
p_fetch = sub.add_parser("fetch", help="일봉 캔들 수집")
p_fetch.add_argument("--start", default="2023-01-01", help="시작일 YYYY-MM-DD")
p_fetch.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d"))
p_bt = sub.add_parser("backtest", help="백테스트 실행")
p_bt.add_argument("--code", default="", help="종목코드 (공백=전체)")
p_bt.add_argument("--start", default="2023-01-01")
p_bt.add_argument("--end", default=datetime.now().strftime("%Y-%m-%d"))
sub.add_parser("live", help="실매매 봇 시작")
args = parser.parse_args()
if args.cmd == "live":
# 실매매 봇 시작 — DB는 HoldingTrader 내부에서 관리
trader = HoldingTrader()
trader.run()
sys.exit(0)
db = TradeDB()
ensure_holding_tables(db)
if args.cmd == "fetch":
items = load_watchlist()
app_key, app_secret, base_url, mock = _get_kis_token(db)
for item in items:
code, name = item["code"], item["name"]
logger.info(f" [{code}] {name} 일봉 수집 ({args.start} ~ {args.end})")
rows = fetch_daily_ohlcv(code, args.start, args.end, app_key, app_secret, base_url, mock=mock)
saved = store_candles(db, code, rows)
logger.info(f"{saved}봉 저장 완료")
time.sleep(0.5)
elif args.cmd == "backtest":
items = load_watchlist()
if args.code:
items = [i for i in items if i["code"] == args.code]
for item in items:
code = item["code"]
candles = get_stored_candles(db, code, args.start, args.end)
cfg = get_stock_config(db, code)
result = run_backtest(candles, cfg)
s = result.get("summary", {})
print(f"\n[{code}] {item['name']}")
print(f" 거래:{s.get('total_trades')} | 승률:{s.get('win_rate')}% | "
f"손익:{s.get('total_pnl'):+,}원 | PF:{s.get('profit_factor')} | "
f"MDD:{s.get('max_drawdown'):,}")
else:
parser.print_help()
db.close()