File storage.py of Package PersistenceOS

'''
FILE          : storage.py
PROJECT       : PersistenceOS
COPYRIGHT     : (c) 2024 PersistenceOS Team
AUTHOR        : PersistenceOS Team
PACKAGE       : PersistenceOS
LICENSE       : MIT
PURPOSE       : Storage API router for PersistenceOS
'''

import os
import sys
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime

# Add the parent directory to Python path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

try:
    from fastapi import APIRouter, HTTPException, Depends, status
    from fastapi.responses import JSONResponse
except ImportError as e:
    print(f"FastAPI not available: {e}")
    # Create a mock router for development
    class MockRouter:
        def __init__(self, *args, **kwargs):
            pass
        def get(self, *args, **kwargs):
            def decorator(func):
                return func
            return decorator
        def post(self, *args, **kwargs):
            def decorator(func):
                return func
            return decorator
    APIRouter = MockRouter
    HTTPException = Exception
    Depends = lambda x: x
    status = type('Status', (), {'HTTP_500_INTERNAL_SERVER_ERROR': 500})()
    JSONResponse = dict

try:
    from utils.storage import storage_manager, get_storage_info, get_pools, get_usage
except ImportError:
    # Fallback for when utils are not available
    print("Storage utils not available, using mock functions")
    
    def get_storage_info():
        return {"error": "Storage utils not available"}
    
    def get_pools():
        return []
    
    def get_usage():
        return []
    
    class MockStorageManager:
        def get_storage_summary(self):
            return get_storage_info()
        
        def get_all_pools(self):
            return get_pools()
        
        def get_disk_usage(self):
            return get_usage()
    
    storage_manager = MockStorageManager()

logger = logging.getLogger("persistenceos.api.storage")

# Create storage router
storage_router = APIRouter(
    prefix="/api/storage",
    tags=["storage"],
    responses={404: {"description": "Not found"}}
)


def get_current_user_optional():
    """Mock authentication for development"""
    return {"username": "root", "roles": ["admin"]}


@storage_router.get("/")
async def list_storage():
    """
    Get comprehensive storage information
    
    Returns:
        Dict with storage devices, pools, and usage information
    """
    try:
        logger.info("Getting comprehensive storage information")
        
        # Get storage summary from storage manager
        storage_info = storage_manager.get_storage_summary()
        
        # Add some computed fields for the frontend
        pools = storage_info.get("pools", [])
        usage = storage_info.get("usage", [])
        
        # Calculate total storage statistics
        total_size = 0
        total_used = 0
        
        for usage_item in usage:
            try:
                # Parse size strings (e.g., "20G" -> 20)
                size_str = usage_item.get("size", "0")
                used_str = usage_item.get("used", "0")
                
                # Simple parsing - could be enhanced
                if size_str.endswith('G'):
                    total_size += float(size_str[:-1])
                if used_str.endswith('G'):
                    total_used += float(used_str[:-1])
            except (ValueError, AttributeError):
                continue
        
        # Add summary statistics
        storage_info["summary"] = {
            "total_pools": len(pools),
            "total_size_gb": total_size,
            "total_used_gb": total_used,
            "usage_percent": round((total_used / total_size * 100) if total_size > 0 else 0, 1)
        }
        
        logger.info(f"Retrieved storage info: {len(pools)} pools, {len(usage)} usage entries")
        return storage_info
        
    except Exception as e:
        logger.error(f"Error getting storage information: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve storage information: {str(e)}"
        )


