File system.py of Package PersistenceOS

'''
FILE          : system.py
PROJECT       : PersistenceOS
COPYRIGHT     : (c) 2024 PersistenceOS Team
AUTHOR        : PersistenceOS Team
PACKAGE       : PersistenceOS
LICENSE       : MIT
PURPOSE       : System utility functions for gathering system information
'''

import os
import platform
import socket
import logging
import subprocess
import time
from typing import Dict, List, Any, Tuple, Optional
import json

# Import third-party libraries if available
try:
    import psutil
    PSUTIL_AVAILABLE = True
except ImportError:
    PSUTIL_AVAILABLE = False
    
logger = logging.getLogger(__name__)

# Constants
BYTES_IN_GB = 1024 * 1024 * 1024
DEFAULT_PRECISION = 2


def get_hostname() -> str:
    """Get system hostname"""
    return socket.gethostname()


def get_ip_addresses() -> Dict[str, str]:
    """
    Get all IP addresses for the system
    
    Returns:
        Dict of interface names and IP addresses
    """
    ip_addresses = {}
    
    if PSUTIL_AVAILABLE:
        # Use psutil if available
        try:
            for interface, addrs in psutil.net_if_addrs().items():
                for addr in addrs:
                    if addr.family == socket.AF_INET:
                        ip_addresses[interface] = addr.address
        except Exception as e:
            logger.error(f"Error getting IP addresses with psutil: {e}")
    
    # Fall back to socket if psutil failed or isn't available
    if not ip_addresses:
        try:
            hostname = socket.gethostname()
            ip_addresses["primary"] = socket.gethostbyname(hostname)
        except Exception as e:
            logger.error(f"Error getting IP address with socket: {e}")
            ip_addresses["localhost"] = "127.0.0.1"
    
    return ip_addresses


def get_primary_ip() -> str:
    """
    Get primary IP address of the system
    
    Returns:
        Primary IP address as string
    """
    addresses = get_ip_addresses()
    
    # Try to get a non-loopback address first
    for interface, ip in addresses.items():
        if not ip.startswith("127."):
            return ip
    
    # Fall back to first available address or localhost
    return next(iter(addresses.values()), "127.0.0.1")


def get_system_info() -> Dict[str, Any]:
    """
    Get basic system information
    
    Returns:
        Dict with system information
    """
    info = {
        "hostname": get_hostname(),
        "os": platform.system(),
        "kernel": platform.release(),
        "platform": platform.platform(),
        "architecture": platform.machine(),
        "processor": platform.processor() or "Unknown",
        "python_version": platform.python_version(),
        "primary_ip": get_primary_ip(),
        "ip_addresses": get_ip_addresses()
    }
    
    # Add uptime if available
    uptime_info = get_uptime()
    if uptime_info:
        info.update(uptime_info)
    
    return info


def get_uptime() -> Dict[str, Any]:
    """
    Get system uptime information
    
    Returns:
        Dict with uptime information
    """
    if PSUTIL_AVAILABLE:
        try:
            # Get boot time from psutil
            boot_time = psutil.boot_time()
            uptime_seconds = int(time.time() - boot_time)
            
            # Convert to days, hours, minutes
            days, remainder = divmod(uptime_seconds, 86400)
            hours, remainder = divmod(remainder, 3600)
            minutes, seconds = divmod(remainder, 60)
            
            return {
                "uptime_seconds": uptime_seconds,
                "uptime": f"{days} days, {hours} hours, {minutes} minutes",
                "boot_time": boot_time,
                "boot_time_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(boot_time))
            }
        except Exception as e:
            logger.error(f"Error getting uptime with psutil: {e}")
    
    # Try to get uptime from `uptime` command
    try:
        result = subprocess.run(["uptime"], capture_output=True, text=True, check=False)
        if result.returncode == 0:
            return {
                "uptime_raw": result.stdout.strip(),
                "uptime": "Unknown format" # We don't parse the output here
            }
    except Exception as e:
        logger.error(f"Error getting uptime with command: {e}")
    
    return {"uptime": "Unknown"}


def get_cpu_info() -> Dict[str, Any]:
    """
    Get CPU information
    
    Returns:
        Dict with CPU information
    """
    cpu_info = {
        "architecture": platform.machine(),
        "processor": platform.processor() or "Unknown"
    }
    
    if PSUTIL_AVAILABLE:
        try:
            cpu_info.update({
                "physical_cores": psutil.cpu_count(logical=False),
                "logical_cores": psutil.cpu_count(logical=True),
                "current_frequency": psutil.cpu_freq().current if psutil.cpu_freq() else None,
                "min_frequency": psutil.cpu_freq().min if psutil.cpu_freq() else None,
                "max_frequency": psutil.cpu_freq().max if psutil.cpu_freq() else None,
                "usage_percent": psutil.cpu_percent(interval=0.1),
                "usage_per_core": psutil.cpu_percent(interval=0.1, percpu=True)
            })
        except Exception as e:
            logger.error(f"Error getting CPU info with psutil: {e}")
    
    # Try to get CPU info from /proc/cpuinfo on Linux
    try:
        if os.path.exists("/proc/cpuinfo"):
            with open("/proc/cpuinfo", "r") as f:
                for line in f:
                    if "model name" in line:
                        cpu_info["model"] = line.split(":")[1].strip()
                        break
    except Exception as e:
        logger.error(f"Error getting CPU model from /proc/cpuinfo: {e}")
    
    return cpu_info


