File MAEStrategy.py of Package fint-bot
"""
Усовершенствованная MAE (Moving Average Envelope) Strategy для реальной торговли
"""
from __future__ import annotations
from typing import Optional, Dict, Any
import numpy as np
from dataclasses import dataclass
from datetime import datetime
from tinkoff.invest import Candle, CandleInterval, OrderDirection, MarketDataResponse, OrderType
from robotlib.strategy import TradeStrategyBase, TradeStrategyParams, StrategyDecision, RobotTradeOrder
from robotlib.money import Money
from robotlib.vizualization import Visualizer
@dataclass
class PositionInfo:
"""Информация о текущей позиции"""
entry_time: datetime
entry_price: float
quantity: int
direction: OrderDirection
stop_loss: float = 0.0
take_profit: float = 0.0
@dataclass
class TradingState:
"""Состояние торговой сессии"""
last_signal_time: Optional[datetime] = None
consecutive_losses: int = 0
consecutive_wins: int = 0
daily_pnl: float = 0.0
current_position: Optional[PositionInfo] = None
signals_count: Dict[str, int] = None
def __post_init__(self):
if self.signals_count is None:
self.signals_count = {'buy': 0, 'sell': 0, 'skip': 0}
class AdvancedMAEStrategy(TradeStrategyBase):
"""Продвинутая стратегия на основе скользящих средних и конвертов для реальной торговли"""
strategy_id = "advanced_mae_strategy"
candle_subscription_interval = CandleInterval.CANDLE_INTERVAL_5_MIN
def __init__(self,
fast_window: int = 9,
slow_window: int = 21,
envelope_percent: float = 0.015,
atr_period: int = 14,
risk_per_trade: float = 0.02,
reward_ratio: float = 2.0,
max_consecutive_losses: int = 3,
daily_loss_limit: float = 0.05,
min_trade_interval: int = 15,
visualizer: Optional[Visualizer] = None):
self.fast_window = fast_window
self.slow_window = slow_window
self.envelope_percent = envelope_percent
self.atr_period = atr_period
self.risk_per_trade = risk_per_trade
self.reward_ratio = reward_ratio
self.max_consecutive_losses = max_consecutive_losses
self.daily_loss_limit = daily_loss_limit
self.min_trade_interval = min_trade_interval
self.visualizer = visualizer
self.instrument_info = None
self.prices = []
self.highs = []
self.lows = []
self.state = TradingState()
self.daily_start_balance = 0.0
self._last_trading_day = None
def load_instrument_info(self, instrument_info):
"""Загрузить информацию об инструменте"""
self.instrument_info = instrument_info
def load_candles(self, candles: list[Candle]) -> None:
"""Загрузить исторические свечи"""
for candle in candles:
price = self._quotation_to_float(candle.close)
high = self._quotation_to_float(candle.high)
low = self._quotation_to_float(candle.low)
self.prices.append(price)
self.highs.append(high)
self.lows.append(low)
def _quotation_to_float(self, quotation) -> float:
"""Конвертация Quotation в float"""
return quotation.units + quotation.nano / (10 ** 9)
def _calculate_atr(self) -> float:
"""Расчет Average True Range для определения волатильности"""
if len(self.prices) < self.atr_period + 1:
return 0.0
tr_values = []
for i in range(1, min(len(self.prices), len(self.highs), len(self.lows))):
high = self.highs[i]
low = self.lows[i]
prev_close = self.prices[i-1]
tr1 = high - low
tr2 = abs(high - prev_close)
tr3 = abs(low - prev_close)
tr_values.append(max(tr1, tr2, tr3))
if len(tr_values) >= self.atr_period:
return np.mean(tr_values[-self.atr_period:])
return 0.0
def _calculate_dynamic_envelope(self, ma: float, atr: float) -> tuple:
"""Динамический расчет конверта на основе волатильности"""
if atr > 0 and ma > 0:
# Адаптивный конверт: базовый % + поправка на волатильность
volatility_ratio = atr / ma
dynamic_percent = self.envelope_percent + volatility_ratio * 0.5
dynamic_percent = min(dynamic_percent, 0.05) # Ограничение 5%
else:
dynamic_percent = self.envelope_percent
upper_envelope = ma * (1 + dynamic_percent)
lower_envelope = ma * (1 - dynamic_percent)
return upper_envelope, lower_envelope, dynamic_percent
def _calculate_position_size(self, price: float, stop_loss: float, balance: float) -> int:
"""Расчет размера позиции на основе управления капиталом"""
if stop_loss == price:
return 0
risk_amount = balance * self.risk_per_trade
risk_per_share = abs(price - stop_loss)
if risk_per_share == 0:
return 0
# Количество акций, которые можно купить с учетом риска
shares = int(risk_amount / risk_per_share)
# Учет минимального лота
if self.instrument_info and hasattr(self.instrument_info, 'lot'):
lot_size = self.instrument_info.lot
shares = (shares // lot_size) * lot_size
# Минимальная позиция - 1 лот
if shares < (self.instrument_info.lot if self.instrument_info else 1):
return 0
return shares
def _check_trading_conditions(self, current_time: datetime, params: TradeStrategyParams) -> tuple:
"""Проверка условий для торговли"""
reasons = []
# 1. Проверка дневного лимита убытков
if self.daily_start_balance > 0 and self.state.daily_pnl < -self.daily_start_balance * self.daily_loss_limit:
reasons.append(f"Достигнут дневной лимит убытков: {self.state.daily_pnl:.2f}")
return False, reasons
# 2. Проверка последовательных убытков
if self.state.consecutive_losses >= self.max_consecutive_losses:
reasons.append(f"Достигнут лимит последовательных убытков: {self.state.consecutive_losses}")
return False, reasons
# 3. Проверка времени (не торговать в первые и последние 15 минут сессии)
hour = current_time.hour
minute = current_time.minute
# Московская биржа (пример: 10:00-18:45)
if (hour == 10 and minute < 15) or (hour == 18 and minute > 30):
reasons.append("Торговля в начале/конце сессии ограничена")
return False, reasons
# 4. Проверка минимального баланса
if params.currency_balance < 1000: # Минимальный баланс 1000 руб
reasons.append(f"Недостаточный баланс: {params.currency_balance:.2f}")
return False, reasons
return True, reasons
def _update_trading_state(self, pnl_change: float = 0.0):
"""Обновление состояния торговли"""
self.state.daily_pnl += pnl_change
if pnl_change > 0:
self.state.consecutive_wins += 1
self.state.consecutive_losses = 0
elif pnl_change < 0:
self.state.consecutive_losses += 1
self.state.consecutive_wins = 0
# Сброс дневной статистики при новом дне
current_day = datetime.now().date()
if self._last_trading_day != current_day:
self._last_trading_day = current_day
self.state.daily_pnl = 0.0
self.state.consecutive_losses = 0
self.state.consecutive_wins = 0
self.daily_start_balance = 0.0
def decide(self, market_data: MarketDataResponse, params: TradeStrategyParams) -> StrategyDecision:
"""Принять решение на основе рыночных данных"""
if not market_data.candle:
return StrategyDecision()
candle = market_data.candle
current_time = candle.time
price = self._quotation_to_float(candle.close)
high = self._quotation_to_float(candle.high)
low = self._quotation_to_float(candle.low)
# Сохраняем начальный баланс для дневного лимита
if self.daily_start_balance == 0 and params.currency_balance > 0:
self.daily_start_balance = params.currency_balance
# Обновляем историю цен
self.prices.append(price)
self.highs.append(high)
self.lows.append(low)
# Сохраняем только последние N значений для экономии памяти
max_history = max(self.slow_window * 3, 500)
if len(self.prices) > max_history:
self.prices = self.prices[-max_history:]
self.highs = self.highs[-max_history:]
self.lows = self.lows[-max_history:]
# Проверяем условия для торговли
can_trade, reasons = self._check_trading_conditions(current_time, params)
if not can_trade:
if reasons and len(reasons) > 0:
print(f"[{current_time.strftime('%H:%M')}] Пропуск сделки: {reasons[0]}")
return StrategyDecision()
# Рассчитываем индикаторы
if len(self.prices) < self.slow_window:
return StrategyDecision()
# Быстрая и медленная MA
fast_ma = np.mean(self.prices[-self.fast_window:])
slow_ma = np.mean(self.prices[-self.slow_window:])
# Определение тренда
trend_direction = 1 if fast_ma > slow_ma else -1
# Рассчитываем ATR для динамического конверта
atr = self._calculate_atr()
# Конверт для текущей MA (используем fast_ma для более быстрой реакции)
upper, lower, dynamic_percent = self._calculate_dynamic_envelope(fast_ma, atr)
# Визуализация
if self.visualizer:
self.visualizer.add_price(current_time, price)
decision = StrategyDecision()
# Проверка наличия текущей позиции
if self.state.current_position:
# Управление существующей позицией
decision = self._manage_current_position(
current_time, price, params,
self.state.current_position, upper, lower
)
else:
# Поиск новой позиции
decision = self._find_new_position(
current_time, price, params, trend_direction,
upper, lower, fast_ma, atr
)
# Обновляем график
if self.visualizer:
self.visualizer.update_plot()
return decision
def _manage_current_position(self, current_time: datetime, price: float,
params: TradeStrategyParams, position: PositionInfo,
upper: float, lower: float) -> StrategyDecision:
"""Управление существующей позицией (стоп-лосс, тейк-профит, трейлинг)"""
# Расчет PnL
if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
pnl = (price - position.entry_price) * position.quantity
else:
pnl = (position.entry_price - price) * position.quantity
# Проверка стоп-лосса
if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
if price <= position.stop_loss:
# Сработал стоп-лосс
print(f"[{current_time.strftime('%H:%M')}] СТОП-ЛОСС! Позиция закрыта с убытком: {pnl:.2f}")
self._update_trading_state(pnl)
self.state.signals_count['sell'] += 1
self.state.current_position = None
if self.visualizer:
self.visualizer.add_sell(current_time)
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=position.quantity,
direction=OrderDirection.ORDER_DIRECTION_SELL,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
else:
if price >= position.stop_loss:
# Сработал стоп-лосс для шорта
print(f"[{current_time.strftime('%H:%M')}] СТОП-ЛОСС! Позиция закрыта с убытком: {pnl:.2f}")
self._update_trading_state(pnl)
self.state.signals_count['buy'] += 1
self.state.current_position = None
if self.visualizer:
self.visualizer.add_buy(current_time)
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=position.quantity,
direction=OrderDirection.ORDER_DIRECTION_BUY,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
# Проверка тейк-профита
if position.direction == OrderDirection.ORDER_DIRECTION_BUY:
if price >= position.take_profit:
# Сработал тейк-профит
print(f"[{current_time.strftime('%H:%M')}] ТЕЙК-ПРОФИТ! Позиция закрыта с прибылью: {pnl:.2f}")
self._update_trading_state(pnl)
self.state.signals_count['sell'] += 1
self.state.current_position = None
if self.visualizer:
self.visualizer.add_sell(current_time)
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=position.quantity,
direction=OrderDirection.ORDER_DIRECTION_SELL,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
else:
if price <= position.take_profit:
# Сработал тейк-профит для шорта
print(f"[{current_time.strftime('%H:%M')}] ТЕЙК-ПРОФИТ! Позиция закрыта с прибылью: {pnl:.2f}")
self._update_trading_state(pnl)
self.state.signals_count['buy'] += 1
self.state.current_position = None
if self.visualizer:
self.visualizer.add_buy(current_time)
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=position.quantity,
direction=OrderDirection.ORDER_DIRECTION_BUY,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
# Трейлинг-стоп для длинных позиций
if position.direction == OrderDirection.ORDER_DIRECTION_BUY and price > position.entry_price:
# Поднимаем стоп-лосс при движении в нашу пользу
new_stop = max(position.stop_loss, price * 0.98) # 2% от текущей цены
if new_stop > position.stop_loss:
position.stop_loss = new_stop
print(f"[{current_time.strftime('%H:%M')}] Трейлинг-стоп обновлен: {position.stop_loss:.2f}")
# Трейлинг-стоп для коротких позиций
if position.direction == OrderDirection.ORDER_DIRECTION_SELL and price < position.entry_price:
# Опускаем стоп-лосс при движении в нашу пользу
new_stop = min(position.stop_loss, price * 1.02) # 2% от текущей цены
if new_stop < position.stop_loss:
position.stop_loss = new_stop
print(f"[{current_time.strftime('%H:%M')}] Трейлинг-стоп обновлен: {position.stop_loss:.2f}")
return StrategyDecision()
def _find_new_position(self, current_time: datetime, price: float,
params: TradeStrategyParams, trend_direction: int,
upper: float, lower: float, ma: float, atr: float) -> StrategyDecision:
"""Поиск новой позиции для входа"""
# Проверка минимального интервала между сигналами
if self.state.last_signal_time:
time_diff = (current_time - self.state.last_signal_time).total_seconds() / 60
if time_diff < self.min_trade_interval:
self.state.signals_count['skip'] += 1
return StrategyDecision()
# Определяем волатильность
is_volatile = atr > (ma * 0.02) # ATR > 2% от цены
# Логика входа
entry_signal = None
stop_loss = 0.0
take_profit = 0.0
signal_reason = ""
if price <= lower and trend_direction == 1: # Цена у нижней границы + восходящий тренд
# Сигнал на покупку
entry_signal = OrderDirection.ORDER_DIRECTION_BUY
stop_loss = price * 0.98 # Стоп-лосс 2% ниже цены входа
take_profit = price * (1 + self.reward_ratio * 0.02) # Тейк-профит с соотношением 2:1
signal_reason = f"BUY: цена {price:.2f} ≤ ниж.конверт {lower:.2f}, тренд вверх"
# В условиях высокой волатильности увеличиваем стоп
if is_volatile:
stop_loss = price * 0.97 # 3% стоп при высокой волатильности
signal_reason += f" (выс.волатильность, стоп {stop_loss:.2f})"
elif price >= upper and trend_direction == -1: # Цена у верхней границы + нисходящий тренд
# Сигнал на продажу
entry_signal = OrderDirection.ORDER_DIRECTION_SELL
stop_loss = price * 1.02 # Стоп-лосс 2% выше цены входа
take_profit = price * (1 - self.reward_ratio * 0.02) # Тейк-профит с соотношением 2:1
signal_reason = f"SELL: цена {price:.2f} ≥ верх.конверт {upper:.2f}, тренд вниз"
# В условиях высокой волатильности увеличиваем стоп
if is_volatile:
stop_loss = price * 1.03 # 3% стоп при высокой волатильности
signal_reason += f" (выс.волатильность, стоп {stop_loss:.2f})"
if entry_signal:
# Расчет размера позиции
position_size = self._calculate_position_size(price, stop_loss, params.currency_balance)
if position_size > 0:
# Проверяем возможность исполнения
if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
cost = position_size * price
if cost > params.currency_balance:
self.state.signals_count['skip'] += 1
print(f"[{current_time.strftime('%H:%M')}] Недостаточно средств для покупки")
return StrategyDecision()
else:
if position_size > params.instrument_balance:
self.state.signals_count['skip'] += 1
print(f"[{current_time.strftime('%H:%M')}] Недостаточно бумаг для продажи")
return StrategyDriver()
# Создаем информацию о позиции
self.state.current_position = PositionInfo(
entry_time=current_time,
entry_price=price,
quantity=position_size,
direction=entry_signal,
stop_loss=stop_loss,
take_profit=take_profit
)
self.state.last_signal_time = current_time
signal_type = 'buy' if entry_signal == OrderDirection.ORDER_DIRECTION_BUY else 'sell'
self.state.signals_count[signal_type] += 1
# Визуализация
if self.visualizer:
if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
self.visualizer.add_buy(current_time)
else:
self.visualizer.add_sell(current_time)
print(f"[{current_time.strftime('%H:%M')}] {signal_reason}")
print(f" Размер позиции: {position_size}, Стоп: {stop_loss:.2f}, Тейк: {take_profit:.2f}")
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=position_size,
direction=entry_signal,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
else:
print(f"[{current_time.strftime('%H:%M')}] Размер позиции 0 (риск {self.risk_per_trade*100}% от баланса)")
self.state.signals_count['skip'] += 1
return StrategyDecision()
def decide_by_candle(self, candle: Candle, params: TradeStrategyParams) -> StrategyDecision:
"""Упрощенная версия для бэктестирования"""
# Для бэктеста используем упрощенную логику
price = self._quotation_to_float(candle.close)
current_time = candle.time
self.prices.append(price)
# Сохраняем начальный баланс
if self.daily_start_balance == 0 and params.currency_balance > 0:
self.daily_start_balance = params.currency_balance
if len(self.prices) < self.slow_window:
return StrategyDecision()
# Быстрая и медленная MA
fast_ma = np.mean(self.prices[-self.fast_window:])
slow_ma = np.mean(self.prices[-self.slow_window:])
# Определение тренда
trend_direction = 1 if fast_ma > slow_ma else -1
# Конверты
upper = fast_ma * (1 + self.envelope_percent)
lower = fast_ma * (1 - self.envelope_percent)
# Управление существующей позицией
if self.state.current_position:
return self._manage_current_position(
current_time, price, params,
self.state.current_position, upper, lower
)
# Поиск новой позиции
entry_signal = None
if price <= lower and trend_direction == 1:
entry_signal = OrderDirection.ORDER_DIRECTION_BUY
elif price >= upper and trend_direction == -1:
entry_signal = OrderDirection.ORDER_DIRECTION_SELL
if entry_signal:
# Упрощенный расчет позиции для бэктеста
if self.instrument_info:
if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
max_quantity = int(params.currency_balance / (price * self.instrument_info.lot))
quantity = min(self.instrument_info.lot, max_quantity) if max_quantity > 0 else 0
else:
quantity = min(self.instrument_info.lot, params.instrument_balance)
else:
quantity = 1 if (entry_signal == OrderDirection.ORDER_DIRECTION_BUY and params.currency_balance > price) \
else (1 if params.instrument_balance > 0 else 0)
if quantity > 0:
# Расчет стоп-лосса и тейк-профита
if entry_signal == OrderDirection.ORDER_DIRECTION_BUY:
stop_loss = price * 0.98
take_profit = price * (1 + self.reward_ratio * 0.02)
else:
stop_loss = price * 1.02
take_profit = price * (1 - self.reward_ratio * 0.02)
self.state.current_position = PositionInfo(
entry_time=current_time,
entry_price=price,
quantity=quantity,
direction=entry_signal,
stop_loss=stop_loss,
take_profit=take_profit
)
return StrategyDecision(
robot_trade_order=RobotTradeOrder(
quantity=quantity,
direction=entry_signal,
order_type=OrderType.ORDER_TYPE_MARKET
)
)
return StrategyDecision()
def get_trading_stats(self) -> Dict[str, Any]:
"""Получение статистики торговли"""
position_info = None
if self.state.current_position:
position = self.state.current_position
position_info = {
'direction': 'BUY' if position.direction == OrderDirection.ORDER_DIRECTION_BUY else 'SELL',
'entry_price': position.entry_price,
'quantity': position.quantity,
'stop_loss': position.stop_loss,
'take_profit': position.take_profit,
'entry_time': position.entry_time.strftime('%H:%M')
}
return {
'signals': self.state.signals_count,
'consecutive_losses': self.state.consecutive_losses,
'consecutive_wins': self.state.consecutive_wins,
'daily_pnl': self.state.daily_pnl,
'has_position': self.state.current_position is not None,
'position_info': position_info,
'current_balance': self.daily_start_balance + self.state.daily_pnl if self.daily_start_balance > 0 else 0
}
class MAEStrategy(AdvancedMAEStrategy):
"""Совместимость с существующим кодом"""
def __init__(self, window: int = 20, envelope_percent: float = 0.02, visualizer: Optional[Visualizer] = None):
# Конвертируем старые параметры в новые
super().__init__(
fast_window=max(5, window // 2),
slow_window=window,
envelope_percent=envelope_percent,
visualizer=visualizer
)
def decide(self, market_data: MarketDataResponse, params: TradeStrategyParams) -> StrategyDecision:
"""Переопределяем для логирования"""
decision = super().decide(market_data, params)
# Логируем сигналы для совместимости
if decision.robot_trade_order:
direction = "ПОКУПКА" if decision.robot_trade_order.direction == OrderDirection.ORDER_DIRECTION_BUY else "ПРОДАЖА"
print(f"[MAE] Сигнал на {direction}: {decision.robot_trade_order.quantity} лотов")
return decision