@storage_router.get("/pools")
async def get_storage_pools():
    """
    Get storage pools information
    
    Returns:
        Dict with storage pools data
    """
    try:
        logger.info("Getting storage pools information")
        
        pools = storage_manager.get_all_pools()
        
        # Enhance pool data with additional information
        enhanced_pools = []
        for pool in pools:
            enhanced_pool = pool.copy()
            
            # Add computed fields
            enhanced_pool["id"] = f"{pool.get('type', 'unknown')}-{pool.get('name', 'unnamed')}"
            enhanced_pool["health_status"] = pool.get("status", "unknown")
            enhanced_pool["created_at"] = datetime.now().isoformat()  # Mock data
            
            # Add size information if available
            if pool.get("type") == "btrfs":
                enhanced_pool["features"] = ["snapshots", "compression", "deduplication"]
            elif pool.get("type") == "xfs":
                enhanced_pool["features"] = ["high_performance", "large_files"]
            
            enhanced_pools.append(enhanced_pool)
        
        result = {
            "pools": enhanced_pools,
            "count": len(enhanced_pools),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"Retrieved {len(enhanced_pools)} storage pools")
        return result
        
    except Exception as e:
        logger.error(f"Error getting storage pools: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve storage pools: {str(e)}"
        )


@storage_router.get("/usage")
async def get_storage_usage():
    """
    Get storage usage information
    
    Returns:
        Dict with storage usage data
    """
    try:
        logger.info("Getting storage usage information")
        
        usage_data = storage_manager.get_disk_usage()
        
        # Enhance usage data
        enhanced_usage = []
        for usage in usage_data:
            enhanced_item = usage.copy()
            
            # Parse percentage
            percent_str = usage.get("use_percent", "0%")
            try:
                enhanced_item["usage_percent_numeric"] = float(percent_str.rstrip('%'))
            except (ValueError, AttributeError):
                enhanced_item["usage_percent_numeric"] = 0
            
            # Add status based on usage
            usage_percent = enhanced_item["usage_percent_numeric"]
            if usage_percent < 70:
                enhanced_item["status"] = "healthy"
            elif usage_percent < 90:
                enhanced_item["status"] = "warning"
            else:
                enhanced_item["status"] = "critical"
            
            enhanced_usage.append(enhanced_item)
        
        result = {
            "usage": enhanced_usage,
            "count": len(enhanced_usage),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"Retrieved usage data for {len(enhanced_usage)} devices")
        return result
        
    except Exception as e:
        logger.error(f"Error getting storage usage: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve storage usage: {str(e)}"
        )


@storage_router.get("/devices")
async def get_storage_devices():
    """
    Get storage devices information
    
    Returns:
        Dict with storage devices data
    """
    try:
        logger.info("Getting storage devices information")
        
        devices = storage_manager.get_block_devices() if hasattr(storage_manager, 'get_block_devices') else []
        
        result = {
            "devices": devices,
            "count": len(devices),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"Retrieved {len(devices)} storage devices")
        return result
        
    except Exception as e:
        logger.error(f"Error getting storage devices: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve storage devices: {str(e)}"
        )


@storage_router.get("/health")
async def storage_health_check():
    """
    Storage subsystem health check
    
    Returns:
        Dict with storage health status
    """
    try:
        logger.info("Performing storage health check")
        
        # Check if storage utilities are working
        pools = storage_manager.get_all_pools()
        usage = storage_manager.get_disk_usage()
        
        # Basic health checks
        health_status = "healthy"
        issues = []
        
        # Check for high disk usage
        for usage_item in usage:
            try:
                percent_str = usage_item.get("use_percent", "0%")
                usage_percent = float(percent_str.rstrip('%'))
                
                if usage_percent > 90:
                    health_status = "critical"
                    issues.append(f"High disk usage on {usage_item.get('device', 'unknown')}: {percent_str}")
                elif usage_percent > 80:
                    if health_status == "healthy":
                        health_status = "warning"
                    issues.append(f"Elevated disk usage on {usage_item.get('device', 'unknown')}: {percent_str}")
            except (ValueError, AttributeError):
                continue
        
        result = {
            "status": health_status,
            "pools_count": len(pools),
            "devices_count": len(usage),
            "issues": issues,
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"Storage health check completed: {health_status}")
        return result
        
    except Exception as e:
        logger.error(f"Error during storage health check: {e}")
        return {
            "status": "error",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }


# Export the router
__all__ = ["storage_router"]
openSUSE Build Service is sponsored by