File system.py of Package PersistenceOS
'''
FILE : system.py
PROJECT : PersistenceOS
COPYRIGHT : (c) 2024 PersistenceOS Team
AUTHOR : PersistenceOS Team
PACKAGE : PersistenceOS
LICENSE : MIT
PURPOSE : System utility functions for gathering system information
'''
import os
import platform
import socket
import logging
import subprocess
import time
from typing import Dict, List, Any, Tuple, Optional
import json
# Import third-party libraries if available
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
logger = logging.getLogger(__name__)
# Constants
BYTES_IN_GB = 1024 * 1024 * 1024
DEFAULT_PRECISION = 2
def get_hostname() -> str:
"""Get system hostname"""
return socket.gethostname()
def get_ip_addresses() -> Dict[str, str]:
"""
Get all IP addresses for the system
Returns:
Dict of interface names and IP addresses
"""
ip_addresses = {}
if PSUTIL_AVAILABLE:
# Use psutil if available
try:
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
ip_addresses[interface] = addr.address
except Exception as e:
logger.error(f"Error getting IP addresses with psutil: {e}")
# Fall back to socket if psutil failed or isn't available
if not ip_addresses:
try:
hostname = socket.gethostname()
ip_addresses["primary"] = socket.gethostbyname(hostname)
except Exception as e:
logger.error(f"Error getting IP address with socket: {e}")
ip_addresses["localhost"] = "127.0.0.1"
return ip_addresses
def get_primary_ip() -> str:
"""
Get primary IP address of the system
Returns:
Primary IP address as string
"""
addresses = get_ip_addresses()
# Try to get a non-loopback address first
for interface, ip in addresses.items():
if not ip.startswith("127."):
return ip
# Fall back to first available address or localhost
return next(iter(addresses.values()), "127.0.0.1")
def get_system_info() -> Dict[str, Any]:
"""
Get basic system information
Returns:
Dict with system information
"""
info = {
"hostname": get_hostname(),
"os": platform.system(),
"kernel": platform.release(),
"platform": platform.platform(),
"architecture": platform.machine(),
"processor": platform.processor() or "Unknown",
"python_version": platform.python_version(),
"primary_ip": get_primary_ip(),
"ip_addresses": get_ip_addresses()
}
# Add uptime if available
uptime_info = get_uptime()
if uptime_info:
info.update(uptime_info)
return info
def get_uptime() -> Dict[str, Any]:
"""
Get system uptime information
Returns:
Dict with uptime information
"""
if PSUTIL_AVAILABLE:
try:
# Get boot time from psutil
boot_time = psutil.boot_time()
uptime_seconds = int(time.time() - boot_time)
# Convert to days, hours, minutes
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
return {
"uptime_seconds": uptime_seconds,
"uptime": f"{days} days, {hours} hours, {minutes} minutes",
"boot_time": boot_time,
"boot_time_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(boot_time))
}
except Exception as e:
logger.error(f"Error getting uptime with psutil: {e}")
# Try to get uptime from `uptime` command
try:
result = subprocess.run(["uptime"], capture_output=True, text=True, check=False)
if result.returncode == 0:
return {
"uptime_raw": result.stdout.strip(),
"uptime": "Unknown format" # We don't parse the output here
}
except Exception as e:
logger.error(f"Error getting uptime with command: {e}")
return {"uptime": "Unknown"}
def get_cpu_info() -> Dict[str, Any]:
"""
Get CPU information
Returns:
Dict with CPU information
"""
cpu_info = {
"architecture": platform.machine(),
"processor": platform.processor() or "Unknown"
}
if PSUTIL_AVAILABLE:
try:
cpu_info.update({
"physical_cores": psutil.cpu_count(logical=False),
"logical_cores": psutil.cpu_count(logical=True),
"current_frequency": psutil.cpu_freq().current if psutil.cpu_freq() else None,
"min_frequency": psutil.cpu_freq().min if psutil.cpu_freq() else None,
"max_frequency": psutil.cpu_freq().max if psutil.cpu_freq() else None,
"usage_percent": psutil.cpu_percent(interval=0.1),
"usage_per_core": psutil.cpu_percent(interval=0.1, percpu=True)
})
except Exception as e:
logger.error(f"Error getting CPU info with psutil: {e}")
# Try to get CPU info from /proc/cpuinfo on Linux
try:
if os.path.exists("/proc/cpuinfo"):
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
cpu_info["model"] = line.split(":")[1].strip()
break
except Exception as e:
logger.error(f"Error getting CPU model from /proc/cpuinfo: {e}")
return cpu_info
def get_memory_info() -> Dict[str, Any]:
"""
Get memory information
Returns:
Dict with memory information
"""
memory_info = {}
if PSUTIL_AVAILABLE:
try:
# Get virtual memory info
virtual_memory = psutil.virtual_memory()
memory_info.update({
"total": virtual_memory.total,
"available": virtual_memory.available,
"used": virtual_memory.used,
"percent": virtual_memory.percent,
"total_gb": round(virtual_memory.total / BYTES_IN_GB, DEFAULT_PRECISION),
"available_gb": round(virtual_memory.available / BYTES_IN_GB, DEFAULT_PRECISION),
"used_gb": round(virtual_memory.used / BYTES_IN_GB, DEFAULT_PRECISION)
})
# Get swap memory info
swap_memory = psutil.swap_memory()
memory_info["swap"] = {
"total": swap_memory.total,
"used": swap_memory.used,
"free": swap_memory.free,
"percent": swap_memory.percent,
"total_gb": round(swap_memory.total / BYTES_IN_GB, DEFAULT_PRECISION),
"used_gb": round(swap_memory.used / BYTES_IN_GB, DEFAULT_PRECISION),
"free_gb": round(swap_memory.free / BYTES_IN_GB, DEFAULT_PRECISION)
}
except Exception as e:
logger.error(f"Error getting memory info with psutil: {e}")
# Try to get memory info from /proc/meminfo on Linux
try:
if os.path.exists("/proc/meminfo") and not memory_info:
mem_info = {}
with open("/proc/meminfo", "r") as f:
for line in f:
if ":" in line:
key, value = line.split(":", 1)
mem_info[key.strip()] = value.strip()
if "MemTotal" in mem_info:
memory_info["total"] = int(mem_info["MemTotal"].split()[0]) * 1024
memory_info["total_gb"] = round(memory_info["total"] / BYTES_IN_GB, DEFAULT_PRECISION)
if "MemFree" in mem_info:
memory_info["free"] = int(mem_info["MemFree"].split()[0]) * 1024
memory_info["free_gb"] = round(memory_info["free"] / BYTES_IN_GB, DEFAULT_PRECISION)
if "MemTotal" in mem_info and "MemFree" in mem_info:
memory_info["used"] = memory_info["total"] - memory_info["free"]
memory_info["used_gb"] = round(memory_info["used"] / BYTES_IN_GB, DEFAULT_PRECISION)
memory_info["percent"] = round(memory_info["used"] / memory_info["total"] * 100, DEFAULT_PRECISION)
except Exception as e:
logger.error(f"Error getting memory info from /proc/meminfo: {e}")
return memory_info
def get_disk_info() -> Dict[str, Any]:
"""
Get disk information
Returns:
Dict with disk information
"""
disk_info = {}
if PSUTIL_AVAILABLE:
try:
# Get disk partitions
partitions = psutil.disk_partitions()
disk_info["partitions"] = []
for partition in partitions:
# Skip special filesystems
if partition.fstype in ["squashfs", "tmpfs", "devtmpfs"]:
continue
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info["partitions"].append({
"device": partition.device,
"mountpoint": partition.mountpoint,
"fstype": partition.fstype,
"total": usage.total,
"used": usage.used,
"free": usage.free,
"percent": usage.percent,
"total_gb": round(usage.total / BYTES_IN_GB, DEFAULT_PRECISION),
"used_gb": round(usage.used / BYTES_IN_GB, DEFAULT_PRECISION),
"free_gb": round(usage.free / BYTES_IN_GB, DEFAULT_PRECISION)
})
except PermissionError:
# Skip partitions we don't have access to
logger.warning(f"Permission denied for disk {partition.mountpoint}")
except Exception as e:
logger.error(f"Error getting disk usage for {partition.mountpoint}: {e}")
# Get root partition usage
try:
root_usage = psutil.disk_usage("/")
disk_info["root"] = {
"total": root_usage.total,
"used": root_usage.used,
"free": root_usage.free,
"percent": root_usage.percent,
"total_gb": round(root_usage.total / BYTES_IN_GB, DEFAULT_PRECISION),
"used_gb": round(root_usage.used / BYTES_IN_GB, DEFAULT_PRECISION),
"free_gb": round(root_usage.free / BYTES_IN_GB, DEFAULT_PRECISION)
}
except Exception as e:
logger.error(f"Error getting root disk usage: {e}")
except Exception as e:
logger.error(f"Error getting disk info with psutil: {e}")
# Fall back to df command
if not disk_info:
try:
result = subprocess.run(["df", "-h"], capture_output=True, text=True, check=False)
if result.returncode == 0:
disk_info["df_output"] = result.stdout.strip()
except Exception as e:
logger.error(f"Error running df command: {e}")
return disk_info
def get_network_info() -> Dict[str, Any]:
"""
Get network information
Returns:
Dict with network information
"""
network_info = {
"hostname": get_hostname(),
"primary_ip": get_primary_ip(),
"interfaces": {}
}
if PSUTIL_AVAILABLE:
try:
# Get network interfaces
net_if_addrs = psutil.net_if_addrs()
net_if_stats = psutil.net_if_stats()
for interface, addrs in net_if_addrs.items():
# Skip loopback interfaces
if interface.startswith("lo"):
continue
# Get interface stats
stats = net_if_stats.get(interface, None)
is_up = stats.isup if stats else False
# Add interface to network info
network_info["interfaces"][interface] = {
"status": "up" if is_up else "down",
"addresses": []
}
# Add speed and duplex if available
if stats:
if stats.speed:
network_info["interfaces"][interface]["speed"] = stats.speed
if hasattr(stats, "duplex"):
network_info["interfaces"][interface]["duplex"] = stats.duplex
# Add addresses
for addr in addrs:
if addr.family == socket.AF_INET:
network_info["interfaces"][interface]["addresses"].append({
"family": "IPv4",
"address": addr.address,
"netmask": addr.netmask,
"broadcast": addr.broadcast
})
elif addr.family == socket.AF_INET6:
network_info["interfaces"][interface]["addresses"].append({
"family": "IPv6",
"address": addr.address,
"netmask": addr.netmask,
"broadcast": addr.broadcast
})
elif hasattr(psutil, "AF_LINK") and addr.family == psutil.AF_LINK:
network_info["interfaces"][interface]["mac"] = addr.address
# Get network IO counters
try:
net_io_counters = psutil.net_io_counters(pernic=True)
for interface, counters in net_io_counters.items():
if interface in network_info["interfaces"]:
network_info["interfaces"][interface]["io"] = {
"bytes_sent": counters.bytes_sent,
"bytes_recv": counters.bytes_recv,
"packets_sent": counters.packets_sent,
"packets_recv": counters.packets_recv,
"errin": counters.errin,
"errout": counters.errout,
"dropin": counters.dropin,
"dropout": counters.dropout
}
except Exception as e:
logger.error(f"Error getting network IO counters: {e}")
except Exception as e:
logger.error(f"Error getting network info with psutil: {e}")
# Fall back to ip command
if not network_info["interfaces"]:
try:
result = subprocess.run(["ip", "addr"], capture_output=True, text=True, check=False)
if result.returncode == 0:
network_info["ip_addr_output"] = result.stdout.strip()
except Exception as e:
logger.error(f"Error running ip addr command: {e}")
return network_info
def get_full_system_info() -> Dict[str, Any]:
"""
Get comprehensive system information
Returns:
Dict with complete system information
"""
return {
"system": get_system_info(),
"cpu": get_cpu_info(),
"memory": get_memory_info(),
"disk": get_disk_info(),
"network": get_network_info(),
"timestamp": int(time.time())
}
def to_human_readable_size(size_bytes: int, decimal_places: int = 2) -> str:
"""
Convert bytes to human readable format
Args:
size_bytes: Size in bytes
decimal_places: Number of decimal places
Returns:
Human readable size string
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if size_bytes < 1024.0 or unit == 'PB':
break
size_bytes /= 1024.0
return f"{size_bytes:.{decimal_places}f} {unit}"