File statistics.py of Package fint-bot

from __future__ import annotations

import datetime
import logging
import pickle
import uuid
import os

from abc import ABC, abstractmethod
from dataclasses import asdict

import pandas as pd

from tinkoff.invest import OrderState, Instrument, OrderDirection, Quotation, MoneyValue, OrderExecutionReportStatus, \
    OrderType

from robotlib.money import Money


class TradeStatisticsAnalyzer:
    PENDING_ORDER_STATUSES = [
        OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_NEW,
        OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_PARTIALLYFILL
    ]

    trades: dict[str, OrderState]
    positions: int
    money: float
    instrument_info: Instrument
    logger: logging.Logger

    def __init__(self, positions: int, money: float, instrument_info: Instrument, logger: logging.Logger):
        self.trades = {}
        self.positions = positions
        self.money = money
        self.instrument_info = instrument_info
        self.logger = logger

    def add_trade(self, trade: OrderState) -> None:
        """Добавление сделки в статистику"""
        self.logger.debug(f'Updating balance. Current state: [positions={self.positions} money={self.money}]. '
                          f'trade: {trade}')

        if trade.order_id in self.trades:
            # Сохраняем направление из старой сделки, если в новой его нет
            if not hasattr(trade, 'direction') or trade.direction is None:
                trade.direction = self.trades[trade.order_id].direction
            
            sign = 1 if trade.direction == OrderDirection.ORDER_DIRECTION_BUY else -1
            self.positions += (trade.lots_executed - self.trades[trade.order_id].lots_executed) * sign
            self.money -= (self.convert_from_quotation(trade.total_order_amount)
                           - self.convert_from_quotation(self.trades[trade.order_id].total_order_amount)) * sign
        else:
            # Проверяем, что направление указано
            if not hasattr(trade, 'direction') or trade.direction is None:
                self.logger.warning(f'Trade {trade.order_id} has no direction')
                return
                
            sign = 1 if trade.direction == OrderDirection.ORDER_DIRECTION_BUY else -1
            self.positions += trade.lots_executed * sign
            self.money -= self.convert_from_quotation(trade.total_order_amount) * sign

        self.trades[trade.order_id] = trade
        self.logger.debug(f'Updating balance. New state: [positions={self.positions} money={self.money}]')

    def cancel_order(self, order_id: str):
        """Отмена ордера"""
        if order_id in self.trades:
            self.trades.pop(order_id)

    def get_positions(self) -> int:
        return self.positions

    def get_money(self) -> float:
        return self.money

    def get_pending_orders(self) -> list[OrderState]:
        return [trade for trade in self.trades.values() if trade.execution_report_status in self.PENDING_ORDER_STATUSES]

    def save_to_file(self, filename: str) -> None:
        """Сохранение статистики в файл"""
        # Создаем директорию, если ее нет
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        
        with open(filename, 'wb') as file:
            pickle.dump(obj=self, file=file, protocol=pickle.HIGHEST_PROTOCOL)

    @staticmethod
    def load_from_file(filename: str) -> TradeStatisticsAnalyzer:
        """Загрузка статистики из файла"""
        with open(filename, 'rb') as file:
            return pickle.load(file)

    @staticmethod
    def convert_from_quotation(amount: Quotation | MoneyValue) -> float | None:
        """Конвертация Quotation/MoneyValue в float"""
        if amount is None:
            return None
        return amount.units + amount.nano / (10 ** 9)

    def add_backtest_trade(self, quantity: int, price: Quotation, direction: OrderDirection):
        """Добавление сделки из бэктеста"""
        if quantity == 0:
            return
        
        # Используем валюту инструмента
        currency = self.instrument_info.currency if hasattr(self.instrument_info, 'currency') else 'RUB'
        
        price_money = MoneyValue(currency, price.units, price.nano)
        zero_money = MoneyValue(currency, 0, 0)
        
        self.add_trade(OrderState(
            order_id=str(uuid.uuid4()),
            execution_report_status=OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_FILL,
            lots_requested=quantity,
            lots_executed=quantity,
            initial_order_price=price_money,
            executed_order_price=price_money,
            total_order_amount=(Money(price) * quantity).to_money_value(currency),
            average_position_price=price_money,
            initial_commission=zero_money,
            executed_commission=zero_money,
            figi=self.instrument_info.figi,
            direction=direction,
            initial_security_price=price_money,
            stages=[],
            service_commission=zero_money,
            currency=currency,
            order_type=OrderType.ORDER_TYPE_MARKET,
            order_date=datetime.datetime.now()
        ))

    def get_report(self, processors: list[TradeStatisticsProcessorBase] = None,
                   calculators: list[TradeStatisticsCalculatorBase] = None)\
            -> tuple[dict[str, any], pd.DataFrame]:
        """Генерация отчета"""
        # Проверяем, есть ли сделки
        if not self.trades:
            return {}, pd.DataFrame()
        
        try:
            # Создаем DataFrame из сделок
            trades_list = list(self.trades.values())
            if not trades_list:
                return {}, pd.DataFrame()
                
            df = pd.DataFrame(map(asdict, trades_list))
            
            # Преобразуем вложенные структуры данных
            if 'average_position_price' in df.columns:
                df['average_position_price'] = df['average_position_price'].apply(
                    lambda x: x['units'] + x['nano'] / (10 ** 9) if isinstance(x, dict) else 0
                )
            if 'total_order_amount' in df.columns:
                df['total_order_amount'] = df['total_order_amount'].apply(
                    lambda x: x['units'] + x['nano'] / (10 ** 9) if isinstance(x, dict) else 0
                )
            if 'direction' in df.columns:
                df['sign'] = 3 - df['direction'] * 2

            # Применяем процессоры
            for processor in processors or []:
                df = processor.process(df)

            # Применяем калькуляторы
            stats = {}
            for calculator in calculators or []:
                stats.update(calculator.calculate(df))

            return stats, df
            
        except Exception as e:
            self.logger.error(f"Error generating report: {e}")
            return {}, pd.DataFrame()