def get_memory_info() -> Dict[str, Any]:
    """
    Get memory information
    
    Returns:
        Dict with memory information
    """
    memory_info = {}
    
    if PSUTIL_AVAILABLE:
        try:
            # Get virtual memory info
            virtual_memory = psutil.virtual_memory()
            memory_info.update({
                "total": virtual_memory.total,
                "available": virtual_memory.available,
                "used": virtual_memory.used,
                "percent": virtual_memory.percent,
                "total_gb": round(virtual_memory.total / BYTES_IN_GB, DEFAULT_PRECISION),
                "available_gb": round(virtual_memory.available / BYTES_IN_GB, DEFAULT_PRECISION),
                "used_gb": round(virtual_memory.used / BYTES_IN_GB, DEFAULT_PRECISION)
            })
            
            # Get swap memory info
            swap_memory = psutil.swap_memory()
            memory_info["swap"] = {
                "total": swap_memory.total,
                "used": swap_memory.used,
                "free": swap_memory.free,
                "percent": swap_memory.percent,
                "total_gb": round(swap_memory.total / BYTES_IN_GB, DEFAULT_PRECISION),
                "used_gb": round(swap_memory.used / BYTES_IN_GB, DEFAULT_PRECISION),
                "free_gb": round(swap_memory.free / BYTES_IN_GB, DEFAULT_PRECISION)
            }
        except Exception as e:
            logger.error(f"Error getting memory info with psutil: {e}")
    
    # Try to get memory info from /proc/meminfo on Linux
    try:
        if os.path.exists("/proc/meminfo") and not memory_info:
            mem_info = {}
            with open("/proc/meminfo", "r") as f:
                for line in f:
                    if ":" in line:
                        key, value = line.split(":", 1)
                        mem_info[key.strip()] = value.strip()
            
            if "MemTotal" in mem_info:
                memory_info["total"] = int(mem_info["MemTotal"].split()[0]) * 1024
                memory_info["total_gb"] = round(memory_info["total"] / BYTES_IN_GB, DEFAULT_PRECISION)
            
            if "MemFree" in mem_info:
                memory_info["free"] = int(mem_info["MemFree"].split()[0]) * 1024
                memory_info["free_gb"] = round(memory_info["free"] / BYTES_IN_GB, DEFAULT_PRECISION)
                
            if "MemTotal" in mem_info and "MemFree" in mem_info:
                memory_info["used"] = memory_info["total"] - memory_info["free"]
                memory_info["used_gb"] = round(memory_info["used"] / BYTES_IN_GB, DEFAULT_PRECISION)
                memory_info["percent"] = round(memory_info["used"] / memory_info["total"] * 100, DEFAULT_PRECISION)
    except Exception as e:
        logger.error(f"Error getting memory info from /proc/meminfo: {e}")
    
    return memory_info


def get_disk_info() -> Dict[str, Any]:
    """
    Get disk information
    
    Returns:
        Dict with disk information
    """
    disk_info = {}
    
    if PSUTIL_AVAILABLE:
        try:
            # Get disk partitions
            partitions = psutil.disk_partitions()
            disk_info["partitions"] = []
            
            for partition in partitions:
                # Skip special filesystems
                if partition.fstype in ["squashfs", "tmpfs", "devtmpfs"]:
                    continue
                    
                try:
                    usage = psutil.disk_usage(partition.mountpoint)
                    disk_info["partitions"].append({
                        "device": partition.device,
                        "mountpoint": partition.mountpoint,
                        "fstype": partition.fstype,
                        "total": usage.total,
                        "used": usage.used,
                        "free": usage.free,
                        "percent": usage.percent,
                        "total_gb": round(usage.total / BYTES_IN_GB, DEFAULT_PRECISION),
                        "used_gb": round(usage.used / BYTES_IN_GB, DEFAULT_PRECISION),
                        "free_gb": round(usage.free / BYTES_IN_GB, DEFAULT_PRECISION)
                    })
                except PermissionError:
                    # Skip partitions we don't have access to
                    logger.warning(f"Permission denied for disk {partition.mountpoint}")
                except Exception as e:
                    logger.error(f"Error getting disk usage for {partition.mountpoint}: {e}")
            
            # Get root partition usage
            try:
                root_usage = psutil.disk_usage("/")
                disk_info["root"] = {
                    "total": root_usage.total,
                    "used": root_usage.used,
                    "free": root_usage.free,
                    "percent": root_usage.percent,
                    "total_gb": round(root_usage.total / BYTES_IN_GB, DEFAULT_PRECISION),
                    "used_gb": round(root_usage.used / BYTES_IN_GB, DEFAULT_PRECISION),
                    "free_gb": round(root_usage.free / BYTES_IN_GB, DEFAULT_PRECISION)
                }
            except Exception as e:
                logger.error(f"Error getting root disk usage: {e}")
        except Exception as e:
            logger.error(f"Error getting disk info with psutil: {e}")
    
    # Fall back to df command
    if not disk_info:
        try:
            result = subprocess.run(["df", "-h"], capture_output=True, text=True, check=False)
            if result.returncode == 0:
                disk_info["df_output"] = result.stdout.strip()
        except Exception as e:
            logger.error(f"Error running df command: {e}")
    
    return disk_info


