1009 lines
42 KiB
Python
1009 lines
42 KiB
Python
"""
|
|
KIS Long Watch Bot Ver2 - 위시리스트/늘림목 관찰 전용 리포트 봇
|
|
- 한국투자증권(KIS) Open API 사용 (kis_long_ver1의 KISClientWithOrder 재사용)
|
|
- 매매는 하지 않고, 보유/관심 종목을 모니터링해서 리포트와 뉴스 기반 알림만 전송
|
|
- linux_bot 형식 장기 투자 체크 리포트 (💰📊📈💼🎯🤖 + AI 4줄) 장 시작/마감 전송
|
|
- 수치는 전부 DB/env에서 로드 (하드코딩 금지)
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import random
|
|
import time
|
|
from datetime import datetime as dt, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
import pandas as pd
|
|
|
|
from database import TradeDB
|
|
from news_analyzer import NewsAnalyzer
|
|
from kis_long_ver1 import (
|
|
KISClientWithOrder,
|
|
MattermostBot,
|
|
get_env_float,
|
|
get_env_int,
|
|
get_env_bool,
|
|
get_env_from_db,
|
|
get_kis_daily_chart,
|
|
get_kis_per_eps_peg,
|
|
calculate_volatility,
|
|
MM_CHANNEL_LONG,
|
|
MM_CHANNEL_DEFAULT,
|
|
SCRIPT_DIR,
|
|
db as shared_db,
|
|
)
|
|
|
|
# Gemini AI (4줄 인사이트 생성용)
|
|
try:
|
|
from kis_long_ver1 import gemini_model
|
|
except Exception:
|
|
gemini_model = None
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(
|
|
format="[%(asctime)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
level=logging.INFO,
|
|
)
|
|
logger = logging.getLogger("KISLongBotV2")
|
|
|
|
|
|
class LongWatchBotV2:
|
|
"""
|
|
위시리스트/보유 종목을 기반으로 장기 관찰 리포트와 뉴스 알림을 보내는 봇.
|
|
- KIS API로 현재가/일봉 차트 조회 (6개월 등, env로 조절)
|
|
- long_term_watchlist.json + LONG 전략 보유 종목 기준
|
|
- 매수/매도 주문은 하지 않음 (완전 관찰 전용)
|
|
"""
|
|
|
|
def __init__(self):
|
|
# DB는 기존 공유 인스턴스를 재사용 (kis_long_ver1과 동일)
|
|
self.db: TradeDB = shared_db
|
|
|
|
# KIS 읽기 전용 클라이언트 (주문 메서드는 사용하지 않음)
|
|
self.client = KISClientWithOrder()
|
|
|
|
# Mattermost
|
|
self.mm = MattermostBot()
|
|
self.mm_channel = MM_CHANNEL_LONG or MM_CHANNEL_DEFAULT
|
|
|
|
# 위시리스트 파일
|
|
self.watchlist_path: Path = SCRIPT_DIR / "long_term_watchlist.json"
|
|
self.watchlist: List[Dict] = self._load_watchlist()
|
|
|
|
# DB에 이미 LONG 보유 중인 종목 (늘림목 포지션)
|
|
self.holdings: Dict[str, Dict] = {}
|
|
active_trades = self.db.get_active_trades(strategy_prefix="LONG")
|
|
for code, trade in active_trades.items():
|
|
qty = trade.get("current_qty", 0) or 0
|
|
if qty <= 0:
|
|
continue
|
|
self.holdings[code] = {
|
|
"code": code,
|
|
"name": trade.get("name", code),
|
|
"avg_price": trade.get("avg_buy_price", 0),
|
|
"qty": qty,
|
|
"first_buy_date": trade.get("buy_date", ""),
|
|
}
|
|
|
|
# 자산·리포트용 상태
|
|
self.current_cash: float = 0
|
|
self.current_total_asset: float = 0
|
|
self.start_day_asset: float = 0
|
|
self.total_deposit: float = get_env_float("TOTAL_DEPOSIT", 0)
|
|
self.today_date: str = dt.now().strftime("%Y-%m-%d")
|
|
|
|
# 뉴스 분석기 (ANTHROPIC_API_KEY 필요, 없으면 내부에서 비활성)
|
|
self.news_analyzer = NewsAnalyzer()
|
|
|
|
# 리포트/뉴스 스케줄 설정 (전부 env/DB에서 로드)
|
|
self.daily_lookback_days: int = get_env_int("LONG_DAILY_LOOKBACK_DAYS", 180)
|
|
self.ma_short_days: int = get_env_int("LONG_MA_SHORT_DAYS", 5)
|
|
self.ma_long_days: int = get_env_int("LONG_MA_LONG_DAYS", 20)
|
|
|
|
self.report_am_hour: int = get_env_int("LONG_REPORT_AM_HOUR", 9)
|
|
self.report_am_min: int = get_env_int("LONG_REPORT_AM_MIN", 5)
|
|
self.report_pm_hour: int = get_env_int("LONG_REPORT_PM_HOUR", 15)
|
|
self.report_pm_min: int = get_env_int("LONG_REPORT_PM_MIN", 35)
|
|
|
|
self.news_enabled: bool = get_env_bool("LONG_NEWS_ENABLED", True)
|
|
self.news_interval_min: int = get_env_int("LONG_NEWS_INTERVAL_MIN", 60)
|
|
self.news_active_start_hour: int = get_env_int("LONG_NEWS_ACTIVE_START_HOUR", 9)
|
|
self.news_active_end_hour: int = get_env_int("LONG_NEWS_ACTIVE_END_HOUR", 23)
|
|
self.news_max_count: int = get_env_int("NEWS_MAX_COUNT", 5)
|
|
|
|
# 종목별 분석 사이 딜레이 (초당 호출 분산용, 기본 1~2초)
|
|
self.analysis_delay_min: float = get_env_float("LONG_ANALYSIS_DELAY_MIN_SEC", 1.0)
|
|
self.analysis_delay_max: float = get_env_float("LONG_ANALYSIS_DELAY_MAX_SEC", 2.0)
|
|
if self.analysis_delay_min < 0:
|
|
self.analysis_delay_min = 0.0
|
|
if self.analysis_delay_max < self.analysis_delay_min:
|
|
self.analysis_delay_max = self.analysis_delay_min
|
|
|
|
# 플래그
|
|
self.morning_report_sent: bool = False
|
|
self.closing_report_sent: bool = False
|
|
|
|
# ------------------------------------------------------------------
|
|
# 기본 데이터 로드/갱신
|
|
# ------------------------------------------------------------------
|
|
def _load_watchlist(self) -> List[Dict]:
|
|
"""long_term_watchlist.json에서 관심 종목 리스트 로드."""
|
|
if not self.watchlist_path.exists():
|
|
logger.warning(f"관심 종목 파일 없음: {self.watchlist_path}")
|
|
return []
|
|
try:
|
|
with open(self.watchlist_path, "r", encoding="utf-8") as f:
|
|
data = f.read().strip()
|
|
if not data:
|
|
return []
|
|
try:
|
|
obj = __import__("json").loads(data)
|
|
except Exception as e:
|
|
logger.error(f"관심 종목 JSON 파싱 실패: {e}")
|
|
return []
|
|
items = obj.get("items", []) if isinstance(obj, dict) else []
|
|
codes = [it.get("code") for it in items]
|
|
logger.info(f"📂 위시리스트 로드 완료: {len(items)}개 (codes={','.join(filter(None, codes))})")
|
|
return items
|
|
except Exception as e:
|
|
logger.error(f"관심 종목 로드 실패: {e}")
|
|
return []
|
|
|
|
def _update_assets(self) -> None:
|
|
"""계좌 평가 정보 갱신 (예수금 + LONG 보유 종목 시가 평가)."""
|
|
try:
|
|
balance = self.client.get_account_evaluation()
|
|
if not balance:
|
|
logger.warning("💵 [롱봇V2] 계좌 평가 응답 없음 → 자산 갱신 스킵")
|
|
return
|
|
|
|
# 예수금 (output2)
|
|
out2 = balance.get("output2")
|
|
if isinstance(out2, list) and out2:
|
|
out2 = out2[0]
|
|
out2 = out2 if isinstance(out2, dict) else {}
|
|
ord_psbl = out2.get("ord_psbl_cash") or out2.get("dnca_tot_amt")
|
|
self.current_cash = float(str(ord_psbl or 0).replace(",", "").strip()) if ord_psbl is not None else 0
|
|
|
|
# 보유 종목 평가액 (output1)
|
|
output1_list = balance.get("output1") or []
|
|
if isinstance(output1_list, dict):
|
|
output1_list = [output1_list]
|
|
holdings_value = 0.0
|
|
for code, h in self.holdings.items():
|
|
evlu = None
|
|
for row in output1_list:
|
|
if (row.get("pdno") or "").strip() == code:
|
|
evlu = float(row.get("evlu_amt", 0) or 0)
|
|
break
|
|
if evlu is not None:
|
|
holdings_value += evlu
|
|
else:
|
|
# 계좌 평가 응답에 없으면 현재가로 대략 평가
|
|
try:
|
|
price_data = self.client.inquire_price(code)
|
|
if price_data:
|
|
p = abs(float(price_data.get("stck_prpr", 0) or 0))
|
|
holdings_value += p * (h.get("qty") or h.get("total_qty") or 0)
|
|
except Exception as e:
|
|
logger.debug(f"보유평가 현재가 조회 실패({code}): {e}")
|
|
|
|
self.current_total_asset = self.current_cash + holdings_value
|
|
except Exception as e:
|
|
logger.error(f"자산 정보 업데이트 실패: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# 개별 종목 목록 헬퍼
|
|
# ------------------------------------------------------------------
|
|
def _get_watch_items(self) -> List[Dict]:
|
|
"""보유(LONG) + 위시리스트를 합친 유니크 종목 목록."""
|
|
items: Dict[str, Dict] = {}
|
|
|
|
for code, h in self.holdings.items():
|
|
items[code] = {
|
|
"code": code,
|
|
"name": h.get("name", code),
|
|
"avg_price": h.get("avg_price", 0),
|
|
"qty": h.get("qty") or h.get("total_qty") or 0,
|
|
"first_buy_date": h.get("first_buy_date", ""),
|
|
"source": "HOLDING",
|
|
}
|
|
|
|
for it in self.watchlist:
|
|
code = (it.get("code") or "").strip()
|
|
if not code:
|
|
continue
|
|
if code not in items:
|
|
items[code] = {
|
|
"code": code,
|
|
"name": it.get("name", code),
|
|
"avg_price": float(it.get("avg_price", 0) or 0),
|
|
"qty": int(it.get("qty", 0) or 0),
|
|
"first_buy_date": it.get("first_buy", ""),
|
|
"source": "WATCHLIST",
|
|
}
|
|
else:
|
|
items[code]["source"] = "HOLDING+WATCH"
|
|
|
|
return list(items.values())
|
|
|
|
# ------------------------------------------------------------------
|
|
# 기존 일봉 분석 (MA·6M 기준 요약용 - 뉴스 필터에서 계속 사용)
|
|
# ------------------------------------------------------------------
|
|
def _fetch_daily_analysis(self, code: str) -> Optional[Dict]:
|
|
"""KIS 일봉 기반 MA·6M 고저 위치 계산 (기존 요약 리포트용)."""
|
|
try:
|
|
max_days = int(self.daily_lookback_days) if self.daily_lookback_days > 0 else 180
|
|
except Exception:
|
|
max_days = 180
|
|
|
|
try:
|
|
df = get_kis_daily_chart(self.client, code, max_days=max_days)
|
|
except Exception as e:
|
|
logger.error(f"일봉 차트 조회 실패({code}): {e}")
|
|
return None
|
|
|
|
if df is None or df.empty:
|
|
logger.warning(f"일봉 데이터 없음({code})")
|
|
return None
|
|
|
|
try:
|
|
if self.ma_short_days > 0:
|
|
df["MA_SHORT"] = df["close"].rolling(window=self.ma_short_days).mean()
|
|
else:
|
|
df["MA_SHORT"] = pd.NA
|
|
if self.ma_long_days > 0:
|
|
df["MA_LONG"] = df["close"].rolling(window=self.ma_long_days).mean()
|
|
else:
|
|
df["MA_LONG"] = pd.NA
|
|
except Exception as e:
|
|
logger.debug(f"MA 계산 실패({code}): {e}")
|
|
|
|
last = df.iloc[-1]
|
|
close = float(last["close"])
|
|
ma_short = float(last.get("MA_SHORT") or 0)
|
|
ma_long = float(last.get("MA_LONG") or 0)
|
|
ma20 = float(last.get("MA20") or 0)
|
|
|
|
hi_6m = float(df["high"].tail(max_days).max())
|
|
lo_6m = float(df["low"].tail(max_days).min())
|
|
|
|
def _pct(a: float, b: float) -> float:
|
|
if b == 0:
|
|
return 0.0
|
|
return (a / b - 1.0) * 100.0
|
|
|
|
return {
|
|
"last_close": close,
|
|
"ma_short": ma_short,
|
|
"ma_long": ma_long,
|
|
"ma20": ma20,
|
|
"pct_vs_ma_short": _pct(close, ma_short) if ma_short > 0 else 0.0,
|
|
"pct_vs_ma_long": _pct(close, ma_long) if ma_long > 0 else 0.0,
|
|
"pct_vs_ma20": _pct(close, ma20) if ma20 > 0 else 0.0,
|
|
"hi_6m": hi_6m,
|
|
"lo_6m": lo_6m,
|
|
"pct_from_6m_hi": _pct(close, hi_6m) if hi_6m > 0 else 0.0,
|
|
"pct_from_6m_lo": _pct(close, lo_6m) if lo_6m > 0 else 0.0,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# linux_bot 형식 장기 투자 체크 리포트용 분석 메서드들
|
|
# ------------------------------------------------------------------
|
|
def _is_domestic_code(self, code: str) -> bool:
|
|
"""국내 6자리 종목코드 여부."""
|
|
c = (code or "").strip()
|
|
return len(c) == 6 and c.isdigit()
|
|
|
|
def _compute_rsi(self, df: pd.DataFrame, period: int = 14) -> Optional[float]:
|
|
"""일봉 close 기준 RSI 계산 (RSI = 과매수/과매도 지표, 0~100)."""
|
|
if df is None or df.empty or len(df) < period + 1:
|
|
return None
|
|
try:
|
|
delta = df["close"].diff(1)
|
|
gain = delta.where(delta > 0, 0.0).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0.0)).rolling(window=period).mean()
|
|
rs = gain / loss.replace(0, float("nan"))
|
|
rsi = 100 - (100 / (1 + rs))
|
|
val = float(rsi.iloc[-1])
|
|
return val if pd.notna(val) else None
|
|
except Exception as e:
|
|
logger.debug(f"RSI 계산 실패: {e}")
|
|
return None
|
|
|
|
def _get_trade_history_summary(self, code: str, max_trades: int = 10) -> Optional[Dict]:
|
|
"""trade_history에서 해당 종목 최근 N건 승률·평균 수익률·주요 매도사유 요약."""
|
|
try:
|
|
cursor = self.db.conn.execute(
|
|
"""
|
|
SELECT profit_rate, realized_pnl, sell_reason, hold_minutes
|
|
FROM trade_history
|
|
WHERE code = ?
|
|
ORDER BY sell_date DESC
|
|
LIMIT ?
|
|
""",
|
|
(code, max_trades),
|
|
)
|
|
rows = [dict(zip([c[0] for c in cursor.description], r)) for r in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.debug(f"trade_history 조회 실패({code}): {e}")
|
|
return None
|
|
|
|
if not rows:
|
|
return None
|
|
|
|
total = len(rows)
|
|
profits = [float(r.get("profit_rate") or 0) for r in rows]
|
|
wins = len([p for p in profits if p > 0])
|
|
avg_profit = sum(profits) / total if total else 0.0
|
|
|
|
reason_count: Dict[str, int] = {}
|
|
for r in rows:
|
|
reason = (r.get("sell_reason") or "").strip() or "미기록"
|
|
reason_count[reason] = reason_count.get(reason, 0) + 1
|
|
top_reasons = sorted(reason_count.items(), key=lambda x: x[1], reverse=True)[:3]
|
|
|
|
return {
|
|
"total": total,
|
|
"wins": wins,
|
|
"losses": total - wins,
|
|
"avg_profit": avg_profit,
|
|
"top_reasons": top_reasons,
|
|
}
|
|
|
|
def _full_stock_analysis(self, code: str, item: Dict) -> Optional[Dict]:
|
|
"""
|
|
국내 종목 전체 분석:
|
|
가격·수익률·외인/기관·차트(RSI/변동성/정배열)·밸류(PER/PEG)·종합 판정 점수
|
|
"""
|
|
name = item.get("name", code)
|
|
avg_price = float(item.get("avg_price") or 0)
|
|
qty = int(item.get("qty") or 0)
|
|
is_holding = qty > 0 and avg_price > 0
|
|
|
|
# 현재가 조회
|
|
try:
|
|
price_data = self.client.inquire_price(code)
|
|
except Exception as e:
|
|
logger.warning(f"현재가 조회 실패({code}): {e}")
|
|
return {"code": code, "name": name, "error": "현재가 조회 실패"}
|
|
|
|
if not price_data:
|
|
return {"code": code, "name": name, "error": "현재가 조회 실패"}
|
|
|
|
try:
|
|
current_price = abs(float(price_data.get("stck_prpr") or 0))
|
|
prdy_ctrt_raw = (price_data.get("prdy_ctrt") or "0").strip().replace("+", "").replace("%", "")
|
|
try:
|
|
day_change_pct = float(prdy_ctrt_raw)
|
|
except Exception:
|
|
day_change_pct = 0.0
|
|
except Exception as e:
|
|
logger.warning(f"가격 파싱 실패({code}): {e}")
|
|
return {"code": code, "name": name, "error": "가격 파싱 실패"}
|
|
|
|
if current_price <= 0:
|
|
return {"code": code, "name": name, "error": "현재가 없음"}
|
|
|
|
# 수익률 (보유 종목만)
|
|
profit_val = (current_price - avg_price) * qty if is_holding else 0.0
|
|
profit_pct = (current_price - avg_price) / avg_price * 100 if is_holding else 0.0
|
|
|
|
# 일봉 차트
|
|
df = pd.DataFrame()
|
|
try:
|
|
df = get_kis_daily_chart(self.client, code, max_days=max(65, self.daily_lookback_days))
|
|
except Exception as e:
|
|
logger.debug(f"일봉 조회 실패({code}): {e}")
|
|
|
|
# RSI (과매수: 70+, 과매도: 30-)
|
|
rsi = self._compute_rsi(df) if not df.empty else None
|
|
# 변동성 (20일 수익률 표준편차 %)
|
|
volatility = calculate_volatility(df) if not df.empty else None
|
|
# 정배열/역배열 (MA20 > MA60 이면 정배열)
|
|
ma20 = float(df["MA20"].iloc[-1]) if not df.empty and "MA20" in df.columns and pd.notna(df["MA20"].iloc[-1]) else None
|
|
ma60 = float(df["MA60"].iloc[-1]) if not df.empty and "MA60" in df.columns and pd.notna(df["MA60"].iloc[-1]) else None
|
|
if ma20 is not None and ma60 is not None:
|
|
ma_arrangement = "정배열" if ma20 > ma60 else "역배열"
|
|
else:
|
|
ma_arrangement = "데이터없음"
|
|
|
|
# PER / PEG (재무비율 API)
|
|
per, peg = None, None
|
|
try:
|
|
fund = get_kis_per_eps_peg(self.client, code, current_price)
|
|
if fund:
|
|
per = fund.get("per")
|
|
peg = fund.get("peg")
|
|
time.sleep(random.uniform(0.3, 0.8))
|
|
except Exception as e:
|
|
logger.debug(f"PER/PEG 조회 실패({code}): {e}")
|
|
|
|
# 외인/기관 동향 (5일 순매수 합산)
|
|
investor_text = "데이터 없음"
|
|
investor_score = 0
|
|
try:
|
|
trend = self.client.get_investor_trend(code, days=5)
|
|
if trend is not None:
|
|
total_net = trend.get("total_net_buy", 0) or 0
|
|
inv_strong_buy = get_env_float("LONG_INV_STRONG_BUY", 20000.0)
|
|
inv_buy = get_env_float("LONG_INV_BUY", 5000.0)
|
|
inv_strong_sell = get_env_float("LONG_INV_STRONG_SELL", -20000.0)
|
|
inv_sell = get_env_float("LONG_INV_SELL", -5000.0)
|
|
if total_net > inv_strong_buy:
|
|
investor_text = "외인·기관 강한 매수세"
|
|
investor_score = 25
|
|
elif total_net > inv_buy:
|
|
investor_text = "외인·기관 소폭 매수"
|
|
investor_score = 15
|
|
elif total_net < inv_strong_sell:
|
|
investor_text = "외인·기관 강한 매도세"
|
|
investor_score = -25
|
|
elif total_net < inv_sell:
|
|
investor_text = "외인·기관 소폭 매도"
|
|
investor_score = -15
|
|
else:
|
|
investor_text = "외인·기관 중립"
|
|
else:
|
|
# 응답이 None이면 주말/공휴일 등 데이터 미존재
|
|
investor_text = "주말/공휴일 (거래 없음)"
|
|
except Exception as e:
|
|
logger.debug(f"외인/기관 조회 실패({code}): {e}")
|
|
|
|
# 종합 판정 점수 (0~100)
|
|
score = 50 # 기본값
|
|
per_high = get_env_float("PER_HIGH_THRESHOLD", 30.0)
|
|
per_fair = get_env_float("PER_FAIR_THRESHOLD", 15.0)
|
|
per_low = get_env_float("PER_LOW_THRESHOLD", 10.0)
|
|
peg_great = get_env_float("PEG_GREAT_THRESHOLD", 1.0)
|
|
peg_good = get_env_float("PEG_GOOD_THRESHOLD", 1.5)
|
|
peg_bad = get_env_float("PEG_BAD_THRESHOLD", 2.0)
|
|
rsi_oversold = get_env_float("RSI_OVERSOLD_THRESHOLD", 30.0)
|
|
rsi_weak = get_env_float("RSI_WEAK_THRESHOLD", 40.0)
|
|
rsi_overbought = get_env_float("RSI_OVERBOUGHT_THRESHOLD", 70.0)
|
|
|
|
rsi_comment = "적정"
|
|
|
|
if per is not None:
|
|
if per < per_low:
|
|
score += 30
|
|
elif per < per_fair:
|
|
score += 20
|
|
elif per < per_high:
|
|
score += 10
|
|
elif per >= per_high:
|
|
score -= 20
|
|
if peg is not None:
|
|
if peg < peg_great:
|
|
score += 30
|
|
elif peg < peg_good:
|
|
score += 15
|
|
elif peg > peg_bad:
|
|
score -= 20
|
|
if rsi is not None:
|
|
if rsi < rsi_oversold:
|
|
score += 20
|
|
rsi_comment = "과매도"
|
|
elif rsi < rsi_weak:
|
|
score += 10
|
|
rsi_comment = "약세"
|
|
elif rsi >= rsi_overbought:
|
|
score -= 20
|
|
rsi_comment = "과매수"
|
|
elif rsi > 60.0:
|
|
rsi_comment = "과매수 근접"
|
|
|
|
if ma20 is not None and ma60 is not None and ma20 > ma60:
|
|
score += 10
|
|
score += investor_score
|
|
score = max(0, min(100, int(score)))
|
|
|
|
# 판정 라벨 및 한 줄 설명 (기준값 전부 env)
|
|
v_full_score = get_env_float("LONG_VERDICT_FULL_SCORE", 75.0)
|
|
v_full_profit = get_env_float("LONG_VERDICT_FULL_PROFIT_PCT", 50.0)
|
|
v_partial_score = get_env_float("LONG_VERDICT_PARTIAL_SCORE", 60.0)
|
|
v_partial_profit = get_env_float("LONG_VERDICT_PARTIAL_PROFIT_PCT", 20.0)
|
|
v_stop_score = get_env_float("LONG_VERDICT_STOP_SCORE", 55.0)
|
|
v_stop_loss = get_env_float("LONG_VERDICT_STOP_LOSS_PCT", -10.0)
|
|
v_stop_mild = get_env_float("LONG_VERDICT_STOP_LOSS_MILD_PCT", -5.0)
|
|
v_earn_profit = get_env_float("LONG_VERDICT_EARN_PROFIT_MIN", 30.0)
|
|
v_earn_rsi = get_env_float("LONG_VERDICT_EARN_RSI_MIN", 65.0)
|
|
|
|
if score >= v_full_score and profit_pct > v_full_profit:
|
|
verdict_label = "🎯 전량 익절"
|
|
verdict_reason = "고점 도달"
|
|
elif score >= v_partial_score + 10 and profit_pct > v_partial_profit:
|
|
verdict_label = "분할 익절"
|
|
verdict_reason = "수익 일부 실현"
|
|
elif score >= v_partial_score:
|
|
verdict_label = "분할 익절"
|
|
verdict_reason = "수익 일부 실현"
|
|
elif score >= v_stop_score and profit_pct < v_stop_loss:
|
|
verdict_label = "⚠️ 손절 고려 (확률 55%)"
|
|
verdict_reason = "하락세 + 점수 낮음. 손절 후 바닥에서 재진입 검토"
|
|
elif score <= v_stop_score and profit_pct < v_stop_mild:
|
|
verdict_label = "🚨 손절 검토 (확률 65%)"
|
|
verdict_reason = "손실 크고 하락 추세. 추가 손실 가능"
|
|
elif profit_pct > v_earn_profit and rsi is not None and rsi > v_earn_rsi:
|
|
verdict_label = "🎯 익절 권장"
|
|
verdict_reason = "큰 수익 + 하락 전환"
|
|
else:
|
|
verdict_label = "관망"
|
|
verdict_reason = "추가 관찰"
|
|
|
|
return {
|
|
"code": code,
|
|
"name": name,
|
|
"current_price": current_price,
|
|
"day_change_pct": day_change_pct,
|
|
"avg_price": avg_price,
|
|
"qty": qty,
|
|
"profit_val": profit_val,
|
|
"profit_pct": profit_pct,
|
|
"per": per,
|
|
"peg": peg,
|
|
"rsi": rsi,
|
|
"rsi_comment": rsi_comment,
|
|
"volatility": volatility,
|
|
"ma_arrangement": ma_arrangement,
|
|
"investor_text": investor_text,
|
|
"score": score,
|
|
"verdict_label": verdict_label,
|
|
"verdict_reason": verdict_reason,
|
|
}
|
|
|
|
def _minimal_overseas_analysis(self, code: str, item: Dict) -> Optional[Dict]:
|
|
"""해외 종목: 현재가만 조회 후 최소 분석 (PER/RSI/외인 데이터 없음)."""
|
|
name = item.get("name", code)
|
|
avg_price = float(item.get("avg_price") or 0)
|
|
qty = int(item.get("qty") or 0)
|
|
is_holding = qty > 0 and avg_price > 0
|
|
|
|
try:
|
|
price_data = self.client.inquire_price(code)
|
|
except Exception as e:
|
|
logger.warning(f"해외 현재가 조회 실패({code}): {e}")
|
|
return {"code": code, "name": name, "error": "현재가 조회 실패"}
|
|
|
|
if not price_data:
|
|
return {"code": code, "name": name, "error": "현재가 조회 실패"}
|
|
|
|
try:
|
|
current_price = abs(float(price_data.get("stck_prpr") or 0))
|
|
prdy_ctrt_raw = (price_data.get("prdy_ctrt") or "0").strip().replace("+", "").replace("%", "")
|
|
try:
|
|
day_change_pct = float(prdy_ctrt_raw)
|
|
except Exception:
|
|
day_change_pct = 0.0
|
|
except Exception as e:
|
|
return {"code": code, "name": name, "error": "가격 파싱 실패"}
|
|
|
|
if current_price <= 0:
|
|
return {"code": code, "name": name, "error": "현재가 없음"}
|
|
|
|
profit_val = (current_price - avg_price) * qty if is_holding else 0.0
|
|
profit_pct = (current_price - avg_price) / avg_price * 100 if is_holding else 0.0
|
|
|
|
if profit_pct > 50:
|
|
verdict_label, verdict_reason = "🎯 전량 익절", "고점 도달"
|
|
elif profit_pct > 20:
|
|
verdict_label, verdict_reason = "분할 익절", "수익 일부 실현"
|
|
elif profit_pct < -20:
|
|
verdict_label, verdict_reason = "🚨 손절 검토 (확률 65%)", "손실 크고 하락 추세. 추가 손실 가능"
|
|
elif profit_pct < -10:
|
|
verdict_label, verdict_reason = "⚠️ 손절 고려 (확률 55%)", "하락세 + 점수 낮음. 손절 후 바닥에서 재진입 검토"
|
|
elif profit_pct > 30:
|
|
verdict_label, verdict_reason = "🎯 익절 권장", "큰 수익 + 하락 전환"
|
|
else:
|
|
verdict_label, verdict_reason = "관망", "추가 관찰"
|
|
|
|
return {
|
|
"code": code,
|
|
"name": name,
|
|
"current_price": current_price,
|
|
"day_change_pct": day_change_pct,
|
|
"avg_price": avg_price,
|
|
"qty": qty,
|
|
"profit_val": profit_val,
|
|
"profit_pct": profit_pct,
|
|
"per": None,
|
|
"peg": None,
|
|
"rsi": None,
|
|
"volatility": None,
|
|
"ma_arrangement": "데이터없음",
|
|
"investor_text": "데이터 없음 (해외종목 또는 미제공)",
|
|
"score": 50,
|
|
"verdict_label": verdict_label,
|
|
"verdict_reason": verdict_reason,
|
|
}
|
|
|
|
def _format_one_stock_block(
|
|
self,
|
|
item: Dict,
|
|
analysis: Dict,
|
|
ai_text: str,
|
|
currency: str = "KRW",
|
|
) -> str:
|
|
"""
|
|
한 종목의 linux_bot 형식 블록 생성.
|
|
💰 수익률 → 📊 외인/기관 → 📈 차트 → 💼 밸류 → 🎯 판정 → 🤖 AI 분석
|
|
"""
|
|
name = analysis.get("name", item.get("name", ""))
|
|
code = analysis.get("code", item.get("code", ""))
|
|
|
|
if analysis.get("error"):
|
|
return f"{name} ({code}): {analysis['error']}\n"
|
|
|
|
is_krw = currency == "KRW"
|
|
unit = "원" if is_krw else " USD"
|
|
|
|
profit_val = analysis.get("profit_val", 0)
|
|
profit_pct = analysis.get("profit_pct", 0)
|
|
qty = analysis.get("qty", 0)
|
|
avg_price = analysis.get("avg_price", 0)
|
|
current_price = analysis.get("current_price", 0)
|
|
day_change_pct = analysis.get("day_change_pct", 0)
|
|
|
|
# 💰 수익률 (보유 시) or 현재가 (관심만)
|
|
if qty > 0 and avg_price > 0:
|
|
if is_krw:
|
|
line_profit = (
|
|
f"💰 수익률: {profit_val:+,.0f}{unit} | "
|
|
f"보유 {profit_pct:+.1f}% ({profit_val:+,.0f}{unit}) | "
|
|
f"전일대비 {day_change_pct:+.2f}%"
|
|
)
|
|
else:
|
|
line_profit = (
|
|
f"💰 수익률: {profit_val:+.0f}{unit} | "
|
|
f"보유 {profit_pct:+.1f}% ({profit_val:+.0f}{unit}) | "
|
|
f"전일대비 {day_change_pct:+.2f}%"
|
|
)
|
|
else:
|
|
if is_krw:
|
|
line_profit = f"💰 현재가: {current_price:,.0f}{unit} | 전일대비 {day_change_pct:+.2f}%"
|
|
else:
|
|
line_profit = f"💰 현재가: {current_price:.2f}{unit} | 전일대비 {day_change_pct:+.2f}%"
|
|
|
|
# 📊 외인/기관
|
|
line_inv = f"📊 외인/기관: {analysis.get('investor_text', '데이터 없음')}"
|
|
|
|
# 📈 차트: RSI | 변동성 | 정배열/역배열
|
|
rsi = analysis.get("rsi")
|
|
rsi_comment = analysis.get("rsi_comment", "적정")
|
|
vol = analysis.get("volatility")
|
|
arr = analysis.get("ma_arrangement", "")
|
|
|
|
if rsi is not None:
|
|
rsi_str = f"RSI {rsi:.1f} ({rsi_comment})"
|
|
else:
|
|
rsi_str = "RSI -"
|
|
|
|
vol_str = f"{vol:.2f}%" if vol is not None else "-"
|
|
line_chart = f"📈 차트: {rsi_str} | 변동성 {vol_str} | {arr}"
|
|
|
|
# 💼 밸류: PER / PEG (없으면 줄 생략)
|
|
per = analysis.get("per")
|
|
peg = analysis.get("peg")
|
|
per_high = get_env_float("PER_HIGH_THRESHOLD", 30.0)
|
|
per_fair = get_env_float("PER_FAIR_THRESHOLD", 15.0)
|
|
peg_great = get_env_float("PEG_GREAT_THRESHOLD", 1.0)
|
|
line_value = None
|
|
if per is not None:
|
|
per_comment = "높음" if per > per_high else ("적정" if per > per_fair else "저평가")
|
|
line_value = f"💼 밸류: PER {per:.1f} ({per_comment})"
|
|
if peg is not None:
|
|
peg_comment = "저평가" if peg < peg_great else "적정"
|
|
line_value += f" | PEG {peg:.2f} ({peg_comment})"
|
|
|
|
# 🎯 판정
|
|
score = analysis.get("score", 0)
|
|
vlabel = analysis.get("verdict_label", "")
|
|
vreason = analysis.get("verdict_reason", "")
|
|
line_verdict = f"🎯 판정: {score}점 → {vlabel}\n └ {vreason}"
|
|
|
|
# 🤖 AI 분석
|
|
line_ai = "🤖 AI 분석\n" + (ai_text.strip() if ai_text else " (분석 없음)")
|
|
|
|
parts = [f"**{name} ({code})**", line_profit, "", line_inv, ""]
|
|
parts.append(line_chart)
|
|
if line_value:
|
|
parts.append(line_value)
|
|
parts.extend([line_verdict, "", line_ai, ""])
|
|
return "\n".join(parts)
|
|
|
|
def _get_ai_4lines(
|
|
self,
|
|
name: str,
|
|
code: str,
|
|
analysis: Dict,
|
|
trade_summary: Optional[Dict],
|
|
news_summary: str,
|
|
) -> str:
|
|
"""
|
|
Gemini로 종목별 장기 투자 관점 4줄 생성:
|
|
📌 수치/흐름 / 📰 뉴스 관점 / 💡 판단 / ⚠️ 리스크
|
|
"""
|
|
if not gemini_model:
|
|
return " (Gemini 미설정)"
|
|
|
|
try:
|
|
rsi = analysis.get("rsi")
|
|
vol = analysis.get("volatility")
|
|
arr = analysis.get("ma_arrangement", "")
|
|
per = analysis.get("per")
|
|
peg = analysis.get("peg")
|
|
score = analysis.get("score")
|
|
verdict = analysis.get("verdict_label", "")
|
|
profit_pct = analysis.get("profit_pct", 0)
|
|
investor = analysis.get("investor_text", "")
|
|
|
|
trade_str = ""
|
|
if trade_summary:
|
|
t = trade_summary
|
|
trade_str = (
|
|
f"최근 매매: {t['total']}건, 승률 {t['wins']}/{t['total']}, "
|
|
f"평균 수익률 {t['avg_profit']:.1f}%, 주요 사유 {[r[0] for r in t['top_reasons']]}."
|
|
)
|
|
|
|
prompt = f"""다음 종목에 대해 장기 투자 관점으로 **4줄**만 작성하세요. 각 줄은 반드시 지정 접두사로 시작하세요.
|
|
|
|
종목: {name} ({code})
|
|
- RSI: {rsi} | 변동성: {vol}% | 추세: {arr}
|
|
- PER: {per} | PEG: {peg} | 판정: {score}점 → {verdict}
|
|
- 보유 수익률: {profit_pct:.1f}% | 외인/기관: {investor}
|
|
{trade_str}
|
|
뉴스/시장 요약: {news_summary[:500] if news_summary else '없음'}
|
|
|
|
정확히 4줄, 아래 형식으로만 작성(한글):
|
|
📌 수치/흐름: (RSI·변동성·추세 한 문장)
|
|
📰 뉴스 관점: (호재/악재·수급 한 문장)
|
|
💡 판단: (매수/관망/익절/손절 중 하나 + 이유 한 문장)
|
|
⚠️ 리스크: (주요 리스크 한 문장)
|
|
"""
|
|
response = gemini_model.generate_content(prompt)
|
|
if response and response.text:
|
|
return " " + response.text.strip().replace("\n", "\n ")
|
|
except Exception as e:
|
|
logger.warning(f"AI 4줄 분석 실패({code}): {e}")
|
|
return " (AI 분석 생성 실패)"
|
|
|
|
# ------------------------------------------------------------------
|
|
# linux_bot 형식 장기 투자 체크 리포트 (핵심 메서드)
|
|
# ------------------------------------------------------------------
|
|
def send_long_check_report(self, when_label: str = "정기") -> None:
|
|
"""
|
|
linux_bot 형식으로 전 종목 장기 투자 체크 리포트 생성 후 MM 전송.
|
|
종목당 블록: 💰 수익률 → 📊 외인/기관 → 📈 차트 → 💼 밸류 → 🎯 판정 → 🤖 AI 4줄
|
|
"""
|
|
items = self._get_watch_items()
|
|
max_ai = get_env_int("LONG_AI_MAX_STOCKS", 20)
|
|
ai_recent_trades = get_env_int("LONG_AI_RECENT_TRADES", 10)
|
|
|
|
# 뉴스 요약 수집 (AI 프롬프트에 시장 맥락으로 활용)
|
|
news_summary = ""
|
|
try:
|
|
if self.news_analyzer and getattr(self.news_analyzer, "client", None):
|
|
news_list = self.news_analyzer.crawl_naver_finance_news(max_news=3)
|
|
if news_list:
|
|
analysis = self.news_analyzer.analyze_news_with_claude(news_list)
|
|
if analysis and isinstance(analysis, dict):
|
|
raw = analysis.get("summary") or analysis.get("market_outlook") or ""
|
|
news_summary = (raw[:800] if isinstance(raw, str) else str(raw)[:800])
|
|
except Exception as e:
|
|
logger.debug(f"뉴스 요약 수집 실패(무시): {e}")
|
|
|
|
blocks: List[str] = []
|
|
for i, item in enumerate(items):
|
|
code = (item.get("code") or "").strip()
|
|
name = item.get("name", code)
|
|
if not code:
|
|
continue
|
|
|
|
# 국내 vs 해외 분기
|
|
if self._is_domestic_code(code):
|
|
analysis = self._full_stock_analysis(code, item)
|
|
currency = "KRW"
|
|
else:
|
|
analysis = self._minimal_overseas_analysis(code, item)
|
|
currency = "USD"
|
|
|
|
if analysis is None:
|
|
blocks.append(f"{name} ({code}): 분석 실패\n")
|
|
continue
|
|
if analysis.get("error"):
|
|
blocks.append(f"{name} ({code}): {analysis['error']}\n")
|
|
continue
|
|
|
|
# AI 4줄 분석 (상위 max_ai 종목만, Gemini 있을 때)
|
|
trade_summary = self._get_trade_history_summary(code, max_trades=ai_recent_trades)
|
|
if i < max_ai and gemini_model:
|
|
ai_text = self._get_ai_4lines(name, code, analysis, trade_summary, news_summary)
|
|
else:
|
|
ai_text = " (AI 분석 생략)"
|
|
|
|
block = self._format_one_stock_block(item, analysis, ai_text, currency=currency)
|
|
blocks.append(block)
|
|
|
|
# API Rate Limit 방지용 딜레이 (env 기반)
|
|
try:
|
|
if self.analysis_delay_max > 0:
|
|
time.sleep(random.uniform(self.analysis_delay_min, self.analysis_delay_max))
|
|
except Exception:
|
|
pass
|
|
|
|
title = f"📊 장기 투자 체크 (한투 KIS API + AI 분석)"
|
|
separator = "━" * 30
|
|
msg = f"{title}\n{separator}\n\n" + f"\n{separator}\n\n".join(b.strip() for b in blocks if b.strip())
|
|
try:
|
|
self.mm.send(self.mm_channel, msg)
|
|
logger.info(f"📊 롱봇V2 장기 투자 체크 리포트 전송 완료 ({len(blocks)}개 종목)")
|
|
except Exception as e:
|
|
logger.error(f"장기 투자 체크 리포트 전송 실패: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# 뉴스 분석/알림
|
|
# ------------------------------------------------------------------
|
|
def _should_run_news_now(self, now: Optional[dt] = None) -> bool:
|
|
if not self.news_enabled:
|
|
return False
|
|
now = now or dt.now()
|
|
h = now.hour
|
|
# 활동 시간대만 (예: 9~23시)
|
|
if self.news_active_start_hour <= self.news_active_end_hour:
|
|
return self.news_active_start_hour <= h < self.news_active_end_hour
|
|
# 야간 랩어라운드 형태 (예: 20~5시)도 지원
|
|
return h >= self.news_active_start_hour or h < self.news_active_end_hour
|
|
|
|
def _collect_watchlist_names(self) -> Dict[str, str]:
|
|
m: Dict[str, str] = {}
|
|
for it in self._get_watch_items():
|
|
code = it["code"]
|
|
name = it.get("name", code)
|
|
if code:
|
|
m[code] = name
|
|
return m
|
|
|
|
def run_news_cycle_once(self) -> None:
|
|
"""뉴스 한 사이클 (크롤링 → Claude 분석 → watchlist 관련 이슈만 필터링 → MM 전송)."""
|
|
if not self._should_run_news_now():
|
|
return
|
|
if not self.news_analyzer or not getattr(self.news_analyzer, "client", None):
|
|
logger.debug("뉴스 분석 비활성 (API 키/클라이언트 없음)")
|
|
return
|
|
|
|
try:
|
|
logger.info("📰 [롱봇V2] 뉴스 분석 사이클 시작")
|
|
news_list = self.news_analyzer.crawl_naver_finance_news(max_news=self.news_max_count)
|
|
if not news_list:
|
|
logger.info("📰 [롱봇V2] 크롤링된 뉴스 없음")
|
|
return
|
|
|
|
analysis = self.news_analyzer.analyze_news_with_claude(news_list)
|
|
if not analysis:
|
|
logger.info("📰 [롱봇V2] 뉴스 AI 분석 결과 없음")
|
|
return
|
|
|
|
watch_map = self._collect_watchlist_names()
|
|
watch_codes = set(watch_map.keys())
|
|
|
|
related: List[Dict] = []
|
|
for stock in analysis.get("recommended_stocks", []):
|
|
code = (stock.get("code") or "").strip()
|
|
if code and code in watch_codes:
|
|
related.append(
|
|
{
|
|
"code": code,
|
|
"name": watch_map.get(code, stock.get("name", "")),
|
|
"reason": stock.get("reason", ""),
|
|
}
|
|
)
|
|
|
|
mm_msg = self.news_analyzer.format_analysis_for_mattermost(analysis, news_list)
|
|
if not mm_msg:
|
|
logger.info("📰 [롱봇V2] 포맷된 뉴스 메시지 없음")
|
|
return
|
|
|
|
if related:
|
|
extra_lines = ["\n**🎯 위시리스트 관련 종목**"]
|
|
for r in related:
|
|
extra_lines.append(f"- `{r['code']}` {r['name']}: {r['reason']}")
|
|
mm_msg += "\n" + "\n".join(extra_lines)
|
|
else:
|
|
mm_msg += "\n_현재 위시리스트와 직접 매칭된 종목은 없음 (전체 시장 참고용)_"
|
|
|
|
self.mm.send(self.mm_channel, mm_msg)
|
|
logger.info(f"📰 [롱봇V2] 뉴스 분석 알림 전송 완료 (관련 종목 {len(related)}개)")
|
|
except Exception as e:
|
|
logger.error(f"❌ 롱봇V2 뉴스 분석 사이클 실패: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# 메인 루프
|
|
# ------------------------------------------------------------------
|
|
async def _report_scheduler_loop(self) -> None:
|
|
"""장 시작/마감 시각에 맞춰 하루 2회 장기 투자 체크 리포트 전송."""
|
|
logger.info(
|
|
f"📅 롱봇V2 리포트 스케줄러 시작 (오전 {self.report_am_hour:02d}:{self.report_am_min:02d}, "
|
|
f"마감 {self.report_pm_hour:02d}:{self.report_pm_min:02d})"
|
|
)
|
|
# 시작 직후 잔고 API 연타 방지 (숏봇·다른 프로세스와 같은 앱키 공유 시 EGW00201 완화)
|
|
await asyncio.sleep(5)
|
|
last_date = ""
|
|
while True:
|
|
try:
|
|
now = dt.now()
|
|
today = now.strftime("%Y-%m-%d")
|
|
if today != last_date:
|
|
last_date = today
|
|
self.morning_report_sent = False
|
|
self.closing_report_sent = False
|
|
self.today_date = today
|
|
|
|
if (
|
|
not self.morning_report_sent
|
|
and now.hour == self.report_am_hour
|
|
and now.minute == self.report_am_min
|
|
):
|
|
self.send_long_check_report("장 시작")
|
|
self.morning_report_sent = True
|
|
|
|
if (
|
|
not self.closing_report_sent
|
|
and now.hour == self.report_pm_hour
|
|
and now.minute == self.report_pm_min
|
|
):
|
|
self.send_long_check_report("장 마감")
|
|
self.closing_report_sent = True
|
|
|
|
except Exception as e:
|
|
logger.error(f"리포트 스케줄러 루프 에러: {e}")
|
|
|
|
await asyncio.sleep(30)
|
|
|
|
async def _news_loop(self) -> None:
|
|
"""뉴스 감시 루프 (기본 1시간 간격, env로 조절)."""
|
|
if not self.news_enabled:
|
|
logger.info("📰 롱봇V2 뉴스 감시 비활성화 (LONG_NEWS_ENABLED=false)")
|
|
return
|
|
|
|
interval = self.news_interval_min if self.news_interval_min > 0 else 60
|
|
logger.info(f"📰 롱봇V2 뉴스 감시 시작 (interval={interval}분)")
|
|
|
|
while True:
|
|
try:
|
|
if self._should_run_news_now():
|
|
self.run_news_cycle_once()
|
|
else:
|
|
logger.debug("📰 롱봇V2 뉴스 감시: 비활성 시간대 (sleep)")
|
|
except Exception as e:
|
|
logger.error(f"뉴스 감시 루프 에러: {e}")
|
|
|
|
await asyncio.sleep(interval * 60)
|
|
|
|
def run(self) -> None:
|
|
"""동기 진입점: asyncio 루프에서 리포트/뉴스 태스크를 함께 실행."""
|
|
async def _main():
|
|
tasks = [
|
|
asyncio.create_task(self._report_scheduler_loop()),
|
|
asyncio.create_task(self._news_loop()),
|
|
]
|
|
await asyncio.gather(*tasks)
|
|
|
|
try:
|
|
asyncio.run(_main())
|
|
except KeyboardInterrupt:
|
|
logger.info("💤 롱봇V2 종료 (KeyboardInterrupt)")
|
|
except Exception as e:
|
|
logger.error(f"💥 롱봇V2 메인 루프 에러: {e}")
|
|
|
|
|
|
def main():
|
|
logger.info("🚀 KIS Long Watch Bot Ver2 시작")
|
|
bot = LongWatchBotV2()
|
|
bot.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|