File statistics.py of Package fint-bot
from __future__ import annotations
import datetime
import logging
import pickle
import uuid
import os
from abc import ABC, abstractmethod
from dataclasses import asdict
import pandas as pd
from tinkoff.invest import OrderState, Instrument, OrderDirection, Quotation, MoneyValue, OrderExecutionReportStatus, \
OrderType
from robotlib.money import Money
class TradeStatisticsAnalyzer:
PENDING_ORDER_STATUSES = [
OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_NEW,
OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_PARTIALLYFILL
]
trades: dict[str, OrderState]
positions: int
money: float
instrument_info: Instrument
logger: logging.Logger
def __init__(self, positions: int, money: float, instrument_info: Instrument, logger: logging.Logger):
self.trades = {}
self.positions = positions
self.money = money
self.instrument_info = instrument_info
self.logger = logger
def add_trade(self, trade: OrderState) -> None:
"""Добавление сделки в статистику"""
self.logger.debug(f'Updating balance. Current state: [positions={self.positions} money={self.money}]. '
f'trade: {trade}')
if trade.order_id in self.trades:
# Сохраняем направление из старой сделки, если в новой его нет
if not hasattr(trade, 'direction') or trade.direction is None:
trade.direction = self.trades[trade.order_id].direction
sign = 1 if trade.direction == OrderDirection.ORDER_DIRECTION_BUY else -1
self.positions += (trade.lots_executed - self.trades[trade.order_id].lots_executed) * sign
self.money -= (self.convert_from_quotation(trade.total_order_amount)
- self.convert_from_quotation(self.trades[trade.order_id].total_order_amount)) * sign
else:
# Проверяем, что направление указано
if not hasattr(trade, 'direction') or trade.direction is None:
self.logger.warning(f'Trade {trade.order_id} has no direction')
return
sign = 1 if trade.direction == OrderDirection.ORDER_DIRECTION_BUY else -1
self.positions += trade.lots_executed * sign
self.money -= self.convert_from_quotation(trade.total_order_amount) * sign
self.trades[trade.order_id] = trade
self.logger.debug(f'Updating balance. New state: [positions={self.positions} money={self.money}]')
def cancel_order(self, order_id: str):
"""Отмена ордера"""
if order_id in self.trades:
self.trades.pop(order_id)
def get_positions(self) -> int:
return self.positions
def get_money(self) -> float:
return self.money
def get_pending_orders(self) -> list[OrderState]:
return [trade for trade in self.trades.values() if trade.execution_report_status in self.PENDING_ORDER_STATUSES]
def save_to_file(self, filename: str) -> None:
"""Сохранение статистики в файл"""
# Создаем директорию, если ее нет
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as file:
pickle.dump(obj=self, file=file, protocol=pickle.HIGHEST_PROTOCOL)
@staticmethod
def load_from_file(filename: str) -> TradeStatisticsAnalyzer:
"""Загрузка статистики из файла"""
with open(filename, 'rb') as file:
return pickle.load(file)
@staticmethod
def convert_from_quotation(amount: Quotation | MoneyValue) -> float | None:
"""Конвертация Quotation/MoneyValue в float"""
if amount is None:
return None
return amount.units + amount.nano / (10 ** 9)
def add_backtest_trade(self, quantity: int, price: Quotation, direction: OrderDirection):
"""Добавление сделки из бэктеста"""
if quantity == 0:
return
# Используем валюту инструмента
currency = self.instrument_info.currency if hasattr(self.instrument_info, 'currency') else 'RUB'
price_money = MoneyValue(currency, price.units, price.nano)
zero_money = MoneyValue(currency, 0, 0)
self.add_trade(OrderState(
order_id=str(uuid.uuid4()),
execution_report_status=OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_FILL,
lots_requested=quantity,
lots_executed=quantity,
initial_order_price=price_money,
executed_order_price=price_money,
total_order_amount=(Money(price) * quantity).to_money_value(currency),
average_position_price=price_money,
initial_commission=zero_money,
executed_commission=zero_money,
figi=self.instrument_info.figi,
direction=direction,
initial_security_price=price_money,
stages=[],
service_commission=zero_money,
currency=currency,
order_type=OrderType.ORDER_TYPE_MARKET,
order_date=datetime.datetime.now()
))
def get_report(self, processors: list[TradeStatisticsProcessorBase] = None,
calculators: list[TradeStatisticsCalculatorBase] = None)\
-> tuple[dict[str, any], pd.DataFrame]:
"""Генерация отчета"""
# Проверяем, есть ли сделки
if not self.trades:
return {}, pd.DataFrame()
try:
# Создаем DataFrame из сделок
trades_list = list(self.trades.values())
if not trades_list:
return {}, pd.DataFrame()
df = pd.DataFrame(map(asdict, trades_list))
# Преобразуем вложенные структуры данных
if 'average_position_price' in df.columns:
df['average_position_price'] = df['average_position_price'].apply(
lambda x: x['units'] + x['nano'] / (10 ** 9) if isinstance(x, dict) else 0
)
if 'total_order_amount' in df.columns:
df['total_order_amount'] = df['total_order_amount'].apply(
lambda x: x['units'] + x['nano'] / (10 ** 9) if isinstance(x, dict) else 0
)
if 'direction' in df.columns:
df['sign'] = 3 - df['direction'] * 2
# Применяем процессоры
for processor in processors or []:
df = processor.process(df)
# Применяем калькуляторы
stats = {}
for calculator in calculators or []:
stats.update(calculator.calculate(df))
return stats, df
except Exception as e:
self.logger.error(f"Error generating report: {e}")
return {}, pd.DataFrame()
class TradeStatisticsProcessorBase(ABC):
"""Базовый класс процессора статистики"""
@abstractmethod
def process(self, df: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError()
class TradeStatisticsCalculatorBase(ABC):
"""Базовый класс калькулятора статистики"""
@abstractmethod
def calculate(self, df: pd.DataFrame) -> dict[str, any]:
raise NotImplementedError()
class BalanceProcessor(TradeStatisticsProcessorBase):
"""Процессор для расчета баланса"""
def process(self, df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
if 'total_order_amount' in df.columns and 'sign' in df.columns:
df['balance'] = -(df['total_order_amount'] * df['sign']).cumsum()
if 'lots_executed' in df.columns and 'sign' in df.columns:
df['instrument_balance'] = (df['lots_executed'] * df['sign']).cumsum()
return df
class BalanceCalculator(TradeStatisticsCalculatorBase):
"""Калькулятор баланса"""
def calculate(self, df: pd.DataFrame) -> dict[str, any]:
if df.empty:
return {
'final_balance': 0,
'max_loss': 0,
'final_instrument_balance': 0,
'income': 0
}
final_balance = 0
max_loss = 0
final_instrument_balance = 0
income = 0
if 'balance' in df.columns:
final_balance = float(df['balance'].iloc[-1])
max_loss = -float(df['balance'].min())
if 'instrument_balance' in df.columns:
final_instrument_balance = float(df['instrument_balance'].iloc[-1])
if 'average_position_price' in df.columns:
final_price = float(df['average_position_price'].iloc[-1])
income = final_balance + final_instrument_balance * final_price
return {
'final_balance': final_balance,
'max_loss': max_loss,
'final_instrument_balance': final_instrument_balance,
'income': income
}