Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
system:homeautomation:home-assistant:unstable
python-PyChromecast
_service:obs_scm:pychromecast-14.0.1.obscpio
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:pychromecast-14.0.1.obscpio of Package python-PyChromecast
07070100000000000041ED00000000000000000000000365F9EF9000000000000000000000000000000000000000000000001C00000000pychromecast-14.0.1/.github07070100000001000081A400000000000000000000000165F9EF9000000108000000000000000000000000000000000000002B00000000pychromecast-14.0.1/.github/dependabot.ymlversion: 2 updates: - package-ecosystem: "github-actions" directory: "/" schedule: interval: daily open-pull-requests-limit: 10 - package-ecosystem: pip directory: "/" schedule: interval: weekly open-pull-requests-limit: 10 07070100000002000081A400000000000000000000000165F9EF9000000092000000000000000000000000000000000000003000000000pychromecast-14.0.1/.github/release-drafter.ymlcategories: - title: "⬆️ Dependencies" collapse-after: 1 labels: - "dependencies" template: | ## What's Changed $CHANGES 07070100000003000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000002600000000pychromecast-14.0.1/.github/workflows07070100000004000081A400000000000000000000000165F9EF900000033D000000000000000000000000000000000000003800000000pychromecast-14.0.1/.github/workflows/pythonpublish.yml# This workflows will upload a Python Package using Twine when a release is created # For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries name: Upload Python Package on: release: types: [published] jobs: deploy: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4.1.2 - name: Set up Python uses: actions/setup-python@v5.0.0 with: python-version: '3.11' - name: Install dependencies run: | python -m pip install --upgrade pip pip install build wheel twine - name: Build and publish env: TWINE_USERNAME: __token__ TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} run: | python -m build twine upload dist/* 07070100000005000081A400000000000000000000000165F9EF900000014C000000000000000000000000000000000000003A00000000pychromecast-14.0.1/.github/workflows/release-drafter.ymlname: Release Drafter on: push: branches: - master jobs: update_release_draft: runs-on: ubuntu-latest steps: # Drafts your next Release notes as Pull Requests are merged into "master" - uses: release-drafter/release-drafter@v6.0.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} 07070100000006000081A400000000000000000000000165F9EF9000000496000000000000000000000000000000000000002F00000000pychromecast-14.0.1/.github/workflows/test.yml# This workflow will install Python dependencies, run tests and lint with a single version of Python # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions name: Run Tests on: push: branches: [master] pull_request: branches: [master] jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4.1.2 - name: Set up Python 3.11 uses: actions/setup-python@v5.0.0 with: python-version: 3.11 - name: Install dependencies run: | pip install -r requirements.txt pip install -r requirements-test.txt - name: Check formatting with black run: | black examples pychromecast --check --diff - name: Lint with flake8 run: | flake8 --exclude cast_channel_pb2.py,authority_keys_pb2.py,logging_pb2.py examples pychromecast - name: Lint with mypy run: | mypy examples pychromecast - name: Lint with pylint run: | pylint examples pychromecast - name: Lint with rstcheck run: | rstcheck README.rst 07070100000007000081A400000000000000000000000165F9EF9000000242000000000000000000000000000000000000001F00000000pychromecast-14.0.1/.gitignore# Hide sublime text stuff *.sublime-project *.sublime-workspace # Hide some OS X stuff .DS_Store .AppleDouble .LSOverride Icon # Thumbnails ._* # GITHUB Proposed Python stuff: *.py[cod] # C extensions *.so # Packages *.egg *.egg-info dist build eggs parts bin var sdist develop-eggs .installed.cfg lib lib64 # Build files README.rst # Installer logs pip-log.txt # Unit test / coverage reports .coverage .tox nosetests.xml # Translations *.mo # Mr Developer .mr.developer.cfg .project .pydevproject # pyenv .python-version # Python venv .venv venv # VS Code .vscode 07070100000008000081A400000000000000000000000165F9EF900000043B000000000000000000000000000000000000001C00000000pychromecast-14.0.1/LICENSEThe MIT License (MIT) Copyright (c) 2013 Paulus Schoutsen Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 07070100000009000081A400000000000000000000000165F9EF900000006C000000000000000000000000000000000000002000000000pychromecast-14.0.1/MANIFEST.ininclude README.rst include LICENSE include requirements.txt graft pychromecast recursive-exclude * *.py[co] 0707010000000A000081A400000000000000000000000165F9EF9000002273000000000000000000000000000000000000001F00000000pychromecast-14.0.1/README.rstpychromecast |Build Status| =========================== .. |Build Status| image:: https://travis-ci.org/balloob/pychromecast.svg?branch=master :target: https://travis-ci.org/balloob/pychromecast Library for Python 3.11+ to communicate with the Google Chromecast. It currently supports: - Auto discovering connected Chromecasts on the network - Start the default media receiver and play any online media - Control playback of current playing media - Implement Google Chromecast api v2 - Communicate with apps via channels - Easily extendable to add support for unsupported namespaces - Multi-room setups with Audio cast devices *Check out* `Home Assistant <https://home-assistant.io>`_ *for a ready-made solution using PyChromecast for controlling and automating your Chromecast or Cast-enabled device like Google Home.* Dependencies ------------ PyChromecast depends on the Python packages requests, protobuf and zeroconf. Make sure you have these dependencies installed using ``pip install -r requirements.txt`` How to use ---------- .. code:: python >> import time >> import pychromecast >> import zeroconf >> # Create a browser which prints the friendly name of found chromecast devices >> zconf = zeroconf.Zeroconf() >> browser = pychromecast.CastBrowser(pychromecast.SimpleCastListener(lambda uuid, service: print(browser.devices[uuid].friendly_name)), zconf) >> browser.start_discovery() >> # Shut down discovery >> pychromecast.discovery.stop_discovery(browser) >> # Discover and connect to chromecasts named Living Room >> chromecasts, browser = pychromecast.get_listed_chromecasts(friendly_names=["Living Room"]) >> [cc.cast_info.friendly_name for cc in chromecasts] ['Living Room'] >> # Discover and connect to more than one device >> chromecasts, browser = pychromecast.get_listed_chromecasts(friendly_names=["Living Room","Bed Room","Kitchen"]) >> [cc.device.friendly_name for cc in chromecasts] ["Living Room","Bed Room","Kitchen"] >> # If you are seeing less devices get discovered than expected add the below parameter. You can lessen or extend the timeout as needed. >> chromecasts, browser = pychromecast.get_listed_chromecasts(friendly_names=["Living Room","Bed Room","Kitchen"],discovery_timeout=30) >> [cc.device.friendly_name for cc in chromecasts] ["Living Room","Bed Room","Kitchen"] >> cast = chromecasts[0] >> # Start worker thread and wait for cast device to be ready >> cast.wait() >> print(cast.cast_info) CastInfo(services={ServiceInfo(type='mdns', data='Chromecast-Audio-42feced1d94238232fba92623e2682f3._googlecast._tcp.local.')}, uuid=UUID('42feced1-d942-3823-2fba-92623e2682f3'), model_name='Chromecast Audio', friendly_name='Living room', host='192.168.0.189', port=8009, cast_type='audio', manufacturer='Google Inc.') >> print(cast.status) CastStatus(is_active_input=True, is_stand_by=False, volume_level=1.0, volume_muted=False, app_id='CC1AD845', display_name='Default Media Receiver', namespaces=['urn:x-cast:com.google.cast.player.message', 'urn:x-cast:com.google.cast.media'], session_id='CCA39713-9A4F-34A6-A8BF-5D97BE7ECA5C', transport_id='web-9', status_text='') >> mc = cast.media_controller >> mc.play_media('http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4', 'video/mp4') >> mc.block_until_active() >> print(mc.status) MediaStatus(current_time=42.458322, content_id='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4', content_type='video/mp4', duration=596.474195, stream_type='BUFFERED', idle_reason=None, media_session_id=1, playback_rate=1, player_state='PLAYING', supported_media_commands=15, volume_level=1, volume_muted=False) >> mc.pause() >> time.sleep(5) >> mc.play() >> # Shut down discovery >> pychromecast.discovery.stop_discovery(browser) Adding support for extra namespaces ----------------------------------- Each app that runs on the Chromecast supports namespaces. They specify a JSON-based mini-protocol. This is used to communicate between the Chromecast and your phone/browser and now Python. Support for extra namespaces is added by using controllers. To add your own namespace to a current chromecast instance you will first have to define your controller. Example of a minimal controller: .. code:: python from pychromecast.controllers import BaseController class MyController(BaseController): def __init__(self): super(MyController, self).__init__( "urn:x-cast:my.super.awesome.namespace") def receive_message(self, message, data): print("Wow, I received this message: {}".format(data)) return True # indicate you handled this message def request_beer(self): self.send_message({'request': 'beer'}) After you have defined your controller you will have to add an instance to a Chromecast object: `cast.register_handler(MyController())`. When a message is received with your namespace it will be routed to your controller. For more options see the `BaseController`_. For an example of a fully implemented controller see the `MediaController`_. .. _BaseController: https://github.com/balloob/pychromecast/blob/master/pychromecast/controllers/__init__.py .. _MediaController: https://github.com/balloob/pychromecast/blob/master/pychromecast/controllers/media.py Exploring existing namespaces ------------------------------- So you've got PyChromecast running and decided it is time to add support to your favorite app. No worries, the following instructions will have you covered in exploring the possibilities. The following instructions require the use of the `Google Chrome browser`_ and the `Google Cast plugin`_. * In Chrome, go to `chrome://net-export/` * Select 'Include raw bytes (will include cookies and credentials)' * Click 'Start Logging to Disk' * Open a new tab, browse to your favorite application on the web that has Chromecast support and start casting. * Go back to the tab that is capturing events and click on stop. * Open https://netlog-viewer.appspot.com/ and select your event log file. * Browse to https://netlog-viewer.appspot.com/#events&q=type:SOCKET, and find the socket that has familiar JSON data. (For me, it's usually the second or third from the top.) * Go through the results and collect the JSON that is exchanged. * Now write a controller that is able to mimic this behavior :-) .. _Google Chrome Browser: https://www.google.com/chrome/ .. _Google Cast Plugin: https://chrome.google.com/webstore/detail/google-cast/boadgeojelhgndaghljhdicfkmllpafd Ignoring CEC Data ----------------- The Chromecast typically reports whether it is the active input on the device to which it is connected. This value is stored inside a cast object in the following property. .. code:: python cast.status.is_active_input Some Chromecast users have reported CEC incompatibilities with their media center devices. These incompatibilities may sometimes cause this active input value to be reported improperly. This active input value is typically used to determine if the Chromecast is idle. PyChromecast is capable of ignoring the active input value when determining if the Chromecast is idle in the instance that the Chromecast is returning erroneous values. To ignore this CEC detection data in PyChromecast, append a `Linux style wildcard`_ formatted string to the IGNORE\_CEC list in PyChromecast like in the example below. .. code:: python pychromecast.IGNORE_CEC.append('*') # Ignore CEC on all devices pychromecast.IGNORE_CEC.append('Living Room') # Ignore CEC on Chromecasts named Living Room Networking requirements ----------------------- Pychromecast relies on mDNS to discover cast devices. The mDNS protocol relies on multicast UDP on port 5353 which comes with several implications for discovery to work: - Multicast UDP must be forwarded by WiFI routers; some WiFi routers are known to drop multicast UDP traffic. - The device running pychromecast must allow both inbound and outbound traffic on port 5353. - The device running pychromecast must be on the same subnet as the cast devices because mDNS packets are not routed across subnets. If not all of these conditions are met, discovery will not work. In cases where these conditions are impossible to meet, it's possible to pass a list of known IP-addresses or host names to the discovery functions. Thanks ------ I would like to thank `Fred Clift`_ for laying the socket client ground work. Without him it would not have been possible! .. _Linux style wildcard: http://tldp.org/LDP/GNU-Linux-Tools-Summary/html/x11655.htm .. _Fred Clift: https://github.com/minektur 0707010000000B000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000002800000000pychromecast-14.0.1/chromecast_protobuf0707010000000C000081A400000000000000000000000165F9EF900000009D000000000000000000000000000000000000003200000000pychromecast-14.0.1/chromecast_protobuf/README.mdThese files were imported from https://chromium.googlesource.com/chromium/src.git/+/master/extensions/common/api/cast_channel to generate the \_pb2.py-files.0707010000000D000081A400000000000000000000000165F9EF9000000199000000000000000000000000000000000000003D00000000pychromecast-14.0.1/chromecast_protobuf/authority_keys.proto// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. syntax = "proto2"; option optimize_for = LITE_RUNTIME; package extensions.api.cast_channel.proto; message AuthorityKeys { message Key { required bytes fingerprint = 1; required bytes public_key = 2; } repeated Key keys = 1; } 0707010000000E000081A400000000000000000000000165F9EF9000000BC0000000000000000000000000000000000000003B00000000pychromecast-14.0.1/chromecast_protobuf/cast_channel.proto// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. syntax = "proto2"; option optimize_for = LITE_RUNTIME; package extensions.api.cast_channel; message CastMessage { // Always pass a version of the protocol for future compatibility // requirements. enum ProtocolVersion { CASTV2_1_0 = 0; } required ProtocolVersion protocol_version = 1; // source and destination ids identify the origin and destination of the // message. They are used to route messages between endpoints that share a // device-to-device channel. // // For messages between applications: // - The sender application id is a unique identifier generated on behalf of // the sender application. // - The receiver id is always the the session id for the application. // // For messages to or from the sender or receiver platform, the special ids // 'sender-0' and 'receiver-0' can be used. // // For messages intended for all endpoints using a given channel, the // wildcard destination_id '*' can be used. required string source_id = 2; required string destination_id = 3; // This is the core multiplexing key. All messages are sent on a namespace // and endpoints sharing a channel listen on one or more namespaces. The // namespace defines the protocol and semantics of the message. required string namespace = 4; // Encoding and payload info follows. // What type of data do we have in this message. enum PayloadType { STRING = 0; BINARY = 1; } required PayloadType payload_type = 5; // Depending on payload_type, exactly one of the following optional fields // will always be set. optional string payload_utf8 = 6; optional bytes payload_binary = 7; } enum SignatureAlgorithm { UNSPECIFIED = 0; RSASSA_PKCS1v15 = 1; RSASSA_PSS = 2; } enum HashAlgorithm { SHA1 = 0; SHA256 = 1; } // Messages for authentication protocol between a sender and a receiver. message AuthChallenge { optional SignatureAlgorithm signature_algorithm = 1 [default = RSASSA_PKCS1v15]; optional bytes sender_nonce = 2; optional HashAlgorithm hash_algorithm = 3 [default = SHA1]; } message AuthResponse { required bytes signature = 1; required bytes client_auth_certificate = 2; repeated bytes intermediate_certificate = 3; optional SignatureAlgorithm signature_algorithm = 4 [default = RSASSA_PKCS1v15]; optional bytes sender_nonce = 5; optional HashAlgorithm hash_algorithm = 6 [default = SHA1]; optional bytes crl = 7; } message AuthError { enum ErrorType { INTERNAL_ERROR = 0; NO_TLS = 1; // The underlying connection is not TLS SIGNATURE_ALGORITHM_UNAVAILABLE = 2; } required ErrorType error_type = 1; } message DeviceAuthMessage { // Request fields optional AuthChallenge challenge = 1; // Response fields optional AuthResponse response = 2; optional AuthError error = 3; } 0707010000000F000081A400000000000000000000000165F9EF900000142A000000000000000000000000000000000000003600000000pychromecast-14.0.1/chromecast_protobuf/logging.proto// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. syntax = "proto2"; option optimize_for = LITE_RUNTIME; package extensions.api.cast_channel.proto; enum EventType { EVENT_TYPE_UNKNOWN = 0; CAST_SOCKET_CREATED = 1; READY_STATE_CHANGED = 2; CONNECTION_STATE_CHANGED = 3; READ_STATE_CHANGED = 4; WRITE_STATE_CHANGED = 5; ERROR_STATE_CHANGED = 6; CONNECT_FAILED = 7; TCP_SOCKET_CONNECT = 8; // Logged with RV. TCP_SOCKET_SET_KEEP_ALIVE = 9; SSL_CERT_WHITELISTED = 10; SSL_SOCKET_CONNECT = 11; // Logged with RV. SSL_INFO_OBTAINED = 12; DER_ENCODED_CERT_OBTAIN = 13; // Logged with RV. RECEIVED_CHALLENGE_REPLY = 14; AUTH_CHALLENGE_REPLY = 15; CONNECT_TIMED_OUT = 16; SEND_MESSAGE_FAILED = 17; MESSAGE_ENQUEUED = 18; // Message SOCKET_WRITE = 19; // Logged with RV. MESSAGE_WRITTEN = 20; // Message SOCKET_READ = 21; // Logged with RV. MESSAGE_READ = 22; // Message SOCKET_CLOSED = 25; SSL_CERT_EXCESSIVE_LIFETIME = 26; CHANNEL_POLICY_ENFORCED = 27; TCP_SOCKET_CONNECT_COMPLETE = 28; // Logged with RV. SSL_SOCKET_CONNECT_COMPLETE = 29; // Logged with RV. SSL_SOCKET_CONNECT_FAILED = 30; // Logged with RV. SEND_AUTH_CHALLENGE_FAILED = 31; // Logged with RV. AUTH_CHALLENGE_REPLY_INVALID = 32; PING_WRITE_ERROR = 33; // Logged with RV. } enum ChannelAuth { // SSL over TCP. SSL = 1; // SSL over TCP with challenge and receiver signature verification. SSL_VERIFIED = 2; } enum ReadyState { READY_STATE_NONE = 1; READY_STATE_CONNECTING = 2; READY_STATE_OPEN = 3; READY_STATE_CLOSING = 4; READY_STATE_CLOSED = 5; } enum ConnectionState { CONN_STATE_UNKNOWN = 1; CONN_STATE_TCP_CONNECT = 2; CONN_STATE_TCP_CONNECT_COMPLETE = 3; CONN_STATE_SSL_CONNECT = 4; CONN_STATE_SSL_CONNECT_COMPLETE = 5; CONN_STATE_AUTH_CHALLENGE_SEND = 6; CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE = 7; CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE = 8; CONN_STATE_START_CONNECT = 9; // Terminal states follow. CONN_STATE_FINISHED = 100; CONN_STATE_ERROR = 101; CONN_STATE_TIMEOUT = 102; } enum ReadState { READ_STATE_UNKNOWN = 1; READ_STATE_READ = 2; READ_STATE_READ_COMPLETE = 3; READ_STATE_DO_CALLBACK = 4; READ_STATE_HANDLE_ERROR = 5; READ_STATE_ERROR = 100; // Terminal state. } enum WriteState { WRITE_STATE_UNKNOWN = 1; WRITE_STATE_WRITE = 2; WRITE_STATE_WRITE_COMPLETE = 3; WRITE_STATE_DO_CALLBACK = 4; WRITE_STATE_HANDLE_ERROR = 5; // Terminal states follow. WRITE_STATE_ERROR = 100; WRITE_STATE_IDLE = 101; } enum ErrorState { CHANNEL_ERROR_NONE = 1; CHANNEL_ERROR_CHANNEL_NOT_OPEN = 2; CHANNEL_ERROR_AUTHENTICATION_ERROR = 3; CHANNEL_ERROR_CONNECT_ERROR = 4; CHANNEL_ERROR_SOCKET_ERROR = 5; CHANNEL_ERROR_TRANSPORT_ERROR = 6; CHANNEL_ERROR_INVALID_MESSAGE = 7; CHANNEL_ERROR_INVALID_CHANNEL_ID = 8; CHANNEL_ERROR_CONNECT_TIMEOUT = 9; CHANNEL_ERROR_UNKNOWN = 10; } enum ChallengeReplyErrorType { CHALLENGE_REPLY_ERROR_NONE = 1; CHALLENGE_REPLY_ERROR_PEER_CERT_EMPTY = 2; CHALLENGE_REPLY_ERROR_WRONG_PAYLOAD_TYPE = 3; CHALLENGE_REPLY_ERROR_NO_PAYLOAD = 4; CHALLENGE_REPLY_ERROR_PAYLOAD_PARSING_FAILED = 5; CHALLENGE_REPLY_ERROR_MESSAGE_ERROR = 6; CHALLENGE_REPLY_ERROR_NO_RESPONSE = 7; CHALLENGE_REPLY_ERROR_FINGERPRINT_NOT_FOUND = 8; CHALLENGE_REPLY_ERROR_CERT_PARSING_FAILED = 9; CHALLENGE_REPLY_ERROR_CERT_NOT_SIGNED_BY_TRUSTED_CA = 10; CHALLENGE_REPLY_ERROR_CANNOT_EXTRACT_PUBLIC_KEY = 11; CHALLENGE_REPLY_ERROR_SIGNED_BLOBS_MISMATCH = 12; CHALLENGE_REPLY_ERROR_TLS_CERT_VALIDITY_PERIOD_TOO_LONG = 13; CHALLENGE_REPLY_ERROR_TLS_CERT_VALID_START_DATE_IN_FUTURE = 14; CHALLENGE_REPLY_ERROR_TLS_CERT_EXPIRED = 15; CHALLENGE_REPLY_ERROR_CRL_INVALID = 16; CHALLENGE_REPLY_ERROR_CERT_REVOKED = 17; } message SocketEvent { // Required optional EventType type = 1; optional int64 timestamp_micros = 2; optional string details = 3; optional int32 net_return_value = 4; optional string message_namespace = 5; optional ReadyState ready_state = 6; optional ConnectionState connection_state = 7; optional ReadState read_state = 8; optional WriteState write_state = 9; optional ErrorState error_state = 10; optional ChallengeReplyErrorType challenge_reply_error_type = 11; // No longer used. optional int32 nss_error_code = 12; } message AggregatedSocketEvent { optional int32 id = 1; optional int32 endpoint_id = 2; optional ChannelAuth channel_auth_type = 3; repeated SocketEvent socket_event = 4; optional int64 bytes_read = 5; optional int64 bytes_written = 6; } message Log { // Each AggregatedSocketEvent represents events recorded for a socket. repeated AggregatedSocketEvent aggregated_socket_event = 1; // Number of socket log entries evicted by the logger due to size constraints. optional int32 num_evicted_aggregated_socket_events = 2; // Number of event log entries evicted by the logger due to size constraints. optional int32 num_evicted_socket_events = 3; } 07070100000010000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000001D00000000pychromecast-14.0.1/examples07070100000011000081A400000000000000000000000165F9EF900000008E000000000000000000000000000000000000002900000000pychromecast-14.0.1/examples/__init__.py"""Example scripts. Scripts need to be invoked with the -m flag, e.g.: python3 -m examples.youtube_example --cast "Kitchen" --show-debug """ 07070100000012000081A400000000000000000000000165F9EF90000009D4000000000000000000000000000000000000003300000000pychromecast-14.0.1/examples/bbciplayer_example.py""" Example on how to use the BBC iPlayer Controller """ # pylint: disable=invalid-name import argparse import sys from time import sleep import json import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "Lounge Video" # Note: Media ID is NOT the 8 digit alpha-numeric in the URL # it can be found by right clicking the playing video on the web interface # e.g. https://www.bbc.co.uk/iplayer/episode/b09w7fd9/bitz-bob-series-1-1-castle-makeover shows: # "2908kbps | dash (mf_cloudfront_dash_https) # b09w70r2 | 960x540" MEDIA_ID = "b09w70r2" IS_LIVE = False METADATA = { "metadatatype": 0, "title": "Bitz & Bob", "subtitle": "Castle Makeover", "images": [{"url": "https://ichef.bbci.co.uk/images/ic/1280x720/p07j4m3r.jpg"}], } parser = argparse.ArgumentParser( description="Example on how to use the BBC iPlayer Controller to play an media stream." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--media_id", help='MediaID (default: "%(default)s")', default=MEDIA_ID ) parser.add_argument( "--metadata", help='Metadata (default: "%(default)s")', default=json.dumps(METADATA) ) parser.add_argument( "--is_live", help="Show 'live' and no current/end timestamps on UI", action="store_true", default=IS_LIVE, ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.media_id}"') app_name = "bbciplayer" app_data = { "media_id": args.media_id, "is_live": args.is_live, "metadata": json.loads(args.metadata), } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000013000081A400000000000000000000000165F9EF9000000B13000000000000000000000000000000000000003200000000pychromecast-14.0.1/examples/bbcsounds_example.py""" Example on how to use the BBC Sounds Controller """ # pylint: disable=invalid-name import argparse import sys from time import sleep import json import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "Lounge Video" # Note: # Media ID for live programs can be found in the URL # e.g. for https://www.bbc.co.uk/sounds/play/live:bbc_radio_one, the media ID is bbc_radio_one # Media ID for non-live programs is NOT the 8 digit alpha-numeric in the URL # it can be found by right clicking the playing video on the web interface # e.g. https://www.bbc.co.uk/sounds/play/m0015vch shows: # "96kbps | dash (mf_akamai_nonbidi_dash_https) # m0015vcg", the media ID is m0015vcg MEDIA_ID = "bbc_radio_one" DEFAULT_MEDIA_ID_IS_LIVE = True IS_LIVE = False METADATA = { "metadatatype": 0, "title": "Radio 1", "images": [ { "url": "https://sounds.files.bbci.co.uk/2.3.0/networks/bbc_radio_one/background_1280x720.png" } ], } parser = argparse.ArgumentParser( description="Example on how to use the BBC Sounds Controller to play an media stream." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--media_id", help='MediaID (default: "%(default)s")', default=MEDIA_ID ) parser.add_argument( "--metadata", help='Metadata (default: "%(default)s")', default=json.dumps(METADATA) ) parser.add_argument( "--is_live", help="Show 'live' and no current/end timestamps on UI", action="store_true", default=IS_LIVE, ) args = parser.parse_args() # Set live if playing the default if args.media_id == MEDIA_ID: args.is_live = DEFAULT_MEDIA_ID_IS_LIVE configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.media_id}"') app_name = "bbcsounds" app_data = { "media_id": args.media_id, "is_live": args.is_live, "metadata": json.loads(args.metadata), } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000014000081A400000000000000000000000165F9EF90000007C8000000000000000000000000000000000000003300000000pychromecast-14.0.1/examples/bubbleupnp_example.py""" Example on how to use the BubbleUPNP Controller to play an URL. """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Kitchen speaker" # Change to an audio or video url MEDIA_URL = ( "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" ) MEDIA_TYPE = "video/mp4" parser = argparse.ArgumentParser( description="Example on how to use the BubbleUPNP Controller to play an URL." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) parser.add_argument( "--media-type", help='Media type (default: "%(default)s")', default=MEDIA_TYPE ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = list(chromecasts)[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.url}"') app_name = "bubbleupnp" app_data = { "media_id": args.url, "media_type": args.media_type, "stream_type": "LIVE", } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000015000081A400000000000000000000000165F9EF90000005A6000000000000000000000000000000000000002700000000pychromecast-14.0.1/examples/common.py"""Common helpers and utilities shared by examples.""" import argparse import logging import zeroconf def add_log_arguments(parser: argparse.ArgumentParser) -> None: """Add arguments to control logging to the parser.""" parser.add_argument("--show-debug", help="Enable debug log", action="store_true") parser.add_argument( "--show-discovery-debug", help="Enable discovery debug log", action="store_true" ) parser.add_argument( "--show-zeroconf-debug", help="Enable zeroconf debug log", action="store_true" ) def configure_logging(args: argparse.Namespace) -> None: """Configure logging according to command line arguments.""" fmt = "%(asctime)s %(levelname)s (%(threadName)s) [%(name)s] %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" default_log_level = logging.DEBUG if args.show_debug else logging.INFO logging.basicConfig(format=fmt, datefmt=datefmt, level=default_log_level) if args.show_debug: logging.getLogger("pychromecast.dial").setLevel(logging.INFO) logging.getLogger("pychromecast.discovery").setLevel(logging.INFO) if args.show_discovery_debug: logging.getLogger("pychromecast.dial").setLevel(logging.DEBUG) logging.getLogger("pychromecast.discovery").setLevel(logging.DEBUG) if args.show_zeroconf_debug: print("Zeroconf version: " + zeroconf.__version__) logging.getLogger("zeroconf").setLevel(logging.DEBUG) 07070100000016000081A400000000000000000000000165F9EF9000000B23000000000000000000000000000000000000003100000000pychromecast-14.0.1/examples/dashcast_example.py""" Example that shows how the DashCast controller can be used. """ # pylint: disable=invalid-name import argparse import sys import time import threading import pychromecast from pychromecast.controllers import dashcast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" parser = argparse.ArgumentParser( description="Example that shows how the DashCast controller can be used." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() d = dashcast.DashCastController() cast.register_handler(d) print() print(cast.cast_info) time.sleep(1) print() print(cast.status) print() print(cast.media_controller.status) print() if not cast.is_idle: print("Killing current running app") cast.quit_app() t = 5.0 while cast.status.app_id is not None and t > 0: # type: ignore[union-attr] time.sleep(0.1) t = t - 0.1 time.sleep(1) requests_handled = threading.Event() def _first_request_handled(msg_sent: bool, _response: dict | None) -> None: """Request to load first URL handled, load the second URL.""" if not msg_sent: print("Failed to load first URL") print("Loaded 1st URL, loading 2nd URL") d.load_url("https://home-assistant.io/", callback_function=_second_request_handled) def _second_request_handled(msg_sent: bool, _response: dict | None) -> None: """Request to load second URL handled.""" if not msg_sent: print("Failed to load second URL") print("Loaded 2nd URL") requests_handled.set() # Test that the callback chain works. This should send a message to # load the first url, but immediately after send a message load the # second url. warning_message = "If you see this on your TV then something is broken" print("Loading 1st URL") d.load_url( "https://home-assistant.io/? " + warning_message, callback_function=_second_request_handled, ) print("Waiting for callbacks") requests_handled.wait() # If debugging, sleep after running so we can see any error messages. if args.show_debug: time.sleep(10) # Shut down discovery browser.stop_discovery() 07070100000017000081A400000000000000000000000165F9EF9000000724000000000000000000000000000000000000003F00000000pychromecast-14.0.1/examples/default_media_receiver_example.py""" Example on how to use the Default Media Controller app to play an URL. """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Kitchen speaker" # Change to an audio or video url MEDIA_URL = ( "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" ) parser = argparse.ArgumentParser( description="Example on how to use the Home Asssitant Media Controller to play an URL." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = list(chromecasts)[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.url}"') app_name = "default_media_receiver" app_data = { "media_id": args.url, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000018000081A400000000000000000000000165F9EF9000000A3F000000000000000000000000000000000000003200000000pychromecast-14.0.1/examples/discovery_example.py""" Example that shows how to receive updates on discovered chromecasts. """ # pylint: disable=invalid-name import argparse import sys import time from uuid import UUID import zeroconf import pychromecast from pychromecast import CastInfo from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") parser = argparse.ArgumentParser( description="Example on how to receive updates on discovered chromecasts." ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) parser.add_argument( "--force-zeroconf", help="Zeroconf will be used even if --known-host is present", action="store_true", ) add_log_arguments(parser) parser.add_argument( "--verbose", help="Full display of discovered devices", action="store_true" ) args = parser.parse_args() configure_logging(args) def list_devices() -> None: """Print a list of known devices.""" print("Currently known cast devices:") for service in browser.services.values(): print( f" '{service.friendly_name}' ({service.model_name}) @ {service.host}:{service.port} uuid: {service.uuid}" ) if args.verbose: print(f" service: {service}") class MyCastListener(pychromecast.discovery.AbstractCastListener): """Listener for discovering chromecasts.""" def add_cast(self, uuid: UUID, service: str) -> None: """Called when a new cast has beeen discovered.""" print( f"Found cast device '{browser.services[uuid].friendly_name}' with UUID {uuid}" ) list_devices() def remove_cast(self, uuid: UUID, service: str, cast_info: CastInfo) -> None: """Called when a cast has beeen lost (MDNS info expired or host down).""" print(f"Lost cast device '{cast_info.friendly_name}' with UUID {uuid}") list_devices() def update_cast(self, uuid: UUID, service: str) -> None: """Called when a cast has beeen updated (MDNS info renewed or changed).""" print( f"Updated cast device '{browser.services[uuid].friendly_name}' with UUID {uuid}" ) list_devices() if args.known_host and not args.force_zeroconf: zconf = None else: zconf = zeroconf.Zeroconf() browser = pychromecast.discovery.CastBrowser(MyCastListener(), zconf, args.known_host) browser.start_discovery() try: while True: time.sleep(1) except KeyboardInterrupt: pass # Shut down discovery browser.stop_discovery() 07070100000019000081A400000000000000000000000165F9EF9000000478000000000000000000000000000000000000003300000000pychromecast-14.0.1/examples/discovery_example2.py""" Example that shows how to list all available chromecasts. """ # pylint: disable=invalid-name import argparse import sys import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") parser = argparse.ArgumentParser( description="Example that shows how to list all available chromecasts." ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--verbose", help="Full display of discovered devices", action="store_true" ) args = parser.parse_args() configure_logging(args) devices, browser = pychromecast.discovery.discover_chromecasts( known_hosts=args.known_host ) # Shut down discovery browser.stop_discovery() print(f"Discovered {len(devices)} device(s):") for device in devices: print( f" '{device.friendly_name}' ({device.model_name}) @ {device.host}:{device.port} uuid: {device.uuid}" ) if args.verbose: print(f" service: {device}") 0707010000001A000081A400000000000000000000000165F9EF9000000665000000000000000000000000000000000000003300000000pychromecast-14.0.1/examples/discovery_example3.py""" Example that shows how to list chromecasts matching on name or uuid. """ # pylint: disable=invalid-name import argparse import sys from uuid import UUID import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") parser = argparse.ArgumentParser( description="Example that shows how to list chromecasts matching on name or uuid." ) parser.add_argument("--cast", help='Name of wanted cast device")', default=None) parser.add_argument("--uuid", help="UUID of wanted cast device", default=None) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--verbose", help="Full display of discovered devices", action="store_true" ) args = parser.parse_args() configure_logging(args) if args.cast is None and args.uuid is None: print("Need to supply `cast` or `uuid`") sys.exit(1) friendly_names = [] if args.cast: friendly_names.append(args.cast) uuids = [] if args.uuid: uuids.append(UUID(args.uuid)) devices, browser = pychromecast.discovery.discover_listed_chromecasts( friendly_names=friendly_names, uuids=uuids, known_hosts=args.known_host ) # Shut down discovery browser.stop_discovery() print(f"Discovered {len(devices)} device(s):") for device in devices: print( f" '{device.friendly_name}' ({device.model_name}) @ {device.host}:{device.port} uuid: {device.uuid}" ) if args.verbose: print(f" service: {device}") 0707010000001B000081A400000000000000000000000165F9EF90000003EC000000000000000000000000000000000000003000000000pychromecast-14.0.1/examples/get_chromecasts.py""" Example that shows how to connect to all chromecasts. """ # pylint: disable=invalid-name import argparse import sys import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") parser = argparse.ArgumentParser( description="Example on how to connect to all chromecasts." ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) args = parser.parse_args() configure_logging(args) casts, browser = pychromecast.get_chromecasts(known_hosts=args.known_host) # Shut down discovery as we don't care about updates browser.stop_discovery() if len(casts) == 0: print("No Devices Found") sys.exit(1) print("Found cast devices:") for cast in casts: print( f' "{cast.name}" on mDNS/host service {cast.cast_info.services} with UUID:{cast.uuid}' ) 0707010000001C000081A400000000000000000000000165F9EF900000071E000000000000000000000000000000000000003C00000000pychromecast-14.0.1/examples/homeassistant_media_example.py""" Example on how to use the Home Assistant Media app to play an URL. """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Kitchen speaker" # Change to an audio or video url MEDIA_URL = ( "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" ) parser = argparse.ArgumentParser( description="Example on how to use the Home Asssitant Media Controller to play an URL." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = list(chromecasts)[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.url}"') app_name = "homeassistant_media" app_data = { "media_id": args.url, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 0707010000001D000081A400000000000000000000000165F9EF900000075E000000000000000000000000000000000000002E00000000pychromecast-14.0.1/examples/media_enqueue.py""" Example on how to use queuing with Media Controller """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Change to an audio or video url MEDIA_URLS = [ "https://www.bensound.com/bensound-music/bensound-jazzyfrenchy.mp3", "https://audio.guim.co.uk/2020/08/14-65292-200817TIFXR.mp3", ] parser = argparse.ArgumentParser( description="Example on how to use the Media Controller with a queue." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}"') cast.media_controller.play_media(MEDIA_URLS[0], "audio/mp3") # Wait for Chromecast to start playing while cast.media_controller.status.player_state != "PLAYING": time.sleep(0.1) # Queue next items for URL in MEDIA_URLS[1:]: print("Enqueuing...") cast.media_controller.play_media(URL, "audio/mp3", enqueue=True) for URL in MEDIA_URLS[1:]: time.sleep(5) print("Skipping...") cast.media_controller.queue_next() # Shut down discovery browser.stop_discovery() 0707010000001E000081A400000000000000000000000165F9EF900000089A000000000000000000000000000000000000002E00000000pychromecast-14.0.1/examples/media_example.py""" Example on how to use the Media Controller to play an URL. """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Change to an audio or video url MEDIA_URL = "https://www.bensound.com/bensound-music/bensound-jazzyfrenchy.mp3" parser = argparse.ArgumentParser( description="Example on how to use the Media Controller to play an URL." ) add_log_arguments(parser) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.url}"') cast.media_controller.play_media(args.url, "audio/mp3") # Wait for player_state PLAYING player_state = None t = 30.0 has_played = False while True: try: if player_state != cast.media_controller.status.player_state: player_state = cast.media_controller.status.player_state print("Player state:", player_state) if player_state == "PLAYING": has_played = True if cast.socket_client.is_connected and has_played and player_state != "PLAYING": has_played = False cast.media_controller.play_media(args.url, "audio/mp3") time.sleep(0.1) t = t - 0.1 except KeyboardInterrupt: break # Shut down discovery browser.stop_discovery() 0707010000001F000081A400000000000000000000000165F9EF9000000A12000000000000000000000000000000000000002F00000000pychromecast-14.0.1/examples/media_example2.py""" Example on how to use the Media Controller. """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Change to an audio or video url MEDIA_URL = ( "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" ) parser = argparse.ArgumentParser( description="Example on how to use the Media Controller." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) parser.add_argument( "--show-status-only", help="Show status, then exit", action="store_true" ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print() print(cast.cast_info) time.sleep(1) print() print(cast.status) print() print(cast.media_controller.status) print() if args.show_status_only: sys.exit() if not cast.is_idle: print("Killing current running app") cast.quit_app() t = 5.0 while cast.status.app_id is not None and t > 0: # type: ignore[union-attr] time.sleep(0.1) t = t - 0.1 print(f'Playing media "{args.url}"') cast.play_media(args.url, "video/mp4") t = 0 while True: try: t += 1 if t > 10 and t % 3 == 0: print("Media status", cast.media_controller.status) if t == 15: print("Sending pause command") cast.media_controller.pause() elif t == 20: print("Sending play command") cast.media_controller.play() elif t == 25: print("Sending stop command") cast.media_controller.stop() elif t == 32: cast.quit_app() break time.sleep(1) except KeyboardInterrupt: break # Shut down discovery browser.stop_discovery() 07070100000020000081A400000000000000000000000165F9EF9000000988000000000000000000000000000000000000003200000000pychromecast-14.0.1/examples/multizone_example.py""" Example on how to use the Multizone (Audio Group) Controller """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from pychromecast.controllers.multizone import ( MultizoneController, MultiZoneControllerListener, ) from pychromecast.socket_client import ConnectionStatus, ConnectionStatusListener from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "Whole house" parser = argparse.ArgumentParser( description="Example on how to use the Multizone Controller to track groupp members." ) parser.add_argument( "--cast", help='Name of speaker group (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) args = parser.parse_args() configure_logging(args) class MyConnectionStatusListener(ConnectionStatusListener): """ConnectionStatusListener""" def __init__(self, _mz: MultizoneController): self._mz = _mz def new_connection_status(self, status: ConnectionStatus) -> None: if status.status == "CONNECTED": self._mz.update_members() class MyMultiZoneControllerListener(MultiZoneControllerListener): """MultiZoneControllerListener""" def multizone_member_added(self, group_uuid: str) -> None: print(f"New member: {group_uuid}") def multizone_member_removed(self, group_uuid: str) -> None: print(f"Removed member: {group_uuid}") def multizone_status_received(self) -> None: print(f"Members: {mz.members}") chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Add listeners mz = MultizoneController(cast.uuid) mz.register_listener(MyMultiZoneControllerListener()) cast.register_handler(mz) cast.register_connection_listener(MyConnectionStatusListener(mz)) # Start socket client's worker thread and wait for initial status update cast.wait() while True: try: time.sleep(1) except KeyboardInterrupt: break # Shut down discovery browser.stop_discovery() 07070100000021000081A400000000000000000000000165F9EF900000083F000000000000000000000000000000000000003100000000pychromecast-14.0.1/examples/nrkradio_example.py""" Example on how to use the NRK Radio Controller """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Note: Media ID can be found in the URL, e.g: # For the live channel https://radio.nrk.no/direkte/p1, the media ID is p1 # For the podcast https://radio.nrk.no/podkast/tazte_priv/l_8457deb0-4f2c-4ef3-97de-b04f2c6ef314, # the media ID is l_8457deb0-4f2c-4ef3-97de-b04f2c6ef314 # For the on-demand program https://radio.nrk.no/serie/radiodokumentaren/sesong/201011/MDUP01004510, # the media id is MDUP01004510 MEDIA_ID = "l_8457deb0-4f2c-4ef3-97de-b04f2c6ef314" parser = argparse.ArgumentParser( description="Example on how to use the NRK Radio Controller to play a media stream." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--media_id", help='MediaID (default: "%(default)s")', default=MEDIA_ID ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.media_id}"') app_name = "nrkradio" app_data = { "media_id": args.media_id, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000022000081A400000000000000000000000165F9EF90000007D7000000000000000000000000000000000000002E00000000pychromecast-14.0.1/examples/nrktv_example.py""" Example on how to use the NRK TV Controller """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Note: Media ID for live programs can be found in the URL # e.g. for https://tv.nrk.no/direkte/nrk1, the media ID is nrk1 # Media ID for non-live programs can be found by clicking the share button # e.g. https://tv.nrk.no/serie/uti-vaar-hage/sesong/2/episode/2 shows: # "https://tv.nrk.no/se?v=OUHA43000207", the media ID is OUHA43000207 MEDIA_ID = "OUHA43000207" parser = argparse.ArgumentParser( description="Example on how to use the NRK TV Controller to play a media stream." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--media_id", help='MediaID (default: "%(default)s")', default=MEDIA_ID ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.media_id}"') app_name = "nrktv" app_data = { "media_id": args.media_id, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000023000081A400000000000000000000000165F9EF90000015A6000000000000000000000000000000000000003300000000pychromecast-14.0.1/examples/plex_multi_example.py""" Examples of the Plex controller playing on a Chromecast. DEMO TYPES: * simple: Picks the first item it finds in your libray and plays it. * list: Creates a list of items from your library and plays them. * playqueue: Creates a playqueue and plays it. * playlist: Creates a playlist, plays it, then deletes it. All demos with the exception of 'simple' can use startItem. startItem lets you start playback anywhere in the list of items. turning this option on will pick an item in the middle of the list to start from. This demo uses features that require the latest Python-PlexAPI pip install plexapi """ # pylint: disable=invalid-name from __future__ import annotations import argparse import sys from typing import Any from plexapi.server import PlexServer # type: ignore[import-untyped] import pychromecast from pychromecast.controllers.plex import PlexController from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast. CAST_NAME = "Office TV" # Replace with your own Plex URL, including port. PLEX_URL = "http://192.168.1.3:32400" # Replace with your Plex token. See link below on how to find it: # https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/ PLEX_TOKEN = "Y0urT0k3nH3rE" # Library of items to pick from for tests. Use "episode", "movie", or "track". PLEX_LIBRARY = "episode" # The demo type you'd like to run. # Options are "single", "list", "playqueue", or "playlist" DEMO_TYPE = "playqueue" # If demo type is anything other than "single", # make this True to see a demo of startItem. START_ITEM = True parser = argparse.ArgumentParser( description="How to play media items, lists, playQueues, " "and playlists to a Chromecast device." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s").', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='URL of your Plex Server (default: "%(default)s").', default=PLEX_URL ) parser.add_argument( "--library", help="The library you'd like to test: episode, movie, or track (default: '%(default)s').", default=PLEX_LIBRARY, ) parser.add_argument("--token", help="Your Plex token.", default=PLEX_TOKEN) parser.add_argument( "--demo", help="The demo you'd like to run: single, list, playqueue, or playlist (default: '%(default)s').", default=DEMO_TYPE, ) parser.add_argument( "--startitem", help="If demo type is anything other than 'single', set to True to see a demo of startItem (default: '%(default)s').", default=START_ITEM, ) args = parser.parse_args() configure_logging(args) startItem = None def media_info(_cast: pychromecast.Chromecast, _media: Any, items: Any) -> None: """Print media info.""" print(f"Cast Device: {_cast.name}") print(f"Media Type: {type(_media)}") print(f"Media Items: {items}") def start_item_info(_media: Any) -> None: """Print item info.""" if args.startitem: print(f"Starting From: {_media}") chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) cast = next((cc for cc in chromecasts if cc.name == args.cast), None) if not cast: print(f"No chromecast with name '{args.cast}' found.") foundCasts = ", ".join( [cc.name or "<unknown>" for cc in pychromecast.get_chromecasts()[0]] ) print(f"Chromecasts found: {foundCasts}") sys.exit(1) plex_server = PlexServer(args.url, args.token) # Create a list of 5 items from the selected library. libraryItems = plex_server.library.search( libtype=args.library, sort="addedAt:desc", limit=5 ) if args.demo == "single": # Use a single item as media. media = libraryItems[0] media_info(cast, media, libraryItems[0]) elif args.demo == "list": # Use the unaltered list as media. media = libraryItems # Set starting position to the 2nd item if startItem demo. startItem = libraryItems[1] if args.startitem else None # Print info media_info(cast, libraryItems, libraryItems) start_item_info(libraryItems[1]) elif args.demo == "playqueue": # Convert list into a playqueue for media. media = plex_server.createPlayQueue(libraryItems) # Set starting position to the 3rd item if startItem demo. startItem = libraryItems[2] if args.startitem else None # Print info media_info(cast, media, media.items) start_item_info(libraryItems[2]) elif args.demo == "playlist": # Convert list into a playlist for media. media = plex_server.createPlaylist("pychromecast test playlist", libraryItems) # Set starting position to the 4th item if startItem demo. startItem = libraryItems[3] if args.startitem else None # Print info media_info(cast, media, media.items()) start_item_info(libraryItems[2]) plex_c = PlexController() cast.register_handler(plex_c) cast.wait() # Plays the media item, list, playlist, or playqueue. # If startItem = None it is ignored and playback starts at first item, # otherwise playback starts at the position of the media item given. plex_c.block_until_playing(media, startItem=startItem) if getattr(media, "TYPE", None) == "playlist": media.delete() # Shut down discovery browser.stop_discovery() 07070100000024000081A400000000000000000000000165F9EF90000006E6000000000000000000000000000000000000003A00000000pychromecast-14.0.1/examples/set_playback_rate_example.py""" Example changing the playback rate. """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Change to an audio or video url MEDIA_URL = ( "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" ) parser = argparse.ArgumentParser( description="Example on how to use the Media Controller." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Playing media "{args.url}"') cast.play_media(args.url, "video/mp4") print("Waiting for media session to be active") cast.media_controller.block_until_active() SPEEDS = [0.5, 0.75, 1, 1.25, 1.5, 1.75, 2] for speed in SPEEDS: time.sleep(10) print(f"Setting playback rate to {speed}") cast.media_controller.set_playback_rate(speed) # Shut down discovery browser.stop_discovery() 07070100000025000081A400000000000000000000000165F9EF9000000EB7000000000000000000000000000000000000003200000000pychromecast-14.0.1/examples/shaka_drm_example.py""" Example on how to use the Shaka Controller to play an URL. """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room" # Change to an audio or video url # Sample DRM request from https://reference.dashif.org/dash.js/latest/samples/drm/clearkey.html MEDIA_URL = "https://media.axprod.net/TestVectors/v7-MultiDRM-SingleKey/Manifest_1080p_ClearKey.mpd" parser = argparse.ArgumentParser( description="Example on how to use the Shaka Controller to play an URL with DRM." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--url", help='Media url (default: "%(default)s")', default=MEDIA_URL ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = list(chromecasts)[0] # Start socket client's worker thread and wait for initial status update cast.wait() print(f'Found chromecast with name "{args.cast}", attempting to play "{args.url}"') # Take customData from shaka player appData object sent in init message to chromecast app_name = "shaka" app_data = { "media_id": args.url, "media_type": "", "stream_type": "LIVE", "media_info": { "customData": { "asset": { "name": "Custom DRM Video", "shortName": "", "iconUri": "", "manifestUri": "https://media.axprod.net/TestVectors/v7-MultiDRM-SingleKey/Manifest_1080p_ClearKey.mpd", "source": "Custom", "focus": False, "disabled": False, "extraText": [], "extraThumbnail": [], "certificateUri": None, "description": None, "isFeatured": False, "drm": ["No DRM protection"], "features": ["VOD"], "licenseServers": {"__type__": "map"}, "licenseRequestHeaders": {"__type__": "map"}, "requestFilter": None, "responseFilter": None, "clearKeys": {"__type__": "map"}, "extraConfig": { "drm": { "clearKeys": { "nrQFDeRLSAKTLifXUIPiZg": "FmY0xnWCPCNaSpRG-tUuTQ" } } }, "adTagUri": None, "imaVideoId": None, "imaAssetKey": None, "imaContentSrcId": None, "imaManifestType": None, "mediaTailorUrl": None, "mediaTailorAdsParams": None, "mimeType": None, "mediaPlaylistFullMimeType": None, "storedProgress": 1, "storedContent": None, } } }, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) browser.stop_discovery() 07070100000026000081A400000000000000000000000165F9EF9000000AFA000000000000000000000000000000000000003800000000pychromecast-14.0.1/examples/simple_listener_example.py""" Example showing how to create a simple Chromecast event listener for device and media status events """ # pylint: disable=invalid-name import argparse import sys import time import pychromecast from pychromecast.controllers.media import MediaStatus, MediaStatusListener from pychromecast.controllers.receiver import CastStatusListener from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the friendly name of your Chromecast CAST_NAME = "Living Room Speaker" class MyCastStatusListener(CastStatusListener): """Cast status listener""" def __init__(self, name: str | None, cast: pychromecast.Chromecast) -> None: self.name = name self.cast = cast def new_cast_status(self, status: pychromecast.CastStatus) -> None: print("[", time.ctime(), " - ", self.name, "] status chromecast change:") print(status) class MyMediaStatusListener(MediaStatusListener): """Status media listener""" def __init__(self, name: str | None, cast: pychromecast.Chromecast) -> None: self.name = name self.cast = cast def new_media_status(self, status: MediaStatus) -> None: print("[", time.ctime(), " - ", self.name, "] status media change:") print(status) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: print( "[", time.ctime(), " - ", self.name, "] load media failed for queue item id: ", queue_item_id, " with code: ", error_code, ) parser = argparse.ArgumentParser( description="Example on how to create a simple Chromecast event listener." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) chromecast = chromecasts[0] # Start socket client's worker thread and wait for initial status update chromecast.wait() listenerCast = MyCastStatusListener(chromecast.name, chromecast) chromecast.register_status_listener(listenerCast) listenerMedia = MyMediaStatusListener(chromecast.name, chromecast) chromecast.media_controller.register_status_listener(listenerMedia) input("Listening for Chromecast events...\n\n") # Shut down discovery browser.stop_discovery() 07070100000027000081A400000000000000000000000165F9EF90000004F3000000000000000000000000000000000000002E00000000pychromecast-14.0.1/examples/supla_example.py""" Example on how to use the Supla Controller """ # pylint: disable=invalid-name import logging from time import sleep import sys import requests from bs4 import BeautifulSoup import pychromecast from pychromecast import quick_play # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "Kitchen Speaker" # Change to the video id of the YouTube video # video id is the last part of the url http://youtube.com/watch?v=video_id PROGRAM = "aamulypsy" result = requests.get(f"https://www.supla.fi/ohjelmat/{PROGRAM}", timeout=10) soup = BeautifulSoup(result.content) MEDIA_ID = soup.select('a[title*="Koko Shitti"]')[0]["href"].split("/")[-1] # type: ignore[union-attr] print(MEDIA_ID) logging.basicConfig(level=logging.DEBUG) chromecasts, browser = pychromecast.get_listed_chromecasts(friendly_names=[CAST_NAME]) if not chromecasts: print(f'No chromecast with name "{CAST_NAME}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() app_name = "supla" app_data = { "media_id": MEDIA_ID, } quick_play.quick_play(cast, app_name, app_data) sleep(10) 07070100000028000081A400000000000000000000000165F9EF9000000BD0000000000000000000000000000000000000003200000000pychromecast-14.0.1/examples/yleareena_example.py""" Example on how to use the Yle Areena Controller """ # pylint: disable=invalid-name, import-outside-toplevel, too-many-locals import argparse import sys from time import sleep import pychromecast from pychromecast import quick_play from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "My Chromecast" parser = argparse.ArgumentParser( description="Example on how to use the Yle Areena Controller." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument("--program", help="Areena Program ID", default="1-50649659") parser.add_argument("--audio_language", help="audio_language", default="") parser.add_argument("--text_language", help="text_language", default="off") args = parser.parse_args() configure_logging(args) def get_kaltura_id(program_id: str) -> str: """ Dive into the yledl internals and fetch the kaltura player id. This can be used with Chromecast """ # yledl is not available in CI, silence import warnings from yledl.extractors import extractor_factory # type: ignore[import-untyped] from yledl.ffprobe import NullProbe # type: ignore[import-untyped] from yledl.http import HttpClient # type: ignore[import-untyped] from yledl.io import IOContext # type: ignore[import-untyped] from yledl.localization import TranslationChooser # type: ignore[import-untyped] from yledl.titleformatter import TitleFormatter # type: ignore[import-untyped] title_formatter = TitleFormatter() language_chooser = TranslationChooser("fin") httpclient = HttpClient(IOContext()) url = f"https://areena.yle.fi/{program_id}" ffprobe = NullProbe() extractor = extractor_factory( url, language_chooser, httpclient, title_formatter, ffprobe ) pid = extractor.program_id_from_url(url) info = extractor.program_info_for_pid(pid, url, title_formatter, ffprobe) kaltura_id: str = info.media_id.split("-")[-1] return kaltura_id chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() app_name = "yleareena" app_data = { "media_id": get_kaltura_id(args.program), "audio_lang": args.audio_language, "text_lang": args.text_language, } quick_play.quick_play(cast, app_name, app_data) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) # Shut down discovery browser.stop_discovery() 07070100000029000081A400000000000000000000000165F9EF900000069D000000000000000000000000000000000000003000000000pychromecast-14.0.1/examples/youtube_example.py""" Example on how to use the YouTube Controller """ # pylint: disable=invalid-name import argparse import sys from time import sleep import pychromecast from pychromecast.controllers.youtube import YouTubeController from .common import add_log_arguments, configure_logging # Enable deprecation warnings etc. if not sys.warnoptions: import warnings warnings.simplefilter("default") # Change to the name of your Chromecast CAST_NAME = "Living Room TV" # Change to the video id of the YouTube video # video id is the last part of the url http://youtube.com/watch?v=video_id VIDEO_ID = "dQw4w9WgXcQ" parser = argparse.ArgumentParser( description="Example on how to use the Youtube Controller." ) parser.add_argument( "--cast", help='Name of cast device (default: "%(default)s")', default=CAST_NAME ) parser.add_argument( "--known-host", help="Add known host (IP), can be used multiple times", action="append", ) add_log_arguments(parser) parser.add_argument( "--videoid", help='Youtube video ID (default: "%(default)s")', default=VIDEO_ID ) args = parser.parse_args() configure_logging(args) chromecasts, browser = pychromecast.get_listed_chromecasts( friendly_names=[args.cast], known_hosts=args.known_host ) if not chromecasts: print(f'No chromecast with name "{args.cast}" discovered') sys.exit(1) cast = chromecasts[0] # Start socket client's worker thread and wait for initial status update cast.wait() yt = YouTubeController() cast.register_handler(yt) yt.play_video(VIDEO_ID) # If debugging, sleep after running so we can see any error messages. if args.show_debug: sleep(10) # Shut down discovery browser.stop_discovery() 0707010000002A000081A400000000000000000000000165F9EF900000020B000000000000000000000000000000000000001F00000000pychromecast-14.0.1/fabfile.pyimport os from fabric.decorators import task from fabric.operations import local @task def build(): """ Builds the distribution files """ if not os.path.exists("build"): os.mkdir("build") local("date >> build/log") local("python setup.py sdist >> build/log") local("python setup.py bdist_wheel >> build/log") @task def release(): """ Uploads files to PyPi to create a new release. Note: Requires that files have been built first """ local("twine upload dist/*") 0707010000002B000081A400000000000000000000000165F9EF900000024E000000000000000000000000000000000000001D00000000pychromecast-14.0.1/mypy.ini[mypy] python_version = 3.11 show_error_codes = true follow_imports = silent local_partial_types = true strict_equality = true no_implicit_optional = true warn_incomplete_stub = true warn_redundant_casts = true warn_unused_configs = true warn_unused_ignores = true enable_error_code = ignore-without-code, redundant-self, truthy-iterable extra_checks = false check_untyped_defs = true disallow_incomplete_defs = true disallow_subclassing_any = true disallow_untyped_calls = true disallow_untyped_decorators = true disallow_untyped_defs = true warn_return_any = true warn_unreachable = true 0707010000002C000041ED00000000000000000000000465F9EF9000000000000000000000000000000000000000000000002100000000pychromecast-14.0.1/pychromecast0707010000002D000081A400000000000000000000000165F9EF9000004E2B000000000000000000000000000000000000002D00000000pychromecast-14.0.1/pychromecast/__init__.py""" PyChromecast: remote control your Chromecast """ from __future__ import annotations from collections.abc import Callable import logging import fnmatch from threading import Event import threading from typing import TYPE_CHECKING, Literal, cast, overload from uuid import UUID import zeroconf from .config import * # noqa: F403 from .error import * # noqa: F403 from . import socket_client from .discovery import ( # noqa: F401 DISCOVER_TIMEOUT, CastBrowser, CastListener, # Deprecated SimpleCastListener, discover_chromecasts, start_discovery, stop_discovery, ) from .dial import get_cast_type from .const import CAST_TYPE_CHROMECAST, REQUEST_TIMEOUT from .controllers.media import STREAM_TYPE_BUFFERED, MediaController # noqa: F401 from .controllers.receiver import CastStatus, CastStatusListener from .error import NotConnected, RequestTimeout from .models import CastInfo, HostServiceInfo, MDNSServiceInfo from .response_handler import WaitResponse __all__ = ("get_chromecasts", "Chromecast") IDLE_APP_ID = "E8C28D3C" IGNORE_CEC: list[str] = [] _LOGGER = logging.getLogger(__name__) def get_chromecast_from_host( host: tuple[str, int, UUID, str | None, str | None], tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, ) -> Chromecast: """Creates a Chromecast object from a zeroconf host.""" # Build device status from the mDNS info, this information is # the primary source and the remaining will be fetched # later on. ip_address, port, uuid, model_name, friendly_name = host _LOGGER.debug("get_chromecast_from_host %s", host) port = port or 8009 services: set[HostServiceInfo | MDNSServiceInfo] = { HostServiceInfo(ip_address, port) } cast_info = CastInfo( services, uuid, model_name, friendly_name, ip_address, port, None, None ) return Chromecast( cast_info=cast_info, tries=tries, timeout=timeout, retry_wait=retry_wait, ) # Alias for backwards compatibility _get_chromecast_from_host = get_chromecast_from_host # pylint: disable=invalid-name def get_chromecast_from_cast_info( cast_info: CastInfo, zconf: zeroconf.Zeroconf | None, tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, ) -> Chromecast: """Creates a Chromecast object from a zeroconf service.""" _LOGGER.debug("get_chromecast_from_cast_info %s", cast_info) return Chromecast( cast_info=cast_info, tries=tries, timeout=timeout, retry_wait=retry_wait, zconf=zconf, ) # Alias for backwards compatibility _get_chromecast_from_service = ( # pylint: disable=invalid-name get_chromecast_from_cast_info ) def get_listed_chromecasts( friendly_names: list[str] | None = None, uuids: list[UUID] | None = None, tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, discovery_timeout: float = DISCOVER_TIMEOUT, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> tuple[list[Chromecast], CastBrowser]: """ Searches the network for chromecast devices matching a list of friendly names or a list of UUIDs. Returns a tuple of: A list of Chromecast objects matching the criteria, or an empty list if no matching chromecasts were found. A service browser to keep the Chromecast mDNS data updated. When updates are (no longer) needed, call browser.stop_discovery(). To only discover chromecast devices without connecting to them, use discover_listed_chromecasts instead. :param friendly_names: A list of wanted friendly names :param uuids: A list of wanted uuids :param tries: passed to get_chromecasts :param retry_wait: passed to get_chromecasts :param timeout: passed to get_chromecasts :param discovery_timeout: A floating point number specifying the time to wait devices matching the criteria have been found. :param zeroconf_instance: An existing zeroconf instance. """ cc_list: dict[UUID, Chromecast] = {} def add_callback(uuid: UUID, _service: str) -> None: _LOGGER.debug( "Found chromecast %s (%s)", browser.devices[uuid].friendly_name, uuid ) def get_chromecast_from_uuid(uuid: UUID) -> Chromecast: return get_chromecast_from_cast_info( browser.devices[uuid], zconf=zconf, tries=tries, retry_wait=retry_wait, timeout=timeout, ) friendly_name = browser.devices[uuid].friendly_name try: if uuids and uuid in uuids: if uuid not in cc_list: cc_list[uuid] = get_chromecast_from_uuid(uuid) uuids.remove(uuid) if friendly_names and friendly_name in friendly_names: if uuid not in cc_list: cc_list[uuid] = get_chromecast_from_uuid(uuid) friendly_names.remove(friendly_name) if not friendly_names and not uuids: discover_complete.set() except ChromecastConnectionError: # noqa: F405 pass discover_complete = Event() zconf = zeroconf_instance or zeroconf.Zeroconf() browser = CastBrowser(SimpleCastListener(add_callback), zconf, known_hosts) browser.start_discovery() # Wait for the timeout or found all wanted devices discover_complete.wait(discovery_timeout) return (list(cc_list.values()), browser) @overload def get_chromecasts( tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, blocking: Literal[True] = True, callback: Callable[[Chromecast], None] | None = None, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> tuple[list[Chromecast], CastBrowser]: ... @overload def get_chromecasts( tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, *, blocking: Literal[False], callback: Callable[[Chromecast], None] | None = None, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> CastBrowser: ... def get_chromecasts( # pylint: disable=too-many-locals tries: int | None = None, retry_wait: float | None = None, timeout: float | None = None, blocking: bool = True, callback: Callable[[Chromecast], None] | None = None, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> tuple[list[Chromecast], CastBrowser] | CastBrowser: """ Searches the network for chromecast devices and creates a Chromecast object for each discovered device. Returns a tuple of: A list of Chromecast objects, or an empty list if no matching chromecasts were found. A service browser to keep the Chromecast mDNS data updated. When updates are (no longer) needed, call browser.stop_discovery(). To only discover chromecast devices without connecting to them, use discover_chromecasts instead. Parameters tries, timeout, retry_wait and blocking_app_launch controls the behavior of the created Chromecast instances. :param tries: Number of retries to perform if the connection fails. None for infinite retries. :param timeout: A floating point number specifying the socket timeout in seconds. None means to use the default which is 30 seconds. :param retry_wait: A floating point number specifying how many seconds to wait between each retry. None means to use the default which is 5 seconds. :param blocking: If True, returns a list of discovered chromecast devices. If False, triggers a callback for each discovered chromecast, and returns a function which can be executed to stop discovery. :param callback: Callback which is triggered for each discovered chromecast when blocking = False. :param zeroconf_instance: An existing zeroconf instance. """ if blocking: # Thread blocking chromecast discovery devices, browser = discover_chromecasts(known_hosts=known_hosts) cc_list: list[Chromecast] = [] for device in devices: try: cc_list.append( get_chromecast_from_cast_info( device, browser.zc, tries=tries, retry_wait=retry_wait, timeout=timeout, ) ) except ChromecastConnectionError: # noqa: F405 pass return (cc_list, browser) # Callback based chromecast discovery if not callable(callback): raise ValueError("Nonblocking discovery requires a callback function.") known_uuids: set[UUID] = set() def add_callback(uuid: UUID, _service: str) -> None: """Called when zeroconf has discovered a new chromecast.""" if uuid in known_uuids: return if TYPE_CHECKING: assert callback is not None try: callback( get_chromecast_from_cast_info( browser.devices[uuid], zconf=zconf, tries=tries, retry_wait=retry_wait, timeout=timeout, ) ) known_uuids.add(uuid) except ChromecastConnectionError: # noqa: F405 pass zconf = zeroconf_instance or zeroconf.Zeroconf() browser = CastBrowser(SimpleCastListener(add_callback), zconf, known_hosts) browser.start_discovery() return browser # pylint: disable=too-many-instance-attributes, too-many-public-methods class Chromecast(CastStatusListener): """ Class to interface with a ChromeCast. :param cast_info: CastInfo with information for the device. :param tries: Number of retries to perform if the connection fails. None for infinite retries. :param timeout: A floating point number specifying the socket timeout in seconds. None means to use the default which is 30 seconds. :param retry_wait: A floating point number specifying how many seconds to wait between each retry. None means to use the default which is 5 seconds. :param zconf: A zeroconf instance, needed if a the services if cast info includes mDNS services. The zeroconf instance may be obtained from the browser returned by pychromecast.start_discovery(). """ def __init__( self, cast_info: CastInfo, *, tries: int | None = None, timeout: float | None = None, retry_wait: float | None = None, zconf: zeroconf.Zeroconf | None = None, ): self.logger = logging.getLogger(__name__) if not cast_info.cast_type: cast_info = get_cast_type(cast_info, zconf) if TYPE_CHECKING: # get_cast_type is guaranteed to return a CastInfo with a non-None cast_type assert cast_info.cast_type is not None self.cast_info = cast_info self.status: CastStatus | None = None self.status_event = threading.Event() self.socket_client = socket_client.SocketClient( cast_type=cast_info.cast_type, tries=tries, timeout=timeout, retry_wait=retry_wait, services=cast_info.services, zconf=zconf, ) receiver_controller = self.socket_client.receiver_controller receiver_controller.register_status_listener(self) # Forward these methods self.set_volume = receiver_controller.set_volume self.set_volume_muted = receiver_controller.set_volume_muted self.play_media = self.socket_client.media_controller.play_media self.register_handler = self.socket_client.register_handler self.unregister_handler = self.socket_client.unregister_handler self.register_status_listener = receiver_controller.register_status_listener self.register_launch_error_listener = ( receiver_controller.register_launch_error_listener ) self.register_connection_listener = ( self.socket_client.register_connection_listener ) @property def ignore_cec(self) -> bool: """Returns whether the CEC data should be ignored.""" return self.cast_info.friendly_name is not None and any( fnmatch.fnmatchcase(self.cast_info.friendly_name, pattern) for pattern in IGNORE_CEC ) @property def is_idle(self) -> bool: """Returns if there is currently an app running.""" return ( self.status is None or self.app_id in (None, IDLE_APP_ID) or ( self.cast_type == CAST_TYPE_CHROMECAST and not self.status.is_active_input and not self.ignore_cec ) ) @property def uuid(self) -> UUID: """Returns the unique UUID of the Chromecast device.""" return self.cast_info.uuid @property def name(self) -> str | None: """ Returns the friendly name set for the Chromecast device. This is the name that the end-user chooses for the cast device. """ return self.cast_info.friendly_name @property def uri(self) -> str: """Returns the device URI (ip:port)""" return f"{self.socket_client.host}:{self.socket_client.port}" @property def model_name(self) -> str: """Returns the model name of the Chromecast device.""" if TYPE_CHECKING: # get_cast_type is guaranteed to return a CastInfo with a non-None model assert self.cast_info.model_name is not None return self.cast_info.model_name @property def cast_type(self) -> str: """ Returns the type of the Chromecast device. This is one of CAST_TYPE_CHROMECAST for regular Chromecast device, CAST_TYPE_AUDIO for Chromecast devices that only support audio and CAST_TYPE_GROUP for virtual a Chromecast device that groups together two or more cast (Audio for now) devices. :rtype: str """ if TYPE_CHECKING: # get_cast_type is guaranteed to return a CastInfo with a non-None cast_type assert self.cast_info.cast_type is not None return self.cast_info.cast_type @property def app_id(self) -> str | None: """Returns the current app_id.""" return self.status.app_id if self.status else None @property def app_display_name(self) -> str | None: """Returns the name of the current running app.""" return self.status.display_name if self.status else None @property def media_controller(self) -> MediaController: """Returns the media controller.""" return self.socket_client.media_controller def new_cast_status(self, status: CastStatus) -> None: """Called when a new status received from the Chromecast.""" self.status = status if status: self.status_event.set() def start_app( self, app_id: str, force_launch: bool = False, timeout: float = REQUEST_TIMEOUT ) -> None: """Start an app on the Chromecast.""" self.logger.info("Starting app %s", app_id) response_handler = WaitResponse(timeout, f"start app {app_id}") self.socket_client.receiver_controller.launch_app( app_id, force_launch=force_launch, callback_function=response_handler.callback, ) response_handler.wait_response() def quit_app(self, timeout: float = REQUEST_TIMEOUT) -> None: """Tells the Chromecast to quit current app_id.""" self.logger.info("Quitting current app") response_handler = WaitResponse(timeout, "quit app") self.socket_client.receiver_controller.stop_app( callback_function=response_handler.callback ) response_handler.wait_response() def volume_up(self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT) -> float: """Increment volume by 0.1 (or delta) unless it is already maxed. Returns the new volume. """ if delta <= 0: raise ValueError(f"volume delta must be greater than zero, not {delta}") if not self.status: raise NotConnected return self.set_volume(self.status.volume_level + delta, timeout=timeout) def volume_down( self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT ) -> float: """Decrement the volume by 0.1 (or delta) unless it is already 0. Returns the new volume. """ if delta <= 0: raise ValueError(f"volume delta must be greater than zero, not {delta}") if not self.status: raise NotConnected return self.set_volume(self.status.volume_level - delta, timeout=timeout) def wait(self, timeout: float | None = None) -> None: """ Waits until the cast device is ready for communication. The device is ready as soon a status message has been received. If the worker thread is not already running, it will be started. If the status has already been received then the method returns immediately. :param timeout: a floating point number specifying a timeout for the operation in seconds (or fractions thereof). Or None to block forever. """ if not self.socket_client.is_alive(): self.socket_client.start() ready = self.status_event.wait(timeout=timeout) if not ready: raise RequestTimeout("wait", cast(float, timeout)) def disconnect(self, timeout: float | None = None) -> None: """ Disconnects the chromecast and waits for it to terminate. :param timeout: A floating point number specifying a timeout for the operation in seconds (or fractions thereof). Or None to block forever. Set to 0 to not block. """ self.socket_client.disconnect() self.join(timeout=timeout) def join(self, timeout: float | None = None) -> None: """ Blocks the thread of the caller until the chromecast connection is stopped. :param timeout: a floating point number specifying a timeout for the operation in seconds (or fractions thereof). Or None to block forever. """ self.socket_client.join(timeout=timeout) if self.socket_client.is_alive(): raise TimeoutError("join", timeout) def start(self) -> None: """ Start the chromecast connection's worker thread. """ self.socket_client.start() def __del__(self) -> None: try: self.socket_client.stop.set() except AttributeError: pass def __repr__(self) -> str: return ( f"Chromecast({self.socket_client.host!r}, port={self.socket_client.port!r}, " f"cast_info={self.cast_info!r})" ) def __unicode__(self) -> str: return ( f"Chromecast({self.socket_client.host}, {self.socket_client.port}, " f"{self.cast_info.friendly_name}, {self.cast_info.model_name}, " f"{self.cast_info.manufacturer})" ) 0707010000002E000081A400000000000000000000000165F9EF90000005B4000000000000000000000000000000000000002B00000000pychromecast-14.0.1/pychromecast/config.py""" Data and methods to retrieve app specific configuration """ import json from typing import cast import requests APP_BACKDROP = "E8C28D3C" APP_YOUTUBE = "233637DE" APP_MEDIA_RECEIVER = "CC1AD845" APP_PLEX = "06ee44ee-e7e3-4249-83b6-f5d0b6f07f34_1" APP_DASHCAST = "84912283" APP_HOMEASSISTANT_LOVELACE = "A078F6B0" APP_HOMEASSISTANT_MEDIA = "B45F4572" APP_SUPLA = "A41B766D" APP_YLEAREENA = "A9BCCB7C" APP_BUBBLEUPNP = "3927FA74" APP_BBCSOUNDS = "D350F6A9" APP_BBCIPLAYER = "5E81F6DB" APP_SHAKA = "07AEE832" APP_NRKTV = "3AEDF8D1" APP_NRKRADIO = "A49874B1" def get_possible_app_ids() -> list[str]: """Returns all possible app ids.""" try: req = requests.get( "https://clients3.google.com/cast/chromecast/device/baseconfig", timeout=10, ) data = json.loads(req.text[4:]) return cast( list[str], [app["app_id"] for app in data["applications"]] + data["enabled_app_ids"], ) except ValueError: # If json fails to parse return [] def get_app_config(app_id: str) -> dict: """Get specific configuration for 'app_id'.""" try: req = requests.get( f"https://clients3.google.com/cast/chromecast/device/app?a={app_id}", timeout=10, ) return cast(dict, json.loads(req.text[4:])) if req.status_code == 200 else {} except ValueError: # If json fails to parse return {} 0707010000002F000081A400000000000000000000000165F9EF90000009EF000000000000000000000000000000000000002A00000000pychromecast-14.0.1/pychromecast/const.py""" Chromecast constants """ # Regular chromecast, supports video/audio CAST_TYPE_CHROMECAST = "cast" # Cast Audio device, supports only audio CAST_TYPE_AUDIO = "audio" # Cast Audio group device, supports only audio CAST_TYPE_GROUP = "group" # Default command timeout REQUEST_TIMEOUT = 10.0 MF_CANTON = "Canton Elektronik GmbH + Co. KG" MF_GOOGLE = "Google Inc." MF_HARMAN = "HARMAN International Industries" MF_JBL = "JBL" MF_LENOVO = "LENOVO" MF_LG = "LG" MF_MARSHALL = "Marshall" MF_NVIDIA = "NVIDIA" MF_PHILIPS = "Philips" MF_PIONEER = "Pioneer" MF_SONY = "Sony" MF_SVS = "SVS" MF_VIZIO = "Vizio" MF_WNC = "wnc" MF_XIAOMI = "Xiaomi" CAST_TYPES = { "chromecast audio": (CAST_TYPE_AUDIO, MF_GOOGLE), "chromecast": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "chromecast hd": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "chromecast ultra": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "eureka dongle": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "google cast group": (CAST_TYPE_GROUP, MF_GOOGLE), "google home mini": (CAST_TYPE_AUDIO, MF_GOOGLE), "google home": (CAST_TYPE_AUDIO, MF_GOOGLE), "google nest hub max": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "google nest hub": (CAST_TYPE_CHROMECAST, MF_GOOGLE), "google nest mini": (CAST_TYPE_AUDIO, MF_GOOGLE), "nest audio": (CAST_TYPE_AUDIO, MF_GOOGLE), "nest wifi point": (CAST_TYPE_AUDIO, MF_GOOGLE), "bravia 4k vh2": (CAST_TYPE_CHROMECAST, MF_SONY), "C4A": (CAST_TYPE_AUDIO, MF_SONY), "JBL Link 10": (CAST_TYPE_AUDIO, MF_JBL), "JBL Link 20": (CAST_TYPE_AUDIO, MF_JBL), "JBL Link 300": (CAST_TYPE_AUDIO, MF_JBL), "JBL Link 500": (CAST_TYPE_AUDIO, MF_JBL), "JBL Link Portable": (CAST_TYPE_AUDIO, MF_HARMAN), "lenovocd-24502f": (CAST_TYPE_AUDIO, MF_LENOVO), "Lenovo Smart Display 7": (CAST_TYPE_CHROMECAST, MF_LENOVO), "LG WK7 ThinQ Speaker": (CAST_TYPE_AUDIO, MF_LG), "marshall stanmore ii": (CAST_TYPE_AUDIO, MF_MARSHALL), "mitv-mssp2": (CAST_TYPE_CHROMECAST, MF_XIAOMI), "Pioneer VSX-831": (CAST_TYPE_AUDIO, MF_PIONEER), "Pioneer VSX-1131": (CAST_TYPE_AUDIO, MF_PIONEER), "Pioneer VSX-LX305": (CAST_TYPE_AUDIO, MF_PIONEER), "shield android tv": (CAST_TYPE_CHROMECAST, MF_NVIDIA), "Smart Soundbar 10": (CAST_TYPE_AUDIO, MF_CANTON), "Stream TV": (CAST_TYPE_CHROMECAST, MF_WNC), "SVS Pro SoundBase": (CAST_TYPE_AUDIO, MF_SVS), "TPM191E": (CAST_TYPE_CHROMECAST, MF_PHILIPS), "V705-H3": (CAST_TYPE_CHROMECAST, MF_VIZIO), } MESSAGE_TYPE = "type" REQUEST_ID = "requestId" SESSION_ID = "sessionId" 07070100000030000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000002D00000000pychromecast-14.0.1/pychromecast/controllers07070100000031000081A400000000000000000000000165F9EF9000001B31000000000000000000000000000000000000003900000000pychromecast-14.0.1/pychromecast/controllers/__init__.py""" Provides controllers to handle specific namespaces in Chromecast communication. """ from __future__ import annotations import abc from functools import partial import logging from typing import TYPE_CHECKING, Any, Protocol from ..error import UnsupportedNamespace, ControllerNotRegistered from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import CallbackType, chain_on_success if TYPE_CHECKING: from ..socket_client import SocketClient class SendMessageFunc(Protocol): """Protocol for SocketClient's send message functions.""" def __call__( self, namespace: str, message: Any, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, ) -> None: ... class BaseController(abc.ABC): """ABC for namespace controllers.""" def __init__( self, namespace: str, supporting_app_id: str | None = None, target_platform: bool = False, app_must_match: bool = False, ) -> None: """ Initialize the controller. namespace: the namespace this controller will act on supporting_app_id: app to be launched if app is running with unsupported namespace. target_platform: set to True if you target the platform instead of current app. app_must_match: set to True if the app should be launched even if the namespace is supported by another app. """ self.app_must_match = app_must_match self.namespace = namespace self.supporting_app_id = supporting_app_id self.target_platform = target_platform self._socket_client: SocketClient | None = None self._message_func: SendMessageFunc | None = None self.logger = logging.getLogger(__name__) @property def is_active(self) -> bool: """True if the controller is connected to a socket client and the Chromecast is running an app that supports this controller.""" return ( self._socket_client is not None and self.namespace in self._socket_client.app_namespaces ) def launch( self, *, callback_function: CallbackType | None = None, force_launch: bool = False, ) -> None: """If set, launches app related to the controller.""" if self.supporting_app_id is None: self.logger.debug( "%s: Can't launch app with no supporting app_id", self.__class__.__name__, ) if callback_function: callback_function(False, None) return if self._socket_client is None: if callback_function: callback_function(False, None) raise ControllerNotRegistered self._socket_client.receiver_controller.launch_app( self.supporting_app_id, force_launch=force_launch, callback_function=callback_function, ) def registered(self, socket_client: SocketClient) -> None: """Called when a controller is registered.""" self._socket_client = socket_client if self.target_platform: self._message_func = self._socket_client.send_platform_message else: self._message_func = self._socket_client.send_app_message def unregistered(self) -> None: """Called when a controller is unregistered.""" self._message_func = None def channel_connected(self) -> None: """Called when a channel has been openend that supports the namespace of this controller.""" def channel_disconnected(self) -> None: """Called when a channel is disconnected.""" def send_message( self, data: Any, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, ) -> None: """ Send a message on this namespace to the Chromecast. Ensures app is loaded. Will raise a NotConnected exception if not connected. """ if self._socket_client is None: if callback_function: callback_function(False, None) raise ControllerNotRegistered receiver_ctrl = self._socket_client.receiver_controller if not self.target_platform and ( self.namespace not in self._socket_client.app_namespaces or (self.app_must_match and receiver_ctrl.app_id != self.supporting_app_id) ): if self.supporting_app_id is not None: self.launch( callback_function=chain_on_success( partial( self.send_message_nocheck, data, inc_session_id=inc_session_id, no_add_request_id=no_add_request_id, ), callback_function, ) ) return if callback_function: callback_function(False, None) raise UnsupportedNamespace( f"Namespace {self.namespace} is not supported by running application." ) self.send_message_nocheck( data, inc_session_id=inc_session_id, callback_function=callback_function, no_add_request_id=no_add_request_id, ) def send_message_nocheck( self, data: Any, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, ) -> None: """Send a message.""" if TYPE_CHECKING: assert self._message_func self._message_func( self.namespace, data, inc_session_id=inc_session_id, callback_function=callback_function, no_add_request_id=no_add_request_id, ) def receive_message(self, _message: CastMessage, _data: dict) -> bool: """ Called when a message is received that matches the namespace. Returns boolean indicating if message was handled. data is message.payload_utf8 interpreted as a JSON dict. """ return False def tear_down(self) -> None: """Called when we are shutting down.""" self._socket_client = None self._message_func = None class QuickPlayController(BaseController, abc.ABC): """ABC for controller which supports quick play.""" @abc.abstractmethod def quick_play(self, *, media_id: str, timeout: float, **kwargs: Any) -> None: """Quick Play support for a controller.""" 07070100000032000081A400000000000000000000000165F9EF900000063A000000000000000000000000000000000000003B00000000pychromecast-14.0.1/pychromecast/controllers/bbciplayer.py""" Controller to interface with BBC iPlayer. """ # Note: Media ID is NOT the 8 digit alpha-numeric in the URL # it can be found by right clicking the playing video on the web interface # e.g. https://www.bbc.co.uk/iplayer/episode/b09w7fd9/bitz-bob-series-1-1-castle-makeover shows: # "2908kbps | dash (mf_cloudfront_dash_https) # b09w70r2 | 960x540" import logging from typing import Any from .media import STREAM_TYPE_BUFFERED, STREAM_TYPE_LIVE, BaseMediaPlayer from ..config import APP_BBCIPLAYER APP_NAMESPACE = "urn:x-cast:com.google.cast.media" class BbcIplayerController(BaseMediaPlayer): """Controller to interact with BBC iPlayer namespace.""" def __init__(self) -> None: super().__init__(APP_BBCIPLAYER) self.logger = logging.getLogger(__name__) def quick_play( self, *, media_id: str, timeout: float, is_live: bool = False, metadata: dict[str, Any] | None = None, **kwargs: Any, ) -> None: """Quick Play helper for BBC iPlayer media.""" stream_type = STREAM_TYPE_LIVE if is_live else STREAM_TYPE_BUFFERED metadata_default = {"metadataType": 0, "title": ""} if metadata is None: metadata = metadata_default subtitle = metadata.pop("subtitle", "") super().quick_play( media_id=media_id, media_type=None, stream_type=stream_type, metadata=metadata, media_info={"customData": {"secondary_title": subtitle}}, timeout=timeout, **kwargs, ) 07070100000033000081A400000000000000000000000165F9EF9000000513000000000000000000000000000000000000003A00000000pychromecast-14.0.1/pychromecast/controllers/bbcsounds.py""" Controller to interface with BBC Sounds. """ # Media ID can be found in the URL # e.g. https://www.bbc.co.uk/sounds/live:bbc_radio_one import logging from typing import Any from .media import STREAM_TYPE_BUFFERED, STREAM_TYPE_LIVE, BaseMediaPlayer from ..config import APP_BBCSOUNDS APP_NAMESPACE = "urn:x-cast:com.google.cast.media" class BbcSoundsController(BaseMediaPlayer): """Controller to interact with BBC Sounds namespace.""" def __init__(self) -> None: super().__init__(APP_BBCSOUNDS) self.logger = logging.getLogger(__name__) # pylint: disable-next=arguments-differ def quick_play( self, *, media_id: str, timeout: float, is_live: bool = False, metadata: dict[str, Any] | None = None, **kwargs: Any, ) -> Any: """Quick Play helper for BBC Sounds media""" stream_type = STREAM_TYPE_LIVE if is_live else STREAM_TYPE_BUFFERED metadata_default = {"metadataType": 0, "title": ""} if metadata is None: metadata = metadata_default super().quick_play( media_id=media_id, media_type=None, stream_type=stream_type, metadata=metadata, timeout=timeout, **kwargs, ) 07070100000034000081A400000000000000000000000165F9EF9000000156000000000000000000000000000000000000003B00000000pychromecast-14.0.1/pychromecast/controllers/bubbleupnp.py""" Simple Controller to use BubbleUPNP as a media controller. """ from .media import BaseMediaPlayer from ..config import APP_BUBBLEUPNP class BubbleUPNPController(BaseMediaPlayer): """Controller to interact with BubbleUPNP app namespace.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_BUBBLEUPNP) 07070100000035000081A400000000000000000000000165F9EF900000091D000000000000000000000000000000000000003900000000pychromecast-14.0.1/pychromecast/controllers/dashcast.py""" Controller to interface with the DashCast app namespace. """ from __future__ import annotations from ..config import APP_DASHCAST from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import chain_on_success from . import CallbackType, BaseController APP_NAMESPACE = "urn:x-cast:com.madmod.dashcast" class DashCastController(BaseController): """Controller to interact with DashCast app namespace.""" def __init__( self, appNamespace: str = APP_NAMESPACE, appId: str = APP_DASHCAST ) -> None: super().__init__(appNamespace, appId) def receive_message(self, _message: CastMessage, _data: dict) -> bool: """ Called when a load complete message is received. This is currently un-used by this controller. It is implemented so that we don't get "Message unhandled" warnings. In the future it might be used to update a public status object like the media controller does. """ # Indicate that the message was successfully handled. return True def load_url( self, url: str, *, force: bool = False, reload_seconds: float = 0, callback_function: CallbackType | None = None, ) -> None: """ Starts loading a URL with an optional reload time in seconds. Setting force to True may load pages which block iframe embedding, but will prevent reload from working and will cause calls to load_url() to reload the app. """ def launch_callback(*, callback_function: CallbackType | None) -> None: """Loads requested URL after app launched.""" should_reload = not force and reload_seconds not in (0, None) reload_milliseconds = 0 if not should_reload else reload_seconds * 1000 msg = { "url": url, "force": force, "reload": should_reload, "reload_time": reload_milliseconds, } self.send_message( msg, inc_session_id=True, callback_function=callback_function ) self.launch( callback_function=chain_on_success(launch_callback, callback_function) ) 07070100000036000081A400000000000000000000000165F9EF9000001F04000000000000000000000000000000000000003E00000000pychromecast-14.0.1/pychromecast/controllers/homeassistant.py""" Controller to interface with Home Assistant """ from collections.abc import Callable from functools import partial import logging import threading from typing import Any from ..config import APP_HOMEASSISTANT_LOVELACE from ..error import PyChromecastError from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import chain_on_success from . import CallbackType, BaseController APP_NAMESPACE = "urn:x-cast:com.nabucasa.hast" DEFAULT_HASS_CONNECT_TIMEOUT = 30 # Error codes sent in receiver_error messages ERR_CONNECTION_FAILED = 1 ERR_AUTHENTICATION_FAILED = 2 ERR_CONNECTION_LOST = 3 ERR_HASS_URL_MISSING = 4 ERR_NO_HTTPS = 5 ERR_WRONG_INSTANCE = 20 ERR_NOT_CONNECTED = 21 ERR_FETCH_CONFIG_FAILED = 22 _LOGGER = logging.getLogger(__name__) class HomeAssistantController(BaseController): """Controller to interact with Home Assistant.""" def __init__( self, *, hass_url: str, hass_uuid: str, client_id: str | None, refresh_token: str, unregister: Callable[[], None], app_namespace: str = APP_NAMESPACE, app_id: str = APP_HOMEASSISTANT_LOVELACE, hass_connect_timeout: float = DEFAULT_HASS_CONNECT_TIMEOUT, ) -> None: _LOGGER.debug("HomeAssistantController.__init__") super().__init__(app_namespace, app_id) self.hass_url = hass_url self.hass_uuid = hass_uuid self.client_id = client_id self.refresh_token = refresh_token self.unregister = unregister self.hass_connect_timeout = hass_connect_timeout # { # connected: boolean; # showDemo: boolean; # hassUrl?: string; # lovelacePath?: string | number | null; # urlPath?: string | null; # } self.status: dict | None = None self._hass_connecting_event = threading.Event() self._hass_connecting_event.set() self._on_connect: list[CallbackType] = [] @property def hass_connected(self) -> bool: """Return if connected to Home Assistant.""" return ( self.status is not None and self.status["connected"] and self.status["hassUrl"] == self.hass_url and self.status["hassUUID"] == self.hass_uuid ) def channel_connected(self) -> None: """Called when a channel has been openend that supports the namespace of this controller.""" _LOGGER.debug("HomeAssistantController.channel_connected") self.get_status() def channel_disconnected(self) -> None: """Called when a channel is disconnected.""" _LOGGER.debug("HomeAssistantController.channel_disconnected") self.status = None self._hass_connecting_event.set() def receive_message(self, _message: CastMessage, data: dict) -> bool: """Called when a message is received.""" if data.get("type") == "receiver_status": _LOGGER.debug("HomeAssistantController.receive_message %s", data) if data["hassUrl"] != self.hass_url or data["hassUUID"] != self.hass_uuid: self.logger.info("Received status for another instance") self.unregister() return True was_connected = self.hass_connected self.status = data if was_connected or not self.hass_connected: _LOGGER.debug( "HomeAssistantController.receive_message already connected" ) return True # We just got connected, call the callbacks. self._hass_connecting_event.set() self._call_on_connect_callbacks(True) return True if data.get("type") == "receiver_error": _LOGGER.debug("HomeAssistantController.receive_message %s", data) if data.get("error_code") == ERR_WRONG_INSTANCE: self.logger.info("Received ERR_WRONG_INSTANCE") self.unregister() return True return False def _call_on_connect_callbacks(self, msg_sent: bool) -> None: """Call on connect callbacks.""" _LOGGER.debug("HomeAssistantController._call_on_connect_callbacks %s", msg_sent) while self._on_connect: self._on_connect.pop()(msg_sent, None) def _connect_hass(self, callback_function: CallbackType) -> None: """Connect to Home Assistant and call the provided callback.""" _LOGGER.debug("HomeAssistantController._connect_hass") self._on_connect.append(callback_function) if not self._hass_connecting_event.is_set(): _LOGGER.debug( "HomeAssistantController._connect_hass _hass_connecting_event not set" ) return self._hass_connecting_event.clear() try: self.send_message( { "type": "connect", "refreshToken": self.refresh_token, "clientId": self.client_id, "hassUrl": self.hass_url, "hassUUID": self.hass_uuid, } ) except Exception: # pylint: disable=broad-except _LOGGER.debug( "HomeAssistantController._connect_hass failed to send connect message" ) self._hass_connecting_event.set() self._call_on_connect_callbacks(False) raise self._hass_connecting_event.wait(self.hass_connect_timeout) try: if not self._hass_connecting_event.is_set(): self.logger.warning("_connect_hass failed for %s", self.hass_url) self._call_on_connect_callbacks(False) raise PyChromecastError() # pylint: disable=broad-exception-raised finally: self._hass_connecting_event.set() def show_demo(self) -> None: """Show the demo.""" self.send_message({"type": "show_demo"}) def get_status(self, *, callback_function: CallbackType | None = None) -> None: """Get status of Home Assistant Cast.""" _LOGGER.debug("HomeAssistantController.get_status") self._send_connected_message( { "type": "get_status", "hassUrl": self.hass_url, "hassUUID": self.hass_uuid, }, callback_function=callback_function, ) def show_lovelace_view( self, view_path: str | int | None, url_path: str | None = None, *, callback_function: CallbackType | None = None, ) -> None: """Show a Lovelace UI.""" _LOGGER.debug("HomeAssistantController.show_lovelace_view") self._send_connected_message( { "type": "show_lovelace_view", "hassUrl": self.hass_url, "hassUUID": self.hass_uuid, "viewPath": view_path, "urlPath": url_path, }, callback_function=callback_function, ) def _send_connected_message( self, data: dict[str, Any], callback_function: CallbackType | None ) -> None: """Send a message to a connected Home Assistant Cast""" _LOGGER.debug("HomeAssistantController._send_connected_message %s", data) if self.hass_connected: _LOGGER.debug( "HomeAssistantController._send_connected_message already connected" ) self.send_message_nocheck(data, callback_function=callback_function) return _LOGGER.debug( "HomeAssistantController._send_connected_message not yet connected" ) self._connect_hass( chain_on_success( partial(self.send_message_nocheck, data), callback_function ) ) 07070100000037000081A400000000000000000000000165F9EF9000000196000000000000000000000000000000000000004400000000pychromecast-14.0.1/pychromecast/controllers/homeassistant_media.py""" Simple Controller to use the Home Assistant Media Player Cast App as a media controller. """ from ..config import APP_HOMEASSISTANT_MEDIA from .media import BaseMediaPlayer class HomeAssistantMediaController(BaseMediaPlayer): """Controller to interact with HomeAssistantMedia app namespace.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_HOMEASSISTANT_MEDIA) 07070100000038000081A400000000000000000000000165F9EF9000006D9A000000000000000000000000000000000000003600000000pychromecast-14.0.1/pychromecast/controllers/media.py""" Provides a controller for controlling the default media players on the Chromecast. """ import abc from datetime import datetime from dataclasses import dataclass import logging import threading from typing import Any from ..config import APP_MEDIA_RECEIVER from ..const import MESSAGE_TYPE from ..error import ControllerNotRegistered from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import WaitResponse from . import CallbackType, QuickPlayController STREAM_TYPE_UNKNOWN = "UNKNOWN" STREAM_TYPE_BUFFERED = "BUFFERED" STREAM_TYPE_LIVE = "LIVE" MEDIA_PLAYER_STATE_PLAYING = "PLAYING" MEDIA_PLAYER_STATE_BUFFERING = "BUFFERING" MEDIA_PLAYER_STATE_PAUSED = "PAUSED" MEDIA_PLAYER_STATE_IDLE = "IDLE" MEDIA_PLAYER_STATE_UNKNOWN = "UNKNOWN" TYPE_EDIT_TRACKS_INFO = "EDIT_TRACKS_INFO" TYPE_GET_STATUS = "GET_STATUS" TYPE_LOAD = "LOAD" TYPE_LOAD_FAILED = "LOAD_FAILED" TYPE_QUEUE_INSERT = "QUEUE_INSERT" TYPE_MEDIA_STATUS = "MEDIA_STATUS" TYPE_PAUSE = "PAUSE" TYPE_PLAY = "PLAY" TYPE_QUEUE_NEXT = "QUEUE_NEXT" TYPE_QUEUE_PREV = "QUEUE_PREV" TYPE_QUEUE_UPDATE = "QUEUE_UPDATE" TYPE_SEEK = "SEEK" TYPE_SET_PLAYBACK_RATE = "SET_PLAYBACK_RATE" TYPE_STOP = "STOP" METADATA_TYPE_GENERIC = 0 METADATA_TYPE_MOVIE = 1 METADATA_TYPE_TVSHOW = 2 METADATA_TYPE_MUSICTRACK = 3 METADATA_TYPE_PHOTO = 4 # From www.gstatic.com/cast/sdk/libs/caf_receiver/v3/cast_receiver_framework.js CMD_SUPPORT_PAUSE = 1 CMD_SUPPORT_SEEK = 2 CMD_SUPPORT_STREAM_VOLUME = 4 CMD_SUPPORT_STREAM_MUTE = 8 # ALL_BASIC_MEDIA = PAUSE | SEEK | VOLUME | MUTE | EDIT_TRACKS | PLAYBACK_RATE CMD_SUPPORT_ALL_BASIC_MEDIA = 12303 CMD_SUPPORT_QUEUE_NEXT = 64 CMD_SUPPORT_QUEUE_PREV = 128 CMD_SUPPORT_QUEUE_SHUFFLE = 256 CMD_SUPPORT_QUEUE_REPEAT_ALL = 1024 CMD_SUPPORT_QUEUE_REPEAT_ONE = 2048 CMD_SUPPORT_QUEUE_REPEAT = 3072 CMD_SUPPORT_SKIP_AD = 512 CMD_SUPPORT_EDIT_TRACKS = 4096 CMD_SUPPORT_PLAYBACK_RATE = 8192 CMD_SUPPORT_LIKE = 16384 CMD_SUPPORT_DISLIKE = 32768 CMD_SUPPORT_FOLLOW = 65536 CMD_SUPPORT_UNFOLLOW = 131072 CMD_SUPPORT_STREAM_TRANSFER = 262144 # Legacy? CMD_SUPPORT_SKIP_FORWARD = 16 CMD_SUPPORT_SKIP_BACKWARD = 32 # From https://developers.google.com/cast/docs/web_receiver/error_codes MEDIA_PLAYER_ERROR_CODES: dict[int | None, str] = { 100: "MEDIA_UNKNOWN", 101: "MEDIA_ABORTED", 102: "MEDIA_DECODE", 103: "MEDIA_NETWORK", 104: "MEDIA_SRC_NOT_SUPPORTED", 110: "SOURCE_BUFFER_FAILURE", 201: "MEDIAKEYS_NETWORK", 202: "MEDIAKEYS_UNSUPPORTED", 203: "MEDIAKEYS_WEBCRYPTO", 301: "SEGMENT_NETWORK", 311: "HLS_NETWORK_MASTER_PLAYLIST", 312: "HLS_NETWORK_PLAYLIST", 313: "HLS_NETWORK_NO_KEY_RESPONSE", 314: "HLS_NETWORK_KEY_LOAD", 315: "HLS_NETWORK_INVALID_SEGMENT", 316: "HLS_SEGMENT_PARSING", 321: "DASH_NETWORK", 322: "DASH_NO_INIT", 331: "SMOOTH_NETWORK", 332: "SMOOTH_NO_MEDIA_DATA", 411: "HLS_MANIFEST_MASTER", 412: "HLS_MANIFEST_PLAYLIST", 421: "DASH_MANIFEST_NO_PERIODS", 422: "DASH_MANIFEST_NO_MIMETYPE", 423: "DASH_INVALID_SEGMENT_INFO", 431: "SMOOTH_MANIFEST", } @dataclass(frozen=True) class MediaImage: """Media image metadata container.""" url: str | None height: int | None width: int | None _LOGGER = logging.getLogger(__name__) class MediaStatus: """Class to hold the media status.""" def __init__(self) -> None: self.current_time = 0.0 self.content_id: str | None = None self.content_type: str | None = None self.duration: float | None = None self.stream_type = STREAM_TYPE_UNKNOWN self.idle_reason: str | None = None self.media_session_id: int | None = None self.playback_rate = 1.0 self.player_state = MEDIA_PLAYER_STATE_UNKNOWN self.supported_media_commands = 0 self.volume_level = 1.0 self.volume_muted = False self.media_custom_data: dict = {} self.media_metadata: dict = {} self.subtitle_tracks: dict = {} self.current_subtitle_tracks: list = [] self.last_updated: datetime | None = None @property def adjusted_current_time(self) -> float | None: """Returns calculated current seek time of media in seconds""" if ( self.current_time is not None and self.last_updated is not None and self.player_state == MEDIA_PLAYER_STATE_PLAYING ): # Add time since last update return ( self.current_time + self.playback_rate * (datetime.utcnow() - self.last_updated).total_seconds() ) # Not playing, return last reported seek time return self.current_time @property def metadata_type(self) -> int | None: """Type of meta data.""" return self.media_metadata.get("metadataType") @property def player_is_playing(self) -> bool: """Return True if player is PLAYING.""" return self.player_state in ( MEDIA_PLAYER_STATE_PLAYING, MEDIA_PLAYER_STATE_BUFFERING, ) @property def player_is_paused(self) -> bool: """Return True if player is PAUSED.""" return self.player_state == MEDIA_PLAYER_STATE_PAUSED @property def player_is_idle(self) -> bool: """Return True if player is IDLE.""" return self.player_state == MEDIA_PLAYER_STATE_IDLE @property def media_is_generic(self) -> bool: """Return True if media status represents generic media.""" return self.metadata_type == METADATA_TYPE_GENERIC @property def media_is_tvshow(self) -> bool: """Return True if media status represents a tv show.""" return self.metadata_type == METADATA_TYPE_TVSHOW @property def media_is_movie(self) -> bool: """Return True if media status represents a movie.""" return self.metadata_type == METADATA_TYPE_MOVIE @property def media_is_musictrack(self) -> bool: """Return True if media status represents a musictrack.""" return self.metadata_type == METADATA_TYPE_MUSICTRACK @property def media_is_photo(self) -> bool: """Return True if media status represents a photo.""" return self.metadata_type == METADATA_TYPE_PHOTO @property def stream_type_is_buffered(self) -> bool: """Return True if stream type is BUFFERED.""" return self.stream_type == STREAM_TYPE_BUFFERED @property def stream_type_is_live(self) -> bool: """Return True if stream type is LIVE.""" return self.stream_type == STREAM_TYPE_LIVE @property def title(self) -> str | None: """Return title of media.""" return self.media_metadata.get("title") @property def series_title(self) -> str | None: """Return series title if available.""" return self.media_metadata.get("seriesTitle") @property def season(self) -> int | None: """Return season if available.""" return self.media_metadata.get("season") @property def episode(self) -> int | None: """Return episode if available.""" return self.media_metadata.get("episode") @property def artist(self) -> str | None: """Return artist if available.""" return self.media_metadata.get("artist") @property def album_name(self) -> str | None: """Return album name if available.""" return self.media_metadata.get("albumName") @property def album_artist(self) -> str | None: """Return album artist if available.""" return self.media_metadata.get("albumArtist") @property def track(self) -> int | None: """Return track number if available.""" return self.media_metadata.get("track") @property def images(self) -> list[MediaImage]: """Return a list of MediaImage objects for this media.""" return [ MediaImage(item.get("url"), item.get("height"), item.get("width")) for item in self.media_metadata.get("images", []) ] @property def supports_pause(self) -> bool: """True if PAUSE is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_PAUSE) @property def supports_seek(self) -> bool: """True if SEEK is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_SEEK) @property def supports_stream_volume(self) -> bool: """True if STREAM_VOLUME is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_STREAM_VOLUME) @property def supports_stream_mute(self) -> bool: """True if STREAM_MUTE is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_STREAM_MUTE) @property def supports_skip_forward(self) -> bool: """True if SKIP_FORWARD is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_SKIP_FORWARD) @property def supports_skip_backward(self) -> bool: """True if SKIP_BACKWARD is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_SKIP_BACKWARD) @property def supports_queue_next(self) -> bool: """True if QUEUE_NEXT is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_QUEUE_NEXT) @property def supports_queue_prev(self) -> bool: """True if QUEUE_PREV is supported.""" return bool(self.supported_media_commands & CMD_SUPPORT_QUEUE_PREV) def update(self, data: dict) -> None: """New data will only contain the changed attributes.""" if not data.get("status", []): return status_data = data["status"][0] media_data = status_data.get("media") or {} if not media_data and "extendedStatus" in status_data: media_data = status_data["extendedStatus"].get("media") or {} volume_data = status_data.get("volume", {}) self.current_time = status_data.get("currentTime", self.current_time) self.content_id = media_data.get("contentId", self.content_id) self.content_type = media_data.get("contentType", self.content_type) self.duration = media_data.get("duration", self.duration) self.stream_type = media_data.get("streamType", self.stream_type) # Clear idle reason if not set in the message self.idle_reason = status_data.get("idleReason", None) self.media_session_id = status_data.get("mediaSessionId", self.media_session_id) self.playback_rate = status_data.get("playbackRate", self.playback_rate) self.player_state = status_data.get("playerState", self.player_state) self.supported_media_commands = status_data.get( "supportedMediaCommands", self.supported_media_commands ) self.volume_level = volume_data.get("level", self.volume_level) self.volume_muted = volume_data.get("muted", self.volume_muted) self.media_custom_data = media_data.get("customData", self.media_custom_data) self.media_metadata = media_data.get("metadata", self.media_metadata) self.subtitle_tracks = media_data.get("tracks", self.subtitle_tracks) self.current_subtitle_tracks = status_data.get( "activeTrackIds", self.current_subtitle_tracks ) self.last_updated = datetime.utcnow() def __repr__(self) -> str: info = { "metadata_type": self.metadata_type, "title": self.title, "series_title": self.series_title, "season": self.season, "episode": self.episode, "artist": self.artist, "album_name": self.album_name, "album_artist": self.album_artist, "track": self.track, "subtitle_tracks": self.subtitle_tracks, "images": self.images, "supports_pause": self.supports_pause, "supports_seek": self.supports_seek, "supports_stream_volume": self.supports_stream_volume, "supports_stream_mute": self.supports_stream_mute, "supports_skip_forward": self.supports_skip_forward, "supports_skip_backward": self.supports_skip_backward, } info.update(self.__dict__) return f"<MediaStatus {info}>" class MediaStatusListener(abc.ABC): """Listener for receiving media status events.""" @abc.abstractmethod def new_media_status(self, status: MediaStatus) -> None: """Updated media status.""" @abc.abstractmethod def load_media_failed(self, queue_item_id: int, error_code: int) -> None: """Called when load media failed. queue_item_id is the id of the queue item which failed to load """ class BaseMediaPlayer(QuickPlayController): """Mixin class for apps which can play media using the default media namespace.""" def __init__(self, supporting_app_id: str, app_must_match: bool = True) -> None: super().__init__( "urn:x-cast:com.google.cast.media", supporting_app_id=supporting_app_id, app_must_match=app_must_match, ) def play_media( # pylint: disable=too-many-locals self, url: str, content_type: str, *, title: str | None = None, thumb: str | None = None, current_time: float | None = None, autoplay: bool = True, stream_type: str = STREAM_TYPE_LIVE, metadata: dict | None = None, subtitles: str | None = None, subtitles_lang: str = "en-US", subtitles_mime: str = "text/vtt", subtitle_id: int = 1, enqueue: bool = False, media_info: dict | None = None, callback_function: CallbackType | None = None, ) -> None: """ Plays media on the Chromecast. Start default media receiver if not already started. Parameters: url: str - url of the media. content_type: str - mime type. Example: 'video/mp4'. title: str - title of the media. thumb: str - thumbnail image url. current_time: float - Seconds since beginning of content. If the content is live content, and position is not specifed, the stream will start at the live position autoplay: bool - whether the media will automatically play. stream_type: str - describes the type of media artifact as one of the following: "NONE", "BUFFERED", "LIVE". subtitles: str - url of subtitle file to be shown on chromecast. subtitles_lang: str - language for subtitles. subtitles_mime: str - mimetype of subtitles. subtitle_id: int - id of subtitle to be loaded. enqueue: bool - if True, enqueue the media instead of play it. media_info: dict - additional MediaInformation attributes not explicitly listed. metadata: dict - media metadata object, one of the following: GenericMediaMetadata, MovieMediaMetadata, TvShowMediaMetadata, MusicTrackMediaMetadata, PhotoMediaMetadata. Docs: https://developers.google.com/cast/docs/reference/messages#MediaData https://developers.google.com/cast/docs/reference/web_receiver/cast.framework.messages.MediaInformation """ self._send_start_play_media( url, content_type, title, thumb, current_time, autoplay, stream_type, metadata, subtitles, subtitles_lang, subtitles_mime, subtitle_id, enqueue, media_info, callback_function=callback_function, ) def _send_start_play_media( # pylint: disable=too-many-locals self, url: str, content_type: str, title: str | None, thumb: str | None, current_time: float | None, autoplay: bool, stream_type: str, metadata: dict | None, subtitles: str | None, subtitles_lang: str, subtitles_mime: str, subtitle_id: int, enqueue: bool, media_info: dict | None, callback_function: CallbackType | None, ) -> None: media_info = media_info or {} media = { "contentId": url, "streamType": stream_type, "contentType": content_type, "metadata": metadata or {}, **media_info, } if title: media["metadata"]["title"] = title if thumb: media["metadata"]["thumb"] = thumb if "images" not in media["metadata"]: media["metadata"]["images"] = [] media["metadata"]["images"].append({"url": thumb}) # Need to set metadataType if not specified # https://developers.google.com/cast/docs/reference/messages#MediaInformation if media["metadata"] and "metadataType" not in media["metadata"]: media["metadata"]["metadataType"] = METADATA_TYPE_GENERIC if subtitles: sub_msg = [ { "trackId": subtitle_id, "trackContentId": subtitles, "language": subtitles_lang, "subtype": "SUBTITLES", "type": "TEXT", "trackContentType": subtitles_mime, "name": f"{subtitles_lang} - {subtitle_id} Subtitle", } ] media["tracks"] = sub_msg media["textTrackStyle"] = { "backgroundColor": "#FFFFFF00", "edgeType": "OUTLINE", "edgeColor": "#000000FF", } if enqueue: if self._socket_client is None: raise ControllerNotRegistered status = self._socket_client.media_controller.status msg: dict[str, Any] = { "mediaSessionId": status.media_session_id, "items": [ { "media": media, "autoplay": True, "startTime": 0, "preloadTime": 0, } ], MESSAGE_TYPE: TYPE_QUEUE_INSERT, } else: msg = { "media": media, MESSAGE_TYPE: TYPE_LOAD, } if current_time is not None: msg["currentTime"] = current_time msg["autoplay"] = autoplay msg["customData"] = {} if subtitles: msg["activeTrackIds"] = [subtitle_id] self.send_message(msg, inc_session_id=True, callback_function=callback_function) def quick_play(self, *, media_id: str, timeout: float, **kwargs: Any) -> None: """Quick Play""" media_type = kwargs.pop("media_type", "video/mp4") response_handler = WaitResponse(timeout, f"quick play {media_id}") self.play_media( media_id, media_type, **kwargs, callback_function=response_handler.callback ) response_handler.wait_response() class MediaController(BaseMediaPlayer): """Controller to interact with Google media namespace.""" def __init__(self) -> None: super().__init__( supporting_app_id=APP_MEDIA_RECEIVER, app_must_match=False, ) self.media_session_id = 0 self.status = MediaStatus() self.session_active_event = threading.Event() self._status_listeners: list[MediaStatusListener] = [] def channel_connected(self) -> None: """Called when media channel is connected. Will update status.""" self.update_status() def channel_disconnected(self) -> None: """Called when a media channel is disconnected. Will erase status.""" self.status = MediaStatus() self._fire_status_changed() def receive_message(self, _message: CastMessage, data: dict) -> bool: """Called when a media message is received.""" if data[MESSAGE_TYPE] == TYPE_MEDIA_STATUS: self._process_media_status(data) return True if data[MESSAGE_TYPE] == TYPE_LOAD_FAILED: self._process_load_failed(data) return True return False def register_status_listener(self, listener: MediaStatusListener) -> None: """Register a listener for new media statuses. A new status will call listener.new_media_status(status)""" self._status_listeners.append(listener) def update_status(self, *, callback_function: CallbackType | None = None) -> None: """Send message to update the status.""" self.send_message( {MESSAGE_TYPE: TYPE_GET_STATUS}, callback_function=callback_function ) def _send_command( self, command: dict, callback_function: CallbackType | None ) -> None: """Send a command to the Chromecast on media channel.""" if self.status is None or self.status.media_session_id is None: self.logger.warning( "%s command requested but no session is active.", command[MESSAGE_TYPE] ) if callback_function: callback_function(False, None) return command["mediaSessionId"] = self.status.media_session_id self.send_message( command, callback_function=callback_function, inc_session_id=True ) def play(self, timeout: float = 10.0) -> None: """Send the PLAY command.""" response_handler = WaitResponse(timeout, "play") self._send_command({MESSAGE_TYPE: TYPE_PLAY}, response_handler.callback) response_handler.wait_response() def pause(self, timeout: float = 10.0) -> None: """Send the PAUSE command.""" response_handler = WaitResponse(timeout, "pause") self._send_command({MESSAGE_TYPE: TYPE_PAUSE}, response_handler.callback) response_handler.wait_response() def stop(self, timeout: float = 10.0) -> None: """Send the STOP command.""" response_handler = WaitResponse(timeout, "stop") self._send_command({MESSAGE_TYPE: TYPE_STOP}, response_handler.callback) response_handler.wait_response() def rewind(self, timeout: float = 10.0) -> None: """Starts playing the media from the beginning.""" self.seek(0, timeout) def skip(self, timeout: float = 10.0) -> None: """Skips rest of the media. Values less then -5 behaved flaky.""" if not self.status.duration or self.status.duration < 5: return self.seek(int(self.status.duration) - 5, timeout) def seek(self, position: float, timeout: float = 10.0) -> None: """Seek the media to a specific location.""" response_handler = WaitResponse(timeout, f"seek {position}") self._send_command( { MESSAGE_TYPE: TYPE_SEEK, "currentTime": position, "resumeState": "PLAYBACK_START", }, response_handler.callback, ) response_handler.wait_response() def set_playback_rate(self, playback_rate: float, timeout: float = 10.0) -> None: """Set the playback rate. 1.0 is regular time, 0.5 is slow motion.""" response_handler = WaitResponse(timeout, "set playback rate") self._send_command( { MESSAGE_TYPE: TYPE_SET_PLAYBACK_RATE, "playbackRate": playback_rate, }, response_handler.callback, ) response_handler.wait_response() def queue_next(self, timeout: float = 10.0) -> None: """Send the QUEUE_NEXT command.""" response_handler = WaitResponse(timeout, "queue next") self._send_command( {MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": 1}, response_handler.callback ) response_handler.wait_response() def queue_prev(self, timeout: float = 10.0) -> None: """Send the QUEUE_PREV command.""" response_handler = WaitResponse(timeout, "queue prev") self._send_command( {MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": -1}, response_handler.callback ) response_handler.wait_response() def enable_subtitle(self, track_id: int, timeout: float = 10.0) -> None: """Enable specific text track.""" response_handler = WaitResponse(timeout, "enable subtitle") self._send_command( {MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": [track_id]}, response_handler.callback, ) response_handler.wait_response() def disable_subtitle(self, timeout: float = 10.0) -> None: """Disable subtitle.""" response_handler = WaitResponse(timeout, "disable subtitle") self._send_command( {MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": []}, response_handler.callback, ) response_handler.wait_response() def block_until_active(self, timeout: float | None = None) -> None: """ Blocks thread until the media controller session is active on the chromecast. The media controller only accepts playback control commands when a media session is active. If a session is already active then the method returns immediately. :param timeout: a floating point number specifying a timeout for the operation in seconds (or fractions thereof). Or None to block forever. """ self.session_active_event.wait(timeout=timeout) def _process_media_status(self, data: dict) -> None: """Processes a STATUS message.""" self.status.update(data) self.logger.debug("Media:Updated status %s", self.status) # Update session active threading event if self.status.media_session_id is None: self.session_active_event.clear() else: self.session_active_event.set() self._fire_status_changed() def _process_load_failed(self, data: dict) -> None: """Processes a LOAD_FAILED message.""" queue_item_id: int | None = data.get("itemId") error_code: int | None = data.get("detailedErrorCode") self.logger.debug( "Media:Load failed with code %s(%s) for queue item id %s", error_code, MEDIA_PLAYER_ERROR_CODES.get(error_code, "unknown code"), queue_item_id, ) if queue_item_id is None or error_code is None: self.logger.debug("Media:Not firing load failed") return self._fire_load_failed(queue_item_id, error_code) def _fire_status_changed(self) -> None: """Tells listeners of a changed status.""" for listener in self._status_listeners: try: listener.new_media_status(self.status) except Exception: # pylint: disable=broad-except _LOGGER.exception("Exception thrown when calling media status callback") def _fire_load_failed(self, queue_item_id: int, error_code: int) -> None: """Tells listeners of a changed status.""" for listener in self._status_listeners: try: listener.load_media_failed(queue_item_id, error_code) except Exception: # pylint: disable=broad-except _LOGGER.exception("Exception thrown when calling load failed callback") def tear_down(self) -> None: """Called when controller is destroyed.""" super().tear_down() self._status_listeners = [] class DefaultMediaReceiverController(BaseMediaPlayer): """Controller to force media to play with the default media receiver.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_MEDIA_RECEIVER) 07070100000039000081A400000000000000000000000165F9EF9000003213000000000000000000000000000000000000003A00000000pychromecast-14.0.1/pychromecast/controllers/multizone.py""" Controller to monitor audio group members. """ from __future__ import annotations import abc import logging from typing import TYPE_CHECKING, TypedDict from uuid import UUID from . import BaseController from .media import MediaController, MediaStatusListener, MediaStatus from .receiver import CastStatus, CastStatusListener from ..const import MESSAGE_TYPE # pylint: disable-next=no-name-in-module from ..generated.cast_channel_pb2 import CastMessage from ..socket_client import ( CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_LOST, ConnectionStatus, ConnectionStatusListener, ) if TYPE_CHECKING: from .. import Chromecast _LOGGER = logging.getLogger(__name__) MULTIZONE_NAMESPACE = "urn:x-cast:com.google.cast.multizone" TYPE_CASTING_GROUPS = "CASTING_GROUPS" TYPE_DEVICE_ADDED = "DEVICE_ADDED" TYPE_DEVICE_UPDATED = "DEVICE_UPDATED" TYPE_DEVICE_REMOVED = "DEVICE_REMOVED" TYPE_GET_CASTING_GROUPS = "GET_CASTING_GROUPS" TYPE_GET_STATUS = "GET_STATUS" TYPE_MULTIZONE_STATUS = "MULTIZONE_STATUS" TYPE_SESSION_UPDATED = "PLAYBACK_SESSION_UPDATED" class GroupInfo(TypedDict): """Chromecast connection and listener for a group.""" chromecast: Chromecast listener: Listener class GroupMemberInfo(TypedDict): """Group memberships and listener for a group.""" group_memberships: set[str] listeners: list[MultiZoneManagerListener] class MultiZoneControllerListener(abc.ABC): """Listener for receiving audio group events.""" @abc.abstractmethod def multizone_member_added(self, group_uuid: str) -> None: """The cast has been added to group identified by group_uuid.""" @abc.abstractmethod def multizone_member_removed(self, group_uuid: str) -> None: """The cast has been removed from group identified by group_uuid.""" @abc.abstractmethod def multizone_status_received(self) -> None: """Multizone status has been updated.""" class MultiZoneManagerListener(abc.ABC): """Listener for receiving audio group events for a cast device.""" @abc.abstractmethod def added_to_multizone(self, group_uuid: str) -> None: """The cast has been added to group identified by group_uuid.""" @abc.abstractmethod def removed_from_multizone(self, group_uuid: str) -> None: """The cast has been removed from group identified by group_uuid.""" @abc.abstractmethod def multizone_new_media_status( self, group_uuid: str, media_status: MediaStatus ) -> None: """The group identified by group_uuid, of which the cast is a member, has new media status.""" @abc.abstractmethod def multizone_new_cast_status( self, group_uuid: str, cast_status: CastStatus ) -> None: """The group identified by group_uuid, of which the cast is a member, has new status.""" class Listener( CastStatusListener, ConnectionStatusListener, MediaStatusListener, MultiZoneControllerListener, ): """Callback handler.""" def __init__( self, group_cast: Chromecast, casts: dict[str, GroupMemberInfo] ) -> None: """Initialize the listener.""" self._casts = casts group_cast.register_status_listener(self) group_cast.media_controller.register_status_listener(self) group_cast.register_connection_listener(self) self._mz = MultizoneController(group_cast.uuid) self._mz.register_listener(self) self._group_uuid = str(group_cast.uuid) group_cast.register_handler(self._mz) def new_cast_status(self, status: CastStatus) -> None: """Handle reception of a new CastStatus.""" casts = self._casts group_members = self._mz.members for member_uuid in group_members: if member_uuid not in casts: continue for listener in list(casts[member_uuid]["listeners"]): listener.multizone_new_cast_status(self._group_uuid, status) def new_media_status(self, status: MediaStatus) -> None: """Handle reception of a new MediaStatus.""" casts = self._casts group_members = self._mz.members for member_uuid in group_members: if member_uuid not in casts: continue for listener in list(casts[member_uuid]["listeners"]): listener.multizone_new_media_status(self._group_uuid, status) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: """Called when load media failed.""" def new_connection_status(self, status: ConnectionStatus) -> None: """Handle reception of a new ConnectionStatus.""" if status.status == CONNECTION_STATUS_CONNECTED: self._mz.update_members() if status.status in ( CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_LOST, ): self._mz.reset_members() def multizone_member_added(self, group_uuid: str) -> None: """Handle added audio group member.""" casts = self._casts if group_uuid not in casts: casts[group_uuid] = {"listeners": [], "group_memberships": set()} casts[group_uuid]["group_memberships"].add(self._group_uuid) for listener in list(casts[group_uuid]["listeners"]): listener.added_to_multizone(self._group_uuid) def multizone_member_removed(self, group_uuid: str) -> None: """Handle removed audio group member.""" casts = self._casts if group_uuid not in casts: casts[group_uuid] = {"listeners": [], "group_memberships": set()} casts[group_uuid]["group_memberships"].discard(self._group_uuid) for listener in list(casts[group_uuid]["listeners"]): listener.removed_from_multizone(self._group_uuid) def multizone_status_received(self) -> None: """Handle reception of audio group status.""" class MultizoneManager: """Manage audio groups.""" def __init__(self) -> None: # Protect self._casts because it will be accessed from callbacks from # the casts' socket_client thread self._casts: dict[str, GroupMemberInfo] = {} self._groups: dict[str, GroupInfo] = {} def add_multizone(self, group_cast: Chromecast) -> None: """Start managing a group""" self._groups[str(group_cast.uuid)] = { "chromecast": group_cast, "listener": Listener(group_cast, self._casts), } def remove_multizone(self, group_uuid: UUID) -> None: """Stop managing a group""" group_uuid_str = str(group_uuid) group = self._groups.pop(group_uuid_str, None) # Inform all group members that they are no longer members if group is not None: group["listener"]._mz.reset_members() # pylint: disable=protected-access for member in self._casts.values(): member["group_memberships"].discard(group_uuid_str) def register_listener( self, member_uuid: UUID, listener: MultiZoneManagerListener ) -> None: """Register a listener for audio group changes of cast uuid. On update will call: listener.added_to_multizone(group_uuid) The cast has been added to group uuid listener.removed_from_multizone(group_uuid) The cast has been removed from group uuid listener.multizone_new_media_status(group_uuid, media_status) The group uuid, of which the cast is a member, has new status listener.multizone_new_cast_status(group_uuid, cast_status) The group uuid, of which the cast is a member, has new status """ member_uuid_str = str(member_uuid) if member_uuid_str not in self._casts: self._casts[member_uuid_str] = {"listeners": [], "group_memberships": set()} self._casts[member_uuid_str]["listeners"].append(listener) def deregister_listener( self, member_uuid: UUID, listener: MultiZoneManagerListener ) -> None: """Deregister listener for audio group changes of cast uuid.""" self._casts[str(member_uuid)]["listeners"].remove(listener) def get_multizone_memberships(self, member_uuid: UUID) -> list[str]: """Return a list of audio groups in which cast member_uuid is a member""" return list(self._casts[str(member_uuid)]["group_memberships"]) def get_multizone_mediacontroller(self, group_uuid: UUID) -> MediaController: """Get mediacontroller of a group""" return self._groups[str(group_uuid)]["chromecast"].media_controller class MultizoneController(BaseController): """Controller to monitor audio group members.""" def __init__(self, uuid: UUID) -> None: self._members: dict[str, str] = {} self._status_listeners: list[MultiZoneControllerListener] = [] self._uuid = str(uuid) super().__init__(MULTIZONE_NAMESPACE, target_platform=True) def _add_member(self, uuid: str, name: str) -> None: if uuid not in self._members: self._members[uuid] = name _LOGGER.debug( "(%s) Added member %s(%s), members: %s", self._uuid, uuid, name, self._members, ) for listener in list(self._status_listeners): listener.multizone_member_added(uuid) def _remove_member(self, uuid: str) -> None: name = self._members.pop(uuid, "<Unknown>") _LOGGER.debug( "(%s) Removed member %s(%s), members: %s", self._uuid, uuid, name, self._members, ) for listener in list(self._status_listeners): listener.multizone_member_removed(uuid) def register_listener(self, listener: MultiZoneControllerListener) -> None: """Register a listener for audio group changes. On update will call: listener.multizone_member_added(uuid) listener.multizone_member_removed(uuid) listener.multizone_status_received() """ self._status_listeners.append(listener) @property def members(self) -> list[str]: """Return a list of audio group members.""" return list(self._members.keys()) def reset_members(self) -> None: """Reset audio group members.""" for uuid in list(self._members): self._remove_member(uuid) def update_members(self) -> None: """Update audio group members.""" self.send_message({MESSAGE_TYPE: TYPE_GET_STATUS}) def get_casting_groups(self) -> None: """Send GET_CASTING_GROUPS message.""" self.send_message({MESSAGE_TYPE: TYPE_GET_CASTING_GROUPS}) def receive_message( # pylint: disable=too-many-return-statements self, _message: CastMessage, data: dict ) -> bool: """Called when a multizone message is received.""" if data[MESSAGE_TYPE] == TYPE_DEVICE_ADDED: uuid = data["device"]["deviceId"] name = data["device"]["name"] self._add_member(uuid, name) return True if data[MESSAGE_TYPE] == TYPE_DEVICE_REMOVED: uuid = data["deviceId"] self._remove_member(uuid) return True if data[MESSAGE_TYPE] == TYPE_DEVICE_UPDATED: uuid = data["device"]["deviceId"] name = data["device"]["name"] self._add_member(uuid, name) return True if data[MESSAGE_TYPE] == TYPE_MULTIZONE_STATUS: members = data["status"]["devices"] members = {member["deviceId"]: member["name"] for member in members} removed_members = list(set(self._members.keys()) - set(members.keys())) added_members = list(set(members.keys()) - set(self._members.keys())) _LOGGER.debug( "(%s) Added members %s, Removed members: %s", self._uuid, added_members, removed_members, ) for uuid in removed_members: self._remove_member(uuid) for uuid in added_members: self._add_member(uuid, members[uuid]) for listener in list(self._status_listeners): listener.multizone_status_received() return True if data[MESSAGE_TYPE] == TYPE_SESSION_UPDATED: # A temporary group has been formed return True if data[MESSAGE_TYPE] == TYPE_CASTING_GROUPS: # Answer to GET_CASTING_GROUPS return True return False def tear_down(self) -> None: """Called when controller is destroyed.""" super().tear_down() self._status_listeners = [] 0707010000003A000081A400000000000000000000000165F9EF90000002FC000000000000000000000000000000000000003900000000pychromecast-14.0.1/pychromecast/controllers/nrkradio.py""" Controller to interface with NRK Radio. """ # Note: Media ID can be found in the URL, e.g: # For the live channel https://radio.nrk.no/direkte/p1, the media ID is p1 # For the podcast https://radio.nrk.no/podkast/tazte_priv/l_8457deb0-4f2c-4ef3-97de-b04f2c6ef314, # the Media ID is l_8457deb0-4f2c-4ef3-97de-b04f2c6ef314 # For the on-demand program https://radio.nrk.no/serie/radiodokumentaren/sesong/201011/MDUP01004510, # the media id is MDUP01004510 from .media import BaseMediaPlayer from ..config import APP_NRKRADIO APP_NAMESPACE = "urn:x-cast:com.google.cast.media" class NrkRadioController(BaseMediaPlayer): """Controller to interact with NRK Radio.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_NRKRADIO) 0707010000003B000081A400000000000000000000000165F9EF90000002A7000000000000000000000000000000000000003600000000pychromecast-14.0.1/pychromecast/controllers/nrktv.py""" Controller to interface with NRK TV. """ # Note: Media ID for live programs can be found in the URL # e.g. for https://tv.nrk.no/direkte/nrk1, the media ID is nrk1 # Media ID for non-live programs can be found by clicking the share button # e.g. https://tv.nrk.no/serie/uti-vaar-hage/sesong/2/episode/2 shows: # "https://tv.nrk.no/se?v=OUHA43000207", the media ID is OUHA43000207 from .media import BaseMediaPlayer from ..config import APP_NRKTV APP_NAMESPACE = "urn:x-cast:com.google.cast.media" class NrkTvController(BaseMediaPlayer): """Controller to interact with NRK TV.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_NRKTV) 0707010000003C000081A400000000000000000000000165F9EF90000053BA000000000000000000000000000000000000003500000000pychromecast-14.0.1/pychromecast/controllers/plex.py""" Controller to interface with the Plex-app. """ from __future__ import annotations from copy import deepcopy from functools import partial import json import threading from typing import TYPE_CHECKING, Any, cast from urllib.parse import urlparse from . import CallbackType, BaseController from .media import MediaStatus from ..const import MESSAGE_TYPE from ..error import ControllerNotRegistered, RequestFailed from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import chain_on_success if TYPE_CHECKING: from plexapi.base import Playable # type: ignore[import-untyped] from plexapi.media import Media # type: ignore[import-untyped] from plexapi.playqueue import PlayQueue # type: ignore[import-untyped] from plexapi.server import PlexServer # type: ignore[import-untyped] STREAM_TYPE_UNKNOWN = "UNKNOWN" STREAM_TYPE_BUFFERED = "BUFFERED" STREAM_TYPE_LIVE = "LIVE" SEEK_KEY = "currentTime" TYPE_PLAY = "PLAY" TYPE_PAUSE = "PAUSE" TYPE_STOP = "STOP" TYPE_STEPFORWARD = "STEPFORWARD" TYPE_STEPBACKWARD = "STEPBACK" TYPE_PREVIOUS = "PREVIOUS" TYPE_NEXT = "NEXT" TYPE_LOAD = "LOAD" TYPE_DETAILS = "SHOWDETAILS" TYPE_SEEK = "SEEK" TYPE_MEDIA_STATUS = "MEDIA_STATUS" TYPE_GET_STATUS = "GET_STATUS" TYPE_EDIT_TRACKS_INFO = "EDIT_TRACKS_INFO" def media_to_chromecast_command( # pylint: disable=invalid-name, too-many-locals, protected-access media: Playable | None = None, type: str = "LOAD", # pylint: disable=redefined-builtin requestId: int = 1, offset: int = 0, directPlay: bool = True, directStream: bool = True, subtitleSize: int = 100, audioBoost: int = 100, transcoderVideo: bool = True, transcoderVideoRemuxOnly: bool = False, transcoderAudio: bool = True, isVerifiedHostname: bool = True, contentType: str = "video", myPlexSubscription: bool = True, contentId: str | None = None, streamType: str = STREAM_TYPE_BUFFERED, port: int = 32400, protocol: str = "http", address: str | None = None, username: str | None = None, autoplay: bool = True, currentTime: int = 0, playQueue: PlayQueue | None = None, playQueueID: int | None = None, startItem: Media | None = None, version: str = "1.10.1.4602", **kwargs: Any, ) -> dict[str, Any]: """Create the message that chromecast requires. Use pass of plexapi media object or set all the needed kwargs manually. See the code for what to set. Args: media (None, optional): a :class:`~plexapi.base.Playable type (str): Default LOAD, SHOWDETAILS. requestId (int): The requestId, Chromecasts may use this. offset (int): Offset of the playback in seconds. directPlay (bool): Default True directStream (bool): Default True subtitleSize (int): Set the subtitle size, possibly only 100 & 200. audioBoost (int): Default 100 transcoderVideo (bool): Default True transcoderVideoRemuxOnly (bool): Default False transcoderAudio (bool): Default True isVerifiedHostname (bool): Default True contentType (str): Default 'video', 'audio' myPlexSubscription (bool): True if user has a PlexPass. contentId (str): The key Chromecasts use to start playback. streamType (str): Default BUFFERED, LIVE port (int): PMS port address (str): PMS host, without scheme. username (None): Username of the user that started playback. autoplay (bool): Auto play after the video is done. currentTime (int): Set playback from this time. default 0 version (str): PMS version. Default 1.10.1.4602 startItem (:class:`~plexapi.media.Media`, optional): Media item in list/playlist/playqueue where playback should start. Overrides existing startItem for playqueues if set. **kwargs: To allow overrides, this will be merged with the rest of the msg. Returns: dict: Returs a dict formatted correctly to start playback on a Chromecast. """ if media is not None: # Lets set some params for the user if they use plexapi. server: PlexServer = ( media[0]._server if isinstance(media, list) else media._server ) server_url = urlparse(server._baseurl) protocol = server_url.scheme address = server_url.hostname port = server_url.port machineIdentifier = server.machineIdentifier token = server._token username = server.myPlexUsername myPlexSubscription = server.myPlexSubscription if getattr(media, "TYPE", None) == "playqueue": if startItem: media = media.items else: playQueue = media if playQueue is None: playQueue = server.createPlayQueue(media, startItem=startItem) playQueueID = playQueue.playQueueID contentId = playQueue.selectedItem.key contentType = playQueue.items[0].listType version = server.version # Chromecasts seem to start playback 5 seconds before the offset. if offset != 0: currentTime = offset msg = { "type": type, "requestId": requestId, "media": { "contentId": contentId, "streamType": streamType, "contentType": contentType, "customData": { "offset": offset, "directPlay": directPlay, "directStream": directStream, "subtitleSize": subtitleSize, "audioBoost": audioBoost, "server": { "machineIdentifier": machineIdentifier, "transcoderVideo": transcoderVideo, "transcoderVideoRemuxOnly": transcoderVideoRemuxOnly, "transcoderAudio": transcoderAudio, "version": version, "myPlexSubscription": myPlexSubscription, "isVerifiedHostname": isVerifiedHostname, "protocol": protocol, "address": address, "port": port, "accessToken": token, "user": {"username": username}, }, "containerKey": f"/playQueues/{playQueueID}?own=1&window=200", }, "autoplay": autoplay, "currentTime": currentTime, "activeTrackIds": None, }, } # Allow passing of kwargs to the dict. msg.update(kwargs) return msg class PlexMediaStatus(MediaStatus): """Class to hold the media status.""" @property def episode_title(self) -> str | None: """Return episode title.""" return self.media_metadata.get("subtitle") # The episode_title property is added to MediaStatus objects @property # type: ignore[misc] def episode_title(self: PlexMediaStatus) -> str | None: """Return episode title.""" return self.media_metadata.get("subtitle") class PlexController(BaseController): # pylint: disable=too-many-public-methods """Controller to interact with Plex namespace.""" def __init__(self) -> None: super().__init__("urn:x-cast:plex", "9AC194DC") self.app_id = "9AC194DC" self.namespace = "urn:x-cast:plex" self.request_id = 0 self.play_media_event = threading.Event() self._last_play_msg: dict[str, Any] = {} def _send_cmd( self, msg: dict, namespace: str | None = None, inc_session_id: bool = False, callback_function: CallbackType | None = None, inc: bool = True, ) -> None: # pylint: disable=too-many-arguments """Wrapper for the commands. Args: msg (dict): The actual command that will be sent. namespace (None, optional): What namespace should be used to send this. inc_session_id (bool, optional): Include session ID. callback_function (None, optional): If callback is provided it is executed after the command. inc (bool, optional): Increase the requestsId. """ self.logger.debug( "Sending msg %r %s %s %s %s.", msg, namespace, inc_session_id, callback_function, inc, ) if inc: self._inc_request() if namespace: old = self.namespace try: self.namespace = namespace self.send_message( msg, inc_session_id=inc_session_id, callback_function=callback_function, ) finally: self.namespace = old else: self.send_message( msg, inc_session_id=inc_session_id, callback_function=callback_function ) def _inc_request(self) -> int: # Is this getting passed to Plex? self.request_id += 1 return self.request_id def channel_connected(self) -> None: """Updates status when a media channel is connected.""" self.update_status() def receive_message(self, _message: CastMessage, data: dict) -> bool: """Called when a message from Plex to our controller is received. I haven't seen any message for it, but lets keep for for now. I have done minimal testing. Args: message (dict): Description data (dict): message.payload_utf8 interpreted as a JSON dict. Returns: bool: True if the message is handled. """ if data[MESSAGE_TYPE] == TYPE_MEDIA_STATUS: self.logger.debug("(PlexController) MESSAGE RECEIVED: %r.", data) return True return False def update_status(self, *, callback_function: CallbackType | None = None) -> None: """Send message to update status.""" self.send_message( {MESSAGE_TYPE: TYPE_GET_STATUS}, callback_function=callback_function ) def stop(self) -> None: """Send stop command.""" self._send_cmd({MESSAGE_TYPE: TYPE_STOP}) def pause(self) -> None: """Send pause command.""" self._send_cmd({MESSAGE_TYPE: TYPE_PAUSE}) def play(self) -> None: """Send play command.""" self._send_cmd({MESSAGE_TYPE: TYPE_PLAY}) def previous(self) -> None: """Send previous command.""" self._send_cmd({MESSAGE_TYPE: TYPE_PREVIOUS}) def next(self) -> None: """Send next command.""" self._send_cmd({MESSAGE_TYPE: TYPE_NEXT}) def seek(self, position: int, resume_state: str = "PLAYBACK_START") -> None: """Send seek command. Args: position (int): Offset in seconds. resume_state (str, default): PLAYBACK_START """ self._send_cmd( {MESSAGE_TYPE: TYPE_SEEK, SEEK_KEY: position, "resumeState": resume_state} ) def rewind(self) -> None: """Rewind back to the start.""" self.seek(0) def set_volume(self, percent: float) -> float: """Set the volume in percent (1-100). Args: percent (int): Percent of volume to be set. """ if self._socket_client is None: raise ControllerNotRegistered return self._socket_client.receiver_controller.set_volume(percent / 100) def volume_up(self, delta: float = 0.1) -> float: """Increment volume by 0.1 (or delta) unless at max. Returns the new volume. """ if delta <= 0: raise ValueError(f"volume delta must be greater than zero, not {delta}") return self.set_volume(self.status.volume_level + delta) def volume_down(self, delta: float = 0.1) -> float: """Decrement the volume by 0.1 (or delta) unless at 0. Returns the new volume. """ if delta <= 0: raise ValueError(f"volume delta must be greater than zero, not {delta}") return self.set_volume(self.status.volume_level - delta) def mute(self, status: bool | None = None) -> None: """Toggle muting of audio. Args: status (None, optional): Override for on/off. """ if self._socket_client is None: raise ControllerNotRegistered if status is None: status = not self.status.volume_muted self._socket_client.receiver_controller.set_volume_muted(status) def show_media(self, media: Playable | None = None, **kwargs: Any) -> None: """Show media item's info on screen.""" msg = media_to_chromecast_command( media, type=TYPE_DETAILS, requestId=self._inc_request(), **kwargs ) def callback(msg_sent: bool, _response: dict | None) -> None: if not msg_sent: raise RequestFailed("PlexController.show_media") self.launch( callback_function=chain_on_success( partial(self._send_cmd, msg, inc_session_id=True, inc=False), callback ) ) def quit_app(self) -> None: """Quit the Plex app.""" if self._socket_client is None: raise ControllerNotRegistered self._socket_client.receiver_controller.stop_app() @property def status(self) -> PlexMediaStatus: """Get the Chromecast's playing status. Returns: pychromecast.controllers.media.MediaStatus: Slightly modified status with patched method for episode_title. """ if self._socket_client is None: raise ControllerNotRegistered status = self._socket_client.media_controller.status status.episode_title = episode_title # type: ignore[attr-defined] return cast(PlexMediaStatus, status) def _reset_playback(self, offset: float | None = None) -> None: """Reset playback. Args: offset (None, optional): Start playback from this offset in seconds, otherwise playback will start from current time. """ if self._last_play_msg: offset_now = self.status.adjusted_current_time msg = deepcopy(self._last_play_msg) msg["media"]["customData"]["offset"] = ( offset_now if offset is None else offset ) msg["current_time"] = offset_now self._send_cmd( msg, namespace="urn:x-cast:com.google.cast.media", inc_session_id=True, inc=False, ) else: self.logger.debug( "Can not reset the stream, _last_play_msg " "was not set with _send_start_play." ) def _send_start_play(self, media: Playable | None = None, **kwargs: Any) -> None: """Helper to send a playback command. Args: media (None, optional): :class:`~plexapi.base.Playable **kwargs: media_to_chromecast_command docs string. """ msg = media_to_chromecast_command( media, requestiId=self._inc_request(), **kwargs ) self.logger.debug("Create command: \n%r\n", json.dumps(msg, indent=4)) self._last_play_msg = msg self._send_cmd( msg, namespace="urn:x-cast:com.google.cast.media", inc_session_id=True, inc=False, ) def block_until_playing( self, media: Playable | None = None, timeout: float | None = None, **kwargs: Any ) -> None: """Block until media is playing, typically useful in a script. Another way to do the same is to check if the controller is_active or by using self.status.player_state. Args: media (None, optional): Can also be :class:`~plexapi.base.Playable if not, you need to fill out all the kwargs. timeout (None, int): default None **kwargs: See media_to_chromecast_command docs string. """ # In case media isnt playing. self.play_media_event.clear() self.play_media(media, **kwargs) self.play_media_event.wait(timeout) self.play_media_event.clear() def play_media(self, media: Playable | None = None, **kwargs: Any) -> None: """Start playback on the Chromecast. Args: media (None, optional): Can also be :class:`~plexapi.base.Playable if not, you need to fill out all the kwargs. **kwargs: See media_to_chromecast_command docs string. """ self.play_media_event.clear() def app_launched_callback(msg_sent: bool, _response: dict | None) -> None: if not msg_sent: raise RequestFailed("PlexController.play_media") try: self._send_start_play(media, **kwargs) finally: self.play_media_event.set() self.launch(callback_function=app_launched_callback) def join(self, timeout: float | None = None) -> None: """Join the thread.""" if self._socket_client is None: raise ControllerNotRegistered self._socket_client.join(timeout=timeout) def disconnect(self, timeout: float | None = None) -> None: """Disconnect the controller. :param timeout: A floating point number specifying a timeout for the operation in seconds (or fractions thereof). Or None to block forever. Set to 0 to not block. """ if self._socket_client is None: raise ControllerNotRegistered self._socket_client.disconnect() self.join(timeout=timeout) # pylint: disable=too-many-public-methods class PlexApiController(PlexController): """A controller that can use PlexAPI.""" def __init__(self, pms: PlexServer) -> None: super().__init__() self.pms = pms def _get_current_media(self) -> tuple[Any, Any, Any]: """Get current media_item, media, & part for PMS.""" # Note: The cast to str below was added when adding type annotations. We may # need to instead add error handling, checking the content_id is valid and so on key = int(cast(str, self.status.content_id).split("/")[-1]) media_item = self.pms.fetchItem(key).reload() media_idx = self.status.media_custom_data.get("mediaIndex", 0) part_idx = self.status.media_custom_data.get("partIndex", 0) media = media_item.media[media_idx] part = media.parts[part_idx] return media_item, media, part def _change_track( self, track: Any, type_: str = "subtitle", reset_playback: bool = True ) -> None: """Sets a new default audio/subtitle track. Args: track (None): The chosen track. type_ (str): The type of track. reset_playback (bool, optional): Reset playback after the track has been changed. Raises: ValueError: If type isn't subtitle or audio. """ item, _, part = self._get_current_media() if type_ == "subtitle": method = part.subtitleStreams() default = part.setDefaultSubtitleStream elif type_ == "audio": method = part.audioStreams() default = part.setDefaultAudioStream else: raise ValueError("Set type parameter as subtitle or audio.") for track_ in method: if track in (track_.index, track_.language, track_.languageCode): self.logger.debug("Change %s to %s.", type_, track) default(track_) break item.reload() if reset_playback: self._reset_playback() def enable_audiotrack(self, audio: str) -> None: """Enable an audiotrack. Args: audio (str): Can be index, language or languageCode. """ self._change_track(audio, "audio") def disable_subtitle(self) -> None: """Disable a subtitle track.""" ( _, __, part, ) = self._get_current_media() part.resetDefaultSubtitleStream() self._reset_playback() def enable_subtitle(self, subtitle: str) -> None: """Enable a subtitle track. Args: subtitle (str): Can be index, language or languageCode. """ self._change_track(subtitle) def play_media(self, media: Playable | None = None, **kwargs: Any) -> None: """Start playback on the Chromecast. Args: media (None, optional): Can also be :class:`~plexapi.base.Playable if not, you need to fill out all the kwargs. **kwargs: See media_to_chromecast_command docs string. `version` is set to the version of the PMS reported by the API by default. """ args = {"version": self.pms.version} args.update(kwargs) super().play_media(media, **args) 0707010000003D000081A400000000000000000000000165F9EF9000002CA1000000000000000000000000000000000000003900000000pychromecast-14.0.1/pychromecast/controllers/receiver.py""" Provides a controller for controlling the default media players on the Chromecast. """ import abc from dataclasses import dataclass from functools import partial from ..const import ( CAST_TYPE_AUDIO, CAST_TYPE_CHROMECAST, CAST_TYPE_GROUP, MESSAGE_TYPE, REQUEST_ID, REQUEST_TIMEOUT, SESSION_ID, ) from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..response_handler import WaitResponse, chain_on_success from . import BaseController, CallbackType APP_ID = "appId" ERROR_REASON = "reason" NS_RECEIVER = "urn:x-cast:com.google.cast.receiver" TYPE_GET_STATUS = "GET_STATUS" TYPE_RECEIVER_STATUS = "RECEIVER_STATUS" TYPE_LAUNCH = "LAUNCH" TYPE_LAUNCH_ERROR = "LAUNCH_ERROR" LAUNCH_CANCELLED = "CANCELLED" VOLUME_CONTROL_TYPE_ATTENUATION = "attenuation" VOLUME_CONTROL_TYPE_FIXED = "fixed" VOLUME_CONTROL_TYPE_MASTER = "master" @dataclass(frozen=True) class CastStatus: """Cast status container.""" is_active_input: bool | None is_stand_by: bool | None volume_level: float volume_muted: bool app_id: str | None display_name: str | None namespaces: list[str] session_id: str | None transport_id: str | None status_text: str icon_url: str | None volume_control_type: str @dataclass(frozen=True) class LaunchFailure: """Launch failure container.""" reason: str | None app_id: str | None request_id: int | None class CastStatusListener(abc.ABC): """Listener for receiving cast status events.""" @abc.abstractmethod def new_cast_status(self, status: CastStatus) -> None: """Updated cast status.""" class LaunchErrorListener(abc.ABC): """Listener for receiving launch error events.""" @abc.abstractmethod def new_launch_error(self, status: LaunchFailure) -> None: """Launch error.""" class ReceiverController(BaseController): """ Controller to interact with the Chromecast platform. :param cast_type: Type of Chromecast device. """ def __init__(self, cast_type: str = CAST_TYPE_CHROMECAST) -> None: super().__init__(NS_RECEIVER, target_platform=True) self.status: CastStatus | None = None self.launch_failure: LaunchFailure | None = None self.cast_type = cast_type self._status_listeners: list[CastStatusListener] = [] self._launch_error_listeners: list[LaunchErrorListener] = [] def disconnected(self) -> None: """Called when disconnected. Will erase status.""" self.logger.info("Receiver:channel_disconnected") self.status = None @property def app_id(self) -> str | None: """Convenience method to retrieve current app id.""" return self.status.app_id if self.status else None def receive_message(self, _message: CastMessage, data: dict) -> bool: """ Called when a receiver message is received. data is message.payload_utf8 interpreted as a JSON dict. """ if data[MESSAGE_TYPE] == TYPE_RECEIVER_STATUS: self._process_get_status(data) return True if data[MESSAGE_TYPE] == TYPE_LAUNCH_ERROR: self._process_launch_error(data) return True return False def register_status_listener(self, listener: CastStatusListener) -> None: """Register a status listener for when a new Chromecast status has been received. Listeners will be called with listener.new_cast_status(status)""" self._status_listeners.append(listener) def register_launch_error_listener(self, listener: LaunchErrorListener) -> None: """Register a listener for when a new launch error message has been received. Listeners will be called with listener.new_launch_error(launch_failure)""" self._launch_error_listeners.append(listener) def update_status( self, *, callback_function: CallbackType | None = None, ) -> None: """Sends a message to the Chromecast to update the status.""" self.logger.debug("Receiver:Updating status") self.send_message( {MESSAGE_TYPE: TYPE_GET_STATUS}, callback_function=callback_function ) def launch_app( self, app_id: str, *, force_launch: bool = False, callback_function: CallbackType | None = None, ) -> None: """Launches an app on the Chromecast. Will only launch if it is not currently running unless force_launch=True.""" if not force_launch and self.status is None: self.update_status( callback_function=chain_on_success( partial(self._send_launch_message, app_id, force_launch), callback_function, ) ) else: self._send_launch_message(app_id, force_launch, callback_function) def _send_launch_message( self, app_id: str, force_launch: bool, callback_function: CallbackType | None, *, retry_on_cancelled_error: bool = True, ) -> None: if force_launch or self.app_id != app_id: self.logger.info("Receiver:Launching app %s", app_id) self.launch_failure = None def handle_launch_response(msg_sent: bool, response: dict | None) -> None: if ( msg_sent # pylint: disable=too-many-boolean-expressions and response and response.get(MESSAGE_TYPE) == TYPE_LAUNCH_ERROR and response.get(ERROR_REASON) == LAUNCH_CANCELLED and not self._launch_error_listeners and retry_on_cancelled_error ): self.logger.info( "Receiver:Launching app %s failed, retrying once", app_id ) self._send_launch_message( app_id, force_launch, callback_function, retry_on_cancelled_error=False, ) return if not callback_function: return if not msg_sent or not response: callback_function(False, response) return if response[MESSAGE_TYPE] == TYPE_RECEIVER_STATUS: callback_function(True, response) return callback_function(False, response) self.send_message( {MESSAGE_TYPE: TYPE_LAUNCH, APP_ID: app_id}, callback_function=handle_launch_response, ) else: self.logger.info("Not launching app %s - already running", app_id) if callback_function: callback_function(True, None) def stop_app( self, *, callback_function: CallbackType | None = None, ) -> None: """Stops the current running app on the Chromecast.""" self.logger.info("Receiver:Stopping current app '%s'", self.app_id) return self.send_message( {MESSAGE_TYPE: "STOP"}, inc_session_id=True, callback_function=callback_function, ) def set_volume(self, volume: float, timeout: float = REQUEST_TIMEOUT) -> float: """Allows to set volume. Should be value between 0..1. Returns the new volume. """ volume = min(max(0, volume), 1) self.logger.info("Receiver:setting volume to %.2f", volume) response_handler = WaitResponse(timeout, "set volume") self.send_message( {MESSAGE_TYPE: "SET_VOLUME", "volume": {"level": volume}}, callback_function=response_handler.callback, ) response_handler.wait_response() return volume def set_volume_muted(self, muted: bool, timeout: float = REQUEST_TIMEOUT) -> None: """Allows to mute volume.""" response_handler = WaitResponse(timeout, "mute volume") self.send_message( {MESSAGE_TYPE: "SET_VOLUME", "volume": {"muted": muted}}, callback_function=response_handler.callback, ) response_handler.wait_response() @staticmethod def _parse_status(data: dict, cast_type: str) -> CastStatus: """ Parses a STATUS message and returns a CastStatus object. :type data: dict :param cast_type: Type of Chromecast. :rtype: CastStatus """ status_data: dict = data.get("status", {}) volume_data: dict = status_data.get("volume", {}) try: app_data: dict = status_data["applications"][0] except (KeyError, IndexError): app_data = {} is_audio = cast_type in (CAST_TYPE_AUDIO, CAST_TYPE_GROUP) status = CastStatus( data.get("isActiveInput", None if is_audio else False), data.get("isStandBy", None if is_audio else True), volume_data.get("level", 1.0), volume_data.get("muted", False), app_data.get(APP_ID), app_data.get("displayName"), [item["name"] for item in app_data.get("namespaces", [])], app_data.get(SESSION_ID), app_data.get("transportId"), app_data.get("statusText", ""), app_data.get("iconUrl"), volume_data.get("controlType", VOLUME_CONTROL_TYPE_ATTENUATION), ) return status def _process_get_status(self, data: dict) -> None: """Processes a received STATUS message and notifies listeners.""" status = self._parse_status(data, self.cast_type) self.status = status self.logger.debug("Received status: %s", self.status) for listener in self._status_listeners: try: listener.new_cast_status(self.status) except Exception: # pylint: disable=broad-except self.logger.exception( "Exception thrown when calling cast status listener" ) @staticmethod def _parse_launch_error(data: dict) -> LaunchFailure: """ Parses a LAUNCH_ERROR message and returns a LaunchFailure object. :type data: dict :rtype: LaunchFailure """ return LaunchFailure( data.get(ERROR_REASON, None), data.get(APP_ID), data.get(REQUEST_ID) ) def _process_launch_error(self, data: dict) -> None: """ Processes a received LAUNCH_ERROR message and notifies listeners. """ launch_failure = self._parse_launch_error(data) self.launch_failure = launch_failure self.logger.debug("Launch status: %s", launch_failure) for listener in self._launch_error_listeners: try: listener.new_launch_error(launch_failure) except Exception: # pylint: disable=broad-except self.logger.exception( "Exception thrown when calling launch error listener" ) def tear_down(self) -> None: """Called when controller is destroyed.""" super().tear_down() self.status = None self.launch_failure = None self._status_listeners = [] 0707010000003E000081A400000000000000000000000165F9EF900000013D000000000000000000000000000000000000003600000000pychromecast-14.0.1/pychromecast/controllers/shaka.py""" Simple Controller to use Shaka as a media controller. """ from ..config import APP_SHAKA from .media import BaseMediaPlayer class ShakaController(BaseMediaPlayer): """Controller to interact with Shaka app namespace.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_SHAKA) 0707010000003F000081A400000000000000000000000165F9EF9000000727000000000000000000000000000000000000003600000000pychromecast-14.0.1/pychromecast/controllers/supla.py""" Controller to interface with Supla. """ import logging from typing import Any from . import CallbackType, QuickPlayController from ..config import APP_SUPLA from ..response_handler import WaitResponse APP_NAMESPACE = "urn:x-cast:fi.ruutu.chromecast" # pylint: disable=too-many-instance-attributes class SuplaController(QuickPlayController): """Controller to interact with Supla namespace.""" def __init__(self) -> None: super().__init__(APP_NAMESPACE, APP_SUPLA) self.logger = logging.getLogger(__name__) def play_media( self, media_id: str, *, is_live: bool = False, callback_function: CallbackType | None = None, ) -> None: """ Play Supla media """ msg = { "type": "load", "mediaId": media_id, "currentTime": 0, "isLive": is_live, "isAtLiveMoment": False, "bookToken": "", "sample": True, "fw_site": "Supla", "Sanoma_adkv": "", "prerollAdsPlayed": True, "supla": True, "nextInSequenceList": 0, "playbackRate": 1, "env": "prod", } self.send_message( msg, inc_session_id=False, callback_function=callback_function, no_add_request_id=True, ) def quick_play( self, *, media_id: str, timeout: float, is_live: bool = False, **kwargs: Any ) -> None: """Quick Play""" response_handler = WaitResponse(timeout, f"supla quick play {media_id}") self.play_media( media_id, is_live=is_live, **kwargs, callback_function=response_handler.callback, ) response_handler.wait_response() 07070100000040000081A400000000000000000000000165F9EF90000009CD000000000000000000000000000000000000003A00000000pychromecast-14.0.1/pychromecast/controllers/yleareena.py""" Controller to interface with the Yle Areena app namespace. """ from typing import Any from . import CallbackType from .media import BaseMediaPlayer, STREAM_TYPE_BUFFERED, TYPE_LOAD, MESSAGE_TYPE from ..config import APP_YLEAREENA from ..response_handler import WaitResponse class YleAreenaController(BaseMediaPlayer): """Controller to interact with Yle Areena app namespace.""" def __init__(self) -> None: super().__init__(supporting_app_id=APP_YLEAREENA) def play_areena_media( # pylint: disable=too-many-locals self, kaltura_id: str, *, audio_language: str = "", text_language: str = "off", current_time: float = 0, autoplay: bool = True, stream_type: str = STREAM_TYPE_BUFFERED, callback_function: CallbackType | None = None, ) -> None: """ Play media with the entry id "kaltura_id". This value can be found by loading a page on Areena, e.g. https://areena.yle.fi/1-50097921 And finding the kaltura player which has an id of yle-kaltura-player3430579305188-29-0_whwjqpry In this case the kaltura id is 0_whwjqpry """ msg = { "media": { "streamType": stream_type, "customData": { "mediaInfo": {"entryId": kaltura_id}, "audioLanguage": audio_language, "textLanguage": text_language, }, }, MESSAGE_TYPE: TYPE_LOAD, "currentTime": current_time, "autoplay": autoplay, "customData": {}, "textTrackStyle": { "foregroundColor": "#FFFFFFFF", "backgroundColor": "#000000FF", "fontScale": 1, "fontFamily": "sans-serif", }, } self.send_message(msg, inc_session_id=True, callback_function=callback_function) def quick_play( self, *, media_id: str, timeout: float, audio_lang: str = "", text_lang: str = "off", **kwargs: Any, ) -> None: """Quick Play""" response_handler = WaitResponse(timeout, f"yleareena quick play {media_id}") self.play_areena_media( media_id, audio_language=audio_lang, text_language=text_lang, **kwargs, callback_function=response_handler.callback, ) response_handler.wait_response() 07070100000041000081A400000000000000000000000165F9EF9000001886000000000000000000000000000000000000003800000000pychromecast-14.0.1/pychromecast/controllers/youtube.py""" Controller to interface with the YouTube-app. Use the media controller to play, pause etc. """ import logging import threading from typing import Any, cast from casttube import YouTubeSession # type: ignore[import-untyped] from casttube.YouTubeSession import HEADERS # type: ignore[import-untyped] import requests from . import QuickPlayController from ..const import MESSAGE_TYPE from ..error import RequestTimeout from ..generated.cast_channel_pb2 import ( # pylint: disable=no-name-in-module CastMessage, ) from ..config import APP_YOUTUBE YOUTUBE_NAMESPACE = "urn:x-cast:com.google.youtube.mdx" TYPE_GET_SCREEN_ID = "getMdxSessionStatus" TYPE_STATUS = "mdxSessionStatus" ATTR_SCREEN_ID = "screenId" _LOGGER = logging.getLogger(__name__) class TimeoutYouTubeSession(YouTubeSession): # type: ignore[misc] """A youtube session with timeout.""" def __init__(self, screen_id: str, timeout: float) -> None: """Initialize.""" super().__init__(screen_id) self.__timeout = timeout def _do_post( self, url: Any, data: Any = None, params: Any = None, headers: Any = None, session_request: Any = False, ) -> Any: """ Calls requests.post with custom headers, increments RID(request id) on every post. will raise if response is not 200 :param url:(str) request url :param data: (dict) the POST body :param params:(dict) POST url params :param headers:(dict) Additional headers for the request :param session_request:(bool) True to increment session request counter(req_count) :return: POST response """ if headers: headers = {**HEADERS, **headers} else: headers = HEADERS response = requests.post( url, headers=headers, data=data, params=params, timeout=self.__timeout ) # 404 resets the sid, session counters # 400 in session probably means bad sid # If user did a bad request (eg. remove an non-existing video from queue) # bind restores the session. if response.status_code in (404, 400) and session_request: self._bind() response.raise_for_status() if session_request: self._req_count += 1 self._rid += 1 return response class YouTubeController(QuickPlayController): """Controller to interact with Youtube.""" _session: YouTubeSession _screen_id: str | None = None def __init__(self, timeout: float = 10) -> None: super().__init__(YOUTUBE_NAMESPACE, APP_YOUTUBE) self.status_update_event = threading.Event() self._timeout = timeout def start_session_if_none(self) -> None: """ Starts a session it is not yet initialized. """ if not (self._screen_id and self._session): self.update_screen_id() self._session = TimeoutYouTubeSession( screen_id=cast(str, self._screen_id), timeout=self._timeout ) def play_video(self, video_id: str, playlist_id: str | None = None) -> None: """ Play video(video_id) now. This ignores the current play queue order. :param video_id: YouTube video id(http://youtube.com/watch?v=video_id) :param playlist_id: youtube.com/watch?v=video_id&list=playlist_id """ self.start_session_if_none() self._session.play_video(video_id, playlist_id) def add_to_queue(self, video_id: str) -> None: """ Add video(video_id) to the end of the play queue. :param video_id: YouTube video id(http://youtube.com/watch?v=video_id) """ self.start_session_if_none() self._session.add_to_queue(video_id) def play_next(self, video_id: str) -> None: """ Play video(video_id) after the currently playing video. :param video_id: YouTube video id(http://youtube.com/watch?v=video_id) """ self.start_session_if_none() self._session.play_next(video_id) def remove_video(self, video_id: str) -> None: """ Remove video(videoId) from the queue. :param video_id: YouTube video id(http://youtube.com/watch?v=video_id) """ self.start_session_if_none() self._session.remove_video(video_id) def clear_playlist(self) -> None: """ Clear the entire video queue """ self.start_session_if_none() self._session.clear_playlist() def update_screen_id(self) -> None: """ Sends a getMdxSessionStatus to get the screenId and waits for response. This function is blocking If connected we should always get a response (send message will launch app if it is not running). """ self.status_update_event.clear() self.send_message({MESSAGE_TYPE: TYPE_GET_SCREEN_ID}) status = self.status_update_event.wait(10) if not status: _LOGGER.warning("Failed to update screen_id") self.status_update_event.clear() def receive_message(self, _message: CastMessage, data: dict) -> bool: """Called when a message is received.""" if data[MESSAGE_TYPE] == TYPE_STATUS: # Ignore the type error until validation of messages has been implemented self._process_status(data.get("data")) # type: ignore[arg-type] return True return False def _process_status(self, status: dict) -> None: """Process latest status update.""" self._screen_id = status.get(ATTR_SCREEN_ID) self.status_update_event.set() def quick_play( self, *, media_id: str, timeout: float, playlist_id: str | None = None, enqueue: bool = False, **kwargs: Any, ) -> None: """Quick Play""" self._timeout = timeout try: if enqueue: self.add_to_queue(media_id, **kwargs) else: self.play_video(media_id, playlist_id=playlist_id, **kwargs) except requests.Timeout as exc: raise RequestTimeout(f"youtube quick play {media_id}", timeout) from exc 07070100000042000081A400000000000000000000000165F9EF90000028B9000000000000000000000000000000000000002900000000pychromecast-14.0.1/pychromecast/dial.py""" Implements the DIAL-protocol to communicate with the Chromecast """ from __future__ import annotations from dataclasses import dataclass import json import logging import socket import ssl import urllib.request from uuid import UUID from typing import Any import zeroconf from .const import CAST_TYPE_AUDIO, CAST_TYPE_CHROMECAST, CAST_TYPE_GROUP from .error import ZeroConfInstanceRequired from .models import ZEROCONF_ERRORS, CastInfo, HostServiceInfo, MDNSServiceInfo XML_NS_UPNP_DEVICE = "{urn:schemas-upnp-org:device-1-0}" FORMAT_BASE_URL_HTTP = "http://{}:8008" FORMAT_BASE_URL_HTTPS = "https://{}:8443" _LOGGER = logging.getLogger(__name__) def get_host_from_service( service: HostServiceInfo | MDNSServiceInfo, zconf: zeroconf.Zeroconf | None ) -> tuple[str | None, int | None, zeroconf.ServiceInfo | None]: """Resolve host and port from service.""" service_info = None if isinstance(service, HostServiceInfo): return (service.host, service.port, None) try: if not zconf: raise ZeroConfInstanceRequired service_info = zconf.get_service_info("_googlecast._tcp.local.", service.name) if service_info: _LOGGER.debug( "get_info_from_service resolved service %s to service_info %s", service, service_info, ) else: _LOGGER.debug( "get_info_from_service failed to resolve service %s", service, ) except ZEROCONF_ERRORS: # We do not catch zeroconf.NotRunningException as it's # an unrecoverable error. _LOGGER.debug("get_info_from_service raised:", exc_info=True) return _get_host_from_zc_service_info(service_info) + (service_info,) def _get_host_from_zc_service_info( service_info: zeroconf.ServiceInfo | None, ) -> tuple[str | None, int | None]: """Get hostname or IP + port from zeroconf service_info.""" host = None port = None if service_info and service_info.port: if len(service_info.addresses) > 0: host = socket.inet_ntoa(service_info.addresses[0]) elif service_info.server is not None: host = service_info.server.lower() if host is not None: port = service_info.port return (host, port) def _get_status( services: set[HostServiceInfo | MDNSServiceInfo], zconf: zeroconf.Zeroconf | None, path: str, secure: bool, timeout: float, context: ssl.SSLContext | None, ) -> tuple[str | None, Any]: """Query a cast device via http(s).""" for service in services.copy(): host, _, _ = get_host_from_service(service, zconf) if host: _LOGGER.debug("Resolved service %s to %s", service, host) break headers = {"content-type": "application/json"} if secure: url = FORMAT_BASE_URL_HTTPS.format(host) + path else: url = FORMAT_BASE_URL_HTTP.format(host) + path has_context = bool(context) if secure and not has_context: context = get_ssl_context() req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=timeout, context=context) as response: data = response.read() return (host, json.loads(data.decode("utf-8"))) def get_ssl_context() -> ssl.SSLContext: """Create an SSL context.""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context def get_cast_type( cast_info: CastInfo, zconf: zeroconf.Zeroconf | None = None, timeout: float = 30, context: ssl.SSLContext | None = None, ) -> CastInfo: """Add cast type and manufacturer to a CastInfo instance.""" cast_type = CAST_TYPE_CHROMECAST manufacturer = "Unknown manufacturer" if cast_info.port != 8009: cast_type = CAST_TYPE_GROUP manufacturer = "Google Inc." else: host: str | None = "<unknown>" try: display_supported = True host, status = _get_status( cast_info.services, zconf, "/setup/eureka_info?params=device_info,name", True, timeout, context, ) if "device_info" in status: device_info = status["device_info"] capabilities = device_info.get("capabilities", {}) display_supported = capabilities.get("display_supported", True) manufacturer = device_info.get("manufacturer", manufacturer) if not display_supported: cast_type = CAST_TYPE_AUDIO _LOGGER.debug("cast type: %s, manufacturer: %s", cast_type, manufacturer) except ( urllib.error.HTTPError, urllib.error.URLError, OSError, ValueError, ) as err: _LOGGER.warning( "Failed to determine cast type for host %s (%s) (services:%s)", host, err, cast_info.services, ) cast_type = CAST_TYPE_CHROMECAST return CastInfo( cast_info.services, cast_info.uuid, cast_info.model_name, cast_info.friendly_name, cast_info.host, cast_info.port, cast_type, manufacturer, ) def get_device_info( # pylint: disable=too-many-locals host: str, services: set[HostServiceInfo | MDNSServiceInfo] | None = None, zconf: zeroconf.Zeroconf | None = None, timeout: float = 30, context: ssl.SSLContext | None = None, ) -> DeviceStatus | None: """Return a filled in DeviceStatus object for the specified device.""" try: if services is None: services = {HostServiceInfo(host, 8009)} # Try connection with SSL first, and if it fails fall back to non-SSL try: _, status = _get_status( services, zconf, "/setup/eureka_info?params=device_info,name", True, timeout / 2, context, ) except (urllib.error.HTTPError, urllib.error.URLError): _, status = _get_status( services, zconf, "/setup/eureka_info?params=device_info,name", False, timeout / 2, context, ) cast_type = CAST_TYPE_CHROMECAST display_supported = True friendly_name = status.get("name", "Unknown Chromecast") manufacturer = "Unknown manufacturer" model_name = "Unknown model name" multizone_supported = False udn = None if "device_info" in status: device_info = status["device_info"] capabilities = device_info.get("capabilities", {}) display_supported = capabilities.get("display_supported", True) multizone_supported = capabilities.get("multizone_supported", True) friendly_name = device_info.get("name", friendly_name) model_name = device_info.get("model_name", model_name) manufacturer = device_info.get("manufacturer", manufacturer) udn = device_info.get("ssdp_udn", None) else: udn = status.get("ssdp_udn", None) if not display_supported: cast_type = CAST_TYPE_AUDIO uuid = None if udn: uuid = UUID(udn.replace("-", "")) return DeviceStatus( friendly_name, model_name, manufacturer, uuid, cast_type, multizone_supported, ) except (urllib.error.HTTPError, urllib.error.URLError, OSError, ValueError): return None def _get_group_info(host: str, group: Any) -> MultizoneInfo: """Parse group JSON data and return a MultizoneInfo instance.""" name = group.get("name", "Unknown group name") udn = group.get("uuid", None) uuid = None if udn: uuid = UUID(udn.replace("-", "")) elected_leader = group.get("elected_leader", "") elected_leader_split = elected_leader.rsplit(":", 1) leader_host = None leader_port = None if elected_leader == "self" and "cast_port" in group: leader_host = host leader_port = group["cast_port"] elif len(elected_leader_split) == 2: # The port in the URL is not useful, but we can scan the host leader_host = elected_leader_split[0] return MultizoneInfo(name, uuid, leader_host, leader_port) def get_multizone_status( host: str, services: set[HostServiceInfo | MDNSServiceInfo] | None = None, zconf: zeroconf.Zeroconf | None = None, timeout: float = 30, context: ssl.SSLContext | None = None, ) -> MultizoneStatus | None: """Return a filled in MultizoneStatus object for the specified device.""" try: if services is None: services = {HostServiceInfo(host, 8009)} _, status = _get_status( services, zconf, "/setup/eureka_info?params=multizone", True, timeout, context, ) dynamic_groups = [] if "multizone" in status and "dynamic_groups" in status["multizone"]: for group in status["multizone"]["dynamic_groups"]: dynamic_groups.append(_get_group_info(host, group)) groups = [] if "multizone" in status and "groups" in status["multizone"]: for group in status["multizone"]["groups"]: groups.append(_get_group_info(host, group)) return MultizoneStatus(dynamic_groups, groups) except (urllib.error.HTTPError, urllib.error.URLError, OSError, ValueError): return None @dataclass(frozen=True) class MultizoneInfo: """Multizone info container.""" friendly_name: str uuid: UUID | None host: str | None port: int | None @dataclass(frozen=True) class MultizoneStatus: """Multizone status container.""" dynamic_groups: list[MultizoneInfo] groups: list[MultizoneInfo] @dataclass(frozen=True) class DeviceStatus: """Device status container.""" friendly_name: str model_name: str manufacturer: str uuid: UUID | None cast_type: str multizone_supported: bool 07070100000043000081A400000000000000000000000165F9EF9000006ACA000000000000000000000000000000000000002E00000000pychromecast-14.0.1/pychromecast/discovery.py"""Discovers Chromecasts on the network using mDNS/zeroconf.""" from __future__ import annotations import abc from collections.abc import Callable import functools import itertools import logging import ssl import threading import time from uuid import UUID import zeroconf from .const import CAST_TYPE_AUDIO, CAST_TYPE_GROUP, CAST_TYPES, MF_GOOGLE from .dial import get_device_info, get_multizone_status, get_ssl_context from .models import ZEROCONF_ERRORS, CastInfo, HostServiceInfo, MDNSServiceInfo DISCOVER_TIMEOUT = 5 # Models matching this list will only be polled once by the HostBrowser HOST_BROWSER_BLOCKED_MODEL_PREFIXES = [ "HK", # Harman Kardon speakers crash if polled: https://github.com/home-assistant/core/issues/52020 "JBL", # JBL speakers crash if polled: https://github.com/home-assistant/core/issues/52020 ] _LOGGER = logging.getLogger(__name__) class AbstractCastListener(abc.ABC): """Listener for discovering chromecasts.""" @abc.abstractmethod def add_cast(self, uuid: UUID, service: str) -> None: """A cast has been discovered. uuid: The cast's uuid, this is the dictionary key to find the chromecast metadata in CastBrowser.devices. service: First known MDNS service name or host:port """ @abc.abstractmethod def remove_cast(self, uuid: UUID, service: str, cast_info: CastInfo) -> None: """A cast has been removed, meaning there are no longer any known services. uuid: The cast's uuid service: Last valid MDNS service name or host:port cast_info: CastInfo for the service to aid cleanup """ @abc.abstractmethod def update_cast(self, uuid: UUID, service: str) -> None: """A cast has been updated. uuid: The cast's uuid service: MDNS service name or host:port """ def _is_blocked_from_host_browser( item: str, block_list: list[str], item_type: str ) -> bool: for blocked_prefix in block_list: if item.startswith(blocked_prefix): _LOGGER.debug("%s %s is blocked from host based polling", item_type, item) return True return False def _is_model_blocked_from_host_browser(model: str) -> bool: return _is_blocked_from_host_browser( model, HOST_BROWSER_BLOCKED_MODEL_PREFIXES, "Model" ) class SimpleCastListener(AbstractCastListener): """Helper for backwards compatibility.""" def __init__( self, add_callback: Callable[[UUID, str], None] | None = None, remove_callback: Callable[[UUID, str, CastInfo], None] | None = None, update_callback: Callable[[UUID, str], None] | None = None, ): self._add_callback = add_callback self._remove_callback = remove_callback self._update_callback = update_callback def add_cast(self, uuid: UUID, service: str) -> None: if self._add_callback: self._add_callback(uuid, service) def remove_cast(self, uuid: UUID, service: str, cast_info: CastInfo) -> None: if self._remove_callback: self._remove_callback(uuid, service, cast_info) def update_cast(self, uuid: UUID, service: str) -> None: if self._update_callback: self._update_callback(uuid, service) class ZeroConfListener(zeroconf.ServiceListener): """Listener for ZeroConf service browser.""" def __init__( self, cast_listener: AbstractCastListener, devices: dict[UUID, CastInfo], host_browser: HostBrowser, lock: threading.Lock, ) -> None: self._cast_listener = cast_listener self._devices = devices self._host_browser = host_browser self._services_lock = lock def remove_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None: """Called by zeroconf when an mDNS service is lost.""" _LOGGER.debug("remove_service %s, %s", type_, name) cast_info = None device_removed = False uuid = None service_info = MDNSServiceInfo(name) # Lock because the HostBrowser may also add or remove items with self._services_lock: for uuid, info_for_uuid in self._devices.items(): if service_info in info_for_uuid.services: cast_info = info_for_uuid info_for_uuid.services.remove(service_info) if len(info_for_uuid.services) == 0: device_removed = True break if not cast_info: _LOGGER.debug("remove_service unknown %s, %s", type_, name) return if device_removed: self._cast_listener.remove_cast(uuid, name, cast_info) else: self._cast_listener.update_cast(uuid, name) def update_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None: """Called by zeroconf when an mDNS service is updated.""" _LOGGER.debug("update_service %s, %s", type_, name) self._add_update_service(zc, type_, name, self._cast_listener.update_cast) def add_service(self, zc: zeroconf.Zeroconf, type_: str, name: str) -> None: """Called by zeroconf when an mDNS service is discovered.""" _LOGGER.debug("add_service %s, %s", type_, name) self._add_update_service(zc, type_, name, self._cast_listener.add_cast) # pylint: disable-next=too-many-locals def _add_update_service( self, zconf: zeroconf.Zeroconf, typ: str, name: str, callback: Callable[[UUID, str], None], ) -> None: """Add or update a service.""" service = None tries = 0 if name.endswith("_sub._googlecast._tcp.local."): _LOGGER.debug("_add_update_service ignoring %s, %s", typ, name) return while service is None and tries < 4: try: service = zconf.get_service_info(typ, name) except ZEROCONF_ERRORS: # If the zeroconf fails to receive the necessary data we abort # adding the service # We do not catch zeroconf.NotRunningException as it's # an unrecoverable error. _LOGGER.debug( "get_info_from_service failed to resolve service %s", service, ) break tries += 1 if not service: _LOGGER.debug("_add_update_service failed to add %s, %s", typ, name) return if service.port is None: _LOGGER.debug("_add_update_service port is None") return def get_value(key: str) -> str | None: """Retrieve value and decode to UTF-8.""" value = service.properties.get(key.encode("utf-8")) # zeroconf would keep str version of cached items, this check # can be removed if we pin zeroconf to a version where this is # removed. if value is None or isinstance(value, str): # type: ignore[unreachable] return value return value.decode("utf-8") addresses = service.parsed_addresses() host = addresses[0] if addresses else service.server if host is None: _LOGGER.debug( "_add_update_service failed to get host for %s, %s", typ, name ) return # Store the host, in case mDNS stops working self._host_browser.add_hosts([host]) friendly_name = get_value("fn") model_name = get_value("md") or "Unknown model name" uuid_str = get_value("id") if not uuid_str: _LOGGER.debug( "_add_update_service failed to get uuid for %s, %s", typ, name ) return # Ignore incorrect UUIDs from third-party Chromecast emulators try: uuid = UUID(uuid_str) except ValueError: _LOGGER.debug( "_add_update_service failed due to bad uuid for %s, %s, model %s", typ, name, model_name, ) return service_info = MDNSServiceInfo(name) # Lock because the HostBrowser may also add or remove items with self._services_lock: cast_type: str | None manufacturer: str | None if service.port != 8009: cast_type = CAST_TYPE_GROUP manufacturer = MF_GOOGLE else: cast_type, manufacturer = CAST_TYPES.get( model_name.lower(), (None, None) ) if uuid not in self._devices: self._devices[uuid] = CastInfo( {service_info}, uuid, model_name, friendly_name, host, service.port, cast_type, manufacturer, ) else: # Update stored information services = self._devices[uuid].services services.add(service_info) self._devices[uuid] = CastInfo( services, uuid, model_name, friendly_name, host, service.port, cast_type, manufacturer, ) callback(uuid, name) class HostStatus: """Status of known host.""" def __init__(self) -> None: self.failcount = 0 self.no_polling = False HOSTLISTENER_CYCLE_TIME = 30 HOSTLISTENER_MAX_FAIL = 5 class HostBrowser(threading.Thread): """Repeateadly poll a set of known hosts.""" def __init__( self, cast_listener: AbstractCastListener, devices: dict[UUID, CastInfo], lock: threading.Lock, ) -> None: super().__init__(daemon=True) self._cast_listener = cast_listener self._devices = devices self._known_hosts: dict[str, HostStatus] = {} self._next_update = time.time() self._services_lock = lock self._start_requested = False self._context: ssl.SSLContext | None = None self.stop = threading.Event() def add_hosts(self, known_hosts: list[str]) -> None: """Add a list of known hosts to the set.""" for host in known_hosts: if host not in self._known_hosts: _LOGGER.debug("Addded host %s", host) self._known_hosts[host] = HostStatus() def update_hosts(self, known_hosts: list[str] | None) -> None: """Update the set of known hosts. Note: Removed hosts will no longer be polled, but services of any associated cast devices will not be purged. """ if known_hosts is None: known_hosts = [] self.add_hosts(known_hosts) for host in list(self._known_hosts.keys()): if host not in known_hosts: _LOGGER.debug("Removed host %s", host) self._known_hosts.pop(host) def run(self) -> None: """Start worker thread.""" _LOGGER.debug("HostBrowser thread started") self._context = get_ssl_context() try: while not self.stop.is_set(): self._poll_hosts() self._next_update += HOSTLISTENER_CYCLE_TIME self.stop.wait(max(self._next_update - time.time(), 0)) except Exception: # pylint: disable=broad-except _LOGGER.exception("Unhandled exception in worker thread") raise _LOGGER.debug("HostBrowser thread done") def _poll_hosts(self) -> None: # Iterate over a copy because other threads may modify the known_hosts list known_hosts = list(self._known_hosts.keys()) for host in known_hosts: devices: list[tuple[int, str, str, UUID, str, str]] = [] uuids: list[UUID] = [] if self.stop.is_set(): break try: hoststatus = self._known_hosts[host] except KeyError: # The host has been removed by another thread continue if hoststatus.no_polling: # This host should not be polled continue device_status = get_device_info(host, timeout=30, context=self._context) if not device_status: hoststatus.failcount += 1 if hoststatus.failcount == HOSTLISTENER_MAX_FAIL: # We can't contact the host, drop all its devices and UUIDs self._update_devices(host, devices, uuids) hoststatus.failcount = min( hoststatus.failcount, HOSTLISTENER_MAX_FAIL + 1 ) continue if not device_status.uuid: _LOGGER.debug("host %s does not report UUID", host) continue if ( device_status.cast_type != CAST_TYPE_AUDIO or _is_model_blocked_from_host_browser(device_status.model_name) ): # Polling causes frame drops on some Android TVs, # https://github.com/home-assistant/core/issues/55435 # Keep polling audio chromecasts to detect new speaker groups, but # exclude some devices which crash when polled # Note: This will not work well the IP is recycled to another cast # device. hoststatus.no_polling = True # We got device_status, try to get multizone status, then update devices hoststatus.failcount = 0 devices.append( ( 8009, device_status.friendly_name, device_status.model_name, device_status.uuid, device_status.cast_type, device_status.manufacturer, ) ) uuids.append(device_status.uuid) multizone_status = ( get_multizone_status(host, context=self._context) if device_status.multizone_supported else None ) if multizone_status: for group in itertools.chain( multizone_status.dynamic_groups, multizone_status.groups ): # Note: This is currently (2021-02) not working for dynamic_groups, the # ports of dynamic groups are not present in the eureka_info reply. if group.host and group.host not in self._known_hosts: self.add_hosts([group.host]) if group.port is None or group.uuid is None or group.host != host: continue devices.append( ( group.port, group.friendly_name, "Google Cast Group", group.uuid, CAST_TYPE_GROUP, "Google Inc.", ) ) uuids.append(group.uuid) self._update_devices(host, devices, uuids) def _update_devices( self, host: str, devices: list[tuple[int, str, str, UUID, str, str]], host_uuids: list[UUID], ) -> None: callbacks: list[Callable[[], None]] = [] # Lock because the ZeroConfListener may also add or remove items with self._services_lock: for ( port, friendly_name, model_name, uuid, cast_type, manufacturer, ) in devices: self._add_host_service( host, port, friendly_name, model_name, uuid, callbacks, cast_type, manufacturer, ) for uuid in self._devices: for service in self._devices[uuid].services.copy(): if ( isinstance(service, HostServiceInfo) and service.host == host and uuid not in host_uuids ): self._remove_host_service(host, uuid, callbacks) # Handle callbacks after releasing the lock for callback in callbacks: callback() def _add_host_service( self, host: str, port: int, friendly_name: str, model_name: str, uuid: UUID, callbacks: list[Callable[[], None]], cast_type: str, manufacturer: str, ) -> None: service_info = HostServiceInfo(host, port) callback = self._cast_listener.add_cast if uuid in self._devices: callback = self._cast_listener.update_cast cast_info = self._devices[uuid] if ( service_info in cast_info.services and cast_info.model_name == model_name and cast_info.friendly_name == friendly_name ): # No changes, return return if uuid not in self._devices: self._devices[uuid] = CastInfo( {service_info}, uuid, model_name, friendly_name, host, port, cast_type, manufacturer, ) else: # Update stored information services = self._devices[uuid].services services.add(service_info) self._devices[uuid] = CastInfo( services, uuid, model_name, friendly_name, host, port, cast_type, manufacturer, ) name = f"{host}:{port}" _LOGGER.debug( "Host %s (%s) up, adding or updating host based service", name, uuid ) callbacks.append(functools.partial(callback, uuid, name)) def _remove_host_service( self, host: str, uuid: UUID, callbacks: list[Callable[[], None]], ) -> None: if uuid not in self._devices: return info_for_uuid = self._devices[uuid] for service in info_for_uuid.services: if isinstance(service, HostServiceInfo) and service.host == host: info_for_uuid.services.remove(service) port = service.port name = f"{host}:{port}" _LOGGER.debug( "Host %s down or no longer handles uuid %s, removing host based service", name, uuid, ) if len(info_for_uuid.services) == 0: callbacks.append( functools.partial( self._cast_listener.remove_cast, uuid, name, info_for_uuid ) ) else: callbacks.append( functools.partial(self._cast_listener.update_cast, uuid, name) ) break class CastBrowser: """Discover Chromecasts on the network. When a Chromecast is found, cast_listener.add_cast is called When a Chromecast is updated, cast_listener.update_cast is called When a Chromecast is lost, the cast_listener.remove_cast is called A shared zeroconf instance can be passed as zeroconf_instance. If no instance is passed, a new instance will be created. """ def __init__( self, cast_listener: AbstractCastListener, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> None: self._cast_listener = cast_listener self.zc = zeroconf_instance # pylint: disable=invalid-name self._zc_browser: zeroconf.ServiceBrowser | None = None self.devices: dict[UUID, CastInfo] = {} self.services = self.devices # For backwards compatibility self._services_lock = threading.Lock() self.host_browser = HostBrowser( self._cast_listener, self.devices, self._services_lock ) self.zeroconf_listener = ZeroConfListener( self._cast_listener, self.devices, self.host_browser, self._services_lock ) if known_hosts: self.host_browser.add_hosts(known_hosts) @property def count(self) -> int: """Number of discovered cast devices.""" return len(self.devices) def set_zeroconf_instance(self, zeroconf_instance: zeroconf.Zeroconf) -> None: """Set zeroconf_instance.""" if self.zc: return self.zc = zeroconf_instance def start_discovery(self) -> None: """ This method will start discovering chromecasts on separate threads. When a chromecast is discovered, callback will be called with the discovered chromecast's zeroconf name. This is the dictionary key to find the chromecast metadata in CastBrowser.devices. A shared zeroconf instance can be passed as zeroconf_instance. If no instance is passed, a new instance will be created. """ if self.zc: self._zc_browser = zeroconf.ServiceBrowser( self.zc, "_googlecast._tcp.local.", self.zeroconf_listener, ) self.host_browser.start() def stop_discovery(self) -> None: """Stop the chromecast discovery threads.""" if self._zc_browser: try: self._zc_browser.cancel() except RuntimeError: # Throws if called from service callback when joining the zc browser thread pass self._zc_browser.zc.close() self.host_browser.stop.set() self.host_browser.join() class CastListener(CastBrowser): """Backwards compatible helper class. Deprecated as of February 2021, will be removed in June 2024. """ def __init__( self, add_callback: Callable[[UUID, str], None] | None = None, remove_callback: Callable[[UUID, str, CastInfo], None] | None = None, update_callback: Callable[[UUID, str], None] | None = None, ): _LOGGER.info( "CastListener is deprecated and will be removed in June 2024, update to use CastBrowser instead" ) listener = SimpleCastListener(add_callback, remove_callback, update_callback) super().__init__(listener) def start_discovery( cast_browser: CastBrowser, zeroconf_instance: zeroconf.Zeroconf ) -> CastBrowser: """Start discovering chromecasts on the network. Deprecated as of February 2021, will be removed in June 2024. """ _LOGGER.info( "start_discovery is deprecated and will be removed in June 2024, call CastBrowser.start_discovery() instead" ) cast_browser.set_zeroconf_instance(zeroconf_instance) cast_browser.start_discovery() return cast_browser def stop_discovery(cast_browser: CastBrowser) -> None: """Stop the chromecast discovery threads. Deprecated as of February 2021, will be removed in June 2024. """ _LOGGER.info( "stop_discovery is deprecated and will be removed in June 2024, call CastBrowser.stop_discovery() instead" ) cast_browser.stop_discovery() def discover_chromecasts( max_devices: int | None = None, timeout: float = DISCOVER_TIMEOUT, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> tuple[list[CastInfo], CastBrowser]: """ Discover chromecasts on the network. Deprecated as of February 2021, will be removed in June 2024. Returns a tuple of: A list of chromecast devices, or an empty list if no chromecasts were found. A service browser to keep the Chromecast mDNS data updated. When updates are (no longer) needed, call browser.stop_discovery(). :param zeroconf_instance: An existing zeroconf instance. """ _LOGGER.info( "discover_chromecasts is deprecated and will be removed in June 2024, update to use CastBrowser instead." ) def add_callback(_uuid: UUID, _service: str) -> None: """Called when a new chromecast has been discovered.""" if max_devices is not None and browser.count >= max_devices: discover_complete.set() discover_complete = threading.Event() zconf = zeroconf_instance or zeroconf.Zeroconf() browser = CastBrowser(SimpleCastListener(add_callback), zconf, known_hosts) browser.start_discovery() # Wait for the timeout or the maximum number of devices discover_complete.wait(timeout) return (list(browser.devices.values()), browser) def discover_listed_chromecasts( friendly_names: list[str] | None = None, uuids: list[UUID] | None = None, discovery_timeout: float = DISCOVER_TIMEOUT, zeroconf_instance: zeroconf.Zeroconf | None = None, known_hosts: list[str] | None = None, ) -> tuple[list[CastInfo], CastBrowser]: """ Searches the network for chromecast devices matching a list of friendly names or a list of UUIDs. Returns a tuple of: A list of chromecast devices matching the criteria, or an empty list if no matching chromecasts were found. A service browser to keep the Chromecast mDNS data updated. When updates are (no longer) needed, call browser.stop_discovery(). :param friendly_names: A list of wanted friendly names :param uuids: A list of wanted uuids :param discovery_timeout: A floating point number specifying the time to wait devices matching the criteria have been found. :param zeroconf_instance: An existing zeroconf instance. """ cc_list: dict[UUID, CastInfo] = {} def add_callback(uuid: UUID, service: str) -> None: _LOGGER.debug("Got cast %s, %s", uuid, service) cast_info = browser.devices[uuid] friendly_name = cast_info.friendly_name if uuids and uuid in uuids: cc_list[uuid] = browser.devices[uuid] uuids.remove(uuid) if friendly_names and friendly_name in friendly_names: cc_list[uuid] = browser.devices[uuid] friendly_names.remove(friendly_name) if not friendly_names and not uuids: discover_complete.set() discover_complete = threading.Event() zconf = zeroconf_instance or zeroconf.Zeroconf() browser = CastBrowser(SimpleCastListener(add_callback), zconf, known_hosts) browser.start_discovery() # Wait for the timeout or found all wanted devices discover_complete.wait(discovery_timeout) return (list(cc_list.values()), browser) 07070100000044000081A400000000000000000000000165F9EF90000005E8000000000000000000000000000000000000002A00000000pychromecast-14.0.1/pychromecast/error.py""" Errors to be used by PyChromecast. """ class PyChromecastError(Exception): """Base error for PyChromecast.""" class ChromecastConnectionError(PyChromecastError): """When a connection error occurs within PyChromecast.""" class PyChromecastStopped(PyChromecastError): """Raised when a command is invoked while the Chromecast's socket_client is stopped. """ class NotConnected(PyChromecastError): """ Raised when a command is invoked while not connected to a Chromecast. """ class UnsupportedNamespace(PyChromecastError): """ Raised when trying to send a message with a namespace that is not supported by the current running app. """ class ControllerNotRegistered(PyChromecastError): """ Raised when trying to interact with a controller while it is not registered with a ChromeCast object. """ class RequestFailed(PyChromecastError): """Raised when a request failed to complete.""" MSG = "Failed to execute {request}." def __init__(self, request: str) -> None: super().__init__(self.MSG.format(request=request)) class RequestTimeout(PyChromecastError): """Raised when a request timed out.""" MSG = "Execution of {request} timed out after {timeout} s." def __init__(self, request: str, timeout: float) -> None: super().__init__(self.MSG.format(request=request, timeout=timeout)) class ZeroConfInstanceRequired(PyChromecastError): """Raised when a zeroconf instance is required.""" 07070100000045000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000002B00000000pychromecast-14.0.1/pychromecast/generated07070100000046000081A400000000000000000000000165F9EF9000000592000000000000000000000000000000000000004100000000pychromecast-14.0.1/pychromecast/generated/authority_keys_pb2.py# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: authority_keys.proto # Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x61uthority_keys.proto\x12!extensions.api.cast_channel.proto\"\x83\x01\n\rAuthorityKeys\x12\x42\n\x04keys\x18\x01 \x03(\x0b\x32\x34.extensions.api.cast_channel.proto.AuthorityKeys.Key\x1a.\n\x03Key\x12\x13\n\x0b\x66ingerprint\x18\x01 \x02(\x0c\x12\x12\n\npublic_key\x18\x02 \x02(\x0c\x42\x02H\x03') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'authority_keys_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'H\003' _globals['_AUTHORITYKEYS']._serialized_start=60 _globals['_AUTHORITYKEYS']._serialized_end=191 _globals['_AUTHORITYKEYS_KEY']._serialized_start=145 _globals['_AUTHORITYKEYS_KEY']._serialized_end=191 # @@protoc_insertion_point(module_scope) 07070100000047000081A400000000000000000000000165F9EF90000003C0000000000000000000000000000000000000004200000000pychromecast-14.0.1/pychromecast/generated/authority_keys_pb2.pyifrom google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor class AuthorityKeys(_message.Message): __slots__ = ("keys",) class Key(_message.Message): __slots__ = ("fingerprint", "public_key") FINGERPRINT_FIELD_NUMBER: _ClassVar[int] PUBLIC_KEY_FIELD_NUMBER: _ClassVar[int] fingerprint: bytes public_key: bytes def __init__(self, fingerprint: _Optional[bytes] = ..., public_key: _Optional[bytes] = ...) -> None: ... KEYS_FIELD_NUMBER: _ClassVar[int] keys: _containers.RepeatedCompositeFieldContainer[AuthorityKeys.Key] def __init__(self, keys: _Optional[_Iterable[_Union[AuthorityKeys.Key, _Mapping]]] = ...) -> None: ... 07070100000048000081A400000000000000000000000165F9EF9000001039000000000000000000000000000000000000003F00000000pychromecast-14.0.1/pychromecast/generated/cast_channel_pb2.py# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: cast_channel.proto # Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x63\x61st_channel.proto\x12\x1b\x65xtensions.api.cast_channel\"\xe3\x02\n\x0b\x43\x61stMessage\x12R\n\x10protocol_version\x18\x01 \x02(\x0e\x32\x38.extensions.api.cast_channel.CastMessage.ProtocolVersion\x12\x11\n\tsource_id\x18\x02 \x02(\t\x12\x16\n\x0e\x64\x65stination_id\x18\x03 \x02(\t\x12\x11\n\tnamespace\x18\x04 \x02(\t\x12J\n\x0cpayload_type\x18\x05 \x02(\x0e\x32\x34.extensions.api.cast_channel.CastMessage.PayloadType\x12\x14\n\x0cpayload_utf8\x18\x06 \x01(\t\x12\x16\n\x0epayload_binary\x18\x07 \x01(\x0c\"!\n\x0fProtocolVersion\x12\x0e\n\nCASTV2_1_0\x10\x00\"%\n\x0bPayloadType\x12\n\n\x06STRING\x10\x00\x12\n\n\x06\x42INARY\x10\x01\"\xce\x01\n\rAuthChallenge\x12]\n\x13signature_algorithm\x18\x01 \x01(\x0e\x32/.extensions.api.cast_channel.SignatureAlgorithm:\x0fRSASSA_PKCS1v15\x12\x14\n\x0csender_nonce\x18\x02 \x01(\x0c\x12H\n\x0ehash_algorithm\x18\x03 \x01(\x0e\x32*.extensions.api.cast_channel.HashAlgorithm:\x04SHA1\"\xb0\x02\n\x0c\x41uthResponse\x12\x11\n\tsignature\x18\x01 \x02(\x0c\x12\x1f\n\x17\x63lient_auth_certificate\x18\x02 \x02(\x0c\x12 \n\x18intermediate_certificate\x18\x03 \x03(\x0c\x12]\n\x13signature_algorithm\x18\x04 \x01(\x0e\x32/.extensions.api.cast_channel.SignatureAlgorithm:\x0fRSASSA_PKCS1v15\x12\x14\n\x0csender_nonce\x18\x05 \x01(\x0c\x12H\n\x0ehash_algorithm\x18\x06 \x01(\x0e\x32*.extensions.api.cast_channel.HashAlgorithm:\x04SHA1\x12\x0b\n\x03\x63rl\x18\x07 \x01(\x0c\"\xa3\x01\n\tAuthError\x12\x44\n\nerror_type\x18\x01 \x02(\x0e\x32\x30.extensions.api.cast_channel.AuthError.ErrorType\"P\n\tErrorType\x12\x12\n\x0eINTERNAL_ERROR\x10\x00\x12\n\n\x06NO_TLS\x10\x01\x12#\n\x1fSIGNATURE_ALGORITHM_UNAVAILABLE\x10\x02\"\xc6\x01\n\x11\x44\x65viceAuthMessage\x12=\n\tchallenge\x18\x01 \x01(\x0b\x32*.extensions.api.cast_channel.AuthChallenge\x12;\n\x08response\x18\x02 \x01(\x0b\x32).extensions.api.cast_channel.AuthResponse\x12\x35\n\x05\x65rror\x18\x03 \x01(\x0b\x32&.extensions.api.cast_channel.AuthError*J\n\x12SignatureAlgorithm\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x13\n\x0fRSASSA_PKCS1v15\x10\x01\x12\x0e\n\nRSASSA_PSS\x10\x02*%\n\rHashAlgorithm\x12\x08\n\x04SHA1\x10\x00\x12\n\n\x06SHA256\x10\x01\x42\x02H\x03') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'cast_channel_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'H\003' _globals['_SIGNATUREALGORITHM']._serialized_start=1292 _globals['_SIGNATUREALGORITHM']._serialized_end=1366 _globals['_HASHALGORITHM']._serialized_start=1368 _globals['_HASHALGORITHM']._serialized_end=1405 _globals['_CASTMESSAGE']._serialized_start=52 _globals['_CASTMESSAGE']._serialized_end=407 _globals['_CASTMESSAGE_PROTOCOLVERSION']._serialized_start=335 _globals['_CASTMESSAGE_PROTOCOLVERSION']._serialized_end=368 _globals['_CASTMESSAGE_PAYLOADTYPE']._serialized_start=370 _globals['_CASTMESSAGE_PAYLOADTYPE']._serialized_end=407 _globals['_AUTHCHALLENGE']._serialized_start=410 _globals['_AUTHCHALLENGE']._serialized_end=616 _globals['_AUTHRESPONSE']._serialized_start=619 _globals['_AUTHRESPONSE']._serialized_end=923 _globals['_AUTHERROR']._serialized_start=926 _globals['_AUTHERROR']._serialized_end=1089 _globals['_AUTHERROR_ERRORTYPE']._serialized_start=1009 _globals['_AUTHERROR_ERRORTYPE']._serialized_end=1089 _globals['_DEVICEAUTHMESSAGE']._serialized_start=1092 _globals['_DEVICEAUTHMESSAGE']._serialized_end=1290 # @@protoc_insertion_point(module_scope) 07070100000049000081A400000000000000000000000165F9EF90000014F5000000000000000000000000000000000000004000000000pychromecast-14.0.1/pychromecast/generated/cast_channel_pb2.pyifrom google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor class SignatureAlgorithm(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () UNSPECIFIED: _ClassVar[SignatureAlgorithm] RSASSA_PKCS1v15: _ClassVar[SignatureAlgorithm] RSASSA_PSS: _ClassVar[SignatureAlgorithm] class HashAlgorithm(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () SHA1: _ClassVar[HashAlgorithm] SHA256: _ClassVar[HashAlgorithm] UNSPECIFIED: SignatureAlgorithm RSASSA_PKCS1v15: SignatureAlgorithm RSASSA_PSS: SignatureAlgorithm SHA1: HashAlgorithm SHA256: HashAlgorithm class CastMessage(_message.Message): __slots__ = ("protocol_version", "source_id", "destination_id", "namespace", "payload_type", "payload_utf8", "payload_binary") class ProtocolVersion(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () CASTV2_1_0: _ClassVar[CastMessage.ProtocolVersion] CASTV2_1_0: CastMessage.ProtocolVersion class PayloadType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () STRING: _ClassVar[CastMessage.PayloadType] BINARY: _ClassVar[CastMessage.PayloadType] STRING: CastMessage.PayloadType BINARY: CastMessage.PayloadType PROTOCOL_VERSION_FIELD_NUMBER: _ClassVar[int] SOURCE_ID_FIELD_NUMBER: _ClassVar[int] DESTINATION_ID_FIELD_NUMBER: _ClassVar[int] NAMESPACE_FIELD_NUMBER: _ClassVar[int] PAYLOAD_TYPE_FIELD_NUMBER: _ClassVar[int] PAYLOAD_UTF8_FIELD_NUMBER: _ClassVar[int] PAYLOAD_BINARY_FIELD_NUMBER: _ClassVar[int] protocol_version: CastMessage.ProtocolVersion source_id: str destination_id: str namespace: str payload_type: CastMessage.PayloadType payload_utf8: str payload_binary: bytes def __init__(self, protocol_version: _Optional[_Union[CastMessage.ProtocolVersion, str]] = ..., source_id: _Optional[str] = ..., destination_id: _Optional[str] = ..., namespace: _Optional[str] = ..., payload_type: _Optional[_Union[CastMessage.PayloadType, str]] = ..., payload_utf8: _Optional[str] = ..., payload_binary: _Optional[bytes] = ...) -> None: ... class AuthChallenge(_message.Message): __slots__ = ("signature_algorithm", "sender_nonce", "hash_algorithm") SIGNATURE_ALGORITHM_FIELD_NUMBER: _ClassVar[int] SENDER_NONCE_FIELD_NUMBER: _ClassVar[int] HASH_ALGORITHM_FIELD_NUMBER: _ClassVar[int] signature_algorithm: SignatureAlgorithm sender_nonce: bytes hash_algorithm: HashAlgorithm def __init__(self, signature_algorithm: _Optional[_Union[SignatureAlgorithm, str]] = ..., sender_nonce: _Optional[bytes] = ..., hash_algorithm: _Optional[_Union[HashAlgorithm, str]] = ...) -> None: ... class AuthResponse(_message.Message): __slots__ = ("signature", "client_auth_certificate", "intermediate_certificate", "signature_algorithm", "sender_nonce", "hash_algorithm", "crl") SIGNATURE_FIELD_NUMBER: _ClassVar[int] CLIENT_AUTH_CERTIFICATE_FIELD_NUMBER: _ClassVar[int] INTERMEDIATE_CERTIFICATE_FIELD_NUMBER: _ClassVar[int] SIGNATURE_ALGORITHM_FIELD_NUMBER: _ClassVar[int] SENDER_NONCE_FIELD_NUMBER: _ClassVar[int] HASH_ALGORITHM_FIELD_NUMBER: _ClassVar[int] CRL_FIELD_NUMBER: _ClassVar[int] signature: bytes client_auth_certificate: bytes intermediate_certificate: _containers.RepeatedScalarFieldContainer[bytes] signature_algorithm: SignatureAlgorithm sender_nonce: bytes hash_algorithm: HashAlgorithm crl: bytes def __init__(self, signature: _Optional[bytes] = ..., client_auth_certificate: _Optional[bytes] = ..., intermediate_certificate: _Optional[_Iterable[bytes]] = ..., signature_algorithm: _Optional[_Union[SignatureAlgorithm, str]] = ..., sender_nonce: _Optional[bytes] = ..., hash_algorithm: _Optional[_Union[HashAlgorithm, str]] = ..., crl: _Optional[bytes] = ...) -> None: ... class AuthError(_message.Message): __slots__ = ("error_type",) class ErrorType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () INTERNAL_ERROR: _ClassVar[AuthError.ErrorType] NO_TLS: _ClassVar[AuthError.ErrorType] SIGNATURE_ALGORITHM_UNAVAILABLE: _ClassVar[AuthError.ErrorType] INTERNAL_ERROR: AuthError.ErrorType NO_TLS: AuthError.ErrorType SIGNATURE_ALGORITHM_UNAVAILABLE: AuthError.ErrorType ERROR_TYPE_FIELD_NUMBER: _ClassVar[int] error_type: AuthError.ErrorType def __init__(self, error_type: _Optional[_Union[AuthError.ErrorType, str]] = ...) -> None: ... class DeviceAuthMessage(_message.Message): __slots__ = ("challenge", "response", "error") CHALLENGE_FIELD_NUMBER: _ClassVar[int] RESPONSE_FIELD_NUMBER: _ClassVar[int] ERROR_FIELD_NUMBER: _ClassVar[int] challenge: AuthChallenge response: AuthResponse error: AuthError def __init__(self, challenge: _Optional[_Union[AuthChallenge, _Mapping]] = ..., response: _Optional[_Union[AuthResponse, _Mapping]] = ..., error: _Optional[_Union[AuthError, _Mapping]] = ...) -> None: ... 0707010000004A000081A400000000000000000000000165F9EF9000001F80000000000000000000000000000000000000003A00000000pychromecast-14.0.1/pychromecast/generated/logging_pb2.py# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: logging.proto # Protobuf Python Version: 4.25.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rlogging.proto\x12!extensions.api.cast_channel.proto\"\xfd\x04\n\x0bSocketEvent\x12:\n\x04type\x18\x01 \x01(\x0e\x32,.extensions.api.cast_channel.proto.EventType\x12\x18\n\x10timestamp_micros\x18\x02 \x01(\x03\x12\x0f\n\x07\x64\x65tails\x18\x03 \x01(\t\x12\x18\n\x10net_return_value\x18\x04 \x01(\x05\x12\x19\n\x11message_namespace\x18\x05 \x01(\t\x12\x42\n\x0bready_state\x18\x06 \x01(\x0e\x32-.extensions.api.cast_channel.proto.ReadyState\x12L\n\x10\x63onnection_state\x18\x07 \x01(\x0e\x32\x32.extensions.api.cast_channel.proto.ConnectionState\x12@\n\nread_state\x18\x08 \x01(\x0e\x32,.extensions.api.cast_channel.proto.ReadState\x12\x42\n\x0bwrite_state\x18\t \x01(\x0e\x32-.extensions.api.cast_channel.proto.WriteState\x12\x42\n\x0b\x65rror_state\x18\n \x01(\x0e\x32-.extensions.api.cast_channel.proto.ErrorState\x12^\n\x1a\x63hallenge_reply_error_type\x18\x0b \x01(\x0e\x32:.extensions.api.cast_channel.proto.ChallengeReplyErrorType\x12\x16\n\x0enss_error_code\x18\x0c \x01(\x05\"\xf4\x01\n\x15\x41ggregatedSocketEvent\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x13\n\x0b\x65ndpoint_id\x18\x02 \x01(\x05\x12I\n\x11\x63hannel_auth_type\x18\x03 \x01(\x0e\x32..extensions.api.cast_channel.proto.ChannelAuth\x12\x44\n\x0csocket_event\x18\x04 \x03(\x0b\x32..extensions.api.cast_channel.proto.SocketEvent\x12\x12\n\nbytes_read\x18\x05 \x01(\x03\x12\x15\n\rbytes_written\x18\x06 \x01(\x03\"\xb1\x01\n\x03Log\x12Y\n\x17\x61ggregated_socket_event\x18\x01 \x03(\x0b\x32\x38.extensions.api.cast_channel.proto.AggregatedSocketEvent\x12,\n$num_evicted_aggregated_socket_events\x18\x02 \x01(\x05\x12!\n\x19num_evicted_socket_events\x18\x03 \x01(\x05*\xc0\x06\n\tEventType\x12\x16\n\x12\x45VENT_TYPE_UNKNOWN\x10\x00\x12\x17\n\x13\x43\x41ST_SOCKET_CREATED\x10\x01\x12\x17\n\x13READY_STATE_CHANGED\x10\x02\x12\x1c\n\x18\x43ONNECTION_STATE_CHANGED\x10\x03\x12\x16\n\x12READ_STATE_CHANGED\x10\x04\x12\x17\n\x13WRITE_STATE_CHANGED\x10\x05\x12\x17\n\x13\x45RROR_STATE_CHANGED\x10\x06\x12\x12\n\x0e\x43ONNECT_FAILED\x10\x07\x12\x16\n\x12TCP_SOCKET_CONNECT\x10\x08\x12\x1d\n\x19TCP_SOCKET_SET_KEEP_ALIVE\x10\t\x12\x18\n\x14SSL_CERT_WHITELISTED\x10\n\x12\x16\n\x12SSL_SOCKET_CONNECT\x10\x0b\x12\x15\n\x11SSL_INFO_OBTAINED\x10\x0c\x12\x1b\n\x17\x44\x45R_ENCODED_CERT_OBTAIN\x10\r\x12\x1c\n\x18RECEIVED_CHALLENGE_REPLY\x10\x0e\x12\x18\n\x14\x41UTH_CHALLENGE_REPLY\x10\x0f\x12\x15\n\x11\x43ONNECT_TIMED_OUT\x10\x10\x12\x17\n\x13SEND_MESSAGE_FAILED\x10\x11\x12\x14\n\x10MESSAGE_ENQUEUED\x10\x12\x12\x10\n\x0cSOCKET_WRITE\x10\x13\x12\x13\n\x0fMESSAGE_WRITTEN\x10\x14\x12\x0f\n\x0bSOCKET_READ\x10\x15\x12\x10\n\x0cMESSAGE_READ\x10\x16\x12\x11\n\rSOCKET_CLOSED\x10\x19\x12\x1f\n\x1bSSL_CERT_EXCESSIVE_LIFETIME\x10\x1a\x12\x1b\n\x17\x43HANNEL_POLICY_ENFORCED\x10\x1b\x12\x1f\n\x1bTCP_SOCKET_CONNECT_COMPLETE\x10\x1c\x12\x1f\n\x1bSSL_SOCKET_CONNECT_COMPLETE\x10\x1d\x12\x1d\n\x19SSL_SOCKET_CONNECT_FAILED\x10\x1e\x12\x1e\n\x1aSEND_AUTH_CHALLENGE_FAILED\x10\x1f\x12 \n\x1c\x41UTH_CHALLENGE_REPLY_INVALID\x10 \x12\x14\n\x10PING_WRITE_ERROR\x10!*(\n\x0b\x43hannelAuth\x12\x07\n\x03SSL\x10\x01\x12\x10\n\x0cSSL_VERIFIED\x10\x02*\x85\x01\n\nReadyState\x12\x14\n\x10READY_STATE_NONE\x10\x01\x12\x1a\n\x16READY_STATE_CONNECTING\x10\x02\x12\x14\n\x10READY_STATE_OPEN\x10\x03\x12\x17\n\x13READY_STATE_CLOSING\x10\x04\x12\x16\n\x12READY_STATE_CLOSED\x10\x05*\x8f\x03\n\x0f\x43onnectionState\x12\x16\n\x12\x43ONN_STATE_UNKNOWN\x10\x01\x12\x1a\n\x16\x43ONN_STATE_TCP_CONNECT\x10\x02\x12#\n\x1f\x43ONN_STATE_TCP_CONNECT_COMPLETE\x10\x03\x12\x1a\n\x16\x43ONN_STATE_SSL_CONNECT\x10\x04\x12#\n\x1f\x43ONN_STATE_SSL_CONNECT_COMPLETE\x10\x05\x12\"\n\x1e\x43ONN_STATE_AUTH_CHALLENGE_SEND\x10\x06\x12+\n\'CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE\x10\x07\x12,\n(CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE\x10\x08\x12\x1c\n\x18\x43ONN_STATE_START_CONNECT\x10\t\x12\x17\n\x13\x43ONN_STATE_FINISHED\x10\x64\x12\x14\n\x10\x43ONN_STATE_ERROR\x10\x65\x12\x16\n\x12\x43ONN_STATE_TIMEOUT\x10\x66*\xa5\x01\n\tReadState\x12\x16\n\x12READ_STATE_UNKNOWN\x10\x01\x12\x13\n\x0fREAD_STATE_READ\x10\x02\x12\x1c\n\x18READ_STATE_READ_COMPLETE\x10\x03\x12\x1a\n\x16READ_STATE_DO_CALLBACK\x10\x04\x12\x1b\n\x17READ_STATE_HANDLE_ERROR\x10\x05\x12\x14\n\x10READ_STATE_ERROR\x10\x64*\xc4\x01\n\nWriteState\x12\x17\n\x13WRITE_STATE_UNKNOWN\x10\x01\x12\x15\n\x11WRITE_STATE_WRITE\x10\x02\x12\x1e\n\x1aWRITE_STATE_WRITE_COMPLETE\x10\x03\x12\x1b\n\x17WRITE_STATE_DO_CALLBACK\x10\x04\x12\x1c\n\x18WRITE_STATE_HANDLE_ERROR\x10\x05\x12\x15\n\x11WRITE_STATE_ERROR\x10\x64\x12\x14\n\x10WRITE_STATE_IDLE\x10\x65*\xdb\x02\n\nErrorState\x12\x16\n\x12\x43HANNEL_ERROR_NONE\x10\x01\x12\"\n\x1e\x43HANNEL_ERROR_CHANNEL_NOT_OPEN\x10\x02\x12&\n\"CHANNEL_ERROR_AUTHENTICATION_ERROR\x10\x03\x12\x1f\n\x1b\x43HANNEL_ERROR_CONNECT_ERROR\x10\x04\x12\x1e\n\x1a\x43HANNEL_ERROR_SOCKET_ERROR\x10\x05\x12!\n\x1d\x43HANNEL_ERROR_TRANSPORT_ERROR\x10\x06\x12!\n\x1d\x43HANNEL_ERROR_INVALID_MESSAGE\x10\x07\x12$\n CHANNEL_ERROR_INVALID_CHANNEL_ID\x10\x08\x12!\n\x1d\x43HANNEL_ERROR_CONNECT_TIMEOUT\x10\t\x12\x19\n\x15\x43HANNEL_ERROR_UNKNOWN\x10\n*\xb0\x06\n\x17\x43hallengeReplyErrorType\x12\x1e\n\x1a\x43HALLENGE_REPLY_ERROR_NONE\x10\x01\x12)\n%CHALLENGE_REPLY_ERROR_PEER_CERT_EMPTY\x10\x02\x12,\n(CHALLENGE_REPLY_ERROR_WRONG_PAYLOAD_TYPE\x10\x03\x12$\n CHALLENGE_REPLY_ERROR_NO_PAYLOAD\x10\x04\x12\x30\n,CHALLENGE_REPLY_ERROR_PAYLOAD_PARSING_FAILED\x10\x05\x12\'\n#CHALLENGE_REPLY_ERROR_MESSAGE_ERROR\x10\x06\x12%\n!CHALLENGE_REPLY_ERROR_NO_RESPONSE\x10\x07\x12/\n+CHALLENGE_REPLY_ERROR_FINGERPRINT_NOT_FOUND\x10\x08\x12-\n)CHALLENGE_REPLY_ERROR_CERT_PARSING_FAILED\x10\t\x12\x37\n3CHALLENGE_REPLY_ERROR_CERT_NOT_SIGNED_BY_TRUSTED_CA\x10\n\x12\x33\n/CHALLENGE_REPLY_ERROR_CANNOT_EXTRACT_PUBLIC_KEY\x10\x0b\x12/\n+CHALLENGE_REPLY_ERROR_SIGNED_BLOBS_MISMATCH\x10\x0c\x12;\n7CHALLENGE_REPLY_ERROR_TLS_CERT_VALIDITY_PERIOD_TOO_LONG\x10\r\x12=\n9CHALLENGE_REPLY_ERROR_TLS_CERT_VALID_START_DATE_IN_FUTURE\x10\x0e\x12*\n&CHALLENGE_REPLY_ERROR_TLS_CERT_EXPIRED\x10\x0f\x12%\n!CHALLENGE_REPLY_ERROR_CRL_INVALID\x10\x10\x12&\n\"CHALLENGE_REPLY_ERROR_CERT_REVOKED\x10\x11\x42\x02H\x03') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'logging_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'H\003' _globals['_EVENTTYPE']._serialized_start=1120 _globals['_EVENTTYPE']._serialized_end=1952 _globals['_CHANNELAUTH']._serialized_start=1954 _globals['_CHANNELAUTH']._serialized_end=1994 _globals['_READYSTATE']._serialized_start=1997 _globals['_READYSTATE']._serialized_end=2130 _globals['_CONNECTIONSTATE']._serialized_start=2133 _globals['_CONNECTIONSTATE']._serialized_end=2532 _globals['_READSTATE']._serialized_start=2535 _globals['_READSTATE']._serialized_end=2700 _globals['_WRITESTATE']._serialized_start=2703 _globals['_WRITESTATE']._serialized_end=2899 _globals['_ERRORSTATE']._serialized_start=2902 _globals['_ERRORSTATE']._serialized_end=3249 _globals['_CHALLENGEREPLYERRORTYPE']._serialized_start=3252 _globals['_CHALLENGEREPLYERRORTYPE']._serialized_end=4068 _globals['_SOCKETEVENT']._serialized_start=53 _globals['_SOCKETEVENT']._serialized_end=690 _globals['_AGGREGATEDSOCKETEVENT']._serialized_start=693 _globals['_AGGREGATEDSOCKETEVENT']._serialized_end=937 _globals['_LOG']._serialized_start=940 _globals['_LOG']._serialized_end=1117 # @@protoc_insertion_point(module_scope) 0707010000004B000081A400000000000000000000000165F9EF9000003434000000000000000000000000000000000000003B00000000pychromecast-14.0.1/pychromecast/generated/logging_pb2.pyifrom google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor class EventType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () EVENT_TYPE_UNKNOWN: _ClassVar[EventType] CAST_SOCKET_CREATED: _ClassVar[EventType] READY_STATE_CHANGED: _ClassVar[EventType] CONNECTION_STATE_CHANGED: _ClassVar[EventType] READ_STATE_CHANGED: _ClassVar[EventType] WRITE_STATE_CHANGED: _ClassVar[EventType] ERROR_STATE_CHANGED: _ClassVar[EventType] CONNECT_FAILED: _ClassVar[EventType] TCP_SOCKET_CONNECT: _ClassVar[EventType] TCP_SOCKET_SET_KEEP_ALIVE: _ClassVar[EventType] SSL_CERT_WHITELISTED: _ClassVar[EventType] SSL_SOCKET_CONNECT: _ClassVar[EventType] SSL_INFO_OBTAINED: _ClassVar[EventType] DER_ENCODED_CERT_OBTAIN: _ClassVar[EventType] RECEIVED_CHALLENGE_REPLY: _ClassVar[EventType] AUTH_CHALLENGE_REPLY: _ClassVar[EventType] CONNECT_TIMED_OUT: _ClassVar[EventType] SEND_MESSAGE_FAILED: _ClassVar[EventType] MESSAGE_ENQUEUED: _ClassVar[EventType] SOCKET_WRITE: _ClassVar[EventType] MESSAGE_WRITTEN: _ClassVar[EventType] SOCKET_READ: _ClassVar[EventType] MESSAGE_READ: _ClassVar[EventType] SOCKET_CLOSED: _ClassVar[EventType] SSL_CERT_EXCESSIVE_LIFETIME: _ClassVar[EventType] CHANNEL_POLICY_ENFORCED: _ClassVar[EventType] TCP_SOCKET_CONNECT_COMPLETE: _ClassVar[EventType] SSL_SOCKET_CONNECT_COMPLETE: _ClassVar[EventType] SSL_SOCKET_CONNECT_FAILED: _ClassVar[EventType] SEND_AUTH_CHALLENGE_FAILED: _ClassVar[EventType] AUTH_CHALLENGE_REPLY_INVALID: _ClassVar[EventType] PING_WRITE_ERROR: _ClassVar[EventType] class ChannelAuth(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () SSL: _ClassVar[ChannelAuth] SSL_VERIFIED: _ClassVar[ChannelAuth] class ReadyState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () READY_STATE_NONE: _ClassVar[ReadyState] READY_STATE_CONNECTING: _ClassVar[ReadyState] READY_STATE_OPEN: _ClassVar[ReadyState] READY_STATE_CLOSING: _ClassVar[ReadyState] READY_STATE_CLOSED: _ClassVar[ReadyState] class ConnectionState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () CONN_STATE_UNKNOWN: _ClassVar[ConnectionState] CONN_STATE_TCP_CONNECT: _ClassVar[ConnectionState] CONN_STATE_TCP_CONNECT_COMPLETE: _ClassVar[ConnectionState] CONN_STATE_SSL_CONNECT: _ClassVar[ConnectionState] CONN_STATE_SSL_CONNECT_COMPLETE: _ClassVar[ConnectionState] CONN_STATE_AUTH_CHALLENGE_SEND: _ClassVar[ConnectionState] CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE: _ClassVar[ConnectionState] CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE: _ClassVar[ConnectionState] CONN_STATE_START_CONNECT: _ClassVar[ConnectionState] CONN_STATE_FINISHED: _ClassVar[ConnectionState] CONN_STATE_ERROR: _ClassVar[ConnectionState] CONN_STATE_TIMEOUT: _ClassVar[ConnectionState] class ReadState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () READ_STATE_UNKNOWN: _ClassVar[ReadState] READ_STATE_READ: _ClassVar[ReadState] READ_STATE_READ_COMPLETE: _ClassVar[ReadState] READ_STATE_DO_CALLBACK: _ClassVar[ReadState] READ_STATE_HANDLE_ERROR: _ClassVar[ReadState] READ_STATE_ERROR: _ClassVar[ReadState] class WriteState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () WRITE_STATE_UNKNOWN: _ClassVar[WriteState] WRITE_STATE_WRITE: _ClassVar[WriteState] WRITE_STATE_WRITE_COMPLETE: _ClassVar[WriteState] WRITE_STATE_DO_CALLBACK: _ClassVar[WriteState] WRITE_STATE_HANDLE_ERROR: _ClassVar[WriteState] WRITE_STATE_ERROR: _ClassVar[WriteState] WRITE_STATE_IDLE: _ClassVar[WriteState] class ErrorState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () CHANNEL_ERROR_NONE: _ClassVar[ErrorState] CHANNEL_ERROR_CHANNEL_NOT_OPEN: _ClassVar[ErrorState] CHANNEL_ERROR_AUTHENTICATION_ERROR: _ClassVar[ErrorState] CHANNEL_ERROR_CONNECT_ERROR: _ClassVar[ErrorState] CHANNEL_ERROR_SOCKET_ERROR: _ClassVar[ErrorState] CHANNEL_ERROR_TRANSPORT_ERROR: _ClassVar[ErrorState] CHANNEL_ERROR_INVALID_MESSAGE: _ClassVar[ErrorState] CHANNEL_ERROR_INVALID_CHANNEL_ID: _ClassVar[ErrorState] CHANNEL_ERROR_CONNECT_TIMEOUT: _ClassVar[ErrorState] CHANNEL_ERROR_UNKNOWN: _ClassVar[ErrorState] class ChallengeReplyErrorType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () CHALLENGE_REPLY_ERROR_NONE: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_PEER_CERT_EMPTY: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_WRONG_PAYLOAD_TYPE: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_NO_PAYLOAD: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_PAYLOAD_PARSING_FAILED: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_MESSAGE_ERROR: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_NO_RESPONSE: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_FINGERPRINT_NOT_FOUND: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_CERT_PARSING_FAILED: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_CERT_NOT_SIGNED_BY_TRUSTED_CA: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_CANNOT_EXTRACT_PUBLIC_KEY: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_SIGNED_BLOBS_MISMATCH: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_TLS_CERT_VALIDITY_PERIOD_TOO_LONG: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_TLS_CERT_VALID_START_DATE_IN_FUTURE: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_TLS_CERT_EXPIRED: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_CRL_INVALID: _ClassVar[ChallengeReplyErrorType] CHALLENGE_REPLY_ERROR_CERT_REVOKED: _ClassVar[ChallengeReplyErrorType] EVENT_TYPE_UNKNOWN: EventType CAST_SOCKET_CREATED: EventType READY_STATE_CHANGED: EventType CONNECTION_STATE_CHANGED: EventType READ_STATE_CHANGED: EventType WRITE_STATE_CHANGED: EventType ERROR_STATE_CHANGED: EventType CONNECT_FAILED: EventType TCP_SOCKET_CONNECT: EventType TCP_SOCKET_SET_KEEP_ALIVE: EventType SSL_CERT_WHITELISTED: EventType SSL_SOCKET_CONNECT: EventType SSL_INFO_OBTAINED: EventType DER_ENCODED_CERT_OBTAIN: EventType RECEIVED_CHALLENGE_REPLY: EventType AUTH_CHALLENGE_REPLY: EventType CONNECT_TIMED_OUT: EventType SEND_MESSAGE_FAILED: EventType MESSAGE_ENQUEUED: EventType SOCKET_WRITE: EventType MESSAGE_WRITTEN: EventType SOCKET_READ: EventType MESSAGE_READ: EventType SOCKET_CLOSED: EventType SSL_CERT_EXCESSIVE_LIFETIME: EventType CHANNEL_POLICY_ENFORCED: EventType TCP_SOCKET_CONNECT_COMPLETE: EventType SSL_SOCKET_CONNECT_COMPLETE: EventType SSL_SOCKET_CONNECT_FAILED: EventType SEND_AUTH_CHALLENGE_FAILED: EventType AUTH_CHALLENGE_REPLY_INVALID: EventType PING_WRITE_ERROR: EventType SSL: ChannelAuth SSL_VERIFIED: ChannelAuth READY_STATE_NONE: ReadyState READY_STATE_CONNECTING: ReadyState READY_STATE_OPEN: ReadyState READY_STATE_CLOSING: ReadyState READY_STATE_CLOSED: ReadyState CONN_STATE_UNKNOWN: ConnectionState CONN_STATE_TCP_CONNECT: ConnectionState CONN_STATE_TCP_CONNECT_COMPLETE: ConnectionState CONN_STATE_SSL_CONNECT: ConnectionState CONN_STATE_SSL_CONNECT_COMPLETE: ConnectionState CONN_STATE_AUTH_CHALLENGE_SEND: ConnectionState CONN_STATE_AUTH_CHALLENGE_SEND_COMPLETE: ConnectionState CONN_STATE_AUTH_CHALLENGE_REPLY_COMPLETE: ConnectionState CONN_STATE_START_CONNECT: ConnectionState CONN_STATE_FINISHED: ConnectionState CONN_STATE_ERROR: ConnectionState CONN_STATE_TIMEOUT: ConnectionState READ_STATE_UNKNOWN: ReadState READ_STATE_READ: ReadState READ_STATE_READ_COMPLETE: ReadState READ_STATE_DO_CALLBACK: ReadState READ_STATE_HANDLE_ERROR: ReadState READ_STATE_ERROR: ReadState WRITE_STATE_UNKNOWN: WriteState WRITE_STATE_WRITE: WriteState WRITE_STATE_WRITE_COMPLETE: WriteState WRITE_STATE_DO_CALLBACK: WriteState WRITE_STATE_HANDLE_ERROR: WriteState WRITE_STATE_ERROR: WriteState WRITE_STATE_IDLE: WriteState CHANNEL_ERROR_NONE: ErrorState CHANNEL_ERROR_CHANNEL_NOT_OPEN: ErrorState CHANNEL_ERROR_AUTHENTICATION_ERROR: ErrorState CHANNEL_ERROR_CONNECT_ERROR: ErrorState CHANNEL_ERROR_SOCKET_ERROR: ErrorState CHANNEL_ERROR_TRANSPORT_ERROR: ErrorState CHANNEL_ERROR_INVALID_MESSAGE: ErrorState CHANNEL_ERROR_INVALID_CHANNEL_ID: ErrorState CHANNEL_ERROR_CONNECT_TIMEOUT: ErrorState CHANNEL_ERROR_UNKNOWN: ErrorState CHALLENGE_REPLY_ERROR_NONE: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_PEER_CERT_EMPTY: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_WRONG_PAYLOAD_TYPE: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_NO_PAYLOAD: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_PAYLOAD_PARSING_FAILED: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_MESSAGE_ERROR: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_NO_RESPONSE: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_FINGERPRINT_NOT_FOUND: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_CERT_PARSING_FAILED: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_CERT_NOT_SIGNED_BY_TRUSTED_CA: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_CANNOT_EXTRACT_PUBLIC_KEY: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_SIGNED_BLOBS_MISMATCH: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_TLS_CERT_VALIDITY_PERIOD_TOO_LONG: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_TLS_CERT_VALID_START_DATE_IN_FUTURE: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_TLS_CERT_EXPIRED: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_CRL_INVALID: ChallengeReplyErrorType CHALLENGE_REPLY_ERROR_CERT_REVOKED: ChallengeReplyErrorType class SocketEvent(_message.Message): __slots__ = ("type", "timestamp_micros", "details", "net_return_value", "message_namespace", "ready_state", "connection_state", "read_state", "write_state", "error_state", "challenge_reply_error_type", "nss_error_code") TYPE_FIELD_NUMBER: _ClassVar[int] TIMESTAMP_MICROS_FIELD_NUMBER: _ClassVar[int] DETAILS_FIELD_NUMBER: _ClassVar[int] NET_RETURN_VALUE_FIELD_NUMBER: _ClassVar[int] MESSAGE_NAMESPACE_FIELD_NUMBER: _ClassVar[int] READY_STATE_FIELD_NUMBER: _ClassVar[int] CONNECTION_STATE_FIELD_NUMBER: _ClassVar[int] READ_STATE_FIELD_NUMBER: _ClassVar[int] WRITE_STATE_FIELD_NUMBER: _ClassVar[int] ERROR_STATE_FIELD_NUMBER: _ClassVar[int] CHALLENGE_REPLY_ERROR_TYPE_FIELD_NUMBER: _ClassVar[int] NSS_ERROR_CODE_FIELD_NUMBER: _ClassVar[int] type: EventType timestamp_micros: int details: str net_return_value: int message_namespace: str ready_state: ReadyState connection_state: ConnectionState read_state: ReadState write_state: WriteState error_state: ErrorState challenge_reply_error_type: ChallengeReplyErrorType nss_error_code: int def __init__(self, type: _Optional[_Union[EventType, str]] = ..., timestamp_micros: _Optional[int] = ..., details: _Optional[str] = ..., net_return_value: _Optional[int] = ..., message_namespace: _Optional[str] = ..., ready_state: _Optional[_Union[ReadyState, str]] = ..., connection_state: _Optional[_Union[ConnectionState, str]] = ..., read_state: _Optional[_Union[ReadState, str]] = ..., write_state: _Optional[_Union[WriteState, str]] = ..., error_state: _Optional[_Union[ErrorState, str]] = ..., challenge_reply_error_type: _Optional[_Union[ChallengeReplyErrorType, str]] = ..., nss_error_code: _Optional[int] = ...) -> None: ... class AggregatedSocketEvent(_message.Message): __slots__ = ("id", "endpoint_id", "channel_auth_type", "socket_event", "bytes_read", "bytes_written") ID_FIELD_NUMBER: _ClassVar[int] ENDPOINT_ID_FIELD_NUMBER: _ClassVar[int] CHANNEL_AUTH_TYPE_FIELD_NUMBER: _ClassVar[int] SOCKET_EVENT_FIELD_NUMBER: _ClassVar[int] BYTES_READ_FIELD_NUMBER: _ClassVar[int] BYTES_WRITTEN_FIELD_NUMBER: _ClassVar[int] id: int endpoint_id: int channel_auth_type: ChannelAuth socket_event: _containers.RepeatedCompositeFieldContainer[SocketEvent] bytes_read: int bytes_written: int def __init__(self, id: _Optional[int] = ..., endpoint_id: _Optional[int] = ..., channel_auth_type: _Optional[_Union[ChannelAuth, str]] = ..., socket_event: _Optional[_Iterable[_Union[SocketEvent, _Mapping]]] = ..., bytes_read: _Optional[int] = ..., bytes_written: _Optional[int] = ...) -> None: ... class Log(_message.Message): __slots__ = ("aggregated_socket_event", "num_evicted_aggregated_socket_events", "num_evicted_socket_events") AGGREGATED_SOCKET_EVENT_FIELD_NUMBER: _ClassVar[int] NUM_EVICTED_AGGREGATED_SOCKET_EVENTS_FIELD_NUMBER: _ClassVar[int] NUM_EVICTED_SOCKET_EVENTS_FIELD_NUMBER: _ClassVar[int] aggregated_socket_event: _containers.RepeatedCompositeFieldContainer[AggregatedSocketEvent] num_evicted_aggregated_socket_events: int num_evicted_socket_events: int def __init__(self, aggregated_socket_event: _Optional[_Iterable[_Union[AggregatedSocketEvent, _Mapping]]] = ..., num_evicted_aggregated_socket_events: _Optional[int] = ..., num_evicted_socket_events: _Optional[int] = ...) -> None: ... 0707010000004C000081A400000000000000000000000165F9EF900000010F000000000000000000000000000000000000003600000000pychromecast-14.0.1/pychromecast/generated/readme.txtThe files in this directory can be generated by calling protoc -I=chromecast_protobuf/ --python_out=pychromecast/generated/ --pyi_out=pychromecast/generated/ chromecast_protobuf/authority_keys.proto chromecast_protobuf/cast_channel.proto chromecast_protobuf/logging.proto0707010000004D000081A400000000000000000000000165F9EF900000035F000000000000000000000000000000000000002B00000000pychromecast-14.0.1/pychromecast/models.py""" Chromecast types """ from __future__ import annotations import asyncio from dataclasses import dataclass from uuid import UUID import zeroconf ZEROCONF_ERRORS: tuple[type[Exception], ...] = (IOError, asyncio.TimeoutError) if hasattr(zeroconf, "EventLoopBlocked"): # Added in zeroconf 0.37.0 ZEROCONF_ERRORS = (*ZEROCONF_ERRORS, zeroconf.EventLoopBlocked) @dataclass(frozen=True) class CastInfo: """Cast info container.""" services: set[HostServiceInfo | MDNSServiceInfo] uuid: UUID model_name: str | None friendly_name: str | None host: str port: int cast_type: str | None manufacturer: str | None @dataclass(frozen=True) class HostServiceInfo: """Service info container.""" host: str port: int @dataclass(frozen=True) class MDNSServiceInfo: """Service info container.""" name: str 0707010000004E000081A400000000000000000000000165F9EF9000000DCA000000000000000000000000000000000000002F00000000pychromecast-14.0.1/pychromecast/quick_play.py""" Choose a controller and quick play """ from typing import Any from . import Chromecast from .controllers import QuickPlayController from .controllers.bbciplayer import BbcIplayerController from .controllers.bbcsounds import BbcSoundsController from .controllers.bubbleupnp import BubbleUPNPController from .controllers.homeassistant_media import HomeAssistantMediaController from .controllers.media import DefaultMediaReceiverController from .controllers.supla import SuplaController from .controllers.yleareena import YleAreenaController from .controllers.youtube import YouTubeController from .controllers.shaka import ShakaController from .controllers.nrktv import NrkTvController from .controllers.nrkradio import NrkRadioController def quick_play( # pylint:disable=too-many-branches cast: Chromecast, app_name: str, data: dict[str, Any], timeout: float = 30.0, ) -> None: """ Given a Chromecast connection, launch the app `app_name` and start playing media based on parameters defined in `data`. :param cast: Chromecast connection to cast to :param app_name: App name "slug" to cast :param data: Data to send to the app controller. Must contain "media_id", and other values can be passed depending on the controller. :type cast: Chromecast :type app_name: string :type data: dict `data` can contain the following keys: media_id: string (Required) Primary identifier of the media media_type: string Type of the media identified by `media_id`. e.g. "program" if the media is a program name instead of a direct item id. When using a regular media controller (e.g. BubbleUPNP) this should be the content_type ('audio/mp3') enqueue: boolean Enqueue the media to the current playlist, if possible. index: string Play index x of matching media. "random" should also be allowed. audio_lang: string Audio language (3 characters for YleAreena) text_lang: string Subtitle language (3 characters for YleAreena) Youtube-specific: playlist_id: string Youtube playlist id Supla-specific: is_live: boolean Whether the media is a livestream Media controller (BubbleUPNP)-specific: stream_type: string "BUFFERED" or "LIVE" """ controller: QuickPlayController if app_name == "bbciplayer": controller = BbcIplayerController() elif app_name == "bbcsounds": controller = BbcSoundsController() elif app_name == "bubbleupnp": controller = BubbleUPNPController() elif app_name == "default_media_receiver": controller = DefaultMediaReceiverController() elif app_name == "homeassistant_media": controller = HomeAssistantMediaController() elif app_name == "supla": controller = SuplaController() elif app_name == "yleareena": controller = YleAreenaController() elif app_name == "youtube": controller = YouTubeController() elif app_name == "shaka": controller = ShakaController() elif app_name == "nrktv": controller = NrkTvController() elif app_name == "nrkradio": controller = NrkRadioController() else: raise NotImplementedError() cast.register_handler(controller) try: controller.quick_play(**data, timeout=timeout) finally: cast.unregister_handler(controller) 0707010000004F000081A400000000000000000000000165F9EF9000000823000000000000000000000000000000000000003500000000pychromecast-14.0.1/pychromecast/response_handler.py"""Helpers and types related to waiting for a command response.""" from __future__ import annotations from collections.abc import Callable import logging import threading from typing import Protocol from .error import RequestFailed, RequestTimeout _LOGGER = logging.getLogger(__name__) CallbackType = Callable[[bool, dict | None], None] """Signature of optional callback functions supported by methods sending messages. The callback function will be called with a bool indicating if the message was sent and an optional response dict. """ class AcceptsCallbackFunc(Protocol): """A function which accepts a callback_function kwarg.""" def __call__( self, *, callback_function: CallbackType | None, ) -> None: ... class WaitResponse: """Wait for a response.""" msg_sent: bool response: dict | None def __init__(self, timeout: float, request: str) -> None: """Initialize.""" self._event = threading.Event() self._request = request self._timeout = timeout def callback(self, msg_sent: bool, response: dict | None) -> None: """Called when the request is finished.""" self.response = response self.msg_sent = msg_sent self._event.set() def wait_response(self) -> None: """Wait for the request to finish.""" request_completed = self._event.wait(self._timeout) if not request_completed: raise RequestTimeout(self._request, self._timeout) if not self.msg_sent: raise RequestFailed(self._request) def chain_on_success( on_success: AcceptsCallbackFunc, callback_function: CallbackType | None ) -> CallbackType: """Helper to chain callbacks.""" def _callback(msg_sent: bool, response: dict | None) -> None: if not msg_sent: _LOGGER.debug("Not calling on_success %s", on_success) if callback_function: callback_function(msg_sent, response) return on_success(callback_function=callback_function) return _callback 07070100000050000081A400000000000000000000000165F9EF900000A13C000000000000000000000000000000000000003200000000pychromecast-14.0.1/pychromecast/socket_client.py""" Module to interact with the ChromeCast via protobuf-over-socket. Big thanks goes out to Fred Clift <fred@clift.org> who build the first version of this code: https://github.com/minektur/chromecast-python-poc. Without him this would not have been possible. """ # pylint: disable=too-many-lines from __future__ import annotations import abc from dataclasses import dataclass import errno import json import logging import select import socket import ssl import threading import time from collections import defaultdict from struct import pack, unpack import zeroconf from .controllers import CallbackType, BaseController from .controllers.media import MediaController from .controllers.receiver import CastStatus, CastStatusListener, ReceiverController from .const import MESSAGE_TYPE, REQUEST_ID, SESSION_ID from .dial import get_host_from_service from .error import ( ChromecastConnectionError, ControllerNotRegistered, UnsupportedNamespace, NotConnected, PyChromecastStopped, ) # pylint: disable-next=no-name-in-module from .generated.cast_channel_pb2 import CastMessage from .models import HostServiceInfo, MDNSServiceInfo NS_CONNECTION = "urn:x-cast:com.google.cast.tp.connection" NS_HEARTBEAT = "urn:x-cast:com.google.cast.tp.heartbeat" PLATFORM_DESTINATION_ID = "receiver-0" TYPE_PING = "PING" TYPE_PONG = "PONG" TYPE_CONNECT = "CONNECT" TYPE_CLOSE = "CLOSE" TYPE_LOAD = "LOAD" # The socket connection is being setup CONNECTION_STATUS_CONNECTING = "CONNECTING" # The socket connection was complete CONNECTION_STATUS_CONNECTED = "CONNECTED" # The socket connection has been disconnected CONNECTION_STATUS_DISCONNECTED = "DISCONNECTED" # Connecting to socket failed (after a CONNECTION_STATUS_CONNECTING) CONNECTION_STATUS_FAILED = "FAILED" # Failed to resolve service name CONNECTION_STATUS_FAILED_RESOLVE = "FAILED_RESOLVE" # The socket connection was lost and needs to be retried CONNECTION_STATUS_LOST = "LOST" # Check for select poll method SELECT_HAS_POLL = hasattr(select, "poll") HB_PING_TIME = 10 HB_PONG_TIME = 10 POLL_TIME_BLOCKING = 5.0 POLL_TIME_NON_BLOCKING = 0.01 TIMEOUT_TIME = 30.0 RETRY_TIME = 5.0 class InterruptLoop(Exception): """The chromecast has been manually stopped.""" def _dict_from_message_payload(message: CastMessage) -> dict: """Parses a PB2 message as a JSON dict.""" try: data = json.loads(message.payload_utf8) if not isinstance(data, dict): logger = logging.getLogger(__name__) logger.debug( "Non dict json in namespace %s: '%s'", message.namespace, message.payload_utf8, ) return {} return data except ValueError: logger = logging.getLogger(__name__) logger.debug( "Invalid json in namespace %s: '%s'", message.namespace, message.payload_utf8, ) return {} def _message_to_string( message: CastMessage, data: dict | None = None, ) -> str: """Gives a string representation of a PB2 message.""" if data is None: data = _dict_from_message_payload(message) return ( f"Message {message.namespace} from {message.source_id} to " f"{message.destination_id}: {data or message.payload_utf8}" ) @dataclass(frozen=True) class NetworkAddress: """Network address container.""" address: str port: int | None @dataclass(frozen=True) class ConnectionStatus: """Connection status container.""" status: str address: NetworkAddress | None service: HostServiceInfo | MDNSServiceInfo | None class ConnectionStatusListener(abc.ABC): """Listener for receiving connection status events.""" @abc.abstractmethod def new_connection_status(self, status: ConnectionStatus) -> None: """Updated connection status.""" # pylint: disable-next=too-many-instance-attributes class SocketClient(threading.Thread, CastStatusListener): """ Class to interact with a Chromecast through a socket. :param host: The host to connect to. :param port: The port to use when connecting to the device, set to None to use the default of 8009. Special devices such as Cast Groups may return a different port number so we need to use that. :param cast_type: The type of chromecast to connect to, see dial.CAST_TYPE_* for types. :param tries: Number of retries to perform if the connection fails. None for infinite retries. :param timeout: A floating point number specifying the socket timeout in seconds. None means to use the default which is 30 seconds. :param retry_wait: A floating point number specifying how many seconds to wait between each retry. None means to use the default which is 5 seconds. :param services: A list of mDNS services to try to connect to. If present, parameters host and port are ignored and host and port are instead resolved through mDNS. The list of services may be modified, for example if speaker group leadership is handed over. SocketClient will catch modifications to the list when attempting reconnect. :param zconf: A zeroconf instance, needed if a list of services is passed. The zeroconf instance may be obtained from the browser returned by pychromecast.start_discovery(). """ # pylint: disable-next=too-many-arguments def __init__( self, *, cast_type: str, tries: int | None, timeout: float | None, retry_wait: float | None, services: set[HostServiceInfo | MDNSServiceInfo], zconf: zeroconf.Zeroconf | None, ) -> None: super().__init__() self.daemon = True self.logger = logging.getLogger(__name__) self._force_recon = False self.cast_type = cast_type self.fn: str | None = None # pylint:disable=invalid-name self.tries = tries self.timeout = timeout or TIMEOUT_TIME self.retry_wait = retry_wait or RETRY_TIME self.services = services self.zconf = zconf self.host = "unknown" self.port = 8009 self.source_id = "sender-0" self.stop = threading.Event() # socketpair used to interrupt the worker thread self.socketpair = socket.socketpair() self.app_namespaces: list[str] = [] self.destination_id: str | None = None self.session_id: str | None = None self._request_id = 0 self._request_callbacks: dict[int, CallbackType] = {} self._open_channels: list[str] = [] self.connecting = True self.first_connection = True self.socket: socket.socket | ssl.SSLSocket | None = None # dict mapping namespace on Controller objects self._handlers: dict[str, set[BaseController]] = defaultdict(set) self._connection_listeners: list[ConnectionStatusListener] = [] self.receiver_controller = ReceiverController(cast_type) self.media_controller = MediaController() self.heartbeat_controller = HeartbeatController() self.register_handler(self.heartbeat_controller) self.register_handler(ConnectionController()) self.register_handler(self.receiver_controller) self.register_handler(self.media_controller) self.receiver_controller.register_status_listener(self) def initialize_connection( # pylint:disable=too-many-statements, too-many-branches self, ) -> None: """Initialize a socket to a Chromecast, retrying as necessary.""" tries = self.tries if self.socket is not None: self.socket.close() self.socket = None # Make sure nobody is blocking. for callback_function in self._request_callbacks.values(): callback_function(False, None) self.app_namespaces = [] self.destination_id = None self.session_id = None self._request_id = 0 self._request_callbacks = {} self._open_channels = [] self.connecting = True retry_log_fun = self.logger.error # Dict keeping track of individual retry delay for each named service retries: dict[HostServiceInfo | MDNSServiceInfo, dict[str, float]] = {} def mdns_backoff( service: HostServiceInfo | MDNSServiceInfo, retry: dict[str, float], ) -> None: """Exponentional backoff for service name mdns lookups.""" now = time.time() retry["next_retry"] = now + retry["delay"] retry["delay"] = min(retry["delay"] * 2, 300) retries[service] = retry while not self.stop.is_set() and ( tries is None or tries > 0 ): # pylint:disable=too-many-nested-blocks # Prune retries dict retries = { key: retries[key] for key in self.services.copy() if (key is not None and key in retries) } for service in self.services.copy(): now = time.time() retry = retries.get( service, {"delay": self.retry_wait, "next_retry": now} ) if now < retry["next_retry"]: continue try: if self.socket is not None: # If we retry connecting, we need to clean up the socket again self.socket.close() # type: ignore[unreachable] self.socket = None self.socket = new_socket() self.socket.settimeout(self.timeout) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_CONNECTING, NetworkAddress(self.host, self.port), None, ) ) # Resolve the service name. host = None port = None host, port, service_info = get_host_from_service( service, self.zconf ) if host and port: if service_info: try: # Mypy does not understand that we catch errors, ignore it self.fn = service_info.properties[b"fn"].decode("utf-8") # type: ignore[union-attr] except (AttributeError, KeyError, UnicodeError): pass self.logger.debug( "[%s(%s):%s] Resolved service %s to %s:%s", self.fn or "", self.host, self.port, service, host, port, ) self.host = host self.port = port else: self.logger.debug( "[%s(%s):%s] Failed to resolve service %s", self.fn or "", self.host, self.port, service, ) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_FAILED_RESOLVE, None, service, ) ) mdns_backoff(service, retry) # If zeroconf fails to receive the necessary data, # try next service continue self.logger.debug( "[%s(%s):%s] Connecting to %s:%s", self.fn or "", self.host, self.port, self.host, self.port, ) self.socket.connect((self.host, self.port)) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE self.socket = context.wrap_socket(self.socket) self.connecting = False self._force_recon = False self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_CONNECTED, NetworkAddress(self.host, self.port), None, ) ) self.receiver_controller.update_status() self.heartbeat_controller.ping() self.heartbeat_controller.reset() if self.first_connection: self.first_connection = False self.logger.debug( "[%s(%s):%s] Connected!", self.fn or "", self.host, self.port, ) else: self.logger.info( "[%s(%s):%s] Connection reestablished!", self.fn or "", self.host, self.port, ) return # OSError raised if connecting to the socket fails, NotConnected raised # if another thread tries - and fails - to send a message before the # calls to receiver_controller and heartbeat_controller. except (OSError, NotConnected) as err: self.connecting = True if self.stop.is_set(): self.logger.error( "[%s(%s):%s] Failed to connect: %s. aborting due to stop signal.", self.fn or "", self.host, self.port, err, ) raise ChromecastConnectionError("Failed to connect") from err self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_FAILED, NetworkAddress(self.host, self.port), None, ) ) retry_log_fun( "[%s(%s):%s] Failed to connect to service %s, retrying in %.1fs", self.fn or "", self.host, self.port, service, retry["delay"], ) mdns_backoff(service, retry) retry_log_fun = self.logger.debug # Only sleep if we have another retry remaining if tries is None or tries > 1: self.logger.debug( "[%s(%s):%s] Not connected, sleeping for %.1fs. Services: %s", self.fn or "", self.host, self.port, self.retry_wait, self.services, ) time.sleep(self.retry_wait) if tries: tries -= 1 self.stop.set() self.logger.error( "[%s(%s):%s] Failed to connect. No retries.", self.fn or "", self.host, self.port, ) raise ChromecastConnectionError("Failed to connect") def disconnect(self) -> None: """Disconnect socket connection to Chromecast device""" self.stop.set() try: # Write to the socket to interrupt the worker thread self.socketpair[1].send(b"x") except socket.error: # The socketpair may already be closed during shutdown, ignore it pass def register_handler(self, handler: BaseController) -> None: """Register a new namespace handler.""" self._handlers[handler.namespace].add(handler) handler.registered(self) def unregister_handler(self, handler: BaseController) -> None: """Register a new namespace handler.""" if ( handler.namespace in self._handlers and handler in self._handlers[handler.namespace] ): self._handlers[handler.namespace].remove(handler) handler.unregistered() def new_cast_status(self, status: CastStatus) -> None: """Called when a new cast status has been received.""" new_channel = self.destination_id != status.transport_id if new_channel and self.destination_id is not None: self.disconnect_channel(self.destination_id) self.app_namespaces = status.namespaces self.destination_id = status.transport_id self.session_id = status.session_id if new_channel and self.destination_id is not None: # If any of the namespaces of the new app are supported # we will automatically connect to it to receive updates for namespace in self.app_namespaces: if namespace in self._handlers: self._ensure_channel_connected(self.destination_id) for handler in set(self._handlers[namespace]): handler.channel_connected() def _gen_request_id(self) -> int: """Generates a unique request id.""" self._request_id += 1 return self._request_id @property def is_connected(self) -> bool: """ Returns True if the client is connected, False if it is stopped (or trying to connect). """ return not self.connecting @property def is_stopped(self) -> bool: """ Returns True if the connection has been stopped, False if it is running. """ return self.stop.is_set() def run(self) -> None: """Connect to the cast and start polling the socket.""" try: self.initialize_connection() except ChromecastConnectionError: self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_DISCONNECTED, NetworkAddress(self.host, self.port), None, ) ) return self.heartbeat_controller.reset() self.logger.debug("Thread started...") while not self.stop.is_set(): try: if self._run_once(timeout=POLL_TIME_BLOCKING) == 1: break except Exception: # pylint: disable=broad-except self._force_recon = True self.logger.exception( "[%s(%s):%s] Unhandled exception in worker thread, attempting reconnect", self.fn or "", self.host, self.port, ) self.logger.debug("Thread done...") # Clean up self._cleanup() def _run_once(self, timeout: float = POLL_TIME_NON_BLOCKING) -> int: """Receive from the socket and handle data.""" # pylint: disable=too-many-branches, too-many-statements, too-many-return-statements try: if not self._check_connection(): return 0 except ChromecastConnectionError: return 1 # A connection has been established at this point by self._check_connection assert self.socket is not None # poll the socket, as well as the socketpair to allow us to be interrupted rlist = [self.socket, self.socketpair[0]] try: if SELECT_HAS_POLL is True: # Map file descriptors to socket objects because select.select does not support fd > 1024 # https://stackoverflow.com/questions/14250751/how-to-increase-filedescriptors-range-in-python-select fd_to_socket = {rlist_item.fileno(): rlist_item for rlist_item in rlist} poll_obj = select.poll() for poll_fd in rlist: poll_obj.register(poll_fd, select.POLLIN) poll_result = poll_obj.poll(timeout * 1000) # timeout in milliseconds can_read = [fd_to_socket[fd] for fd, _status in poll_result] else: can_read, _, _ = select.select(rlist, [], [], timeout) except (ValueError, OSError) as exc: self.logger.error( "[%s(%s):%s] Error in select call: %s", self.fn or "", self.host, self.port, exc, ) self._force_recon = True return 0 # read message from chromecast message = None if self.socket in can_read and not self._force_recon: try: message = self._read_message() except InterruptLoop as exc: if self.stop.is_set(): self.logger.info( "[%s(%s):%s] Stopped while reading message, disconnecting.", self.fn or "", self.host, self.port, ) else: self.logger.error( "[%s(%s):%s] Interruption caught without being stopped: %s", self.fn or "", self.host, self.port, exc, ) return 1 except ssl.SSLError as exc: if exc.errno == ssl.SSL_ERROR_EOF: if self.stop.is_set(): return 1 raise except socket.error: self._force_recon = True self.logger.error( "[%s(%s):%s] Error reading from socket.", self.fn or "", self.host, self.port, ) else: data = _dict_from_message_payload(message) if self.socketpair[0] in can_read: # Clear the socket's buffer self.socketpair[0].recv(128) # If we are stopped after receiving a message we skip the message # and tear down the connection if self.stop.is_set(): return 1 if not message: return 0 # See if any handlers will accept this message self._route_message(message, data) if REQUEST_ID in data and data[REQUEST_ID] in self._request_callbacks: self._request_callbacks.pop(data[REQUEST_ID])(True, data) return 0 def _check_connection(self) -> bool: """ Checks if the connection is active, and if not reconnect :return: True if the connection is active, False if the connection was reset. """ # check if connection is expired reset = False if self._force_recon: self.logger.warning( "[%s(%s):%s] Error communicating with socket, resetting connection", self.fn or "", self.host, self.port, ) reset = True elif self.heartbeat_controller.is_expired(): self.logger.warning( "[%s(%s):%s] Heartbeat timeout, resetting connection", self.fn or "", self.host, self.port, ) reset = True if reset: self.receiver_controller.disconnected() for channel in self._open_channels: self.disconnect_channel(channel) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_LOST, NetworkAddress(self.host, self.port), None ) ) try: self.initialize_connection() except ChromecastConnectionError: self.stop.set() return False return True def _route_message(self, message: CastMessage, data: dict) -> None: """Route message to any handlers on the message namespace""" # route message to handlers if message.namespace in self._handlers: # debug messages if message.namespace != NS_HEARTBEAT: self.logger.debug( "[%s(%s):%s] Received: %s", self.fn or "", self.host, self.port, _message_to_string(message, data), ) # message handlers for handler in set(self._handlers[message.namespace]): try: handled = handler.receive_message(message, data) if not handled: if data.get(REQUEST_ID) not in self._request_callbacks: self.logger.debug( "[%s(%s):%s] Message unhandled: %s", self.fn or "", self.host, self.port, _message_to_string(message, data), ) except Exception: # pylint: disable=broad-except self.logger.exception( ( "[%s(%s):%s] Exception caught while sending message to " "controller %s: %s" ), self.fn or "", self.host, self.port, type(handler).__name__, _message_to_string(message, data), ) else: self.logger.debug( "[%s(%s):%s] Received unknown namespace: %s", self.fn or "", self.host, self.port, _message_to_string(message, data), ) def _cleanup(self) -> None: """Cleanup open channels and handlers""" for channel in self._open_channels: try: self.disconnect_channel(channel) except Exception: # pylint: disable=broad-except pass for handlers in self._handlers.values(): for handler in set(handlers): try: handler.tear_down() except Exception: # pylint: disable=broad-except pass if self.socket is not None: try: self.socket.close() except Exception: # pylint: disable=broad-except self.logger.exception( "[%s(%s):%s] _cleanup", self.fn or "", self.host, self.port ) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_DISCONNECTED, NetworkAddress(self.host, self.port), None, ) ) self.socketpair[0].close() self.socketpair[1].close() self.connecting = True def _report_connection_status(self, status: ConnectionStatus) -> None: """Report a change in the connection status to any listeners""" for listener in self._connection_listeners: try: self.logger.debug( "[%s(%s):%s] connection listener: %x (%s) %s", self.fn or "", self.host, self.port, id(listener), type(listener).__name__, status, ) listener.new_connection_status(status) except Exception: # pylint: disable=broad-except self.logger.exception( "[%s(%s):%s] Exception thrown when calling connection listener", self.fn or "", self.host, self.port, ) def _read_bytes_from_socket(self, msglen: int) -> bytes: """Read bytes from the socket.""" # It is a programming error if this is called when we don't have a socket assert self.socket is not None chunks = [] bytes_recd = 0 while bytes_recd < msglen: if self.stop.is_set(): raise InterruptLoop("Stopped while reading from socket") try: chunk = self.socket.recv(min(msglen - bytes_recd, 2048)) if chunk == b"": raise socket.error("socket connection broken") chunks.append(chunk) bytes_recd += len(chunk) except TimeoutError: self.logger.debug( "[%s(%s):%s] timeout in : _read_bytes_from_socket", self.fn or "", self.host, self.port, ) continue return b"".join(chunks) def _read_message(self) -> CastMessage: """Reads a message from the socket and converts it to a message.""" # first 4 bytes is Big-Endian payload length payload_info = self._read_bytes_from_socket(4) read_len = unpack(">I", payload_info)[0] # now read the payload payload = self._read_bytes_from_socket(read_len) message = CastMessage() message.ParseFromString(payload) return message # pylint: disable=too-many-arguments, too-many-branches def send_message( self, destination_id: str, namespace: str, data: dict, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, force: bool = False, ) -> None: """Send a message to the Chromecast.""" # namespace is a string containing namespace # data is a dict that will be converted to json # wait_for_response only works if we have a request id # If channel is not open yet, connect to it. self._ensure_channel_connected(destination_id) if not no_add_request_id: request_id = self._gen_request_id() data[REQUEST_ID] = request_id if inc_session_id: data[SESSION_ID] = self.session_id msg = CastMessage() msg.protocol_version = msg.CASTV2_1_0 msg.source_id = self.source_id msg.destination_id = destination_id msg.payload_type = CastMessage.STRING msg.namespace = namespace msg.payload_utf8 = json.dumps(data, ensure_ascii=False) # prepend message with Big-Endian 4 byte payload size be_size = pack(">I", msg.ByteSize()) # Log all messages except heartbeat if msg.namespace != NS_HEARTBEAT: self.logger.debug( "[%s(%s):%s] Sending: %s", self.fn or "", self.host, self.port, _message_to_string(msg, data), ) if not force and self.stop.is_set(): if callback_function: callback_function(False, None) raise PyChromecastStopped("Socket client's thread is stopped.") if not self.connecting and not self._force_recon: # We have a socket assert self.socket is not None try: if callback_function: if not no_add_request_id: self._request_callbacks[request_id] = callback_function else: callback_function(True, None) self.socket.sendall(be_size + msg.SerializeToString()) except socket.error: if callback_function: callback_function(False, None) if not no_add_request_id: self._request_callbacks.pop(request_id, None) self._force_recon = True self.logger.info( "[%s(%s):%s] Error writing to socket.", self.fn or "", self.host, self.port, ) else: if callback_function: callback_function(False, None) raise NotConnected(f"Chromecast {self.host}:{self.port} is connecting...") def send_platform_message( self, namespace: str, message: dict, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, ) -> None: """Helper method to send a message to the platform.""" return self.send_message( PLATFORM_DESTINATION_ID, namespace, message, inc_session_id=inc_session_id, callback_function=callback_function, no_add_request_id=no_add_request_id, ) def send_app_message( self, namespace: str, message: dict, *, inc_session_id: bool = False, callback_function: CallbackType | None = None, no_add_request_id: bool = False, ) -> None: """Helper method to send a message to current running app.""" if namespace not in self.app_namespaces: if callback_function: callback_function(False, None) raise UnsupportedNamespace( f"Namespace {namespace} is not supported by current app. " f"Supported are {', '.join(self.app_namespaces)}" ) if self.destination_id is None: if callback_function: callback_function(False, None) raise NotConnected( "Attempting send a message when destination_id is not set" ) return self.send_message( self.destination_id, namespace, message, inc_session_id=inc_session_id, callback_function=callback_function, no_add_request_id=no_add_request_id, ) def register_connection_listener(self, listener: ConnectionStatusListener) -> None: """Register a connection listener for when the socket connection changes. Listeners will be called with listener.new_connection_status(status)""" self._connection_listeners.append(listener) def _ensure_channel_connected(self, destination_id: str) -> None: """Ensure we opened a channel to destination_id.""" if destination_id not in self._open_channels: self._open_channels.append(destination_id) self.send_message( destination_id, NS_CONNECTION, { MESSAGE_TYPE: TYPE_CONNECT, "origin": {}, "userAgent": "PyChromecast", "senderInfo": { "sdkType": 2, "version": "15.605.1.3", "browserVersion": "44.0.2403.30", "platform": 4, "systemVersion": "Macintosh; Intel Mac OS X10_10_3", "connectionType": 1, }, }, no_add_request_id=True, ) def disconnect_channel(self, destination_id: str) -> None: """Disconnect a channel with destination_id.""" if destination_id in self._open_channels: try: self.send_message( destination_id, NS_CONNECTION, {MESSAGE_TYPE: TYPE_CLOSE, "origin": {}}, no_add_request_id=True, force=True, ) except NotConnected: pass except Exception: # pylint: disable=broad-except self.logger.exception( "[%s(%s):%s] Exception", self.fn or "", self.host, self.port ) self._open_channels.remove(destination_id) self.handle_channel_disconnected() def handle_channel_disconnected(self) -> None: """Handles a channel being disconnected.""" for namespace in self.app_namespaces: if namespace in self._handlers: for handler in set(self._handlers[namespace]): handler.channel_disconnected() self.app_namespaces = [] self.destination_id = None self.session_id = None class ConnectionController(BaseController): """Controller to respond to connection messages.""" def __init__(self) -> None: super().__init__(NS_CONNECTION) def receive_message(self, message: CastMessage, data: dict) -> bool: """ Called when a message is received. data is message.payload_utf8 interpreted as a JSON dict. """ if self._socket_client is None: raise ControllerNotRegistered if self._socket_client.is_stopped: return True if data[MESSAGE_TYPE] == TYPE_CLOSE: # The cast device is asking us to acknowledge closing this channel. self._socket_client.disconnect_channel(message.source_id) # Schedule a status update so that a channel is created. self._socket_client.receiver_controller.update_status() return True return False class HeartbeatController(BaseController): """Controller to respond to heartbeat messages.""" def __init__(self) -> None: super().__init__(NS_HEARTBEAT, target_platform=True) self.last_ping = 0.0 self.last_pong = time.time() def receive_message(self, _message: CastMessage, data: dict) -> bool: """ Called when a heartbeat message is received. data is message.payload_utf8 interpreted as a JSON dict. """ if self._socket_client is None: raise ControllerNotRegistered if self._socket_client.is_stopped: return True if data[MESSAGE_TYPE] == TYPE_PING: try: self._socket_client.send_message( PLATFORM_DESTINATION_ID, self.namespace, {MESSAGE_TYPE: TYPE_PONG}, no_add_request_id=True, ) except PyChromecastStopped: self._socket_client.logger.debug( "Heartbeat error when sending response, " "Chromecast connection has stopped" ) return True if data[MESSAGE_TYPE] == TYPE_PONG: self.reset() return True return False def ping(self) -> None: """Send a ping message.""" if self._socket_client is None: raise ControllerNotRegistered self.last_ping = time.time() try: self.send_message({MESSAGE_TYPE: TYPE_PING}) except NotConnected: self._socket_client.logger.error( "Chromecast is disconnected. Cannot ping until reconnected." ) def reset(self) -> None: """Reset expired counter.""" self.last_pong = time.time() def is_expired(self) -> bool: """Indicates if connection has expired.""" if time.time() - self.last_ping > HB_PING_TIME: self.ping() return (time.time() - self.last_pong) > HB_PING_TIME + HB_PONG_TIME def new_socket() -> socket.socket: """ Create a new socket with OS-specific parameters Try to set SO_REUSEPORT for BSD-flavored systems if it's an option. Catches errors if not. """ new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) new_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: # noinspection PyUnresolvedReferences reuseport = socket.SO_REUSEPORT except AttributeError: pass else: try: new_sock.setsockopt(socket.SOL_SOCKET, reuseport, 1) except (OSError, socket.error) as err: # OSError on python 3, socket.error on python 2 if err.errno != errno.ENOPROTOOPT: raise return new_sock 07070100000051000081A400000000000000000000000165F9EF900000016E000000000000000000000000000000000000001D00000000pychromecast-14.0.1/pylintrc[MAIN] ignore=cast_channel_pb2.py,authority_keys_pb2.py,logging_pb2.py reports=no disable= format, locally-disabled, too-few-public-methods, too-many-arguments, too-many-instance-attributes, too-many-public-methods, duplicate-code, too-many-nested-blocks, [EXCEPTIONS] overgeneral-exceptions=builtins.Exception,pychromecast.error.PyChromecastError 07070100000052000081A400000000000000000000000165F9EF90000004E5000000000000000000000000000000000000002300000000pychromecast-14.0.1/pyproject.toml[build-system] requires = ["setuptools~=65.6", "wheel~=0.37.1"] build-backend = "setuptools.build_meta" [project] name = "PyChromecast" version = "14.0.1" description = "Python module to talk to Google Chromecast." readme = "README.rst" authors = [ {name = "Paulus Schoutsen", email = "paulus@paulusschoutsen.nl"} ] requires-python = ">=3.11.0" dependencies = [ "protobuf>=4.25.1", "zeroconf>=0.25.1", "casttube>=0.2.0", ] classifiers=[ "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", "Topic :: Software Development :: Libraries :: Python Modules", ] [project.urls] "Homepage" = "https://github.com/home-assistant-libs/pychromecast" [tool.black] exclude = 'pb2' [tool.rstcheck] # README.rst has embedded python examples which show the interactive interpreter output # that's not valid Python, so we disable the check ignore_languages = ["python"] [tool.setuptools] platforms = ["any"] zip-safe = false include-package-data = true [tool.setuptools.packages.find] include = ["pychromecast*"] [tool.setuptools.package-data] "*" = ["py.typed"] 07070100000053000081A400000000000000000000000165F9EF9000000105000000000000000000000000000000000000002A00000000pychromecast-14.0.1/requirements-test.txtbeautifulsoup4==4.12.3 black==24.3.0 flake8==7.0.0 mypy==1.9.0 PlexAPI==4.15.10 pylint==3.1.0 rstcheck==6.2.0 types-beautifulsoup4==4.12.0.20240229 types-html5lib==1.1.11.20240228 types-protobuf==4.24.0.20240311 types-requests==2.31.0.20240311 yle-dl==20240130 07070100000054000081A400000000000000000000000165F9EF9000000033000000000000000000000000000000000000002500000000pychromecast-14.0.1/requirements.txtcasttube==0.2.1 protobuf==4.25.2 zeroconf==0.131.0 07070100000055000041ED00000000000000000000000265F9EF9000000000000000000000000000000000000000000000001B00000000pychromecast-14.0.1/script07070100000056000081ED00000000000000000000000165F9EF9000000089000000000000000000000000000000000000002300000000pychromecast-14.0.1/script/release#!/bin/sh # Pushes a new version to PyPi. rm -rf dist python3 setup.py sdist bdist_wheel python3 -m twine upload dist/* --skip-existing 07070100000057000081A400000000000000000000000165F9EF900000015D000000000000000000000000000000000000001E00000000pychromecast-14.0.1/setup.cfg[wheel] universal = 1 [flake8] # To work with Black max-line-length = 88 # E501: line too long # W503: Line break occurred before a binary operator # E203: Whitespace before ':' # D202 No blank lines allowed after function docstring # E701 / E704: multiple statements on one line ignore = E501, W503, E203, D202, E701, E704 07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!677 blocks
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor