File MAEStrategy.py of Package fint-bot

"""
Усовершенствованная MAE (Moving Average Envelope) Strategy для реальной торговли
"""
from __future__ import annotations

from typing import Optional, Dict, Any
import numpy as np
from dataclasses import dataclass
from datetime import datetime
from tinkoff.invest import Candle, CandleInterval, OrderDirection, MarketDataResponse, OrderType

from robotlib.strategy import TradeStrategyBase, TradeStrategyParams, StrategyDecision, RobotTradeOrder
from robotlib.money import Money
from robotlib.vizualization import Visualizer


@dataclass
class PositionInfo:
    """Информация о текущей позиции"""
    entry_time: datetime
    entry_price: float
    quantity: int
    direction: OrderDirection
    stop_loss: float = 0.0
    take_profit: float = 0.0


@dataclass
class TradingState:
    """Состояние торговой сессии"""
    last_signal_time: Optional[datetime] = None
    consecutive_losses: int = 0
    consecutive_wins: int = 0
    daily_pnl: float = 0.0
    current_position: Optional[PositionInfo] = None
    signals_count: Dict[str, int] = None
    
    def __post_init__(self):
        if self.signals_count is None:
            self.signals_count = {'buy': 0, 'sell': 0, 'skip': 0}


class AdvancedMAEStrategy(TradeStrategyBase):
    """Продвинутая стратегия на основе скользящих средних и конвертов для реальной торговли"""
    strategy_id = "advanced_mae_strategy"
    candle_subscription_interval = CandleInterval.CANDLE_INTERVAL_5_MIN
    
    def __init__(self, 
                 fast_window: int = 9,
                 slow_window: int = 21,
                 envelope_percent: float = 0.015,
                 atr_period: int = 14,
                 risk_per_trade: float = 0.02,
                 reward_ratio: float = 2.0,
                 max_consecutive_losses: int = 3,
                 daily_loss_limit: float = 0.05,
                 min_trade_interval: int = 15,
                 visualizer: Optional[Visualizer] = None):
        
        self.fast_window = fast_window
        self.slow_window = slow_window
        self.envelope_percent = envelope_percent
        self.atr_period = atr_period
        self.risk_per_trade = risk_per_trade
        self.reward_ratio = reward_ratio
        self.max_consecutive_losses = max_consecutive_losses
        self.daily_loss_limit = daily_loss_limit
        self.min_trade_interval = min_trade_interval
        
        self.visualizer = visualizer
        self.instrument_info = None
        self.prices = []
        self.highs = []
        self.lows = []
        self.state = TradingState()
        self.daily_start_balance = 0.0
        self._last_trading_day = None
        
    def load_instrument_info(self, instrument_info):
        """Загрузить информацию об инструменте"""
        self.instrument_info = instrument_info
        
    def load_candles(self, candles: list[Candle]) -> None:
        """Загрузить исторические свечи"""
        for candle in candles:
            price = self._quotation_to_float(candle.close)
            high = self._quotation_to_float(candle.high)
            low = self._quotation_to_float(candle.low)
            
            self.prices.append(price)
            self.highs.append(high)
            self.lows.append(low)
            
    def _quotation_to_float(self, quotation) -> float:
        """Конвертация Quotation в float"""
        return quotation.units + quotation.nano / (10 ** 9)
    
    def _calculate_atr(self) -> float:
        """Расчет Average True Range для определения волатильности"""
        if len(self.prices) < self.atr_period + 1:
            return 0.0
            
        tr_values = []
        for i in range(1, min(len(self.prices), len(self.highs), len(self.lows))):
            high = self.highs[i]
            low = self.lows[i]
            prev_close = self.prices[i-1]
            
            tr1 = high - low
            tr2 = abs(high - prev_close)
            tr3 = abs(low - prev_close)
            tr_values.append(max(tr1, tr2, tr3))
            
        if len(tr_values) >= self.atr_period:
            return np.mean(tr_values[-self.atr_period:])
        return 0.0
    
    def _calculate_dynamic_envelope(self, ma: float, atr: float) -> tuple:
        """Динамический расчет конверта на основе волатильности"""
        if atr > 0 and ma > 0:
            # Адаптивный конверт: базовый % + поправка на волатильность
            volatility_ratio = atr / ma
            dynamic_percent = self.envelope_percent + volatility_ratio * 0.5
            dynamic_percent = min(dynamic_percent, 0.05)  # Ограничение 5%
        else:
            dynamic_percent = self.envelope_percent
            
        upper_envelope = ma * (1 + dynamic_percent)
        lower_envelope = ma * (1 - dynamic_percent)
        
        return upper_envelope, lower_envelope, dynamic_percent
    
    def _calculate_position_size(self, price: float, stop_loss: float, balance: float) -> int:
        """Расчет размера позиции на основе управления капиталом"""
        if stop_loss == price:
            return 0
            
        risk_amount = balance * self.risk_per_trade
        risk_per_share = abs(price - stop_loss)
        
        if risk_per_share == 0:
            return 0
            
        # Количество акций, которые можно купить с учетом риска
        shares = int(risk_amount / risk_per_share)
        
        # Учет минимального лота
        if self.instrument_info and hasattr(self.instrument_info, 'lot'):
            lot_size = self.instrument_info.lot
            shares = (shares // lot_size) * lot_size
        
        # Минимальная позиция - 1 лот
        if shares < (self.instrument_info.lot if self.instrument_info else 1):
            return 0
            
        return shares
    
    def _check_trading_conditions(self, current_time: datetime, params: TradeStrategyParams) -> tuple:
        """Проверка условий для торговли"""
        reasons = []
        
        # 1. Проверка дневного лимита убытков
        if self.daily_start_balance > 0 and self.state.daily_pnl < -self.daily_start_balance * self.daily_loss_limit:
            reasons.append(f"Достигнут дневной лимит убытков: {self.state.daily_pnl:.2f}")
            return False, reasons
        
        # 2. Проверка последовательных убытков
        if self.state.consecutive_losses >= self.max_consecutive_losses:
            reasons.append(f"Достигнут лимит последовательных убытков: {self.state.consecutive_losses}")
            return False, reasons
        
        # 3. Проверка времени (не торговать в первые и последние 15 минут сессии)
        hour = current_time.hour
        minute = current_time.minute
        
        # Московская биржа (пример: 10:00-18:45)
        if (hour == 10 and minute < 15) or (hour == 18 and minute > 30):
            reasons.append("Торговля в начале/конце сессии ограничена")
            return False, reasons
        
        # 4. Проверка минимального баланса
        if params.currency_balance < 1000:  # Минимальный баланс 1000 руб
            reasons.append(f"Недостаточный баланс: {params.currency_balance:.2f}")
            return False, reasons
        
        return True, reasons
    
    def _update_trading_state(self, pnl_change: float = 0.0):
        """Обновление состояния торговли"""
        self.state.daily_pnl += pnl_change
        
        if pnl_change > 0:
            self.state.consecutive_wins += 1
            self.state.consecutive_losses = 0
        elif pnl_change < 0:
            self.state.consecutive_losses += 1
            self.state.consecutive_wins = 0
        
        # Сброс дневной статистики при новом дне
        current_day = datetime.now().date()
        if self._last_trading_day != current_day:
            self._last_trading_day = current_day
            self.state.daily_pnl = 0.0
            self.state.consecutive_losses = 0
            self.state.consecutive_wins = 0
            self.daily_start_balance = 0.0
    
    def decide(self, market_data: MarketDataResponse, params: TradeStrategyParams) -> StrategyDecision:
        """Принять решение на основе рыночных данных"""
        if not market_data.candle:
            return StrategyDecision()
            
        candle = market_data.candle
        current_time = candle.time
        price = self._quotation_to_float(candle.close)
        high = self._quotation_to_float(candle.high)
        low = self._quotation_to_float(candle.low)
        
        # Сохраняем начальный баланс для дневного лимита
        if self.daily_start_balance == 0 and params.currency_balance > 0:
            self.daily_start_balance = params.currency_balance
        
        # Обновляем историю цен
        self.prices.append(price)
        self.highs.append(high)
        self.lows.append(low)
        
        # Сохраняем только последние N значений для экономии памяти
        max_history = max(self.slow_window * 3, 500)
        if len(self.prices) > max_history:
            self.prices = self.prices[-max_history:]
            self.highs = self.highs[-max_history:]
            self.lows = self.lows[-max_history:]
        
        # Проверяем условия для торговли
        can_trade, reasons = self._check_trading_conditions(current_time, params)
        if not can_trade:
            if reasons and len(reasons) > 0:
                print(f"[{current_time.strftime('%H:%M')}] Пропуск сделки: {reasons[0]}")
            return StrategyDecision()
        
        # Рассчитываем индикаторы
        if len(self.prices) < self.slow_window:
            return StrategyDecision()
        
        # Быстрая и медленная MA
        fast_ma = np.mean(self.prices[-self.fast_window:])
        slow_ma = np.mean(self.prices[-self.slow_window:])
        
        # Определение тренда
        trend_direction = 1 if fast_ma > slow_ma else -1
        
        # Рассчитываем ATR для динамического конверта
        atr = self._calculate_atr()
        
        # Конверт для текущей MA (используем fast_ma для более быстрой реакции)
        upper, lower, dynamic_percent = self._calculate_dynamic_envelope(fast_ma, atr)
        
        # Визуализация
        if self.visualizer:
            self.visualizer.add_price(current_time, price)
        
        decision = StrategyDecision()
        
        # Проверка наличия текущей позиции
        if self.state.current_position:
            # Управление существующей позицией
            decision = self._manage_current_position(
                current_time, price, params, 
                self.state.current_position, upper, lower
            )
        else:
            # Поиск новой позиции
            decision = self._find_new_position(
                current_time, price, params, trend_direction,
                upper, lower, fast_ma, atr
            )
        
        # Обновляем график
        if self.visualizer:
            self.visualizer.update_plot()
            
        return decision
    
    def _manage_current_position(self, current_time: datetime, price: float, 
                                params: TradeStrategyParams, position: PositionInfo,
                                upper: float, lower: float) -> StrategyDecision:
        """Управление существующей позицией (стоп-лосс, тейк-профит, трейлинг)"""
        
        # Расчет PnL
        if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
            pnl = (price - position.entry_price) * position.quantity
        else:
            pnl = (position.entry_price - price) * position.quantity
        
        # Проверка стоп-лосса
        if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
            if price <= position.stop_loss:
                # Сработал стоп-лосс
                print(f"[{current_time.strftime('%H:%M')}] СТОП-ЛОСС! Позиция закрыта с убытком: {pnl:.2f}")
                self._update_trading_state(pnl)
                self.state.signals_count['sell'] += 1
                self.state.current_position = None
                
                if self.visualizer:
                    self.visualizer.add_sell(current_time)
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=position.quantity,
                        direction=OrderDirection.ORDER_DIRECTION_SELL,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
        else:
            if price >= position.stop_loss:
                # Сработал стоп-лосс для шорта
                print(f"[{current_time.strftime('%H:%M')}] СТОП-ЛОСС! Позиция закрыта с убытком: {pnl:.2f}")
                self._update_trading_state(pnl)
                self.state.signals_count['buy'] += 1
                self.state.current_position = None
                
                if self.visualizer:
                    self.visualizer.add_buy(current_time)
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=position.quantity,
                        direction=OrderDirection.ORDER_DIRECTION_BUY,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
        
        # Проверка тейк-профита
        if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
            if price >= position.take_profit:
                # Сработал тейк-профит
                print(f"[{current_time.strftime('%H:%M')}] ТЕЙК-ПРОФИТ! Позиция закрыта с прибылью: {pnl:.2f}")
                self._update_trading_state(pnl)
                self.state.signals_count['sell'] += 1
                self.state.current_position = None
                
                if self.visualizer:
                    self.visualizer.add_sell(current_time)
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=position.quantity,
                        direction=OrderDirection.ORDER_DIRECTION_SELL,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
        else:
            if price <= position.take_profit:
                # Сработал тейк-профит для шорта
                print(f"[{current_time.strftime('%H:%M')}] ТЕЙК-ПРОФИТ! Позиция закрыта с прибылью: {pnl:.2f}")
                self._update_trading_state(pnl)
                self.state.signals_count['buy'] += 1
                self.state.current_position = None
                
                if self.visualizer:
                    self.visualizer.add_buy(current_time)
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=position.quantity,
                        direction=OrderDirection.ORDER_DIRECTION_BUY,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
        
        # Трейлинг-стоп для длинных позиций
        if position.direction == OrderDirection.ORDER_DIRECTION_BUY and price > position.entry_price:
            # Поднимаем стоп-лосс при движении в нашу пользу
            new_stop = max(position.stop_loss, price * 0.98)  # 2% от текущей цены
            if new_stop > position.stop_loss:
                position.stop_loss = new_stop
                print(f"[{current_time.strftime('%H:%M')}] Трейлинг-стоп обновлен: {position.stop_loss:.2f}")
        
        # Трейлинг-стоп для коротких позиций
        if position.direction == OrderDirection.ORDER_DIRECTION_SELL and price < position.entry_price:
            # Опускаем стоп-лосс при движении в нашу пользу
            new_stop = min(position.stop_loss, price * 1.02)  # 2% от текущей цены
            if new_stop < position.stop_loss:
                position.stop_loss = new_stop
                print(f"[{current_time.strftime('%H:%M')}] Трейлинг-стоп обновлен: {position.stop_loss:.2f}")
        
        return StrategyDecision()
    
    def _find_new_position(self, current_time: datetime, price: float, 
                          params: TradeStrategyParams, trend_direction: int,
                          upper: float, lower: float, ma: float, atr: float) -> StrategyDecision:
        """Поиск новой позиции для входа"""
        
        # Проверка минимального интервала между сигналами
        if self.state.last_signal_time:
            time_diff = (current_time - self.state.last_signal_time).total_seconds() / 60
            if time_diff < self.min_trade_interval:
                self.state.signals_count['skip'] += 1
                return StrategyDecision()
        
        # Определяем волатильность
        is_volatile = atr > (ma * 0.02)  # ATR > 2% от цены
        
        # Логика входа
        entry_signal = None
        stop_loss = 0.0
        take_profit = 0.0
        signal_reason = ""
        
        if price <= lower and trend_direction == 1:  # Цена у нижней границы + восходящий тренд
            # Сигнал на покупку
            entry_signal = OrderDirection.ORDER_DIRECTION_BUY
            stop_loss = price * 0.98  # Стоп-лосс 2% ниже цены входа
            take_profit = price * (1 + self.reward_ratio * 0.02)  # Тейк-профит с соотношением 2:1
            signal_reason = f"BUY: цена {price:.2f} ≤ ниж.конверт {lower:.2f}, тренд вверх"
            
            # В условиях высокой волатильности увеличиваем стоп
            if is_volatile:
                stop_loss = price * 0.97  # 3% стоп при высокой волатильности
                signal_reason += f" (выс.волатильность, стоп {stop_loss:.2f})"
                
        elif price >= upper and trend_direction == -1:  # Цена у верхней границы + нисходящий тренд
            # Сигнал на продажу
            entry_signal = OrderDirection.ORDER_DIRECTION_SELL
            stop_loss = price * 1.02  # Стоп-лосс 2% выше цены входа
            take_profit = price * (1 - self.reward_ratio * 0.02)  # Тейк-профит с соотношением 2:1
            signal_reason = f"SELL: цена {price:.2f} ≥ верх.конверт {upper:.2f}, тренд вниз"
            
            # В условиях высокой волатильности увеличиваем стоп
            if is_volatile:
                stop_loss = price * 1.03  # 3% стоп при высокой волатильности
                signal_reason += f" (выс.волатильность, стоп {stop_loss:.2f})"
        
        if entry_signal:
            # Расчет размера позиции
            position_size = self._calculate_position_size(price, stop_loss, params.currency_balance)
            
            if position_size > 0:
                # Проверяем возможность исполнения
                if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
                    cost = position_size * price
                    if cost > params.currency_balance:
                        self.state.signals_count['skip'] += 1
                        print(f"[{current_time.strftime('%H:%M')}] Недостаточно средств для покупки")
                        return StrategyDecision()
                else:
                    if position_size > params.instrument_balance:
                        self.state.signals_count['skip'] += 1
                        print(f"[{current_time.strftime('%H:%M')}] Недостаточно бумаг для продажи")
                        return StrategyDriver()
                
                # Создаем информацию о позиции
                self.state.current_position = PositionInfo(
                    entry_time=current_time,
                    entry_price=price,
                    quantity=position_size,
                    direction=entry_signal,
                    stop_loss=stop_loss,
                    take_profit=take_profit
                )
                
                self.state.last_signal_time = current_time
                signal_type = 'buy' if entry_signal == OrderDirection.ORDER_DIRECTION_BUY else 'sell'
                self.state.signals_count[signal_type] += 1
                
                # Визуализация
                if self.visualizer:
                    if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
                        self.visualizer.add_buy(current_time)
                    else:
                        self.visualizer.add_sell(current_time)
                
                print(f"[{current_time.strftime('%H:%M')}] {signal_reason}")
                print(f"    Размер позиции: {position_size}, Стоп: {stop_loss:.2f}, Тейк: {take_profit:.2f}")
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=position_size,
                        direction=entry_signal,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
            else:
                print(f"[{current_time.strftime('%H:%M')}] Размер позиции 0 (риск {self.risk_per_trade*100}% от баланса)")
        
        self.state.signals_count['skip'] += 1
        return StrategyDecision()
    
    def decide_by_candle(self, candle: Candle, params: TradeStrategyParams) -> StrategyDecision:
        """Упрощенная версия для бэктестирования"""
        # Для бэктеста используем упрощенную логику
        price = self._quotation_to_float(candle.close)
        current_time = candle.time
        self.prices.append(price)
        
        # Сохраняем начальный баланс
        if self.daily_start_balance == 0 and params.currency_balance > 0:
            self.daily_start_balance = params.currency_balance
        
        if len(self.prices) < self.slow_window:
            return StrategyDecision()
        
        # Быстрая и медленная MA
        fast_ma = np.mean(self.prices[-self.fast_window:])
        slow_ma = np.mean(self.prices[-self.slow_window:])
        
        # Определение тренда
        trend_direction = 1 if fast_ma > slow_ma else -1
        
        # Конверты
        upper = fast_ma * (1 + self.envelope_percent)
        lower = fast_ma * (1 - self.envelope_percent)
        
        # Управление существующей позицией
        if self.state.current_position:
            return self._manage_current_position(
                current_time, price, params, 
                self.state.current_position, upper, lower
            )
        
        # Поиск новой позиции
        entry_signal = None
        
        if price <= lower and trend_direction == 1:
            entry_signal = OrderDirection.ORDER_DIRECTION_BUY
        elif price >= upper and trend_direction == -1:
            entry_signal = OrderDirection.ORDER_DIRECTION_SELL
        
        if entry_signal:
            # Упрощенный расчет позиции для бэктеста
            if self.instrument_info:
                if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
                    max_quantity = int(params.currency_balance / (price * self.instrument_info.lot))
                    quantity = min(self.instrument_info.lot, max_quantity) if max_quantity > 0 else 0
                else:
                    quantity = min(self.instrument_info.lot, params.instrument_balance)
            else:
                quantity = 1 if (entry_signal == OrderDirection.ORDER_DIRECTION_BUY and params.currency_balance > price) \
                            else (1 if params.instrument_balance > 0 else 0)
            
            if quantity > 0:
                # Расчет стоп-лосса и тейк-профита
                if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
                    stop_loss = price * 0.98
                    take_profit = price * (1 + self.reward_ratio * 0.02)
                else:
                    stop_loss = price * 1.02
                    take_profit = price * (1 - self.reward_ratio * 0.02)
                
                self.state.current_position = PositionInfo(
                    entry_time=current_time,
                    entry_price=price,
                    quantity=quantity,
                    direction=entry_signal,
                    stop_loss=stop_loss,
                    take_profit=take_profit
                )
                
                return StrategyDecision(
                    robot_trade_order=RobotTradeOrder(
                        quantity=quantity,
                        direction=entry_signal,
                        order_type=OrderType.ORDER_TYPE_MARKET
                    )
                )
                
        return StrategyDecision()
    
    def get_trading_stats(self) -> Dict[str, Any]:
        """Получение статистики торговли"""
        position_info = None
        if self.state.current_position:
            position = self.state.current_position
            position_info = {
                'direction': 'BUY' if position.direction == OrderDirection.ORDER_DIRECTION_BUY else 'SELL',
                'entry_price': position.entry_price,
                'quantity': position.quantity,
                'stop_loss': position.stop_loss,
                'take_profit': position.take_profit,
                'entry_time': position.entry_time.strftime('%H:%M')
            }
        
        return {
            'signals': self.state.signals_count,
            'consecutive_losses': self.state.consecutive_losses,
            'consecutive_wins': self.state.consecutive_wins,
            'daily_pnl': self.state.daily_pnl,
            'has_position': self.state.current_position is not None,
            'position_info': position_info,
            'current_balance': self.daily_start_balance + self.state.daily_pnl if self.daily_start_balance > 0 else 0
        }


class MAEStrategy(AdvancedMAEStrategy):
    """Совместимость с существующим кодом"""
    def __init__(self, window: int = 20, envelope_percent: float = 0.02, visualizer: Optional[Visualizer] = None):
        # Конвертируем старые параметры в новые
        super().__init__(
            fast_window=max(5, window // 2),
            slow_window=window,
            envelope_percent=envelope_percent,
            visualizer=visualizer
        )
    
    def decide(self, market_data: MarketDataResponse, params: TradeStrategyParams) -> StrategyDecision:
        """Переопределяем для логирования"""
        decision = super().decide(market_data, params)
        
        # Логируем сигналы для совместимости
        if decision.robot_trade_order:
            direction = "ПОКУПКА" if decision.robot_trade_order.direction == OrderDirection.ORDER_DIRECTION_BUY else "ПРОДАЖА"
            print(f"[MAE] Сигнал на {direction}: {decision.robot_trade_order.quantity} лотов")
        
        return decision
openSUSE Build Service is sponsored by