338 lines
15 KiB
Python
338 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
[1회성] 13시 발송용 AI 추천 수치 스크립트
|
||
- DB env_config 최신 수치 + trade_history 최근 10건 + journalctl 최근 200줄을 참고해
|
||
Gemini로 문제점·수치 추천 분석 후 Mattermost 발송 및 last_ai_recommendations 저장.
|
||
- 실행: python3 ai_recommend_notice.py (cron 0 13 * * * 등으로 13:00 실행 가능)
|
||
"""
|
||
import os
|
||
import re
|
||
import sys
|
||
import json
|
||
import subprocess
|
||
import warnings
|
||
from pathlib import Path
|
||
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
sys.path.insert(0, str(SCRIPT_DIR))
|
||
|
||
# DB/설정은 프로젝트 DB 사용
|
||
from database import TradeDB, ENV_CONFIG_KEYS
|
||
|
||
warnings.filterwarnings("ignore", message=".*google.*")
|
||
|
||
# Gemini
|
||
GEMINI_MODEL_ID = "gemini-2.5-flash"
|
||
gemini_client = None
|
||
try:
|
||
import google.generativeai as genai
|
||
db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db"))
|
||
latest = db.get_latest_env()
|
||
snap = (latest or {}).get("snapshot") or {}
|
||
GEMINI_API_KEY = (snap.get("GEMINI_API_KEY") or "").strip()
|
||
if GEMINI_API_KEY:
|
||
genai.configure(api_key=GEMINI_API_KEY)
|
||
gemini_client = genai.GenerativeModel(GEMINI_MODEL_ID)
|
||
else:
|
||
db.close()
|
||
print("❌ GEMINI_API_KEY 없음 (DB env_config)")
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
print(f"❌ Gemini/DB 초기화 실패: {e}")
|
||
sys.exit(1)
|
||
|
||
# 단타 봇(kis_short_ver1.py) 코드 기준: env 키가 쓰이는 위치 (AI 추천 시 반드시 참고)
|
||
KIS_SHORT_CODE_REFERENCE = """
|
||
**[단타 봇 kis_short_ver1.py – env 수치 사용 위치 (추천 시 이 봇에만 쓰이는 키만 제안할 것)]**
|
||
|
||
■ reload_config() – DB에서 읽어 봇에 반영
|
||
- STOP_LOSS_PCT: 퍼센트 손절 (profit_pct <= 이 값이면 칼손절, 기본 -0.04)
|
||
- TAKE_PROFIT_PCT: 퍼센트 익절 (profit_pct >= 이 값이면 익절(퍼센트), 기본 0.05)
|
||
- MAX_STOCKS: 최대 보유 종목 수
|
||
- MIN_DROP_RATE: 매수 조건 – 당일 낙폭 하한 (시가 대비 저가 하락률, drop_rate < 이 값이면 탈락-낙폭, 기본 0.03)
|
||
- MIN_RECOVERY_RATIO_SHORT: 매수 조건 – 일봉 회복률 하한 (저가~현재가/당일범위, recovery_pos_day < 이 값이면 탈락-회복률, 기본 0.5). 동일 값이 3분봉 회복률 구간 하한으로도 사용됨(min_recovery_ratio ~ MAX_RECOVERY_RATIO_3M).
|
||
- STOP_ATR_MULTIPLIER_TAIL, TARGET_ATR_MULTIPLIER_TAIL: ATR 손절가/목표가 배율 (stop_price = 매수가 - ATR*배율, target = 매수가 + ATR*배율)
|
||
- MIN_HOLD_HOURS: 24시간 보유 전략의 기준 시간(시간). 이 시간 미만이면 “24시간내” 규칙, 이상이면 “24시간 경과” 규칙 적용.
|
||
- RISK_PCT_PER_TRADE, KELLY_MULTIPLIER, MAX_POSITION_PCT, MIN_POSITION_AMOUNT, USE_KELLY_FORMULA: RiskManager(매수 금액·종목당 상한)
|
||
- SLOT_BASE_AMOUNT_CAP: RiskManager 종목당 원화 상한(0이면 미적용). 매수 금액이 이 값을 넘으면 이 값으로 잘림.
|
||
- ML: USE_ML_SIGNAL, ML_MIN_PROBABILITY
|
||
- TOTAL_DEPOSIT: 리포트용
|
||
|
||
■ 매수 필터 (check_buy_signal_tail_catch) – 순서대로 적용, 하나라도 실패하면 탈락
|
||
- 낙폭: drop_rate = (시가-저가)/시가. drop_rate < MIN_DROP_RATE → 탈락-낙폭
|
||
- 회복률(일봉): recovery_pos_day = (현재가-저가)/(고가-저가). < MIN_RECOVERY_RATIO_SHORT → 탈락-회복률
|
||
- 꼬리: tail_ratio = 꼬리길이/몸통길이, tail_pct = 꼬리길이/저가. TAIL_RATIO_MIN(기본 1.5) 또는 TAIL_PCT_MIN(기본 0.003) 미만 → 탈락-꼬리
|
||
- 3분봉 회복: recovery_pos = (현재가-저가)/(고가-저가) (해당 3분봉 내). MIN_RECOVERY_RATIO_SHORT ~ MAX_RECOVERY_RATIO_3M(기본 0.8) 구간이 아니면 → 탈락-회복3분
|
||
- 피뢰침: HIGH_PRICE_CHASE_THRESHOLD(0.96) – 현재가 >= 당일고가*이 값 → 탈락-피뢰침 고점추격. MAX_DAILY_CHANGE_PCT(20) – (고가-저가)/저가*100 > 이 값 → 탈락-피뢰침 급등주
|
||
- RSI_OVERHEAT_THRESHOLD(78): RSI >= 이 값 → 탈락-RSI
|
||
- MA20: 현재가 < MA20 → 탈락-MA20. 현재가 > MA20*(1+MA20_MAX_ABOVE_PCT/100) → 탈락-MA20초과
|
||
|
||
■ 매도 (check_sell_signals) – 아래 순서대로 적용, 먼저 걸린 사유로 매도
|
||
- [1] SHOULDER_CUT_PCT(0.03): 고점 대비 하락률 >= 이 값 → 어깨매도 (최우선)
|
||
- [2] MAX_LOSS_PER_TRADE_KRW(200000): 원화 손실(profit_val) <= -이 값 → 금액손실컷. 0이면 이 조건 비활성화.
|
||
- [3] SCALP_ATR_UP_MULT, SCALP_ATR_DOWN_MULT, SCALP_ATR_DROP_MULT: ATR 스캘핑 → 스캘핑_본절사수 / 스캘핑_익절보존
|
||
- [4] USE_QUICK_PROFIT_PROTECTION(True): **매수 후 0.5시간(QUICK_PROFIT_PROTECT_HOURS) 이내에만** 적용. 이 시간이 지나면 이 조건은 더 이상 보지 않음. 조건: 고점이 매수가*QUICK_PROFIT_MAX_RATIO(1.005) 이상 올랐는데, 현재가가 매수가*QUICK_PROFIT_CURRENT_MIN(1.0015) 이하로 다시 내려오면 → "💨 작은수익보호" 매도. (24시간 제한과 별개: 0.5시간만 적용, 그 이후는 [5] 24시간 보유 전략이 적용됨)
|
||
- [5] 24시간 보유 전략 (MIN_HOLD_HOURS 기준): **보유 시간이 MIN_HOLD_HOURS(24) 미만**이면: 수익률 > MIN_HOLD_EARLY_TAKE_PCT(0.05) → 익절, 또는 고점이 매수가*MIN_HOLD_HIGH_PCT(1.07) 이상 올랐다가 현재가 <= 고점*MIN_HOLD_DROP_FROM_HIGH(0.97) → 고점→하락 매도. **MIN_HOLD_HOURS 이상**이면: 수익률 > POST_HOLD_TAKE_PCT(0.02) → 익절, 또는 수익 중인데 현재가 < 고점*POST_HOLD_DROP_FROM_HIGH(0.97) → 익절보호.
|
||
- [6] (위에서 모두 걸리지 않았을 때만) 목표가/손절: 현재가 >= target_price → 목표달성. 현재가 <= stop_price(ATR 손절가) → 전략손절. profit_pct <= stop_loss_pct → 칼손절. profit_pct >= take_profit_pct → 익절(퍼센트).
|
||
|
||
※ CONSECUTIVE_LOSS_LIMIT, MAX_PEG, MIN_RECOVERY_RATIO(단타는 MIN_RECOVERY_RATIO_SHORT만 사용)는 이 단타 봇에서 사용하지 않음. MAX_PEG는 kis_long_ver1.py(늘림목 봇) 전용. 추천 시 위에 나온 키만 사용할 것.
|
||
"""
|
||
|
||
# DB 최신 env에서 수치만 (계좌/키/토큰 제외)
|
||
EXCLUDE = {
|
||
"MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY",
|
||
"KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK",
|
||
"KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK",
|
||
"KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL",
|
||
}
|
||
|
||
|
||
def get_env_numeric_snapshot(db_instance):
|
||
"""DB 최신 env에서 계좌/키 제외한 수치만 키=값 줄 단위로 반환."""
|
||
latest = db_instance.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return ""
|
||
snap = latest["snapshot"]
|
||
lines = []
|
||
for k, v in sorted(snap.items()):
|
||
if k in EXCLUDE or v is None:
|
||
continue
|
||
v = (str(v) or "").strip()
|
||
if "#" in v:
|
||
v = v.split("#")[0].strip()
|
||
if not v:
|
||
continue
|
||
lines.append(f"{k}={v}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def get_recent_trades(db_instance, limit=10):
|
||
"""trade_history 최근 N건."""
|
||
out = []
|
||
try:
|
||
cursor = db_instance.conn.execute("""
|
||
SELECT code, name, buy_price, sell_price, qty, profit_rate,
|
||
realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes
|
||
FROM trade_history
|
||
ORDER BY id DESC
|
||
LIMIT ?
|
||
""", (limit,))
|
||
for row in cursor.fetchall():
|
||
out.append({
|
||
"code": row[0], "name": row[1], "buy_price": row[2], "sell_price": row[3],
|
||
"qty": row[4], "profit_rate": row[5], "realized_pnl": row[6], "strategy": row[7],
|
||
"sell_reason": row[8], "buy_date": row[9], "sell_date": row[10], "hold_minutes": row[11] or 0,
|
||
})
|
||
except Exception as e:
|
||
print(f"⚠️ 거래 내역 조회 실패: {e}")
|
||
return out
|
||
|
||
|
||
def get_journalctl_recent(lines=200, unit=None):
|
||
"""journalctl 최근 N줄. unit 있으면 -u unit 적용 (예: kis_short)."""
|
||
cmd = ["journalctl", "-n", str(lines), "-o", "short-iso"]
|
||
if unit:
|
||
cmd = ["journalctl", "-u", unit, "-n", str(lines), "-o", "short-iso"]
|
||
try:
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||
if r.returncode == 0 and r.stdout:
|
||
return r.stdout.strip()
|
||
except Exception as e:
|
||
print(f"⚠️ journalctl 조회 실패: {e}")
|
||
return ""
|
||
|
||
|
||
def save_ai_recommendations_from_text(db_instance, analysis_text):
|
||
"""AI 분석문에서 KEY=값 추천만 추출해 DB last_ai_recommendations에 저장."""
|
||
if not analysis_text or not db_instance:
|
||
return
|
||
valid_keys = set(ENV_CONFIG_KEYS)
|
||
result = []
|
||
for line in analysis_text.splitlines():
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line)
|
||
if m and m.group(1) in valid_keys:
|
||
result.append(f"{m.group(1)}={m.group(2).strip()}")
|
||
if result:
|
||
db_instance.set_last_ai_recommendations("\n".join(result))
|
||
|
||
|
||
def send_mm(message, db_instance):
|
||
"""Mattermost 발송 (DB env에서 URL/토큰/채널 조회, mm_config.json 채널 ID)."""
|
||
latest = db_instance.get_latest_env()
|
||
snap = (latest or {}).get("snapshot") or {}
|
||
url = (snap.get("MM_SERVER_URL") or "https://mattermost.hoonfam.org").strip().rstrip("/")
|
||
token = (snap.get("MM_BOT_TOKEN_") or "").strip()
|
||
channel_alias = (snap.get("KIS_SHORT_MM_CHANNEL") or snap.get("MATTERMOST_CHANNEL") or "stock").strip()
|
||
if not token:
|
||
print("❌ MM_BOT_TOKEN_ 없음")
|
||
return False
|
||
config_path = SCRIPT_DIR / "mm_config.json"
|
||
channels = {}
|
||
if config_path.exists():
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
channels = json.load(f).get("channels", {})
|
||
except Exception:
|
||
pass
|
||
channel_id = channels.get(channel_alias)
|
||
if not channel_id:
|
||
print(f"❌ mm_config.json에 채널 alias '{channel_alias}' 없음")
|
||
return False
|
||
import requests
|
||
api_url = f"{url}/api/v4/posts"
|
||
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
||
payload = {"channel_id": channel_id, "message": message}
|
||
try:
|
||
r = requests.post(api_url, headers=headers, json=payload, timeout=10)
|
||
r.raise_for_status()
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ MM 전송 에러: {e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
# DB 참고: 수치 스냅샷, 최근 거래, 후보 수
|
||
env_lines = get_env_numeric_snapshot(db)
|
||
recent_trades = get_recent_trades(db, 10)
|
||
db_candidates = db.get_target_candidates()
|
||
candidate_count = len(db_candidates)
|
||
|
||
# journalctl 최근 200줄 (단위는 환경변수 JOURNALCTL_UNIT 있으면 사용, 없으면 전체)
|
||
journal_unit = os.environ.get("JOURNALCTL_UNIT", "kis_short_ver1.service").strip() or None
|
||
journal_log = get_journalctl_recent(200, unit=journal_unit)
|
||
if not journal_log:
|
||
journal_log = "(journalctl 로그 없음)"
|
||
|
||
# 프롬프트: DB + 거래 + journal 로그 반영
|
||
if not recent_trades:
|
||
summary = f"- 유니버스 후보: {candidate_count}개\n- 최근 거래: 없음"
|
||
trades_text = "없음"
|
||
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
|
||
|
||
**현재 상태**
|
||
{summary}
|
||
|
||
**현재 DB 설정 수치 (일부만, 계좌/키 제외)**
|
||
```
|
||
{env_lines}
|
||
```
|
||
|
||
**봇 최근 로그 (journalctl 최근 200줄) – 탈락 사유·API 과부하·매수 시도 등 참고**
|
||
```
|
||
{journal_log[:15000]}
|
||
```
|
||
|
||
{KIS_SHORT_CODE_REFERENCE}
|
||
|
||
**당신의 임무**
|
||
1. 위 설정·후보 수·로그·코드 기준을 보고 문제점 분석 (필터 과도, API 제한, 매수 미발생 원인 등). 추천은 반드시 위 코드 기준에 나온 키만 제안할 것.
|
||
2. **추천**: 반드시 "설정수치=값" 한 줄에 하나만. 이유·주석 붙이지 말 것. DB에 그대로 복붙 적용 가능하게.
|
||
예: MIN_DROP_RATE=0.005
|
||
예: MIN_RECOVERY_RATIO_SHORT=0.25
|
||
예: TAIL_RATIO_MIN=0.5
|
||
3. 예상 효과 한두 줄.
|
||
|
||
**출력 형식 (반드시 준수)**
|
||
## 🔍 문제점
|
||
1. [구체적 문제 1]
|
||
2. [구체적 문제 2]
|
||
|
||
## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용 가능)
|
||
KEY=value
|
||
...
|
||
|
||
## 📈 예상 효과
|
||
- [효과]
|
||
"""
|
||
else:
|
||
total = len(recent_trades)
|
||
wins = sum(1 for t in recent_trades if (t.get("profit_rate") or 0) > 0)
|
||
losses = total - wins
|
||
win_rate = (wins / total * 100) if total > 0 else 0
|
||
avg_profit = sum(t.get("profit_rate") or 0 for t in recent_trades) / total
|
||
total_pnl = sum(t.get("realized_pnl") or 0 for t in recent_trades)
|
||
avg_hold = sum(t.get("hold_minutes") or 0 for t in recent_trades) / total
|
||
trades_text = ""
|
||
for i, t in enumerate(recent_trades, 1):
|
||
trades_text += f"""
|
||
[거래 {i}] {t['name']} ({t.get('strategy','')})
|
||
- 매수: {t.get('buy_price',0):,.0f}원 × {t.get('qty',0)}주 | 매도: {t.get('sell_price',0):,.0f}원
|
||
- 손익: {t.get('profit_rate',0):+.2f}% ({t.get('realized_pnl',0):,.0f}원) | 보유: {t.get('hold_minutes',0)}분
|
||
- 사유: {t.get('sell_reason','')}
|
||
"""
|
||
summary = f"- 유니버스 후보: {candidate_count}개\n- 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}승 {losses}패)\n- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:,.0f}원 | 평균 보유: {avg_hold:.0f}분"
|
||
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
|
||
|
||
**현재 상태**
|
||
{summary}
|
||
|
||
**최근 거래 내역**
|
||
{trades_text}
|
||
|
||
**현재 DB 설정 수치 (계좌/키 제외)**
|
||
```
|
||
{env_lines}
|
||
```
|
||
|
||
**봇 최근 로그 (journalctl 최근 200줄) – 탈락 사유·매수 실패·과부하 등 참고**
|
||
```
|
||
{journal_log[:15000]}
|
||
```
|
||
|
||
{KIS_SHORT_CODE_REFERENCE}
|
||
|
||
**당신의 임무**
|
||
1. 거래 내역·설정·로그·코드 기준을 보고 승률/매수 미발생 원인 등 구체적으로 3가지 진단. 추천은 반드시 위 코드 기준에 나온 키만 제안할 것.
|
||
2. **추천**: 반드시 "설정수치=값" 한 줄에 하나만. 이유·주석 붙이지 말 것. DB에 그대로 복붙 적용 가능하게.
|
||
3. 예상 효과 한두 줄.
|
||
|
||
**출력 형식 (반드시 준수)**
|
||
## 🔍 문제점 (승률/매수 원인)
|
||
1. [구체적 문제 1]
|
||
2. [구체적 문제 2]
|
||
3. [구체적 문제 3]
|
||
|
||
## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용)
|
||
KEY=value
|
||
...
|
||
|
||
## 📈 예상 효과
|
||
- [효과]
|
||
"""
|
||
|
||
# Gemini 호출
|
||
try:
|
||
response = gemini_client.generate_content(prompt)
|
||
analysis = (response.text if hasattr(response, "text") else
|
||
(response.candidates[0].content.parts[0].text if response.candidates else ""))
|
||
except Exception as e:
|
||
print(f"❌ Gemini 생성 실패: {e}")
|
||
db.close()
|
||
sys.exit(1)
|
||
|
||
save_ai_recommendations_from_text(db, analysis)
|
||
|
||
message = f"""🤖 **[13시 AI 추천 수치]** (DB + journalctl 로그 반영)
|
||
|
||
{summary}
|
||
|
||
{analysis}
|
||
|
||
---
|
||
💬 단타 전략 최적화. 추천 줄은 그대로 DB에 적용 가능합니다.
|
||
"""
|
||
if send_mm(message, db):
|
||
print("✅ AI 추천 안내 MM 발송 완료")
|
||
else:
|
||
print("⚠️ MM 발송 실패 (내용은 생성됨, last_ai_recommendations 저장됨)")
|
||
|
||
db.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|