class TradeStatisticsProcessorBase(ABC):
    """Базовый класс процессора статистики"""
    @abstractmethod
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        raise NotImplementedError()


class TradeStatisticsCalculatorBase(ABC):
    """Базовый класс калькулятора статистики"""
    @abstractmethod
    def calculate(self, df: pd.DataFrame) -> dict[str, any]:
        raise NotImplementedError()


class BalanceProcessor(TradeStatisticsProcessorBase):
    """Процессор для расчета баланса"""
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        if df.empty:
            return df
            
        if 'total_order_amount' in df.columns and 'sign' in df.columns:
            df['balance'] = -(df['total_order_amount'] * df['sign']).cumsum()
        if 'lots_executed' in df.columns and 'sign' in df.columns:
            df['instrument_balance'] = (df['lots_executed'] * df['sign']).cumsum()
        return df


class BalanceCalculator(TradeStatisticsCalculatorBase):
    """Калькулятор баланса"""
    def calculate(self, df: pd.DataFrame) -> dict[str, any]:
        if df.empty:
            return {
                'final_balance': 0,
                'max_loss': 0,
                'final_instrument_balance': 0,
                'income': 0
            }
        
        final_balance = 0
        max_loss = 0
        final_instrument_balance = 0
        income = 0
        
        if 'balance' in df.columns:
            final_balance = float(df['balance'].iloc[-1])
            max_loss = -float(df['balance'].min())
        
        if 'instrument_balance' in df.columns:
            final_instrument_balance = float(df['instrument_balance'].iloc[-1])
        
        if 'average_position_price' in df.columns:
            final_price = float(df['average_position_price'].iloc[-1])
            income = final_balance + final_instrument_balance * final_price
        
        return {
            'final_balance': final_balance,
            'max_loss': max_loss,
            'final_instrument_balance': final_instrument_balance,
            'income': income
        }
openSUSE Build Service is sponsored by