249 lines
11 KiB
Python
249 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
구글 트렌드 괴리율 분석 모듈
|
|
- 검색량 급증 vs 주가 미반영 = 단타 기회 포착
|
|
- 단타 전략에 활용: 검색량이 폭발하는데 주가는 아직 안 오른 종목
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import random # 랜덤 딜레이를 위한 모듈 추가
|
|
from typing import Dict, Optional, List
|
|
import math
|
|
|
|
logger = logging.getLogger("TrendDivergence")
|
|
|
|
# pandas 임포트 (NaN 체크용)
|
|
try:
|
|
import pandas as pd
|
|
PANDAS_AVAILABLE = True
|
|
except ImportError:
|
|
PANDAS_AVAILABLE = False
|
|
pd = None
|
|
|
|
# yfinance 임포트 (의존성 충돌 대비)
|
|
try:
|
|
import yfinance as yf
|
|
YFINANCE_AVAILABLE = True
|
|
except ImportError:
|
|
YFINANCE_AVAILABLE = False
|
|
logger.warning("⚠️ yfinance 미설치! pip install 'yfinance<0.2.40'")
|
|
except Exception as e:
|
|
YFINANCE_AVAILABLE = False
|
|
logger.warning(f"⚠️ yfinance 임포트 실패 (의존성 충돌 가능): {e}")
|
|
|
|
# pytrends 임포트
|
|
try:
|
|
from pytrends.request import TrendReq
|
|
PTRENDS_AVAILABLE = True
|
|
except ImportError:
|
|
PTRENDS_AVAILABLE = False
|
|
logger.warning("⚠️ pytrends 미설치! pip install pytrends")
|
|
|
|
|
|
class TrendDivergenceAnalyzer:
|
|
"""구글 트렌드 괴리율 분석기"""
|
|
|
|
def __init__(self):
|
|
self.pytrends = None
|
|
self.rate_limit_hit = False # 429 에러 발생 시 플래그
|
|
self.last_request_time = 0 # 마지막 요청 시간 추적
|
|
if PTRENDS_AVAILABLE:
|
|
try:
|
|
# 구글의 봇 차단을 우회하기 위해 최신 크롬 브라우저처럼 User-Agent(신분증) 위장
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
}
|
|
# requests_args 매개변수를 통해 헤더 정보 함께 전송
|
|
self.pytrends = TrendReq(hl='ko-KR', tz=540, requests_args={'headers': headers})
|
|
logger.info("✅ 구글 트렌드 API 초기화 완료 (User-Agent 우회 적용)")
|
|
except Exception as e:
|
|
logger.error(f"❌ 구글 트렌드 초기화 실패: {e}")
|
|
self.pytrends = None
|
|
|
|
def get_trend_score(self, keyword: str, stock_code: str = None) -> Optional[float]:
|
|
"""
|
|
검색량 급증 점수 계산 (0.0 ~ 1.0)
|
|
|
|
:param keyword: 검색 키워드 (예: "삼성전자", "갤럭시")
|
|
:param stock_code: 종목 코드 (예: "005930.KS") - 주가 비교용
|
|
:return: 괴리율 점수 (None이면 데이터 없음)
|
|
"""
|
|
if not self.pytrends:
|
|
return None
|
|
|
|
if not keyword or not keyword.strip():
|
|
return None
|
|
|
|
try:
|
|
# ✅ Rate Limit 방지: 429 에러 발생 시 더 긴 대기
|
|
if self.rate_limit_hit:
|
|
wait_time = random.uniform(60.0, 120.0) # 1~2분 대기
|
|
logger.warning(f"⏳ Rate Limit 복구 대기 중... ({wait_time:.0f}초)")
|
|
time.sleep(wait_time)
|
|
self.rate_limit_hit = False # 재시도
|
|
|
|
# ✅ 요청 간 최소 간격 보장 (5초 이상)
|
|
current_time = time.time()
|
|
if self.last_request_time > 0:
|
|
elapsed = current_time - self.last_request_time
|
|
if elapsed < 5.0:
|
|
sleep_time = 5.0 - elapsed + random.uniform(1.0, 3.0)
|
|
time.sleep(sleep_time)
|
|
|
|
# 1. 구글 트렌드 데이터 가져오기 (최근 7일)
|
|
# ✅ 개선: timeframe을 'now 7-d'로 변경 (최신 데이터), geo='KR' 추가
|
|
self.pytrends.build_payload([keyword], cat=0, timeframe='now 7-d', geo='KR')
|
|
|
|
# ✅ 개선: 네트워크 타임아웃 방지 (5~8초 랜덤 대기로 증가)
|
|
time.sleep(random.uniform(5.0, 8.0))
|
|
|
|
self.last_request_time = time.time() # 요청 시간 기록
|
|
trend_data = self.pytrends.interest_over_time()
|
|
|
|
if trend_data.empty:
|
|
logger.debug(f"트렌드 데이터 없음: {keyword}")
|
|
return None
|
|
|
|
if keyword not in trend_data.columns:
|
|
logger.debug(f"키워드 컬럼 없음: {keyword}")
|
|
return None
|
|
|
|
# 데이터가 충분한지 확인 (최소 3일 필요)
|
|
if len(trend_data) < 3:
|
|
logger.debug(f"데이터 부족 ({len(trend_data)}일): {keyword}")
|
|
return None
|
|
|
|
# 최근 3일 평균 vs 지난 7일 평균 비교
|
|
recent_avg = trend_data[keyword].tail(3).mean()
|
|
long_term_avg = trend_data[keyword].mean()
|
|
|
|
# NaN 체크 (pandas 있으면 pd.isna, 없으면 수동 체크)
|
|
if PANDAS_AVAILABLE and pd is not None:
|
|
if pd.isna(recent_avg) or pd.isna(long_term_avg):
|
|
logger.debug(f"NaN 값 발견: {keyword}")
|
|
return None
|
|
else:
|
|
# pandas 없으면 간단한 체크 (NaN은 float('nan')이므로 자기 자신과 비교)
|
|
if recent_avg is None or long_term_avg is None:
|
|
logger.debug(f"None 값 발견: {keyword}")
|
|
return None
|
|
if isinstance(recent_avg, float) and math.isnan(recent_avg):
|
|
logger.debug(f"NaN 값 발견 (recent_avg): {keyword}")
|
|
return None
|
|
if isinstance(long_term_avg, float) and math.isnan(long_term_avg):
|
|
logger.debug(f"NaN 값 발견 (long_term_avg): {keyword}")
|
|
return None
|
|
|
|
if long_term_avg == 0:
|
|
logger.debug(f"장기 평균 0: {keyword}")
|
|
return None
|
|
|
|
# 검색량 급증 비율
|
|
surge_ratio = recent_avg / long_term_avg if long_term_avg > 0 else 1.0
|
|
|
|
# 2. 주가 데이터와 비교 (괴리율 계산)
|
|
price_change_pct = 0.0
|
|
if stock_code and YFINANCE_AVAILABLE:
|
|
try:
|
|
stock = yf.Ticker(stock_code)
|
|
hist = stock.history(period="7d")
|
|
if not hist.empty and len(hist) >= 2:
|
|
# 최근 3일 평균 vs 지난 7일 평균
|
|
recent_price = hist['Close'].tail(3).mean()
|
|
long_price = hist['Close'].mean()
|
|
price_change_pct = ((recent_price - long_price) / long_price) * 100
|
|
except Exception as e:
|
|
logger.debug(f"주가 조회 실패 ({stock_code}): {e}")
|
|
elif stock_code and not YFINANCE_AVAILABLE:
|
|
logger.debug(f"yfinance 미사용 - 주가 비교 스킵 ({stock_code})")
|
|
|
|
# 3. 괴리율 점수 계산
|
|
# 검색량이 2배 이상 급증했는데 주가는 5% 미만 오름 = 괴리율 발생
|
|
if surge_ratio >= 2.0 and price_change_pct < 5.0:
|
|
# 괴리율이 클수록 점수 높음 (최대 1.0)
|
|
divergence_score = min(1.0, (surge_ratio - 1.0) * 0.3)
|
|
logger.info(f"🔍 [{keyword}] 검색량 {surge_ratio:.1f}배 급증, 주가 {price_change_pct:.1f}% → 괴리율 점수 {divergence_score:.2f}")
|
|
return divergence_score
|
|
else:
|
|
return 0.0
|
|
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
# ✅ 개선: 400 에러는 데이터 없음으로 간주 (로그 레벨 낮춤)
|
|
if "400" in error_str or "Bad Request" in error_str:
|
|
logger.debug(f"검색량 부족 (400): {keyword}")
|
|
return None
|
|
# ✅ 개선: 429 에러는 Rate Limit으로 처리 (더 긴 대기 필요)
|
|
elif "429" in error_str or "Too Many Requests" in error_str:
|
|
self.rate_limit_hit = True
|
|
logger.warning(f"⚠️ Rate Limit 도달 (429): {keyword} - 구글 트렌드 분석 일시 중단")
|
|
# 429 에러 발생 시 나머지 분석 중단
|
|
return None
|
|
else:
|
|
logger.error(f"❌ 트렌드 분석 실패 ({keyword}): {e}")
|
|
return None
|
|
|
|
def analyze_stocks(self, stock_list: List[Dict]) -> Dict[str, float]:
|
|
"""
|
|
종목 리스트에 대한 트렌드 괴리율 점수 계산
|
|
|
|
:param stock_list: [{'code': '005930', 'name': '삼성전자'}, ...]
|
|
:return: {code: score} 딕셔너리
|
|
"""
|
|
results = {}
|
|
|
|
if not self.pytrends:
|
|
logger.warning("⚠️ 구글 트렌드 API 사용 불가")
|
|
return results
|
|
|
|
if not stock_list or not isinstance(stock_list, list):
|
|
logger.debug("종목 리스트가 비어있거나 잘못된 형식")
|
|
return results
|
|
|
|
try:
|
|
# ✅ 개선: 최대 5개로 제한 (429 에러 방지를 위해 더 줄임)
|
|
max_stocks = 5 if not self.rate_limit_hit else 0
|
|
if max_stocks == 0:
|
|
logger.warning("⚠️ Rate Limit 상태 - 구글 트렌드 분석 스킵")
|
|
return results
|
|
|
|
for idx, stock in enumerate(stock_list[:max_stocks], 1):
|
|
# Rate Limit 발생 시 즉시 중단
|
|
if self.rate_limit_hit:
|
|
logger.warning(f"⚠️ Rate Limit 발생 - {idx-1}개 분석 후 중단")
|
|
break
|
|
|
|
if not isinstance(stock, dict):
|
|
continue
|
|
|
|
code = stock.get('code', '')
|
|
name = stock.get('name', '')
|
|
|
|
if not name or not code:
|
|
continue
|
|
|
|
# 한국 주식 티커 형식
|
|
ticker = f"{code}.KS" if len(str(code)) == 6 else None
|
|
|
|
# 종목명으로 검색량 분석
|
|
try:
|
|
logger.debug(f"🔍 [{idx}/{max_stocks}] {name} 분석 중...")
|
|
score = self.get_trend_score(keyword=name, stock_code=ticker)
|
|
|
|
if score is not None and score > 0:
|
|
results[code] = score
|
|
except Exception as e:
|
|
logger.debug(f"종목 분석 실패 ({name} {code}): {e}")
|
|
continue
|
|
|
|
# get_trend_score 내부에서 이미 딜레이 처리하므로 여기서는 추가 딜레이 불필요
|
|
except Exception as e:
|
|
logger.error(f"❌ 종목 리스트 분석 중 에러: {e}")
|
|
|
|
# ✅ 개선: 분석 완료 후 충분한 쿨다운 (다음 배치까지 여유)
|
|
if results:
|
|
logger.info(f"✅ 트렌드 괴리율 발견: {len(results)}개 종목")
|
|
|
|
return results
|