File iptv.obscpio of Package NeoIPTV
07070100293DF1000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001100000000iptv/controllers07070100293E05000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000000C00000000iptv/models07070100293E13000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000000B00000000iptv/views07070100293E4A000081A4000003E8000003E80000000167703FED00000379000000000000003100000000000000000000001100000000iptv/__main__.pyimport locale
import os
import sys
from PyQt6.QtWidgets import QApplication
from iptv.models.channel_manager import ChannelManager
from iptv.models.database.channel import Channel
from iptv.views.main_window import MainWindow
os.environ["LC_NUMERIC"] = "C"
locale.setlocale(locale.LC_NUMERIC, 'C')
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
def initialize_channels():
channel_manager = ChannelManager.get_instance()
channels = channel_manager.get_channels()
print(f"Loaded {len(channels)} channels.")
return channels
def main():
# Create the database
Channel.create_table()
initialize_channels()
# Create the application
app = QApplication(sys.argv)
# Create the main window
window = MainWindow()
window.show()
# Run the application's event loop
sys.exit(app.exec())
if __name__ == "__main__":
main()
07070100293E4F000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001100000000iptv/__init__.py07070100293E50000081A4000003E8000003E80000000167703FED000004D0000000000000003100000000000000000000001200000000iptv/event_bus.pyfrom PyQt6.QtCore import QObject, pyqtSignal
class EventBus(QObject):
"""EventBus to manage global signals."""
channel_url_changed = pyqtSignal(str)
volume_changed = pyqtSignal(int)
mute_toggled = pyqtSignal(bool)
playlist_toggle = pyqtSignal()
fullscreen_toggle = pyqtSignal()
channels_updated = pyqtSignal()
def __init__(self):
super().__init__()
def emit_channel_url(self, url):
"""Emit a signal when the channel URL changes."""
self.channel_url_changed.emit(url)
def emit_volume(self, volume):
"""Emit a signal when the volume changes."""
self.volume_changed.emit(volume)
def emit_mute(self, muted):
"""Emit a signal when mute state changes."""
self.mute_toggled.emit(muted)
def emit_playlist_toggle(self):
"""Emit a signal to toggle playlist visibility."""
self.playlist_toggle.emit()
def emit_fullscreen_toggle(self):
"""Emit a signal to toggle fullscreen mode."""
self.fullscreen_toggle.emit()
def emit_channels_updated(self):
"""Emit a signal when channels are updated."""
self.channels_updated.emit()
# Global EventBus instance
event_bus = EventBus()
07070100293DF9000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001800000000iptv/controllers/thread07070100293DF2000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001D00000000iptv/controllers/__init__.py07070100293DF8000081A4000003E8000003E80000000167703FED00000DF5000000000000003100000000000000000000002100000000iptv/controllers/download_m3u.pyimport logging
import re
import requests
from PyQt6.QtCore import QThread, pyqtSignal
from ipytv import playlist
from iptv.models.database.channel import Channel
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DownloadM3U(QThread):
finished = pyqtSignal(object)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
""" This function will be executed in a separate thread. """
try:
logger.debug(f"Starting download from URL: {self.url}")
# Download the content of the M3U URL
response = requests.get(self.url, timeout=30) # Added timeout to prevent hanging
response.raise_for_status() # This will raise an error if the download fails
logger.debug(f"Successfully downloaded M3U content from {self.url}")
# Load the M3U content from the downloaded URL
m3u_playlist = playlist.loadf(response.text) # Using loadf to load the M3U content
logger.debug(f"Loaded M3U playlist with {len(m3u_playlist)} entries")
valid_channels = []
invalid_channels = []
for entry in m3u_playlist:
# Access the attributes of the IPTVChannel object directly
channel_url = entry.url
if self.is_valid_iptv_stream(channel_url):
valid_channels.append(channel_url)
Channel.insert_channel(entry)
logger.debug(f"Valid channel found: {channel_url}")
else:
invalid_channels.append(channel_url)
logger.debug(f"Invalid channel found: {channel_url}")
# Emit the result to be processed in the main UI
self.finished.emit((valid_channels, invalid_channels))
logger.info(
f"Finished processing. Valid channels: {len(valid_channels)}, Invalid channels: {len(invalid_channels)}")
except requests.exceptions.RequestException as e:
logger.error(f"Request error while downloading M3U URL: {e}")
self.finished.emit(None) # Emit None if there was an error
except Exception as e:
logger.error(f"Unexpected error during M3U processing: {e}")
self.finished.emit(None)
def is_valid_iptv_stream(self, uri):
""" Verifies if the URI is a valid IPTV stream. """
# Basic validation to ensure the URI is an HTTP(s) stream
if re.match(r'^(http|https)://', uri):
if uri.endswith('.m3u8') or uri.endswith('.ts') or re.search(r'rtmp://', uri):
return True
return False
def add_channels_to_db(channels):
""" Add valid channels to the database. """
session = Session()
try:
for channel_url in channels:
existing_channel = session.query(Channel).filter_by(url=channel_url).first()
if existing_channel:
print(f"Channel {channel_url} already exists in the database.")
else:
channel = Channel(url=channel_url)
session.add(channel)
session.commit()
print(f"Successfully added {len(channels)} channels to the database.")
except Exception as e:
session.rollback()
print(f"Error adding channels to the database: {e}")
finally:
session.close()
07070100293E03000081A4000003E8000003E80000000167703FED00000EFF000000000000003100000000000000000000001C00000000iptv/controllers/helpers.pyimport asyncio
import random
import time
from concurrent.futures import ThreadPoolExecutor
import aiohttp
import requests
from iptv.models.database.channel import Channel
def process_channel_entry(entry_data):
"""
Processes a single channel's data from the playlist entry and inserts it into the database.
"""
# Ensure the required fields exist in the entry_data
required_fields = {"url"}
if not required_fields.issubset(entry_data.keys()):
return "Error: Missing required fields in entry data."
# Check for duplicate channels by URL
if Channel.get_channel_by_url(entry_data["url"]):
return False
Channel.insert_channel(entry_data)
return True
def is_url_responsive(channel, timeout=5):
"""
Checks if a channel's URL is responsive within the specified timeout period.
:param channel: The channel object that contains the URL to test.
:param timeout: Timeout in seconds for the HTTP request (default is 5 seconds).
:return: True if the URL is responsive (status code 200), False otherwise.
"""
try:
# Perform a HEAD request to check the URL without downloading the content
response = requests.head(channel.url, timeout=timeout)
# Check if the response code indicates success (200)
return response.status_code == 200
except requests.RequestException as e:
# If there's any exception (timeout, connection error, etc.), the channel is considered offline
print(f"Error checking channel '{getattr(channel, 'name', 'Unknown')}': {e}")
return False
def filter_responsive_channels(channels):
"""
Filters out non-responsive channels by checking their URLs in parallel,
and updates the 'tuned' status of non-responsive channels.
:param channels: List of channels.
:return: List of responsive channels.
"""
# Limiting the max number of threads to 100
batch_size = 100
max_workers = min(batch_size, len(channels))
# Process channels in batches of 100
for i in range(0, len(channels), batch_size):
batch = channels[i:i + batch_size]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(check_channel, batch))
return [channel for channel in results if channel is not None]
def check_channel(channel):
"""
Checks if a channel is responsive and updates its 'tuned' status.
:param channel: The channel to test.
:return: The channel if responsive, None if not.
"""
if not is_url_responsive(channel):
Channel.update_channel(channel.id, {"tuned": False})
return None
else:
Channel.update_channel(channel.id, {"tuned": True})
# Introduce a small delay between requests
time.sleep(random.uniform(0.1, 0.5))
return channel
async def check_channel_async(channel, session):
"""
Asynchronously checks if a channel is responsive and updates its 'tuned' status.
:param channel: The channel to test.
:param session: The aiohttp session to make the HTTP request.
:return: None
"""
async with session.get(channel.url) as response:
if response.status != 200:
Channel.update_channel(channel.id, {"tuned": False})
else:
Channel.update_channel(channel.id, {"tuned": True})
async def filter_responsive_channels_async(channels):
"""
Filters out non-responsive channels asynchronously by checking their URLs in parallel,
and updates the 'tuned' status of non-responsive channels.
:param channels: List of channels.
:return: None
"""
async with aiohttp.ClientSession() as session:
tasks = []
for channel in channels:
tasks.append(check_channel_async(channel, session))
await asyncio.gather(*tasks)
07070100293E04000081A4000003E8000003E80000000167703FED000009B1000000000000003100000000000000000000002600000000iptv/controllers/player_controller.pyimport requests
from PyQt6.QtCore import QThread, pyqtSignal
from iptv.event_bus import event_bus
def is_valid_url(url):
""" Validate if the URL is accessible. """
try:
response = requests.head(url, timeout=5)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
class PlayerController(QThread):
playback_started = pyqtSignal()
playback_error = pyqtSignal(str)
playback_status_changed = pyqtSignal(str)
channel_url_changed = pyqtSignal(str)
def __init__(self, player, url):
super().__init__()
self.player = player
self.url = url
event_bus.channel_url_changed.connect(self.update_video_url)
event_bus.volume_changed.connect(self.update_volume)
event_bus.mute_toggled.connect(self.toggle_mute)
self.channel_url_changed.connect(self.update_video_url)
def run(self):
""" Play the video in the background. """
try:
if is_valid_url(self.url):
# Start playback in the background
self.player.play(self.url)
self.player.observe_property('playback-status', self.on_playback_status_change)
self.playback_started.emit()
else:
self.playback_error.emit("Invalid URL")
except Exception as e:
self.playback_error.emit(f"Error: {str(e)}")
def stop_video(self):
""" Stop the current video playback. """
try:
self.player.stop()
self.playback_status_changed.emit("stopped")
except Exception as e:
self.playback_error.emit(f"Error: {str(e)}")
def on_playback_status_change(self, name, value):
""" Called when playback status changes (playing, paused). """
self.playback_status_changed.emit(value)
def update_video_url(self, new_url: str):
""" Handle a change in video URL."""
if is_valid_url(new_url):
print(f"Channel valid: {new_url}")
self.url = new_url
self.player.play(new_url)
self.playback_status_changed.emit("playing")
else:
print(f"Channel URL is not valid or accessible: {new_url}")
def update_volume(self, volume: int):
""" Handle a change in volume."""
self.player.volume = volume
def toggle_mute(self, muted: bool):
""" Handle mute toggle."""
self.player.mute = muted
07070100293DFF000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000002400000000iptv/controllers/thread/__init__.py07070100293E00000081A4000003E8000003E80000000167703FED00000851000000000000003100000000000000000000002A00000000iptv/controllers/thread/channel_tuning.pyimport random
import time
from concurrent.futures import ThreadPoolExecutor
from PyQt6.QtCore import QThread, pyqtSignal
from iptv.controllers.helpers import is_url_responsive
from iptv.models.database.channel import Channel
def check_channel(channel):
"""
Checks if a channel is responsive and updates its status in the database.
"""
if not is_url_responsive(channel):
Channel.update_channel(channel.id, {"tuned": False})
else:
Channel.update_channel(channel.id, {"tuned": True})
return channel
class ChannelTuningThread(QThread):
"""
Thread class to handle the channel tuning process in background.
Emits progress updates during the process.
"""
progress_updated = pyqtSignal(int) # Signal to update progress
tuning_finished = pyqtSignal() # Signal when the tuning process is finished
def __init__(self, channels):
super().__init__()
self.channels = channels
def run(self):
"""
This method runs in a separate thread.
It processes the channels in batches and updates their 'tuned' status.
"""
total_channels = len(self.channels)
batch_size = 100 # Process channels in batches of 100
max_workers = min(batch_size, len(self.channels)) # Limiting the max workers
# Process channels in batches
for i in range(0, total_channels, batch_size):
batch = self.channels[i:i + batch_size]
# Using ThreadPoolExecutor to process the batch in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(check_channel, batch))
# Emit progress update after each batch
progress = int((i + batch_size) / total_channels * 100) # Calculate progress
self.progress_updated.emit(progress)
# Simulate some delay between batches (for demonstration)
time.sleep(random.uniform(0.1, 0.5)) # Random delay between 100ms and 500ms
# Emit finished signal after all batches are processed
self.tuning_finished.emit()
07070100293E01000081A4000003E8000003E80000000167703FED000007F5000000000000003100000000000000000000002700000000iptv/controllers/thread/file_loader.pyimport random
import time
from concurrent.futures import ThreadPoolExecutor
from PyQt6.QtCore import QThread, pyqtSignal
from ipytv import playlist
from iptv.controllers.helpers import process_channel_entry
class FileLoaderThread(QThread):
"""
Thread class to handle loading channels from a file in the background.
Emits progress updates during the process.
"""
progress_signal = pyqtSignal(int)
completed_signal = pyqtSignal()
error_signal = pyqtSignal(str)
def __init__(self, file_path):
super().__init__()
self.file_path = file_path
def run(self):
"""
Reads the file and processes the channels in batches.
"""
try:
# Load the playlist file using ipytv
pl = playlist.loadf(self.file_path)
if not pl.get_channels():
raise ValueError("The file is invalid or cannot be parsed.")
# Extract the channel data from the playlist
channels = [{"name": entry.name, "url": entry.url} for entry in pl.get_channels()]
total_channels = len(channels)
batch_size = 100
max_workers = min(batch_size, total_channels)
# Process channels in batches
for i in range(0, total_channels, batch_size):
batch = channels[i:i + batch_size]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
list(executor.map(process_channel_entry, batch))
# Emit progress update after each batch
progress = int((i + len(batch)) / total_channels * 100)
self.progress_signal.emit(progress)
# Simulate some delay between batches
time.sleep(random.uniform(0.1, 0.3))
# Emit completion signal after all batches are processed
self.completed_signal.emit()
except Exception as e:
# Emit an error signal if an exception occurs
self.error_signal.emit(str(e))
07070100293E02000081A4000003E8000003E80000000167703FED00000800000000000000003100000000000000000000002600000000iptv/controllers/thread/url_loader.pyimport random
import time
from concurrent.futures import ThreadPoolExecutor
from PyQt6.QtCore import QThread, pyqtSignal
from ipytv import playlist
from iptv.controllers.helpers import process_channel_entry
class URLLoaderThread(QThread):
"""
Thread class to handle loading channels from a URL in the background.
Emits progress updates during the process.
"""
progress_signal = pyqtSignal(int)
completed_signal = pyqtSignal()
error_signal = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
"""
Loads the playlist from the provided URL and processes the channels in batches.
"""
try:
# Load the playlist from the URL using ipytv
pl = playlist.loadu(self.url)
if not pl.get_channels():
raise ValueError("The URL is invalid or cannot be parsed.")
# Extract the channel data from the playlist
channels = [{"name": entry.name, "url": entry.url} for entry in pl.get_channels()]
total_channels = len(channels)
batch_size = 100
max_workers = min(batch_size, total_channels)
# Process channels in batches
for i in range(0, total_channels, batch_size):
batch = channels[i:i + batch_size]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
list(executor.map(process_channel_entry, batch))
# Emit progress update after each batch
progress = int((i + len(batch)) / total_channels * 100)
self.progress_signal.emit(progress)
# Simulate some delay between batches
time.sleep(random.uniform(0.1, 0.3))
# Emit completion signal after all batches are processed
self.completed_signal.emit()
except Exception as e:
# Emit an error signal if an exception occurs
self.error_signal.emit(str(e))
07070100293E07000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001500000000iptv/models/database07070100293E06000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001800000000iptv/models/__init__.py07070100293E12000081A4000003E8000003E80000000167703FED00000B8C000000000000003100000000000000000000001F00000000iptv/models/channel_manager.pyfrom .database.channel import Channel
class ChannelManager:
_instance = None
_channels = None
_current_channel = None
@classmethod
def get_instance(cls):
""" Get the singleton instance of the ChannelManager """
if cls._instance is None:
cls._instance = ChannelManager()
return cls._instance
def load_channels(self):
""" Load channels from the database into the singleton instance """
if self._channels is None:
print("Loading channels from database...")
self._channels = Channel.get_all_channels()
return self._channels
def refresh(self):
""" Force the instance to reload all data from the database """
self._channels = None
self._current_channel = None
self.load_channels()
def get_channels(self):
""" Get the loaded channels, or load them if not loaded yet """
return self._channels if self._channels else self.load_channels()
def get_current_channel(self):
""" Get the current channel object """
if self._channels and len(self._channels) > 0:
if self._current_channel is None:
self._current_channel = self._channels[0]
return self._current_channel
else:
print("No channels loaded")
return None
def set_current_channel(self, channel):
""" Set the current channel by passing the channel object """
if channel in self._channels:
self._current_channel = channel
else:
self._current_channel = None
def get_next_channel(self):
""" Get the next channel in the list, or the first channel if at the end """
if self._channels and self._current_channel:
current_channel = self._current_channel
current_position = self._channels.index(current_channel)
next_position = (current_position + 1) % len(self._channels)
self._current_channel = self._channels[next_position]
return self._current_channel
return None
def get_previous_channel(self):
""" Get the previous channel in the list, or the last channel if at the start """
if self._channels and self._current_channel:
current_channel = self._current_channel
current_position = self._channels.index(current_channel)
prev_position = (current_position - 1) % len(self._channels)
self._current_channel = self._channels[prev_position]
return self._current_channel
return None
def get_first_channel(self):
""" Get the first channel in the list and set it as the current channel """
if not self._channels:
print("No channels available")
return None
if self._current_channel is None:
self._current_channel = self._channels[0]
return self._current_channel
07070100293E08000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000002100000000iptv/models/database/__init__.py07070100293E0D000081A4000003E8000003E80000000167703FED00001CD9000000000000003100000000000000000000002000000000iptv/models/database/channel.pyimport json
from .connection import create_connection, close_connection
class Channel:
def __init__(self, id, name, url, duration, attributes=None, extras=None):
"""
Simplified constructor that only handles the necessary fields:
- id (int): Unique identifier for the channel (automatically generated by the database).
- name (str): Name of the channel.
- url (str): URL of the channel (e.g., .m3u8 format).
- duration (str): Duration of the channel (e.g., "-1").
- attributes (dict): Additional attributes in dictionary format.
- extras (list): List of additional options (e.g., specific VLC settings).
"""
self.id = id
self.name = name
self.url = url
self.duration = duration
self.attributes = attributes or {}
self.extras = extras or []
def __repr__(self):
return f"<Channel(name={self.name}, url={self.url})>"
def to_dict(self):
""" Converts the channel to a dictionary in the required JSON format. """
return {
"name": self.name,
"duration": str(self.duration),
"url": self.url,
"attributes": self.attributes,
"extras": self.extras
}
@staticmethod
def create_table():
""" Creates the channels table in the database if it doesn't exist. """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
url TEXT NOT NULL,
duration TEXT,
attributes TEXT,
extras TEXT,
tuned BOOLEAN DEFAULT 1,
favorite BOOLEAN DEFAULT 0,
last BOOLEAN DEFAULT 0
)
""")
conn.commit()
close_connection(conn)
@staticmethod
def insert_channel(channel_data):
"""Inserts a new channel into the database using dynamic columns based on the provided object (dictionary or class)."""
if isinstance(channel_data, dict):
fields = channel_data
elif hasattr(channel_data, '__dict__'):
fields = vars(channel_data)
else:
raise TypeError("Provided data must be either a dictionary or an object with attributes.")
filtered_fields = {k: v for k, v in fields.items() if v is not None}
if 'attributes' in filtered_fields and isinstance(filtered_fields['attributes'], (dict, list)):
filtered_fields['attributes'] = json.dumps(filtered_fields['attributes'])
if 'extras' in filtered_fields and isinstance(filtered_fields['extras'], (dict, list)):
filtered_fields['extras'] = json.dumps(filtered_fields['extras'])
columns = ', '.join(filtered_fields.keys())
placeholders = ', '.join(['?'] * len(filtered_fields))
values = tuple(filtered_fields.values())
sql_query = f"""
INSERT INTO channels ({columns})
VALUES ({placeholders})
"""
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute(sql_query, values)
conn.commit()
close_connection(conn)
@staticmethod
def get_all_channels():
""" Retrieves all channels from the database that are marked as 'tuned' (tuned = 1). """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM channels WHERE tuned = 1")
rows = cursor.fetchall()
conn.close()
return [
Channel(
id=row[0],
name=row[1],
url=row[2],
duration=row[3],
attributes=json.loads(row[4]) if row[4] else {}, # attributes
extras=json.loads(row[5]) if row[5] else [] # extras
)
for row in rows
]
@staticmethod
def get_all_channels_without_filters():
""" Retrieves all channels from the database. """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM channels")
rows = cursor.fetchall()
close_connection(conn)
# Convert the results to Channel objects and deserialize attributes and extras.
return [
Channel(
id=row[0], # id
name=row[1], # name
url=row[2], # url
duration=row[3], # duration
attributes=json.loads(row[4]) if row[4] else {}, # attributes
extras=json.loads(row[5]) if row[5] else [] # extras
)
for row in rows
]
@staticmethod
def get_channel_by_id(channel_id):
""" Retrieves a channel by its ID. """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM channels WHERE id=?", (channel_id,))
row = cursor.fetchone()
close_connection(conn)
if row:
return Channel(
id=row[0], # id
name=row[1], # name
url=row[2], # url
duration=row[3], # duration
attributes=json.loads(row[4]) if row[4] else {}, # attributes
extras=json.loads(row[5]) if row[5] else [] # extras
)
return None
@staticmethod
def get_channel_by_url(channel_url):
""" Retrieves a channel by its URL. """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM channels WHERE url=?", (channel_url,))
row = cursor.fetchone()
close_connection(conn)
if row:
return Channel(
id=row[0], # id
name=row[1], # name
url=row[2], # url
duration=row[3], # duration
attributes=json.loads(row[4]) if row[4] else {}, # attributes
extras=json.loads(row[5]) if row[5] else [] # extras
)
return None
@staticmethod
def update_channel(channel_id, update_data):
""" Updates a channel in the database with new values. """
if not isinstance(update_data, dict):
print("Error: The update_data parameter must be a dictionary.")
return
if not update_data:
print("No fields to update.")
return
# Create the UPDATE query
set_clause = ", ".join([f"{field} = ?" for field in update_data])
values = list(update_data.values()) + [channel_id]
conn = create_connection("iptv.db")
cursor = conn.cursor()
# Execute the UPDATE query
cursor.execute(f"UPDATE channels SET {set_clause} WHERE id = ?", tuple(values))
conn.commit()
close_connection(conn)
@staticmethod
def delete_channel(channel_id):
""" Deletes a channel from the database by its ID. """
conn = create_connection("iptv.db")
cursor = conn.cursor()
cursor.execute("DELETE FROM channels WHERE id=?", (channel_id,))
conn.commit()
close_connection(conn)
print(f"Channel with ID {channel_id} deleted successfully.")
07070100293E0E000081A4000003E8000003E80000000167703FED00000199000000000000003100000000000000000000002300000000iptv/models/database/connection.pyimport sqlite3
from sqlite3 import Error
def create_connection(db_file):
""" Create a connection to the SQLite database """
conn = None
try:
conn = sqlite3.connect(db_file)
except Error as e:
print(f"Error while connecting to the database: {e}")
return conn
def close_connection(conn):
""" Close the connection to the database """
if conn:
conn.close()
07070100293E14000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001400000000iptv/views/controls07070100293E24000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001000000000iptv/views/list07070100293E2A000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001100000000iptv/views/video07070100293E35000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001300000000iptv/views/dialogs07070100293E30000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001700000000iptv/views/__init__.py07070100293E31000081A4000003E8000003E80000000167703FED000009C5000000000000003100000000000000000000001A00000000iptv/views/main_window.pyfrom PyQt6.QtWidgets import QMainWindow, QVBoxLayout, QWidget, QHBoxLayout
from .controls.controls import Controls
from .list.playlist import Playlist
from .video.player import Player
from iptv.event_bus import event_bus
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# Set up the main window
self.setWindowTitle("Neo IPTV")
event_bus.playlist_toggle.connect(self.toggle_playlist)
event_bus.fullscreen_toggle.connect(self.toggle_fullscreen)
# Create instances
self.video_player = Player()
self.controls = Controls()
self.playlist = Playlist()
# Center the window on the screen
self.center_window()
# Setup the interface (UI)
self.setup_ui()
def center_window(self):
# Resize window
self.resize(800, 600)
# Get the screen's available geometry (size)
screen_geometry = self.screen().availableGeometry()
# Calculate the center position based on the screen size and window size
center_x = (screen_geometry.width() - self.width()) // 2
center_y = (screen_geometry.height() - self.height()) // 2
# Move the window to the center of the screen
self.move(center_x, center_y)
def setup_ui(self):
# Create a central widget
window = QWidget(self)
self.setCentralWidget(window)
# Main layout for the window
main_layout = QHBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0)
video_player_layout = QVBoxLayout()
video_player_layout.setContentsMargins(0, 0, 0, 15)
# Add the video player and controls to the video player layout
video_player_layout.addWidget(self.video_player, stretch=1)
video_player_layout.addWidget(self.controls)
# Add the video player layout and playlist to the main layout
main_layout.addLayout(video_player_layout, stretch=85)
main_layout.addWidget(self.playlist)
# Set the main layout for the window
window.setLayout(main_layout)
def toggle_fullscreen(self):
""" Toggle the fullscreen mode of the window. """
if self.isFullScreen():
self.showNormal()
else:
self.showFullScreen()
def toggle_playlist(self):
""" Emit the event to toggle the visibility of the playlist. """
self.playlist.setVisible(not self.playlist.isVisible())
self.layout().update()
07070100293E15000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000002000000000iptv/views/controls/__init__.py07070100293E16000081A4000003E8000003E80000000167703FED00000670000000000000003100000000000000000000002200000000iptv/views/controls/navigation.pyfrom PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QWidget, QHBoxLayout, QPushButton
from iptv.event_bus import event_bus
from iptv.models.channel_manager import ChannelManager
class Navigation(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
# Navigation buttons (Back and Next)
self.back_button = QPushButton()
self.next_button = QPushButton()
# Add icons to the buttons
self.back_button.setIcon(QIcon.fromTheme("media-seek-backward"))
self.next_button.setIcon(QIcon.fromTheme("media-seek-forward"))
# Add tooltips to buttons
self.back_button.setToolTip("Go to previous channel")
self.next_button.setToolTip("Go to next channel")
# Connect buttons to their respective functions
self.back_button.clicked.connect(self.go_to_previous_channel)
self.next_button.clicked.connect(self.go_to_next_channel)
layout.addWidget(self.back_button)
layout.addWidget(self.next_button)
self.setLayout(layout)
def go_to_previous_channel(self):
""" Navigate to the previous channel """
previous_channel = ChannelManager.get_instance().get_previous_channel()
if previous_channel:
event_bus.channel_url_changed.emit(previous_channel.url)
def go_to_next_channel(self):
""" Navigate to the next channel """
next_channel = ChannelManager.get_instance().get_next_channel()
if next_channel:
event_bus.channel_url_changed.emit(next_channel.url)
07070100293E17000081A4000003E8000003E80000000167703FED0000076A000000000000003100000000000000000000001E00000000iptv/views/controls/volume.pyfrom PyQt6.QtCore import Qt
from PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QWidget, QSlider, QPushButton, QHBoxLayout, QSizePolicy
from iptv.event_bus import event_bus
class Volume(QWidget):
def __init__(self):
super().__init__()
self.last_volume = None
# Main layout
layout = QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
# Mute button
self.mute_button = QPushButton()
self.mute_button.setIcon(QIcon.fromTheme("audio-volume-high"))
self.mute_button.setToolTip("Mute")
self.mute_button.clicked.connect(self.toggle_mute)
layout.addWidget(self.mute_button)
# Volume slider
self.volume_slider = QSlider(Qt.Orientation.Horizontal)
self.volume_slider.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
self.volume_slider.setRange(0, 100)
self.volume_slider.setValue(100) # Default volume is 100
self.volume_slider.valueChanged.connect(self.change_volume)
layout.addWidget(self.volume_slider)
self.setLayout(layout)
def change_volume(self, value):
""" Emit signal when volume changes. """
if value != 0:
self.last_volume = value
event_bus.emit_volume(value)
def toggle_mute(self):
""" Emit signal when mute button is clicked. """
if self.volume_slider.value() == 0:
self.volume_slider.setValue(self.last_volume if self.last_volume else 100)
event_bus.emit_mute(False)
else:
# Mute: Set volume to 0
self.volume_slider.setValue(0)
event_bus.emit_mute(True)
# Update mute button icon
muted = self.volume_slider.value() == 0
self.mute_button.setIcon(
QIcon.fromTheme("audio-volume-muted" if muted else "audio-volume-high")
)
07070100293E18000081A4000003E8000003E80000000167703FED000002FE000000000000003100000000000000000000002100000000iptv/views/controls/resources.pyfrom PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel, QListView
from PyQt6.QtCore import QStringListModel
class Resources(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
# Resources list (example with resource names)
self.resources_list = QListView(self)
self.resources_model = QStringListModel(["Image 1", "Image 2", "Video 1", "Audio 1"])
self.resources_list.setModel(self.resources_model)
layout.addWidget(self.resources_list)
# Additional information label
self.info_label = QLabel("Resources Info Here", self)
layout.addWidget(self.info_label)
self.setLayout(layout)
07070100293E21000081A4000003E8000003E80000000167703FED0000063B000000000000003100000000000000000000002000000000iptv/views/controls/controls.pyfrom PyQt6.QtWidgets import QWidget, QHBoxLayout, QVBoxLayout, QSpacerItem, QSizePolicy
from .navigation import Navigation
from .settings import Settings
from .toggle_buttones import ToggleButtons
from .volume import Volume
class Controls(QWidget):
def __init__(self):
super().__init__()
# Create the main layout
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
# Create a container for both navigation and volume control elements
self.frame = QWidget(self)
layout.addWidget(self.frame)
# Create a horizontal layout for navigation and volume
frame_layout = QHBoxLayout(self.frame)
frame_layout.setContentsMargins(15, 0, 15, 0)
# Instantiate the Navigation and Volume controls and add them to the layout
self.setting = Settings()
self.toggle_buttons = ToggleButtons()
self.navigation = Navigation()
self.volume = Volume()
# Create a spacer
spacer = QSpacerItem(0, 0, QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum)
# Add navigation and volume controls to the horizontal layout (side by side)
frame_layout.addWidget(self.navigation)
frame_layout.addWidget(self.toggle_buttons)
frame_layout.addItem(spacer)
frame_layout.addWidget(self.volume)
frame_layout.addWidget(self.setting)
# Set the layout for the parent widget (Controls)
self.setLayout(layout)
# Adjust layout to ensure everything scales correctly
self.frame.setLayout(frame_layout)
07070100293E22000081A4000003E8000003E80000000167703FED00000773000000000000003100000000000000000000002700000000iptv/views/controls/toggle_buttones.pyfrom PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QWidget, QHBoxLayout, QPushButton
from iptv.event_bus import event_bus
class ToggleButtons(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QHBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
# Buttons for controlling the video player (Show/Hide Playlist and Toggle Fullscreen)
self.toggle_playlist_button = QPushButton()
self.fullscreen_button = QPushButton()
# Add icons to the buttons
self.toggle_playlist_button.setIcon(QIcon.fromTheme("folder-open"))
self.fullscreen_button.setIcon(QIcon.fromTheme("view-fullscreen"))
# Add tooltips to buttons
self.toggle_playlist_button.setToolTip("Show/Hide Playlist")
self.fullscreen_button.setToolTip("Toggle Fullscreen")
# Connect buttons to respective functions
self.toggle_playlist_button.clicked.connect(self.toggle_playlist)
self.fullscreen_button.clicked.connect(self.toggle_fullscreen)
layout.addWidget(self.toggle_playlist_button)
layout.addWidget(self.fullscreen_button)
self.setLayout(layout)
def toggle_playlist(self):
""" Emit the event to toggle the visibility of the playlist. """
event_bus.emit_playlist_toggle()
# Toggle the icon of the playlist button
current_icon = "folder-open" if self.toggle_playlist_button.icon().name() == "folder-new" else "folder-new"
self.toggle_playlist_button.setIcon(QIcon.fromTheme(current_icon))
def toggle_fullscreen(self):
""" Toggle the fullscreen mode of the player. """
event_bus.emit_fullscreen_toggle()
current_icon = "view-restore" if self.fullscreen_button.icon().name() == "view-fullscreen" else "view-fullscreen"
self.fullscreen_button.setIcon(QIcon.fromTheme(current_icon))
07070100293E23000081A4000003E8000003E80000000167703FED000003E8000000000000003100000000000000000000002000000000iptv/views/controls/settings.pyfrom PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QPushButton
from iptv.views.dialogs.setting import SettingsDialog
def show_settings_dialog(self):
""" Opens the 'Settings' dialog when the button is clicked """
settings_dialog = SettingsDialog()
settings_dialog.exec()
class Settings(QWidget):
""" QWidget with a button that opens the Settings dialog """
def __init__(self):
super().__init__()
# Main layout for the QWidget
layout = QVBoxLayout()
# Create a button with an icon that opens the Settings dialog
self.gear_button = QPushButton()
self.gear_button.setIcon(QIcon.fromTheme("preferences-system"))
self.gear_button.setToolTip("Click to open settings menu")
self.gear_button.clicked.connect(show_settings_dialog)
# Add the button to the layout
layout.addWidget(self.gear_button)
self.setLayout(layout)
self.setWindowTitle("Main Widget")
07070100293E25000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001C00000000iptv/views/list/__init__.py07070100293E29000081A4000003E8000003E80000000167703FED00000B1F000000000000003100000000000000000000001C00000000iptv/views/list/playlist.pyfrom PyQt6.QtCore import Qt
from PyQt6.QtGui import QStandardItem, QStandardItemModel
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QListView, QAbstractItemView, QLabel
from iptv.event_bus import event_bus
from iptv.models.channel_manager import ChannelManager
class Playlist(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.setStyleSheet("border: none;")
# Label to show the count of channels
self.channel_count_label = QLabel("Channels count: 0", self)
layout.addWidget(self.channel_count_label)
# Create the list view and model
self.playlist = QListView(self)
self.playlist.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
# Create the item model (QStandardItemModel)
self.playlist_model = QStandardItemModel()
self.playlist.setModel(self.playlist_model)
# Initialize the playlist with current channels
self.load_channels()
# Connect the selection signal of the list to a slot
self.playlist.selectionModel().selectionChanged.connect(self.on_channel_selected)
# Connect to the EventBus signal for channel updates
event_bus.channels_updated.connect(self.on_channels_updated)
# Add the list view to the layout
layout.addWidget(self.playlist)
self.setLayout(layout)
def load_channels(self):
""" Load channels from the ChannelManager and update the playlist """
# Clear the model
self.playlist_model.clear()
# Get channels from the ChannelManager singleton
channels = ChannelManager.get_instance().get_channels()
# Update the label with the count of channels
self.update_channel_count_label(len(channels))
# Add the channels to the model
for channel in channels:
item = QStandardItem(channel.name) # Use the channel's name
item.setData(channel.url, Qt.ItemDataRole.UserRole) # Store the URL as additional data
self.playlist_model.appendRow(item)
def update_channel_count_label(self, count):
""" Update the label to display the count of channels """
self.channel_count_label.setText(f"Channels count: {count}")
def on_channel_selected(self):
""" Handle the selection of a channel from the playlist and emit its URL """
selected_index = self.playlist.selectedIndexes()
if selected_index:
selected_item = selected_index[0]
url = selected_item.data(Qt.ItemDataRole.UserRole)
event_bus.channel_url_changed.emit(url)
def on_channels_updated(self):
""" Handle the channels_updated signal from the EventBus """
self.load_channels()
07070100293E2B000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001D00000000iptv/views/video/__init__.py07070100293E2C000081A4000003E8000003E80000000167703FED00000AE0000000000000003100000000000000000000001B00000000iptv/views/video/player.pyfrom PyQt6.QtCore import Qt, QTimer
from PyQt6.QtWidgets import QWidget, QLabel, QVBoxLayout, QMessageBox
from iptv.controllers.player_controller import PlayerController
from mpv import MPV
from iptv.models.channel_manager import ChannelManager
class Player(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
# Create a QLabel for displaying video (MPV will render into this widget)
self.loading_label = QLabel("Channel content will be displayed here", self)
self.loading_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.loading_label)
# Initialize the MPV player
self.player = MPV(
wid=str(int(self.winId())),
log_handler=self.log_output,
)
# Set the layout for the widget
self.setLayout(layout)
channel = ChannelManager.get_instance().get_current_channel()
# Start the video with the given URL
if channel is None:
print("No channels available or the channels are not loaded correctly.")
else:
self.start_video(channel.url)
def start_video(self, url):
""" Start the video in a separate thread using the controller. """
self.controller = PlayerController(self.player, url)
# Connect signals to corresponding methods
self.controller.playback_started.connect(self.hide_loading_label)
self.controller.playback_error.connect(self.show_error_message)
self.controller.playback_status_changed.connect(self.on_playback_status_change)
self.controller.start() # Start the background thread for playback
def on_playback_status_change(self, status):
""" Hide or show the loading label based on playback status. """
if status == "playing":
QTimer.singleShot(0, self.hide_loading_label)
elif status == "paused" or status == "stopped":
QTimer.singleShot(0, self.show_loading_label)
def hide_loading_label(self):
""" Hide the loading label. """
self.loading_label.setVisible(False)
def show_loading_label(self):
""" Show the loading label. """
self.loading_label.setVisible(True)
def log_output(self, level, prefix, message):
""" Handle logs from MPV (optional for debugging). """
print(f"[{level}] {message}")
def show_error_message(self, message):
""" Show an error message box in case of issues. """
error_dialog = QMessageBox(self)
error_dialog.setWindowTitle("Error")
error_dialog.setText(message)
error_dialog.setIcon(QMessageBox.Icon.Critical)
error_dialog.exec()
07070100293E39000041ED000003E8000003E8000000016770400500000000000000000000003100000000000000000000001800000000iptv/views/dialogs/tabs07070100293E47000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000001F00000000iptv/views/dialogs/__init__.py07070100293E48000081A4000003E8000003E80000000167703FED000014FB000000000000003100000000000000000000002400000000iptv/views/dialogs/load_channels.pyimport sys
from PyQt6.QtCore import QObject, QThread, pyqtSignal
from PyQt6.QtGui import QTextCursor
from PyQt6.QtWidgets import (
QVBoxLayout,
QPushButton,
QLineEdit,
QDialog,
QLabel,
QHBoxLayout,
QTextEdit,
QFileDialog
)
class StreamToTextEdit(QObject):
""" Redirects print() output to a QTextEdit widget """
def __init__(self, text_edit):
super().__init__()
self.text_edit = text_edit
def write(self, text):
""" Writes the text to the QTextEdit if it's still valid """
if self.text_edit and not self.text_edit.isDeleted():
cursor = self.text_edit.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End) # Move cursor to the end
cursor.insertText(text) # Insert the text at the cursor position
self.text_edit.setTextCursor(cursor) # Update the text cursor
self.text_edit.ensureCursorVisible() # Ensure the cursor is visible
def flush(self):
""" Required method for print() compatibility """
pass
class DownloadM3UThread(QThread):
""" Thread for downloading and processing the M3U channels """
new_message = pyqtSignal(str) # Signal to send messages to QTextEdit
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
""" This runs when the thread starts """
try:
self.new_message.emit(f"Starting download from URL: {self.url}\n")
# Simulate download and channel processing
# The real download would happen here using `DownloadM3U`
self.new_message.emit(f"Processing M3U from URL: {self.url}\n")
# Simulate valid and invalid channels for demonstration
valid_channels = ["Channel 1", "Channel 2"]
invalid_channels = ["Channel X"]
self.new_message.emit(f"Valid channels: {len(valid_channels)}\n")
self.new_message.emit(f"Invalid channels: {len(invalid_channels)}\n")
# Emit results (simulated)
self.new_message.emit("Download finished.\n")
except Exception as e:
self.new_message.emit(f"Error: {str(e)}\n")
class LoadChannelsDialog(QDialog):
""" Dialog for loading IPTV channels """
def __init__(self):
super().__init__()
self.setWindowTitle("Load Channels")
self.setFixedSize(400, 350)
# Layout for the dialog
layout = QVBoxLayout()
# Add a label
label = QLabel("Select an M3U file or URL to load channels.", self)
layout.addWidget(label)
# Add buttons
self.file_button = QPushButton("Select M3U File", self)
self.url_button = QPushButton("Enter M3U URL", self)
# Connect buttons to corresponding actions
self.file_button.clicked.connect(self.load_m3u_file)
self.url_button.clicked.connect(self.show_url_input)
# Add buttons to layout
layout.addWidget(self.file_button)
layout.addWidget(self.url_button)
# QTextEdit to show terminal output
self.text_area = QTextEdit(self)
self.text_area.setReadOnly(True) # Make it read-only so users can't edit it
layout.addWidget(self.text_area)
self.setLayout(layout)
# Redirect print() to QTextEdit
sys.stdout = StreamToTextEdit(self.text_area)
def show_url_input(self):
""" Show the input field and 'Load' button for the M3U URL """
# Hide the buttons
self.file_button.setVisible(False)
self.url_button.setVisible(False)
# Create the URL input and load button
self.url_input = QLineEdit(self)
self.url_input.setPlaceholderText("Enter M3U URL")
self.url_input.setClearButtonEnabled(True)
self.load_button = QPushButton("Load", self)
self.load_button.clicked.connect(self.load_url)
# Layout for the new components
url_layout = QHBoxLayout()
url_layout.addWidget(self.url_input)
url_layout.addWidget(self.load_button)
# Add the new layout to the dialog layout
layout = self.layout()
layout.addLayout(url_layout)
def load_url(self):
""" Handle the URL loading action """
url = self.url_input.text().strip()
if url:
self.on_url_entered()
self.accept()
def on_url_entered(self):
""" Processes the entered URL and loads the M3U data in a separate thread. """
url = self.url_input.text().strip()
if url:
print(f"Starting download from URL: {url}")
self.download_thread = DownloadM3UThread(url)
self.download_thread.new_message.connect(self.append_message)
self.download_thread.start()
self.url_input.setVisible(False)
def append_message(self, message):
""" This function is called each time the thread sends a message """
print(message)
def load_m3u_file(self):
""" Opens a dialog to load an M3U file and process it. """
file_path, _ = QFileDialog.getOpenFileName(
self,
"Select M3U File",
"",
"M3U Files (*.m3u);;All Files (*)",
options=QFileDialog.Option.ReadOnly
)
if file_path:
self.process_m3u_file(file_path)
07070100293E49000081A4000003E8000003E80000000167703FED000005F4000000000000003100000000000000000000001E00000000iptv/views/dialogs/setting.pyfrom PyQt6.QtWidgets import QDialog, QVBoxLayout, QTabWidget, QWidget, QPushButton, QLabel
from iptv.views.dialogs.tabs.advance import AdvanceTab
from iptv.views.dialogs.tabs.channels import ChannelTab
from iptv.views.dialogs.tabs.image import ImageTab
from iptv.views.dialogs.tabs.network import NetworkTab
from iptv.views.dialogs.tabs.sound import SoundTab
class SettingsDialog(QDialog):
""" Dialog for the application settings with tabs """
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Settings")
self.setFixedSize(700, 400)
# Create the main layout for the dialog
main_layout = QVBoxLayout()
# Create the QTabWidget to hold different settings sections
self.tab_widget = QTabWidget(self)
# Create tabs
self.advance_tab = AdvanceTab()
self.channel_tab = ChannelTab()
self.image_tab = ImageTab()
self.network_tab = NetworkTab()
self.sound_tab = SoundTab()
# Add tabs to the QTabWidget
# self.tab_widget.addTab(self.advance_tab, "Settings")
# self.tab_widget.addTab(self.image_tab, "Image")
# self.tab_widget.addTab(self.sound_tab, "Sound")
# self.tab_widget.addTab(self.network_tab, "Network")
self.tab_widget.addTab(self.channel_tab, "Channels")
# Add the QTabWidget to the main layout
main_layout.addWidget(self.tab_widget)
# Set the layout for the dialog
self.setLayout(main_layout)
07070100293E41000081A4000003E8000003E80000000167703FED00000000000000000000003100000000000000000000002400000000iptv/views/dialogs/tabs/__init__.py07070100293E42000081A4000003E8000003E80000000167703FED000001B2000000000000003100000000000000000000002300000000iptv/views/dialogs/tabs/advance.pyfrom PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel
class AdvanceTab(QWidget):
def __init__(self):
super().__init__()
# Create a QVBoxLayout
layout = QVBoxLayout()
# Create a QLabel
label = QLabel("Hello, this is a advance label!", self)
# Add the label to the layout
layout.addWidget(label)
# Set the layout for the widget
self.setLayout(layout)
07070100293E43000081A4000003E8000003E80000000167703FED000001AE000000000000003100000000000000000000002100000000iptv/views/dialogs/tabs/image.pyfrom PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel
class ImageTab(QWidget):
def __init__(self):
super().__init__()
# Create a QVBoxLayout
layout = QVBoxLayout()
# Create a QLabel
label = QLabel("Hello, this is a image label!", self)
# Add the label to the layout
layout.addWidget(label)
# Set the layout for the widget
self.setLayout(layout)
07070100293E44000081A4000003E8000003E80000000167703FED000001B2000000000000003100000000000000000000002300000000iptv/views/dialogs/tabs/network.pyfrom PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel
class NetworkTab(QWidget):
def __init__(self):
super().__init__()
# Create a QVBoxLayout
layout = QVBoxLayout()
# Create a QLabel
label = QLabel("Hello, this is a network label!", self)
# Add the label to the layout
layout.addWidget(label)
# Set the layout for the widget
self.setLayout(layout)
07070100293E45000081A4000003E8000003E80000000167703FED000001AE000000000000003100000000000000000000002100000000iptv/views/dialogs/tabs/sound.pyfrom PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel
class SoundTab(QWidget):
def __init__(self):
super().__init__()
# Create a QVBoxLayout
layout = QVBoxLayout()
# Create a QLabel
label = QLabel("Hello, this is a sound label!", self)
# Add the label to the layout
layout.addWidget(label)
# Set the layout for the widget
self.setLayout(layout)
07070100293E46000081A4000003E8000003E80000000167703FED00002FD3000000000000003100000000000000000000002400000000iptv/views/dialogs/tabs/channels.pyfrom PyQt6.QtCore import Qt
from PyQt6.QtWidgets import (
QWidget,
QVBoxLayout,
QLabel,
QPushButton,
QSpacerItem,
QSizePolicy,
QProgressBar,
QFileDialog,
QMessageBox,
QLineEdit,
QHBoxLayout
)
from iptv.controllers.thread.channel_tuning import ChannelTuningThread
from iptv.controllers.thread.file_loader import FileLoaderThread
from iptv.controllers.thread.url_loader import URLLoaderThread
from iptv.event_bus import event_bus
from iptv.models.database.channel import Channel
class ChannelTab(QWidget):
def __init__(self):
super().__init__()
# Main layout
layout = QVBoxLayout()
layout.setContentsMargins(10, 10, 10, 10)
# Label at the top
self.label = QLabel("Manage your IPTV Channels", self)
self.label.setAlignment(Qt.AlignmentFlag.AlignTop)
layout.addWidget(self.label)
# Layout for buttons (vertical alignment)
button_layout = QVBoxLayout()
# Buttons to load channels from file and URL
self.load_file_button = QPushButton("Load Channels from File", self)
self.load_url_button = QPushButton("Load Channels from URL", self)
self.tune_button = QPushButton("Tune Channels", self)
# Connect buttons to their respective methods
self.load_file_button.clicked.connect(self.load_channels_from_file)
self.load_url_button.clicked.connect(self.toggle_url_input)
self.tune_button.clicked.connect(self.start_tuning)
# Add buttons to the layout
button_layout.addWidget(self.load_file_button)
button_layout.addWidget(self.load_url_button)
button_layout.addWidget(self.tune_button)
# Add the button layout to the main layout
layout.addLayout(button_layout)
# URL input section (hidden by default)
self.url_input_layout = QHBoxLayout()
self.url_input = QLineEdit(self)
self.url_input.setPlaceholderText("Enter URL here...")
self.url_input.setVisible(False)
self.load_url_confirm_button = QPushButton("Load", self)
self.load_url_confirm_button.setVisible(False)
self.load_url_confirm_button.clicked.connect(self.load_channels_from_url)
self.url_input_layout.addWidget(self.url_input)
self.url_input_layout.addWidget(self.load_url_confirm_button)
layout.addLayout(self.url_input_layout)
# Waiting label (initially hidden)
self.wait_label = QLabel("", self)
self.wait_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.wait_label.setVisible(False)
layout.addWidget(self.wait_label)
# Progress bar (initially hidden)
self.progress_bar = QProgressBar(self)
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
self.progress_bar.setVisible(False)
layout.addWidget(self.progress_bar)
# Message label (initially hidden)
self.message_label = QLabel("", self)
self.message_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.message_label.setVisible(False)
layout.addWidget(self.message_label)
# Add a spacer
spacer = QSpacerItem(20, 40, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Expanding)
layout.addItem(spacer)
# Set the layout for the widget
self.setLayout(layout)
self.file_loader_thread = None
def toggle_url_input(self):
"""
Toggles the visibility of the URL input and confirm button when 'Load Channels from URL' is clicked.
"""
is_visible = not self.url_input.isVisible()
self.url_input.setVisible(is_visible)
self.load_url_confirm_button.setVisible(is_visible)
def start_tuning(self):
"""
This function is called when the 'Tune Channels' button is pressed.
It triggers the tuning process in a separate thread to avoid UI blocking.
"""
confirm = QMessageBox.question(
self,
"Confirm Load",
f"Do you want to tune the channels? This will take some time.",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if confirm != QMessageBox.StandardButton.Yes:
return
# Show waiting message and progress bar
self.wait_label.setText("Please wait while tuning the channels...")
self.wait_label.setVisible(True)
# Hide the buttons while tuning
self.load_file_button.setEnabled(False)
self.load_url_button.setEnabled(False)
self.tune_button.setEnabled(False)
# Show progress bar
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# Show waiting message
self.message_label.setText("""
The channels are being tuned, please wait. <br><br>
This process will only hide the channels that are currently not responsive until they are tuned again. <br>
It will not hide channels that are inaccessible due to other reasons. <br>
Please note that some channels may not be active 24/7 and could disappear during a tuning. <br><br>
To ensure the process completes successfully, avoid closing the settings window, as it may cancel the tuning.
""")
self.message_label.setVisible(True)
# Create and start the tuning thread
channels = Channel.get_all_channels_without_filters()
self.tuning_thread = ChannelTuningThread(channels)
# Connect signals for progress updates and when finished
self.tuning_thread.progress_updated.connect(self.update_progress)
self.tuning_thread.finished.connect(self.on_tuning_finished)
# Start the tuning thread
self.tuning_thread.start()
def update_progress(self, progress):
"""
This function updates the progress bar during the tuning process.
:param progress: The current progress (between 0 and 100).
"""
self.progress_bar.setValue(progress)
def on_tuning_finished(self):
"""
This function is called when the tuning process has finished.
It updates the UI to reflect the completion of the tuning process.
"""
self.wait_label.setText("Channels tuned successfully!")
self.wait_label.setVisible(True)
self.message_label.setText("")
self.message_label.setVisible(False)
# Hide the progress bar
self.progress_bar.setVisible(False)
# Re-enable buttons after the tuning process is finished
self.load_file_button.setEnabled(True)
self.load_url_button.setEnabled(True)
self.tune_button.setEnabled(True)
# Emit the channels_updated signal
event_bus.emit_channels_updated()
def load_channels_from_file(self):
"""
Opens a file dialog to select an M3U or M3U8 file, and starts the loading process.
"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"Select M3U/M3U8 File",
"",
"Playlist Files (*.m3u *.m3u8);;All Files (*)"
)
if not file_path:
return
confirm = QMessageBox.question(
self,
"Confirm Load",
f"Do you want to load channels from '{file_path}'?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if confirm != QMessageBox.StandardButton.Yes:
return
self.file_loader_thread = FileLoaderThread(file_path)
self.file_loader_thread.progress_signal.connect(self.update_progress)
self.file_loader_thread.completed_signal.connect(self.on_file_loading_complete)
self.file_loader_thread.error_signal.connect(self.on_file_loading_error)
self.wait_label.setText("Loading channels from file...")
self.wait_label.setVisible(True)
self.load_file_button.setEnabled(False)
self.load_url_button.setEnabled(False)
self.tune_button.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# Show waiting message
self.message_label.setText("Channels will be loaded from the archive, please wait.")
self.message_label.setVisible(True)
self.file_loader_thread.start()
def on_file_loading_complete(self):
"""
Handles actions after the file has been successfully loaded.
"""
QMessageBox.information(self, "Success", "Channels loaded successfully!")
self.wait_label.setText("Channels loaded successfully!")
self.wait_label.setVisible(True)
self.message_label.setText("")
self.message_label.setVisible(False)
self.load_file_button.setEnabled(True)
self.load_url_button.setEnabled(True)
self.tune_button.setEnabled(True)
self.progress_bar.setVisible(False)
# Emit the channels_updated signal
event_bus.emit_channels_updated()
def on_file_loading_error(self, error_message):
"""
Handles errors that occur during the file loading process.
"""
QMessageBox.critical(self, "Error", f"An error occurred: {error_message}")
self.load_file_button.setEnabled(True)
self.load_url_button.setEnabled(True)
self.tune_button.setEnabled(True)
self.progress_bar.setVisible(False)
def load_channels_from_url(self):
"""
Handles the loading of channels from the entered URL.
"""
url = self.url_input.text().strip()
if not url:
QMessageBox.warning(self, "Input Error", "Please enter a valid URL.")
return
confirm = QMessageBox.question(
self,
"Confirm Load",
f"Do you want to load channels from the URL: '{url}'?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if confirm != QMessageBox.StandardButton.Yes:
return
self.url_input.setEnabled(False)
self.load_url_confirm_button.setEnabled(False)
self.url_loader_thread = URLLoaderThread(url)
self.url_loader_thread.progress_signal.connect(self.update_progress)
self.url_loader_thread.completed_signal.connect(self.on_url_loading_complete)
self.url_loader_thread.error_signal.connect(self.on_url_loading_error)
self.wait_label.setText("Loading channels from URL...")
self.wait_label.setVisible(True)
self.load_file_button.setEnabled(False)
self.load_url_button.setEnabled(False)
self.tune_button.setEnabled(False)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# Show waiting message
self.message_label.setText("Channels will be loaded from the URL, please wait.")
self.message_label.setVisible(True)
self.url_loader_thread.start()
def on_url_loading_complete(self):
"""
Handles actions after the channels have been successfully loaded from the URL.
"""
QMessageBox.information(self, "Success", "Channels loaded successfully!")
self.url_input.setEnabled(True)
self.load_url_confirm_button.setEnabled(True)
self.wait_label.setText("Channels loaded successfully!")
self.wait_label.setVisible(True)
self.message_label.setText("")
self.message_label.setVisible(False)
self.load_file_button.setEnabled(True)
self.load_url_button.setEnabled(True)
self.tune_button.setEnabled(True)
self.progress_bar.setVisible(False)
event_bus.emit_channels_updated()
def on_url_loading_error(self, error_message):
"""
Handles errors that occur during the URL loading process.
"""
QMessageBox.critical(self, "Error", f"An error occurred: {error_message}")
self.url_input.setEnabled(True)
self.load_url_confirm_button.setEnabled(True)
self.load_file_button.setEnabled(True)
self.load_url_button.setEnabled(True)
self.tune_button.setEnabled(True)
self.progress_bar.setVisible(False)
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!