def get_network_info() -> Dict[str, Any]:
    """
    Get network information
    
    Returns:
        Dict with network information
    """
    network_info = {
        "hostname": get_hostname(),
        "primary_ip": get_primary_ip(),
        "interfaces": {}
    }
    
    if PSUTIL_AVAILABLE:
        try:
            # Get network interfaces
            net_if_addrs = psutil.net_if_addrs()
            net_if_stats = psutil.net_if_stats()
            
            for interface, addrs in net_if_addrs.items():
                # Skip loopback interfaces
                if interface.startswith("lo"):
                    continue
                    
                # Get interface stats
                stats = net_if_stats.get(interface, None)
                is_up = stats.isup if stats else False
                
                # Add interface to network info
                network_info["interfaces"][interface] = {
                    "status": "up" if is_up else "down",
                    "addresses": []
                }
                
                # Add speed and duplex if available
                if stats:
                    if stats.speed:
                        network_info["interfaces"][interface]["speed"] = stats.speed
                    if hasattr(stats, "duplex"):
                        network_info["interfaces"][interface]["duplex"] = stats.duplex
                
                # Add addresses
                for addr in addrs:
                    if addr.family == socket.AF_INET:
                        network_info["interfaces"][interface]["addresses"].append({
                            "family": "IPv4",
                            "address": addr.address,
                            "netmask": addr.netmask,
                            "broadcast": addr.broadcast
                        })
                    elif addr.family == socket.AF_INET6:
                        network_info["interfaces"][interface]["addresses"].append({
                            "family": "IPv6",
                            "address": addr.address,
                            "netmask": addr.netmask,
                            "broadcast": addr.broadcast
                        })
                    elif hasattr(psutil, "AF_LINK") and addr.family == psutil.AF_LINK:
                        network_info["interfaces"][interface]["mac"] = addr.address
            
            # Get network IO counters
            try:
                net_io_counters = psutil.net_io_counters(pernic=True)
                for interface, counters in net_io_counters.items():
                    if interface in network_info["interfaces"]:
                        network_info["interfaces"][interface]["io"] = {
                            "bytes_sent": counters.bytes_sent,
                            "bytes_recv": counters.bytes_recv,
                            "packets_sent": counters.packets_sent,
                            "packets_recv": counters.packets_recv,
                            "errin": counters.errin,
                            "errout": counters.errout,
                            "dropin": counters.dropin,
                            "dropout": counters.dropout
                        }
            except Exception as e:
                logger.error(f"Error getting network IO counters: {e}")
        except Exception as e:
            logger.error(f"Error getting network info with psutil: {e}")
    
    # Fall back to ip command
    if not network_info["interfaces"]:
        try:
            result = subprocess.run(["ip", "addr"], capture_output=True, text=True, check=False)
            if result.returncode == 0:
                network_info["ip_addr_output"] = result.stdout.strip()
        except Exception as e:
            logger.error(f"Error running ip addr command: {e}")
    
    return network_info


def get_full_system_info() -> Dict[str, Any]:
    """
    Get comprehensive system information
    
    Returns:
        Dict with complete system information
    """
    return {
        "system": get_system_info(),
        "cpu": get_cpu_info(),
        "memory": get_memory_info(),
        "disk": get_disk_info(),
        "network": get_network_info(),
        "timestamp": int(time.time())
    }


def to_human_readable_size(size_bytes: int, decimal_places: int = 2) -> str:
    """
    Convert bytes to human readable format
    
    Args:
        size_bytes: Size in bytes
        decimal_places: Number of decimal places
        
    Returns:
        Human readable size string
    """
    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
        if size_bytes < 1024.0 or unit == 'PB':
            break
        size_bytes /= 1024.0
    
    return f"{size_bytes:.{decimal_places}f} {unit}" 
openSUSE Build Service is sponsored by