File storage.py of Package PersistenceOS
'''
FILE : storage.py
PROJECT : PersistenceOS
COPYRIGHT : (c) 2024 PersistenceOS Team
AUTHOR : PersistenceOS Team
PACKAGE : PersistenceOS
LICENSE : MIT
PURPOSE : Storage API router for PersistenceOS
'''
import os
import sys
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
# Add the parent directory to Python path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.responses import JSONResponse
except ImportError as e:
print(f"FastAPI not available: {e}")
# Create a mock router for development
class MockRouter:
def __init__(self, *args, **kwargs):
pass
def get(self, *args, **kwargs):
def decorator(func):
return func
return decorator
def post(self, *args, **kwargs):
def decorator(func):
return func
return decorator
APIRouter = MockRouter
HTTPException = Exception
Depends = lambda x: x
status = type('Status', (), {'HTTP_500_INTERNAL_SERVER_ERROR': 500})()
JSONResponse = dict
try:
from utils.storage import storage_manager, get_storage_info, get_pools, get_usage
except ImportError:
# Fallback for when utils are not available
print("Storage utils not available, using mock functions")
def get_storage_info():
return {"error": "Storage utils not available"}
def get_pools():
return []
def get_usage():
return []
class MockStorageManager:
def get_storage_summary(self):
return get_storage_info()
def get_all_pools(self):
return get_pools()
def get_disk_usage(self):
return get_usage()
storage_manager = MockStorageManager()
logger = logging.getLogger("persistenceos.api.storage")
# Create storage router
storage_router = APIRouter(
prefix="/api/storage",
tags=["storage"],
responses={404: {"description": "Not found"}}
)
def get_current_user_optional():
"""Mock authentication for development"""
return {"username": "root", "roles": ["admin"]}
@storage_router.get("/")
async def list_storage():
"""
Get comprehensive storage information
Returns:
Dict with storage devices, pools, and usage information
"""
try:
logger.info("Getting comprehensive storage information")
# Get storage summary from storage manager
storage_info = storage_manager.get_storage_summary()
# Add some computed fields for the frontend
pools = storage_info.get("pools", [])
usage = storage_info.get("usage", [])
# Calculate total storage statistics
total_size = 0
total_used = 0
for usage_item in usage:
try:
# Parse size strings (e.g., "20G" -> 20)
size_str = usage_item.get("size", "0")
used_str = usage_item.get("used", "0")
# Simple parsing - could be enhanced
if size_str.endswith('G'):
total_size += float(size_str[:-1])
if used_str.endswith('G'):
total_used += float(used_str[:-1])
except (ValueError, AttributeError):
continue
# Add summary statistics
storage_info["summary"] = {
"total_pools": len(pools),
"total_size_gb": total_size,
"total_used_gb": total_used,
"usage_percent": round((total_used / total_size * 100) if total_size > 0 else 0, 1)
}
logger.info(f"Retrieved storage info: {len(pools)} pools, {len(usage)} usage entries")
return storage_info
except Exception as e:
logger.error(f"Error getting storage information: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve storage information: {str(e)}"
)
@storage_router.get("/pools")
async def get_storage_pools():
"""
Get storage pools information
Returns:
Dict with storage pools data
"""
try:
logger.info("Getting storage pools information")
pools = storage_manager.get_all_pools()
# Enhance pool data with additional information
enhanced_pools = []
for pool in pools:
enhanced_pool = pool.copy()
# Add computed fields
enhanced_pool["id"] = f"{pool.get('type', 'unknown')}-{pool.get('name', 'unnamed')}"
enhanced_pool["health_status"] = pool.get("status", "unknown")
enhanced_pool["created_at"] = datetime.now().isoformat() # Mock data
# Add size information if available
if pool.get("type") == "btrfs":
enhanced_pool["features"] = ["snapshots", "compression", "deduplication"]
elif pool.get("type") == "xfs":
enhanced_pool["features"] = ["high_performance", "large_files"]
enhanced_pools.append(enhanced_pool)
result = {
"pools": enhanced_pools,
"count": len(enhanced_pools),
"timestamp": datetime.now().isoformat()
}
logger.info(f"Retrieved {len(enhanced_pools)} storage pools")
return result
except Exception as e:
logger.error(f"Error getting storage pools: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve storage pools: {str(e)}"
)
@storage_router.get("/usage")
async def get_storage_usage():
"""
Get storage usage information
Returns:
Dict with storage usage data
"""
try:
logger.info("Getting storage usage information")
usage_data = storage_manager.get_disk_usage()
# Enhance usage data
enhanced_usage = []
for usage in usage_data:
enhanced_item = usage.copy()
# Parse percentage
percent_str = usage.get("use_percent", "0%")
try:
enhanced_item["usage_percent_numeric"] = float(percent_str.rstrip('%'))
except (ValueError, AttributeError):
enhanced_item["usage_percent_numeric"] = 0
# Add status based on usage
usage_percent = enhanced_item["usage_percent_numeric"]
if usage_percent < 70:
enhanced_item["status"] = "healthy"
elif usage_percent < 90:
enhanced_item["status"] = "warning"
else:
enhanced_item["status"] = "critical"
enhanced_usage.append(enhanced_item)
result = {
"usage": enhanced_usage,
"count": len(enhanced_usage),
"timestamp": datetime.now().isoformat()
}
logger.info(f"Retrieved usage data for {len(enhanced_usage)} devices")
return result
except Exception as e:
logger.error(f"Error getting storage usage: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve storage usage: {str(e)}"
)
@storage_router.get("/devices")
async def get_storage_devices():
"""
Get storage devices information
Returns:
Dict with storage devices data
"""
try:
logger.info("Getting storage devices information")
devices = storage_manager.get_block_devices() if hasattr(storage_manager, 'get_block_devices') else []
result = {
"devices": devices,
"count": len(devices),
"timestamp": datetime.now().isoformat()
}
logger.info(f"Retrieved {len(devices)} storage devices")
return result
except Exception as e:
logger.error(f"Error getting storage devices: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve storage devices: {str(e)}"
)
@storage_router.get("/health")
async def storage_health_check():
"""
Storage subsystem health check
Returns:
Dict with storage health status
"""
try:
logger.info("Performing storage health check")
# Check if storage utilities are working
pools = storage_manager.get_all_pools()
usage = storage_manager.get_disk_usage()
# Basic health checks
health_status = "healthy"
issues = []
# Check for high disk usage
for usage_item in usage:
try:
percent_str = usage_item.get("use_percent", "0%")
usage_percent = float(percent_str.rstrip('%'))
if usage_percent > 90:
health_status = "critical"
issues.append(f"High disk usage on {usage_item.get('device', 'unknown')}: {percent_str}")
elif usage_percent > 80:
if health_status == "healthy":
health_status = "warning"
issues.append(f"Elevated disk usage on {usage_item.get('device', 'unknown')}: {percent_str}")
except (ValueError, AttributeError):
continue
result = {
"status": health_status,
"pools_count": len(pools),
"devices_count": len(usage),
"issues": issues,
"timestamp": datetime.now().isoformat()
}
logger.info(f"Storage health check completed: {health_status}")
return result
except Exception as e:
logger.error(f"Error during storage health check: {e}")
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
# Export the router
__all__ = ["storage_router"]