Files
upbit_trader/upbit_short_ver2.py
2026-03-13 04:37:58 +09:00

1624 lines
67 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
upbit_short_ver2.py — 업비트 단타 봇 Ver2
=============================================================================
Ver1 대비 개선 사항:
1. [핵심] 비동기 매수/매도 (Async Threading)
- wait_for_fill(20초 블로킹)을 별도 Worker 스레드로 실행
- 메인 루프(2초 매도 감시)가 단 1ms도 멈추지 않음
- _buying / _selling set으로 중복 처리 완전 방지
2. MariaDB 연동 (upbit_quant_db)
- SQLite quant_bot.db → MariaDB 192.168.0.141:3306/upbit_quant_db
- pymysql DictCursor 기반, 자동 재접속 포함
3. 기존 아키텍처 100% 유지
- UpbitWSPriceCache (WebSocket 실시간 체결가 캐시)
- UpbitClient (REST API, JWT 인증)
- 매수/매도 로직 완전 동일
[전략: 개미털기(눌림목) 꼬리봉 잡기]
매수: 3분봉 기준
- 낙폭(MIN_DROP_RATE) + 회복률(MIN/MAX_RECOVERY_RATIO)
- 망치봉 꼬리(TAIL_RATIO_MIN, TAIL_PCT_MIN)
- RSI < RSI_OVERHEAT_THRESHOLD
매도:
- 어깨매도: 고점 대비 SHOULDER_CUT_PCT 하락
- ATR 스캘핑: 본절 복귀 or 고점 ATR 낙폭
- 목표가 달성 (TAKE_PROFIT_PCT)
- 손절 (STOP_LOSS_PCT)
- 원화 한도 손절 (MAX_LOSS_PER_TRADE_KRW)
[실행]
python3 upbit_short_ver2.py
"""
import json
import time
import uuid
import random
import hashlib
import logging
import datetime
import threading
from datetime import datetime as dt
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlencode
import pandas as pd
import requests
try:
import pymysql
import pymysql.cursors
_PYMYSQL_AVAILABLE = True
except ImportError:
_PYMYSQL_AVAILABLE = False
try:
import websocket
_WS_AVAILABLE = True
except ImportError:
_WS_AVAILABLE = False
logging.warning("⚠️ websocket-client 미설치 → pip install websocket-client")
# ============================================================
# 로깅 설정
# ============================================================
logging.basicConfig(
format='[%(asctime)s] %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO,
)
logger = logging.getLogger("UpbitShortV2")
LOG_RED = "\033[91m"
LOG_YELLOW = "\033[93m"
LOG_GREEN = "\033[92m"
LOG_CYAN = "\033[96m"
LOG_RESET = "\033[0m"
SCRIPT_DIR = Path(__file__).resolve().parent
# ============================================================
# MariaDB 연결 설정
# ============================================================
_DB_HOST = "192.168.0.141"
_DB_PORT = 3306
_DB_USER = "jae"
_DB_PASS = "1234"
_DB_NAME = "upbit_quant_db"
# ============================================================
# MariaDB 기반 UpbitDB (SQLite UpbitDB 대체)
# ============================================================
class UpbitDB:
"""
upbit_quant_db MariaDB 연동 클래스
- env_config : 전략 설정값 읽기
- kv_store : Upbit API 키 저장/읽기
- active_trades : 현재 보유 포지션 (재시작 복원용)
- trade_history : 매매 기록 (손익 통계)
- target_candidates: 스캔 후보 목록
[멀티스레드 안전]
- threading.Lock()으로 _execute() 직렬화
- pymysql ping(reconnect=True)으로 자동 재접속
"""
def __init__(self):
if not _PYMYSQL_AVAILABLE:
raise RuntimeError("pymysql 미설치 → pip install pymysql")
self._lock = threading.Lock()
self._conn = None
self._connect()
self._init_upbit_keys()
def _connect(self):
"""pymysql 연결"""
self._conn = pymysql.connect(
host=_DB_HOST, port=_DB_PORT,
user=_DB_USER, password=_DB_PASS,
database=_DB_NAME,
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
read_timeout=30,
write_timeout=30,
)
logger.debug(f"✅ MariaDB 연결: {_DB_HOST}/{_DB_NAME}")
def _execute(self, sql: str, params=None):
"""스레드 안전 SQL 실행 (자동 재접속 포함)"""
with self._lock:
try:
self._conn.ping(reconnect=True)
except Exception:
self._connect()
cur = self._conn.cursor()
cur.execute(sql, params or ())
return cur
def _init_upbit_keys(self):
"""kv_store에 Upbit API 키 슬롯 초기화"""
for k, v in [
("UPBIT_ACCESS_KEY", ""),
("UPBIT_SECRET_KEY", ""),
("UPBIT_SCAN_INTERVAL_SEC", "60"),
("UPBIT_BUY_TOP_N", "2"),
]:
self._execute(
"INSERT IGNORE INTO kv_store (k, v) VALUES (%s, %s)", (k, v)
)
# ---- env_config 읽기 ----------------------------------------
def get_latest_env(self) -> dict:
"""env_config 최신 row → {컬럼명: 값} 딕셔너리"""
cur = self._execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1")
row = cur.fetchone()
return dict(row) if row else {}
# ---- kv_store -----------------------------------------------
def get_kv(self, key: str, default: str = "") -> str:
cur = self._execute("SELECT v FROM kv_store WHERE k = %s", (key,))
row = cur.fetchone()
return row["v"] if row and row["v"] else default
def set_kv(self, key: str, value: str):
self._execute(
"INSERT INTO kv_store (k, v) VALUES (%s, %s) "
"ON DUPLICATE KEY UPDATE v = VALUES(v)",
(key, value),
)
# ---- active_trades ------------------------------------------
def load_active_trades(self) -> List[dict]:
"""재시작 시 보유 포지션 복원 (HOLDING 상태만)"""
cur = self._execute(
"SELECT * FROM active_trades WHERE status = 'HOLDING'"
)
return cur.fetchall() or []
def upsert_trade(self, trade: dict):
"""
매수 체결 즉시 active_trades에 원자적 저장
[즉시 저장 원칙] 이벤트 발생 즉시 DB에 저장 → 재시작 시 포지션 복원 가능
"""
now = dt.now().strftime("%Y-%m-%d %H:%M:%S")
self._execute("""
INSERT INTO active_trades (
code, name, strategy,
avg_buy_price, current_price,
stop_price, target_price, max_price, atr_entry,
target_qty, current_qty, total_invested,
status, buy_date, updated_at,
rsi, volume_ratio, tail_length_pct
) VALUES (
%(code)s, %(name)s, %(strategy)s,
%(avg_buy_price)s, %(current_price)s,
%(stop_price)s, %(target_price)s, %(max_price)s, %(atr_entry)s,
%(target_qty)s, %(current_qty)s, %(total_invested)s,
%(status)s, %(buy_date)s, %(updated_at)s,
%(rsi)s, %(volume_ratio)s, %(tail_length_pct)s
)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
strategy = VALUES(strategy),
avg_buy_price = VALUES(avg_buy_price),
current_price = VALUES(current_price),
stop_price = VALUES(stop_price),
target_price = VALUES(target_price),
max_price = VALUES(max_price),
atr_entry = VALUES(atr_entry),
target_qty = VALUES(target_qty),
current_qty = VALUES(current_qty),
total_invested = VALUES(total_invested),
status = VALUES(status),
buy_date = VALUES(buy_date),
updated_at = VALUES(updated_at),
rsi = VALUES(rsi),
volume_ratio = VALUES(volume_ratio),
tail_length_pct = VALUES(tail_length_pct)
""", {**trade, "updated_at": now})
def update_trade_max_price(self, code: str, current_price: float, max_price: float):
"""매도 체크 루프에서 고점 갱신 시 즉시 저장"""
now = dt.now().strftime("%Y-%m-%d %H:%M:%S")
self._execute(
"UPDATE active_trades SET current_price=%s, max_price=%s, updated_at=%s WHERE code=%s",
(current_price, max_price, now, code),
)
def close_trade(self, code: str, sell_price: float, sell_reason: str):
"""
매도 체결 즉시: active_trades 제거 + trade_history 추가 (원자적)
[즉시 저장 원칙] 이벤트 발생 즉시 저장 → 재시작 시 중복 매도 방지
"""
now = dt.now().strftime("%Y-%m-%d %H:%M:%S")
cur = self._execute("SELECT * FROM active_trades WHERE code = %s", (code,))
row = cur.fetchone()
if not row:
return
buy_price = float(row["avg_buy_price"] or 0)
qty = float(row["current_qty"] or 0)
buy_date_str = str(row.get("buy_date", ""))
profit_rate = (sell_price - buy_price) / buy_price * 100 if buy_price else 0
realized_pnl = (sell_price - buy_price) * qty
hold_minutes = 0
try:
buy_dt_obj = dt.strptime(buy_date_str[:19], "%Y-%m-%d %H:%M:%S")
hold_minutes = int((dt.now() - buy_dt_obj).total_seconds() / 60)
except Exception:
pass
self._execute("""
INSERT INTO trade_history (
code, name, strategy,
buy_price, sell_price, qty,
profit_rate, realized_pnl, hold_minutes,
buy_date, sell_date, sell_reason,
rsi, volume_ratio, tail_length_pct
) VALUES (%s,%s,%s, %s,%s,%s, %s,%s,%s, %s,%s,%s, %s,%s,%s)
""", (
code, row.get("name", ""), row.get("strategy", ""),
buy_price, sell_price, qty,
profit_rate, realized_pnl, hold_minutes,
buy_date_str, now, sell_reason,
row.get("rsi"), row.get("volume_ratio"), row.get("tail_length_pct"),
))
self._execute("DELETE FROM active_trades WHERE code = %s", (code,))
# ---- target_candidates --------------------------------------
def upsert_candidate(self, code: str, name: str, score: float, price: float):
"""스캔 후보 이벤트 즉시 저장"""
now = dt.now().strftime("%Y-%m-%d %H:%M:%S")
self._execute("""
INSERT INTO target_candidates (code, name, score, price, scan_time, updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name=VALUES(name), score=VALUES(score), price=VALUES(price),
scan_time=VALUES(scan_time), updated_at=VALUES(updated_at)
""", (code, name, score, price, now, now))
def clear_old_candidates(self, hours: int = 2):
"""오래된 후보 정리"""
cutoff = (dt.now() - datetime.timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
self._execute("DELETE FROM target_candidates WHERE updated_at < %s", (cutoff,))
# ---- Half-Kelly 승률 계산 -----------------------------------
def calculate_half_kelly(self, lookback: int = 50) -> float:
"""최근 N건 거래 기준 Half-Kelly 비율 (5~50% 클리핑)"""
cur = self._execute(
"SELECT profit_rate FROM trade_history ORDER BY id DESC LIMIT %s", (lookback,)
)
rows = cur.fetchall() or []
if not rows:
return 0.25
profits = [float(r["profit_rate"] or 0) for r in rows]
wins = [p for p in profits if p > 0]
if not wins:
return 0.05
win_rate = len(wins) / len(profits)
avg_win = sum(wins) / len(wins)
losses = [abs(p) for p in profits if p < 0]
avg_loss = sum(losses) / len(losses) if losses else 1.0
b = avg_win / avg_loss if avg_loss else 1.0
kelly = (b * win_rate - (1 - win_rate)) / b if b else 0
return max(0.05, min(0.5, kelly / 2))
def close(self):
try:
self._conn.close()
except Exception:
pass
# ============================================================
# 전역 DB & env 헬퍼 (하드코딩 금지 — 모든 수치는 DB에서 로드)
# ============================================================
_db: Optional[UpbitDB] = None
def _env_raw(key: str, default: str = "") -> str:
env = _db.get_latest_env() if _db else {}
val = env.get(key)
if not val:
val = _db.get_kv(key) if _db else ""
if not val:
val = default
if isinstance(val, str) and "#" in val:
val = val.split("#")[0].strip()
return str(val) if val is not None else str(default)
def get_env_float(key: str, default: float) -> float:
try:
return float(_env_raw(key, str(default)))
except (ValueError, TypeError):
return default
def get_env_int(key: str, default: int) -> int:
try:
return int(float(_env_raw(key, str(default))))
except (ValueError, TypeError):
return default
def get_env_bool(key: str, default: bool = False) -> bool:
return _env_raw(key, str(default)).lower() in ("true", "1", "yes")
def get_env_str(key: str, default: str = "") -> str:
return _env_raw(key, default)
# ============================================================
# Mattermost 알림
# ============================================================
def send_mm(msg: str):
"""Mattermost 알림 전송 (실패 시 로그만 남기고 봇 계속 실행)"""
try:
server = get_env_str("MM_SERVER_URL", "")
token = get_env_str("MM_BOT_TOKEN_", "")
channel = get_env_str("MATTERMOST_CHANNEL", "upbit")
if not server or not token:
return
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
r = requests.get(f"{server}/api/v4/channels/name/{channel}", headers=headers, timeout=5)
if r.status_code != 200:
return
channel_id = r.json().get("id", "")
requests.post(
f"{server}/api/v4/posts",
headers=headers,
json={"channel_id": channel_id, "message": msg},
timeout=5,
)
except Exception as e:
logger.debug(f"MM 발송 실패: {e}")
# ============================================================
# WebSocket 실시간 체결가 캐시 (ver1과 동일, 그대로 유지)
# ============================================================
class UpbitWSPriceCache:
"""
Upbit WebSocket 실시간 체결가 캐시
- URL : wss://api.upbit.com/websocket/v1
- 타입 : trade (체결 틱) → SIMPLE 포맷
- 캐시 : {market: {price, volume, timestamp}}
- 자동 재연결: 끊기면 5초 후 재시도
- 구독 갱신: subscribe()/unsubscribe() 호출 즉시 서버에 반영
[SIMPLE 포맷 주요 필드]
cd : 마켓코드 (KRW-BTC 등)
tp : 체결가 (trade price)
tv : 체결량 (trade volume)
tms : 체결 타임스탬프 (ms)
"""
WS_URL = "wss://api.upbit.com/websocket/v1"
def __init__(self):
self._cache: Dict[str, dict] = {}
self._lock = threading.Lock()
self._codes = set()
self._ws: Optional["websocket.WebSocketApp"] = None
self._thread: Optional[threading.Thread] = None
self._stop = threading.Event()
self.is_active = False
# ---- 구독 관리 ----
def subscribe(self, *markets: str):
"""마켓 추가 구독 → 즉시 서버에 재구독 메시지 전송"""
with self._lock:
before = len(self._codes)
self._codes.update(markets)
changed = len(self._codes) != before
if changed and self._ws and self.is_active:
self._send_subscribe()
def unsubscribe(self, *markets: str):
"""마켓 구독 해제 → 즉시 서버에 재구독 메시지 전송"""
with self._lock:
self._codes.difference_update(markets)
for m in markets:
self._cache.pop(m, None)
if self._ws and self.is_active:
self._send_subscribe()
def _send_subscribe(self):
"""현재 구독 목록으로 WebSocket 구독 메시지 전송"""
with self._lock:
codes = list(self._codes)
if not codes or not self._ws:
return
payload = json.dumps([
{"ticket": str(uuid.uuid4())},
{"type": "trade", "codes": codes, "isOnlyRealtime": True},
{"format": "SIMPLE"},
])
try:
self._ws.send(payload)
logger.info(f"[WS] 구독 갱신: {len(codes)}개 마켓")
except Exception as e:
logger.warning(f"[WS] 구독 전송 실패: {e}")
# ---- 가격 조회 ----
def get_price(self, market: str, max_age_sec: float = 8.0) -> Optional[dict]:
"""
캐시에서 현재 체결가 반환
- max_age_sec 이내 수신분만 유효 (오래된 건 REST fallback 유도)
"""
with self._lock:
data = self._cache.get(market)
if not data:
return None
if time.time() - data["timestamp"] > max_age_sec:
return None
return data
# ---- WebSocket 콜백 ----
def _on_open(self, ws):
self.is_active = True
logger.info("[WS] Upbit WebSocket 연결 완료")
self._send_subscribe()
def _on_message(self, ws, raw):
"""체결 틱 수신 → price_cache 갱신"""
try:
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
data = json.loads(raw)
market = data.get("cd")
price = data.get("tp")
volume = data.get("tv")
if market and price:
with self._lock:
self._cache[market] = {
"price": float(price),
"volume": float(volume) if volume else 0.0,
"timestamp": time.time(),
}
except Exception as e:
logger.debug(f"[WS] 메시지 파싱 오류: {e}")
def _on_error(self, ws, error):
logger.warning(f"[WS] 오류: {error}")
self.is_active = False
def _on_close(self, ws, code, msg):
logger.warning(f"[WS] 연결 종료 (code={code})")
self.is_active = False
# ---- 시작/종료 ----
def start(self, initial_markets: list = None):
"""WebSocket 백그라운드 스레드 시작"""
if not _WS_AVAILABLE:
logger.error("websocket-client 미설치 → WebSocket 비활성화")
return
if initial_markets:
with self._lock:
self._codes.update(initial_markets)
self._stop.clear()
self._thread = threading.Thread(target=self._run_loop, daemon=True, name="UpbitWS")
self._thread.start()
logger.info("[WS] WebSocket 스레드 시작")
def stop(self):
self._stop.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
self.is_active = False
logger.info("[WS] WebSocket 종료")
def _run_loop(self):
"""WebSocket 자동 재연결 루프 (5초 간격)"""
while not self._stop.is_set():
try:
self._ws = websocket.WebSocketApp(
self.WS_URL,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)
# ping/pong으로 120초 idle timeout 방지
self._ws.run_forever(ping_interval=30, ping_timeout=10)
except Exception as e:
logger.warning(f"[WS] run_forever 예외: {e}")
finally:
self.is_active = False
if not self._stop.is_set():
logger.info("[WS] 5초 후 재연결...")
time.sleep(5)
# ============================================================
# Upbit REST API 클라이언트 (ver1과 동일)
# ============================================================
class UpbitClient:
"""
Upbit REST API 클라이언트
- Exchange API (주문/계좌): JWT 인증 필요
- Quotation API (시세/캔들): 인증 불필요
- HTTP 429 자동 재시도 (점진적 대기)
[레이트리밋]
- 주문 API: 분당 200회
- 시세 API: 분당 600회
→ 요청 간 0.12초 갭 유지 (안전 마진)
"""
BASE = "https://api.upbit.com/v1"
def __init__(self, access_key: str, secret_key: str):
self.access_key = access_key
self.secret_key = secret_key
self._last_t = 0.0
self._last_order_error = None
def _throttle(self, gap: float = 0.12):
"""요청 간격 조절 (레이트리밋 방지)"""
elapsed = time.time() - self._last_t
if elapsed < gap:
time.sleep(gap - elapsed)
self._last_t = time.time()
# ---- JWT 인증 ----
def _make_jwt(self, query_string: str = "") -> str:
try:
import jwt as pyjwt
except ImportError:
raise RuntimeError("PyJWT 필요: pip install PyJWT")
payload = {"access_key": self.access_key, "nonce": str(uuid.uuid4())}
if query_string:
qh = hashlib.sha512(query_string.encode()).hexdigest()
payload["query_hash"] = qh
payload["query_hash_alg"] = "SHA512"
return pyjwt.encode(payload, self.secret_key, algorithm="HS256")
def _auth_headers(self, qs: str = "") -> dict:
return {"Authorization": f"Bearer {self._make_jwt(qs)}"}
# ---- 공통 GET / POST ----
def _get(self, path: str, params: dict = None, auth: bool = False, retries: int = 5) -> Optional[dict]:
self._throttle()
url = f"{self.BASE}{path}"
qs = urlencode(params, doseq=True) if params else ""
hdrs = self._auth_headers(qs) if auth else {}
for attempt in range(retries):
try:
r = requests.get(url, params=params, headers=hdrs, timeout=10)
if r.status_code == 429:
wait = 1.0 + attempt
logger.warning(f"⏳ REST 429 → {wait:.0f}초 대기 (attempt {attempt+1})")
time.sleep(wait)
continue
if r.status_code == 200:
return r.json()
logger.warning(f"[REST] GET {path}{r.status_code}: {r.text[:200]}")
return None
except requests.RequestException as e:
wait = (2 ** attempt) + random.uniform(0.3, 1.0)
logger.warning(f"⚠️ 네트워크 오류 ({attempt+1}/{retries}): {e}{wait:.1f}")
time.sleep(wait)
return None
def _post(self, path: str, body: dict, retries: int = 3) -> Optional[dict]:
self._throttle()
url = f"{self.BASE}{path}"
qs = urlencode(body, doseq=True)
hdrs = {**self._auth_headers(qs), "Content-Type": "application/json"}
for attempt in range(retries):
try:
r = requests.post(url, json=body, headers=hdrs, timeout=10)
if r.status_code == 429:
wait = 3.0 + attempt * 2
logger.warning(f"⏳ REST 429 → {wait:.0f}초 대기 (attempt {attempt+1})")
time.sleep(wait)
continue
data = r.json()
if r.status_code in (200, 201):
return data
self._last_order_error = data
logger.warning(f"[REST] POST {path}{r.status_code}: {data}")
return None
except requests.RequestException as e:
wait = (2 ** attempt) + random.uniform(0.3, 1.0)
logger.warning(f"⚠️ 네트워크 오류 ({attempt+1}/{retries}): {e}")
time.sleep(wait)
return None
# ---- 계좌 ----
def get_accounts(self) -> list:
return self._get("/accounts", auth=True) or []
def get_krw_balance(self) -> float:
"""가용 KRW 잔고 반환"""
for acc in self.get_accounts():
if acc.get("currency") == "KRW":
return float(acc.get("balance", 0))
return 0.0
# ---- 시세 (Quotation) ----
def get_market_list(self) -> list:
return self._get("/market/all") or []
def get_ticker(self, markets: list) -> list:
return self._get("/ticker", params={"markets": ",".join(markets)}) or []
def get_candles_minutes(self, market: str, unit: int = 3, count: int = 50) -> list:
"""
분봉 조회 (unit: 1,3,5,10,15,30,60,240)
응답: 최신 봉이 인덱스 0 (시간 역순) → candles_to_df()에서 정렬
"""
return self._get(
f"/candles/minutes/{unit}",
params={"market": market, "count": count}
) or []
def get_candles_days(self, market: str, count: int = 30) -> list:
return self._get("/candles/days", params={"market": market, "count": count}) or []
# ---- 주문 ----
def order_market_buy(self, market: str, price_krw: float) -> Optional[str]:
"""
시장가 매수 주문 (금액 기준)
Returns: 주문 UUID → wait_for_fill()에 전달
"""
body = {
"market": market,
"side": "bid",
"price": str(int(price_krw)),
"ord_type": "price",
}
result = self._post("/orders", body)
return result.get("uuid") if result else None
def order_market_sell(self, market: str, volume: float) -> Optional[str]:
"""
시장가 매도 주문 (수량 기준)
Returns: 주문 UUID → wait_for_fill()에 전달
"""
body = {
"market": market,
"side": "ask",
"volume": str(volume),
"ord_type": "market",
}
result = self._post("/orders", body)
return result.get("uuid") if result else None
def wait_for_fill(self, order_uuid: str, timeout_sec: int = 20, poll: float = 0.5) -> Optional[dict]:
"""
★ 체결 확인 (폴링)
- state == 'done'이 될 때까지 REST 폴링
- 실제 체결가(avg_price) / 체결량(filled_volume) 반환
- timeout_sec 초 내 미체결 시 None 반환
Upbit 주문 상태:
wait : 대기 (미체결)
watch : 예약 (예약주문)
done : 완료 (체결 완료)
cancel : 취소
"""
deadline = time.time() + timeout_sec
while time.time() < deadline:
data = self._get("/order", params={"uuid": order_uuid}, auth=True)
if not data:
time.sleep(poll)
continue
state = data.get("state")
if state == "done":
trades = data.get("trades", [])
if trades:
total_vol = sum(float(t["volume"]) for t in trades)
total_cost = sum(float(t["price"]) * float(t["volume"]) for t in trades)
avg_price = total_cost / total_vol if total_vol else 0.0
paid_fee = sum(float(t.get("funds", 0)) * 0.0005 for t in trades)
else:
avg_price = float(data.get("avg_price") or data.get("price", 0))
total_vol = float(data.get("executed_volume", 0))
paid_fee = float(data.get("paid_fee", 0))
return {
"avg_price": avg_price,
"filled_volume": total_vol,
"paid_fee": paid_fee,
}
if state == "cancel":
logger.warning(f"[체결확인] 주문 취소됨: {order_uuid}")
return None
time.sleep(poll)
logger.warning(f"[체결확인] 타임아웃 ({timeout_sec}s): {order_uuid}")
return None
def cancel_order(self, order_uuid: str) -> bool:
hdrs = self._auth_headers(f"uuid={order_uuid}")
try:
r = requests.delete(
f"{self.BASE}/order",
params={"uuid": order_uuid},
headers=hdrs, timeout=10
)
return r.status_code == 200
except Exception:
return False
# ============================================================
# 지표 계산 유틸
# ============================================================
def candles_to_df(candles: list) -> pd.DataFrame:
"""
Upbit 분봉/일봉 리스트 → DataFrame (시간 오름차순)
Upbit 응답은 최신 봉이 인덱스 0 (역순) → sort 필수
"""
if not candles:
return pd.DataFrame()
df = pd.DataFrame(candles).rename(columns={
"opening_price": "open",
"high_price": "high",
"low_price": "low",
"trade_price": "close",
"candle_acc_trade_volume": "volume",
"candle_date_time_kst": "datetime",
})
for col in ("open", "high", "low", "close", "volume"):
if col in df.columns:
df[col] = df[col].astype(float)
if "datetime" in df.columns:
df = df.sort_values("datetime").reset_index(drop=True)
return df
def calc_rsi(close: pd.Series, period: int = 14) -> float:
"""RSI 계산 (마지막 값 반환). 데이터 부족 시 50 반환"""
if len(close) < period + 1:
return 50.0
delta = close.diff()
gain = delta.where(delta > 0, 0.0).rolling(period).mean()
loss = (-delta.where(delta < 0, 0.0)).rolling(period).mean()
rs = gain / loss.replace(0, float("nan"))
rsi = 100 - (100 / (1 + rs))
val = rsi.iloc[-1]
return float(val) if not pd.isna(val) else 50.0
def calc_atr(df: pd.DataFrame, period: int = 14) -> float:
"""ATR (Average True Range) 계산. 데이터 부족 시 0 반환"""
if df.empty or len(df) < period + 1:
return 0.0
h, l, c = df["high"], df["low"], df["close"]
tr = pd.concat(
[(h - l), (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1
).max(axis=1)
val = tr.rolling(period).mean().iloc[-1]
return float(val) if not pd.isna(val) else 0.0
# ============================================================
# 단타 트레이더 Ver2 (핵심 개선: 비동기 매수/매도)
# ============================================================
class UpbitShortTraderV2:
"""
Upbit 단타 트레이더 Ver2 (개미털기/눌림목 전략)
[Ver2 핵심 변경사항]
★ 비동기 매수/매도 (Non-blocking Thread)
- 기존 Ver1: execute_buy/sell → wait_for_fill(20초) → 메인 루프 블로킹
- Ver2: 매수/매도를 Worker 스레드로 던지고 메인 루프는 즉시 계속
- _buying / _selling set으로 중복 처리 완전 방지
[아키텍처 다이어그램]
┌─────────────────────────────────────────────┐
│ WebSocket 스레드 (daemon) │
│ → trade 체결 틱 수신 → price_cache 업데이트 │
└───────────────────┬─────────────────────────┘
│ (캐시 읽기)
┌───────────────────▼─────────────────────────┐
│ 메인 루프 (2초 주기) ← 절대 블로킹 없음! │
│ ① 매도 신호 체크 → _selling 미포함 종목만 │
│ → Worker 스레드 생성 (비동기) │
│ ② 매수 스캔 (60초마다) → _buying 미포함 만 │
│ → Worker 스레드 생성 (비동기) │
└───────────────────┬─────────────────────────┘
│ (스레드로 던짐)
┌───────────────────▼─────────────────────────┐
│ Worker 스레드 (daemon, 개수 제한 없음) │
│ → wait_for_fill(UUID 폴링, 최대 20초) │
│ → DB 저장 (즉시, 원자적) │
└─────────────────────────────────────────────┘
"""
def __init__(self):
global _db
_db = UpbitDB()
self.db = _db
# API 키 (kv_store 우선, env_config fallback)
access_key = self.db.get_kv("UPBIT_ACCESS_KEY") or get_env_str("UPBIT_ACCESS_KEY", "")
secret_key = self.db.get_kv("UPBIT_SECRET_KEY") or get_env_str("UPBIT_SECRET_KEY", "")
if not access_key or not secret_key:
logger.warning("⚠️ Upbit API 키 미설정 → DB kv_store 에 UPBIT_ACCESS_KEY/UPBIT_SECRET_KEY 저장 필요")
self.client = UpbitClient(access_key, secret_key)
self.ws_cache = UpbitWSPriceCache()
# 보유 포지션 메모리 {code: holding_dict}
self.holdings: Dict[str, dict] = {}
# ★ Ver2 핵심: 비동기 처리 중 종목 추적
self._selling: set = set() # 매도 워커 스레드 처리 중인 종목
self._buying: set = set() # 매수 워커 스레드 처리 중인 종목
self._trade_lock = threading.Lock() # holdings 딕셔너리 동시 접근 보호
# 재진입 쿨다운 {code: timestamp}
self.recently_sold: Dict[str, float] = {}
# 설정 로드
self._load_settings()
# DB에서 보유 포지션 복원 (재시작 시)
self._restore_holdings()
def _load_settings(self):
"""DB env_config에서 전략 파라미터 로드 (모두 get_env_* 사용 — 하드코딩 금지)"""
self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.02)
self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05)
self.max_stocks = get_env_int("MAX_STOCKS", 5)
self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03)
self.min_recovery_ratio = get_env_float("MIN_RECOVERY_RATIO", 0.30)
self.rsi_overheat = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0)
self.tail_ratio_min = get_env_float("TAIL_RATIO_MIN", 1.5)
self.tail_pct_min = get_env_float("TAIL_PCT_MIN", 0.003)
self.shoulder_cut_pct = get_env_float("SHOULDER_CUT_PCT", 0.03)
self.shoulder_min_high = get_env_float("SHOULDER_MIN_HIGH_PCT", 0.01)
self.scalp_up_mult = get_env_float("SCALP_ATR_UP_MULT", 1.0)
self.scalp_down_mult = get_env_float("SCALP_ATR_DOWN_MULT", 0.2)
self.scalp_drop_mult = get_env_float("SCALP_ATR_DROP_MULT", 1.0)
self.reentry_cooldown = get_env_int("REENTRY_COOLDOWN_SEC", 300)
self.round_trip_cost = get_env_float("ROUND_TRIP_COST_PCT", 0.001)
logger.info(
f"⚙️ 설정 로드 | 손절={self.stop_loss_pct*100:.1f}% "
f"| 익절={self.take_profit_pct*100:.1f}% "
f"| 최대종목={self.max_stocks}"
)
def _restore_holdings(self):
"""재시작 시 DB active_trades → 메모리 복원 + WS 재구독"""
rows = self.db.load_active_trades()
for r in rows:
code = r["code"]
self.holdings[code] = {
"name": r["name"],
"buy_price": float(r["avg_buy_price"] or 0),
"qty": float(r["current_qty"] or 0),
"buy_time": str(r.get("buy_date", "")),
"max_price": float(r.get("max_price") or r["avg_buy_price"] or 0),
"stop_price": float(r.get("stop_price") or 0) or
float(r["avg_buy_price"] or 0) * (1 + self.stop_loss_pct),
"target_price": float(r.get("target_price") or 0) or
float(r["avg_buy_price"] or 0) * (1 + self.take_profit_pct),
"atr_entry": float(r.get("atr_entry") or 0),
"rsi": float(r.get("rsi") or 50.0),
"volume_ratio": float(r.get("volume_ratio") or 1.0),
"tail_length_pct": float(r.get("tail_length_pct") or 0.0),
}
if rows:
logger.info(f"♻️ 보유 포지션 복원: {list(self.holdings.keys())}")
# --------------------------------------------------------
# 매수 스캔 (REST 캔들 기반, 60초 주기)
# --------------------------------------------------------
def scan_buy_candidates(self, market_list: list) -> list:
"""
전체 KRW 마켓 순회 → 개미털기(눌림목) 조건 필터링
[조건 순서]
1. 낙폭: 시가 대비 저가 낙폭 ≥ MIN_DROP_RATE
2. 회복률: 저가→현재가 회복률 [MIN_RECOVERY_RATIO ~ MAX_RECOVERY_RATIO]
3. 꼬리봉: 밑꼬리/몸통 비율 ≥ TAIL_RATIO_MIN, 꼬리% ≥ TAIL_PCT_MIN
4. 고점 추격 방지: 현재가 < 당일고점 × HIGH_PRICE_CHASE_THRESHOLD
5. RSI: < RSI_OVERHEAT_THRESHOLD
6. 점수 계산 후 내림차순 정렬
"""
candidates = []
for market in market_list:
with self._trade_lock:
if market in self.holdings:
continue
if market in self._buying:
continue
# 재진입 쿨다운
if market in self.recently_sold:
if time.time() - self.recently_sold[market] < self.reentry_cooldown:
continue
try:
# 분봉 단위: DB 설정에서 동적 로드 (기본 3분봉)
# MA60 계산을 위해 최소 80봉 확보 (50→80으로 상향)
candle_unit = get_env_int("UPBIT_CANDLE_UNIT", 3)
candles = self.client.get_candles_minutes(market, unit=candle_unit, count=80)
if not candles or len(candles) < 20:
continue
df = candles_to_df(candles)
current_price = df["close"].iloc[-1]
# =======================================================
# 🛡️ [추세선 필터] 역배열(하락 추세) 종목 필터링
# MA20 < MA60 인 역배열 종목은 꼬리잡기 진입 금지
# → 세력이 코인을 버리고 있는 구간이면 꼬리를 잡아도 반등 없이 추가 하락
# -------------------------------------------------------
ma20 = df["close"].rolling(20).mean().iloc[-1]
ma60 = df["close"].rolling(60).mean().iloc[-1]
if pd.isna(ma20) or pd.isna(ma60):
# 봉 수 부족으로 MA 계산 불가 → 스킵
continue
if ma20 < ma60:
logger.info(
f"{LOG_YELLOW}[탈락-역배열] {market}: "
f"MA20({ma20:,.0f}) < MA60({ma60:,.0f}){LOG_RESET}"
)
continue
# =======================================================
day_open = df["open"].iloc[0]
day_high = df["high"].max()
valid_lows = df["low"][df["low"] > 0]
day_low = float(valid_lows.min()) if not valid_lows.empty else df["low"].min()
day_range = day_high - day_low
# [1] 낙폭 필터
drop_rate = (day_open - day_low) / day_open if day_open > 0 else 0
if drop_rate < self.min_drop_rate:
logger.info(f"{LOG_YELLOW}[탈락-낙폭] {market}: {drop_rate*100:.1f}% < {self.min_drop_rate*100:.1f}%{LOG_RESET}")
continue
# [2] 회복률 필터
recovery = (current_price - day_low) / day_range if day_range > 0 else 0
max_rec = get_env_float("MAX_RECOVERY_RATIO", 0.8)
if not (self.min_recovery_ratio <= recovery <= max_rec):
logger.info(f"{LOG_YELLOW}[탈락-회복] {market}: {recovery*100:.1f}%{LOG_RESET}")
continue
# [3] 망치봉 꼬리 계산 (마지막 봉 기준)
last = df.iloc[-1]
c_open, c_close = last["open"], last["close"]
c_low, c_high = last["low"], last["high"]
body_top = max(c_open, c_close)
body_bottom = min(c_open, c_close)
body_len = max(body_top - body_bottom, 1.0)
tail_len = max(body_bottom - c_low, 0.0)
tail_ratio = tail_len / body_len
tail_pct = tail_len / c_low if c_low > 0 else 0
# 마지막 봉에 꼬리가 없으면 최근 5봉 재탐색 (장외 도지 대응)
if tail_len <= 0 and len(df) >= 2:
for idx in range(len(df) - 2, max(-1, len(df) - 6), -1):
c = df.iloc[idx]
bt = max(c["open"], c["close"])
bb = min(c["open"], c["close"])
bl = max(bt - bb, 1.0)
tl = max(bb - c["low"], 0.0)
if tl > 0:
tail_ratio = tl / bl
tail_pct = tl / c["low"] if c["low"] > 0 else 0
tail_len = tl
break
if tail_ratio < self.tail_ratio_min or tail_pct < self.tail_pct_min:
logger.info(f"{LOG_YELLOW}[탈락-꼬리] {market}: 비율={tail_ratio:.2f}, pct={tail_pct*100:.2f}%{LOG_RESET}")
continue
# [4] 고점 추격 방지
hc_thr = get_env_float("HIGH_PRICE_CHASE_THRESHOLD", 0.96)
if current_price >= day_high * hc_thr:
logger.info(f"{LOG_YELLOW}[탈락-고점추격] {market}{LOG_RESET}")
continue
# [5] RSI 과열 방지
rsi = calc_rsi(df["close"])
if rsi >= self.rsi_overheat:
logger.info(f"{LOG_YELLOW}[탈락-RSI] {market}: {rsi:.1f}{LOG_RESET}")
continue
# [6] 거래량 비율
avg_vol = df["volume"].mean()
last_vol = df["volume"].iloc[-1]
volume_ratio = last_vol / avg_vol if avg_vol > 0 else 1.0
# [7] ATR / 점수
atr = calc_atr(df)
score = get_env_float("TAIL_SCORE_BASE", 5.0) + \
tail_ratio * get_env_float("TAIL_SCORE_RATIO_MULT", 2.0)
candidates.append({
"code": market,
"name": market.replace("KRW-", ""),
"price": current_price,
"score": score,
"atr": atr,
"rsi": rsi,
"volume_ratio": volume_ratio,
"tail_ratio": tail_ratio,
"tail_pct": tail_pct,
"recovery": recovery,
"drop_rate": drop_rate,
})
logger.info(
f"{LOG_GREEN}🎯 [후보] {market} | 가격:{current_price:,.0f} "
f"| 꼬리:{tail_ratio:.1f}x | RSI:{rsi:.1f} | 점수:{score:.1f}{LOG_RESET}"
)
# 후보 DB 즉시 저장 (이벤트 발생 즉시 저장 원칙)
self.db.upsert_candidate(market, market.replace("KRW-", ""), score, current_price)
except Exception as e:
logger.debug(f"[스캔] {market} 오류: {e}")
# 마켓 간 레이트리밋 방지
time.sleep(random.uniform(0.12, 0.20))
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates
# --------------------------------------------------------
# 매도 신호 체크 (WebSocket 캐시 기반, 2초 주기)
# --------------------------------------------------------
def check_sell_signals(self) -> list:
"""
보유 포지션 매도 신호 체크
- 가격 소스: WebSocket 캐시 우선 (max_age=8초), 만료 시 REST fallback
- 이미 _selling에 있는 종목은 체크 스킵 (중복 매도 방지)
[매도 우선순위]
1. 어깨매도 : 고점 대비 SHOULDER_CUT_PCT 이상 하락 (수익 보존)
2. ATR 스캘핑: 고점→본절 복귀 또는 고점→ATR 하락
3. 목표가 달성
4. 손절 (% 또는 원화 기준)
"""
if not self.holdings:
return []
signals = []
min_hold = get_env_float("MIN_HOLD_AFTER_BUY_SEC", 10.0)
with self._trade_lock:
holdings_snapshot = dict(self.holdings)
for code, h in holdings_snapshot.items():
# ★ 이미 매도 처리 중인 종목 스킵 (비동기 중복 방지)
if code in self._selling:
continue
try:
# 최소 보유 시간 체크
if h.get("buy_time"):
try:
buy_dt = dt.strptime(h["buy_time"][:19], "%Y-%m-%d %H:%M:%S")
if (dt.now() - buy_dt).total_seconds() < min_hold:
continue
except Exception:
pass
# ★ 현재가: WebSocket 캐시 우선 → REST fallback
ws_data = self.ws_cache.get_price(code, max_age_sec=8.0)
if ws_data:
current_price = ws_data["price"]
_src = "WS"
else:
tickers = self.client.get_ticker([code])
if not tickers:
continue
current_price = float(tickers[0]["trade_price"])
_src = "REST"
if current_price <= 0:
continue
buy_price = h["buy_price"]
qty = h["qty"]
max_price = h.get("max_price", buy_price)
# 고점 갱신 (즉시 DB 저장)
if current_price > max_price:
max_price = current_price
with self._trade_lock:
if code in self.holdings:
self.holdings[code]["max_price"] = max_price
self.db.update_trade_max_price(code, current_price, max_price)
profit_pct = (current_price - buy_price) / buy_price if buy_price else 0
profit_krw = (current_price - buy_price) * qty
atr = h.get("atr_entry", 0) or buy_price * 0.01
stop_price = h.get("stop_price", buy_price * (1 + self.stop_loss_pct))
target_price = h.get("target_price", buy_price * (1 + self.take_profit_pct))
sell_reason = None
# [1] 어깨매도: 고점이 진입가 대비 충분히 올랐을 때만 적용
drop_from_high = (max_price - current_price) / max_price if max_price > 0 else 0
high_above_entry = (max_price - buy_price) / buy_price if buy_price > 0 else 0
# 수수료 반영 손익분기 하한 자동 계산
min_high_required = (
(1 + self.round_trip_cost + get_env_float("SHOULDER_MIN_NET_PCT", 0.001)) /
(1 - self.shoulder_cut_pct) - 1
) if (1 - self.shoulder_cut_pct) > 0 else 0.01
eff_shoulder_min = max(self.shoulder_min_high, min_high_required)
if drop_from_high >= self.shoulder_cut_pct and high_above_entry >= eff_shoulder_min:
sell_reason = f"어깨매도(고점-{drop_from_high*100:.1f}%)"
# [2] ATR 스캘핑: 고점 달성 후 본절 근처로 내려오면 매도
if not sell_reason and atr > 0:
if max_price >= buy_price + atr * self.scalp_up_mult and \
current_price <= buy_price + atr * self.scalp_down_mult:
sell_reason = "스캘핑_본절사수"
if not sell_reason and \
current_price < (max_price - atr * self.scalp_drop_mult) and profit_pct > 0:
sell_reason = "스캘핑_익절보존"
# [3] 목표가 달성
if not sell_reason and current_price >= target_price:
sell_reason = f"목표달성({profit_pct*100:+.2f}%)"
# [4] 손절 (% 기준)
if not sell_reason and (current_price <= stop_price or profit_pct <= self.stop_loss_pct):
sell_reason = f"손절({profit_pct*100:.2f}%)"
# [5] 원화 손실 한도 (금액 기준 손절)
max_loss_krw = get_env_float("MAX_LOSS_PER_TRADE_KRW", 50000)
if not sell_reason and profit_krw <= -max_loss_krw:
sell_reason = f"금액손절({profit_krw:+,.0f}원)"
if sell_reason:
signals.append({
"code": code,
"name": h["name"],
"reason": sell_reason,
"profit_pct": profit_pct,
"qty": qty,
"price": current_price,
"_src": _src,
})
logger.info(
f"💸 [매도신호/{_src}] {h['name']}({code}): "
f"{sell_reason} | {profit_pct*100:+.2f}%"
)
except Exception as e:
logger.error(f"[매도체크] {code} 오류: {e}")
return signals
# --------------------------------------------------------
# ★ Ver2 핵심: 비동기 매수 실행 (Worker 스레드)
# --------------------------------------------------------
def _buy_worker(self, candidate: dict):
"""
매수 Worker 스레드 (메인 루프와 독립 실행)
- wait_for_fill(20초 최대) 이 블로킹되어도 메인 루프는 계속 돌아감
- 완료 후 반드시 _buying에서 제거 (finally 보장)
"""
code = candidate["code"]
try:
self._execute_buy_internal(candidate)
except Exception as e:
logger.error(f"[매수워커] {code} 예외: {e}")
finally:
self._buying.discard(code)
def _execute_buy_internal(self, candidate: dict) -> bool:
"""
실제 매수 실행 로직 (Worker 스레드 내에서 실행)
- 주문 → 체결 확인 → holdings & DB 반영 → WS 구독 추가
"""
code = candidate["code"]
name = candidate["name"]
price = candidate["price"]
with self._trade_lock:
if code in self.holdings:
logger.warning(f"⚠️ [{name}] 이미 보유 → 스킵")
return False
if len(self.holdings) >= self.max_stocks:
logger.warning(f"⚠️ 최대 종목 수 초과 ({self.max_stocks})")
return False
cash = self.client.get_krw_balance()
slot = get_env_float("SLOT_MONEY_DEFAULT", 100000)
if cash < slot * 1.05:
logger.warning(f"⚠️ [{name}] 잔고 부족: {cash:,.0f}원 < {slot*1.05:,.0f}")
return False
amount = min(slot, cash * 0.90)
logger.info(f"🟡 [매수주문] {name}({code}) {amount:,.0f}원 시장가")
order_uuid = self.client.order_market_buy(code, amount)
if not order_uuid:
logger.warning(f"❌ [{name}] 매수 주문 실패: {self.client._last_order_error}")
return False
# ★ 체결 확인 (최대 20초 블로킹 → 하지만 이미 별도 스레드이므로 안전)
fill = self.client.wait_for_fill(order_uuid, timeout_sec=20)
if not fill or fill["filled_volume"] <= 0:
logger.warning(f"❌ [{name}] 매수 체결 미확인 (UUID={order_uuid})")
self.client.cancel_order(order_uuid)
return False
filled_price = fill["avg_price"]
filled_volume = fill["filled_volume"]
# ATR 기반 손절가/목표가 계산
atr = candidate.get("atr", filled_price * 0.01)
if atr > 0:
stop_price = filled_price - atr * get_env_float("STOP_ATR_MULTIPLIER_TAIL", 2.5)
target_price = filled_price + atr * get_env_float("TARGET_ATR_MULTIPLIER_TAIL", 7.0)
else:
stop_price = filled_price * (1 + self.stop_loss_pct)
target_price = filled_price * (1 + self.take_profit_pct)
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
# 메모리 포지션 등록 (락 보호)
with self._trade_lock:
self.holdings[code] = {
"name": name,
"buy_price": filled_price,
"qty": filled_volume,
"buy_time": buy_time,
"max_price": filled_price,
"stop_price": stop_price,
"target_price": target_price,
"atr_entry": atr,
"rsi": candidate.get("rsi", 50.0),
"volume_ratio": candidate.get("volume_ratio", 1.0),
"tail_length_pct": candidate.get("tail_pct", 0.0) * 100,
}
# [즉시 저장 원칙] 이벤트 발생 즉시 DB 저장 (재시작 복원 가능)
self.db.upsert_trade({
"code": code,
"name": name,
"strategy": "UPBIT_TAIL_CATCH",
"avg_buy_price": filled_price,
"current_price": filled_price,
"stop_price": stop_price,
"target_price": target_price,
"max_price": filled_price,
"atr_entry": atr,
"target_qty": filled_volume,
"current_qty": filled_volume,
"total_invested": filled_price * filled_volume,
"status": "HOLDING",
"buy_date": buy_time,
"rsi": candidate.get("rsi", 50.0),
"volume_ratio": candidate.get("volume_ratio", 1.0),
"tail_length_pct": candidate.get("tail_pct", 0.0) * 100,
})
# WS 구독 추가 → 이후 매도 체크는 WS 기반
self.ws_cache.subscribe(code)
logger.info(
f"✅ [매수체결] {name}({code}): "
f"{filled_price:,.4f}× {filled_volume:.8f} "
f"| 손절={stop_price:,.4f} / 목표={target_price:,.4f}"
)
send_mm(
f"🟢 **매수체결** {name}({code})\n"
f"체결가: {filled_price:,.4f}원 | 수량: {filled_volume:.8f}\n"
f"손절: {stop_price:,.4f} / 목표: {target_price:,.4f}\n"
f"보유: {len(self.holdings)}종목"
)
return True
# --------------------------------------------------------
# ★ Ver2 핵심: 비동기 매도 실행 (Worker 스레드)
# --------------------------------------------------------
def _sell_worker(self, signal: dict):
"""
매도 Worker 스레드 (메인 루프와 독립 실행)
- wait_for_fill(20초 최대) 이 블로킹되어도 메인 루프는 계속 돌아감
- 완료 후 반드시 _selling에서 제거 (finally 보장)
"""
code = signal["code"]
try:
self._execute_sell_internal(signal)
except Exception as e:
logger.error(f"[매도워커] {code} 예외: {e}")
finally:
self._selling.discard(code)
def _execute_sell_internal(self, signal: dict) -> bool:
"""
실제 매도 실행 로직 (Worker 스레드 내에서 실행)
- 주문 → 체결 확인 → holdings & DB 반영 → WS 구독 해제
"""
code = signal["code"]
name = signal["name"]
qty = signal["qty"]
reason = signal["reason"]
with self._trade_lock:
if code not in self.holdings:
logger.warning(f"⚠️ [{name}] 보유 목록에 없음 → 스킵")
return False
logger.info(f"🟡 [매도주문] {name}({code}) {qty:.8f}개 시장가 | 사유: {reason}")
order_uuid = self.client.order_market_sell(code, qty)
if not order_uuid:
logger.warning(f"❌ [{name}] 매도 주문 실패: {self.client._last_order_error}")
return False
# ★ 체결 확인 (최대 20초 블로킹 → 하지만 이미 별도 스레드이므로 안전)
fill = self.client.wait_for_fill(order_uuid, timeout_sec=20)
if not fill or fill["filled_volume"] <= 0:
logger.warning(f"❌ [{name}] 매도 체결 미확인 (UUID={order_uuid})")
return False
sell_price = fill["avg_price"]
buy_price = None
with self._trade_lock:
if code in self.holdings:
buy_price = self.holdings[code]["buy_price"]
buy_price = buy_price or signal.get("price", sell_price)
profit_pct = (sell_price - buy_price) / buy_price if buy_price else 0
profit_krw = (sell_price - buy_price) * fill["filled_volume"]
fee = fill.get("paid_fee", 0)
# [즉시 저장 원칙] DB 이동 (active_trades → trade_history, 원자적)
self.db.close_trade(code, sell_price, reason)
# 메모리에서 제거 (락 보호)
with self._trade_lock:
self.holdings.pop(code, None)
# 재진입 쿨다운 기록
self.recently_sold[code] = time.time()
# WS 구독 해제 (불필요한 데이터 수신 차단)
self.ws_cache.unsubscribe(code)
logger.info(
f"✅ [매도체결] {name}({code}): "
f"{sell_price:,.4f}× {fill['filled_volume']:.8f} "
f"| {profit_pct*100:+.2f}% / {profit_krw:+,.0f}원 | 사유: {reason}"
)
send_mm(
f"🔴 **매도체결** {name}({code})\n"
f"체결가: {sell_price:,.4f}원 | 사유: {reason}\n"
f"수익률: {profit_pct*100:+.2f}% | 손익: {profit_krw:+,.0f}원 | 수수료: {fee:,.4f}\n"
f"보유: {len(self.holdings)}종목"
)
return True
# --------------------------------------------------------
# 메인 루프
# --------------------------------------------------------
def run(self):
"""
메인 루프 (★ Ver2: 완전 논블로킹)
[흐름]
1. 전체 KRW 마켓 목록 로드
2. WebSocket 스레드 시작 (보유 종목 즉시 구독)
3. 매도 루프 (2초 주기) → WS 캐시 기반, 비동기 Worker
4. 매수 스캔 (UPBIT_SCAN_INTERVAL_SEC 주기) → REST 캔들, 비동기 Worker
"""
logger.info("🚀 Upbit 단타 트레이더 V2 시작! (비동기 매수/매도)")
# ── 시작 시 계좌 잔고 조회 ──────────────────────────────────────────
try:
accounts = self.client.get_accounts()
krw_balance = 0.0
coin_holdings = []
for acc in accounts:
cur = acc.get("currency", "")
bal = float(acc.get("balance", 0))
locked = float(acc.get("locked", 0))
avg_buy = float(acc.get("avg_buy_price", 0))
if cur == "KRW":
krw_balance = bal + locked
elif bal + locked > 0 and avg_buy > 0:
coin_holdings.append({
"coin": cur,
"qty": bal + locked,
"avg": avg_buy,
"eval": (bal + locked) * avg_buy, # 매수 기준 평가금
})
total_eval = krw_balance + sum(c["eval"] for c in coin_holdings)
logger.info(f"💰 ── 계좌 현황 ──────────────────────────────")
logger.info(f"💰 KRW 가용 잔고 : {krw_balance:>15,.0f}")
for c in coin_holdings:
logger.info(
f"💰 {c['coin']:<10} 보유 {c['qty']:.4f} "
f"평단 {c['avg']:,.0f} 평가 {c['eval']:,.0f}"
)
logger.info(f"💰 총 평가금액 : {total_eval:>15,.0f}")
logger.info(f"💰 ─────────────────────────────────────────")
except Exception as e:
logger.warning(f"⚠️ 계좌 조회 실패: {e}")
# KRW 마켓 목록 로드
all_markets = [
m["market"] for m in self.client.get_market_list()
if m["market"].startswith("KRW-")
]
logger.info(f"📋 KRW 마켓 {len(all_markets)}개 로드")
# WebSocket 시작 (보유 종목 즉시 구독)
self.ws_cache.start(initial_markets=list(self.holdings.keys()))
time.sleep(2) # WS 연결 안정화 대기
# last_scan_time을 현재 시각으로 초기화 → 시작 직후 즉시 스캔 방지
# (scan_interval 만큼 대기 후 첫 스캔 실행)
last_scan_time = time.time()
scan_interval_sec = get_env_int("UPBIT_SCAN_INTERVAL_SEC", 60)
# Heartbeat: 마지막으로 로그 찍은 시각 (30초마다 대기 상황 출력)
last_heartbeat = time.time()
# 스캔 전용 스레드 중복 실행 방지 플래그
_scan_running = False
_scan_lock = threading.Lock()
def _scan_worker():
"""
매수 스캔을 별도 스레드에서 실행 → 메인 매도 감시 루프 블로킹 없음
scan_buy_candidates 는 243개 마켓 × REST API 호출이라 수십 초 소요됨
"""
nonlocal _scan_running
try:
with self._trade_lock:
current_holdings = len(self.holdings)
active_buying = len(self._buying)
if current_holdings + active_buying >= self.max_stocks:
return # 슬롯 없으면 스캔 생략
logger.info(f"🔍 매수 스캔 시작 ({len(all_markets)}개 마켓)")
self.db.clear_old_candidates()
candidates = self.scan_buy_candidates(all_markets)
logger.info(f"🎯 후보 {len(candidates)}개 발견")
top_n = get_env_int("UPBIT_BUY_TOP_N", 2)
for cand in candidates[:top_n]:
code = cand["code"]
with self._trade_lock:
already_hold = code in self.holdings
if already_hold or code in self._buying or code in self._selling:
continue
if len(self.holdings) + len(self._buying) >= self.max_stocks:
break
# 매수 직전 WS 최신가로 가격 갱신 (슬리피지 최소화)
ws_p = self.ws_cache.get_price(code, max_age_sec=5.0)
if ws_p:
cand["price"] = ws_p["price"]
self._buying.add(code)
# ★ 비동기 매수 Worker 스레드 (메인 루프 블로킹 없음)
t = threading.Thread(
target=self._buy_worker, args=(cand,), daemon=True, name=f"Buy-{code}"
)
t.start()
except Exception as e:
logger.exception(f"❌ 스캔 스레드 예외: {e}")
finally:
with _scan_lock:
_scan_running = False
try:
while True:
now = time.time()
# ① 매도 신호 체크 (2초 주기, WS 캐시 기반) — 최우선 실행
sell_signals = self.check_sell_signals()
for sig in sell_signals:
code = sig["code"]
if code in self._selling:
continue # 이미 매도 처리 중
self._selling.add(code)
# ★ 비동기 매도 Worker 스레드 생성 (메인 루프 블로킹 없음)
t = threading.Thread(
target=self._sell_worker, args=(sig,), daemon=True, name=f"Sell-{code}"
)
t.start()
# ② 매수 스캔 — scan_interval 주기, 별도 스레드로 실행 (비블로킹)
scan_interval_sec = get_env_int("UPBIT_SCAN_INTERVAL_SEC", 60)
if now - last_scan_time >= scan_interval_sec:
last_scan_time = now
with _scan_lock:
if not _scan_running:
# 이전 스캔이 끝나지 않았으면 이번 주기 스킵 (중복 방지)
_scan_running = True
t = threading.Thread(target=_scan_worker, daemon=True, name="Scan")
t.start()
else:
logger.info("⏭️ 이전 스캔 진행 중 — 이번 주기 스킵")
# ③ Heartbeat — 30초마다 현재 상태 출력 (로그 무음 방지)
heartbeat_interval = get_env_int("UPBIT_HEARTBEAT_SEC", 30)
if now - last_heartbeat >= heartbeat_interval:
last_heartbeat = now
next_scan_in = max(0, scan_interval_sec - (now - last_scan_time))
with self._trade_lock:
h_count = len(self.holdings)
try:
krw = self.client.get_krw_balance()
except Exception:
krw = -1
logger.info(
f"💓 [Heartbeat] 보유={h_count}종목 | "
f"KRW={krw:,.0f}원 | "
f"매수중={len(self._buying)} | 매도중={len(self._selling)} | "
f"다음스캔={next_scan_in:.0f}초 후"
)
# ④ 2초 대기 (매도 감시 루프 주기)
time.sleep(2)
except KeyboardInterrupt:
logger.info("⛔ 사용자 종료 요청")
except Exception as e:
logger.exception(f"❌ 메인 루프 예외: {e}")
send_mm(f"🚨 **Upbit V2 봇 비정상 종료**: {e}")
finally:
self.ws_cache.stop()
logger.info("🛑 Upbit 단타 트레이더 V2 종료")
# ============================================================
# 진입점
# ============================================================
if __name__ == "__main__":
trader = UpbitShortTraderV2()
trader.run()