Source code for coherence.upnp.devices.control_point

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2006-2010, Frank Scholz <dev@coherence-project.org>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
Control Point
=============

:class:`DeviceQuery`
--------------------

A convenient class that allow us to create request queries to control point.

:class:`ControlPoint`
---------------------

Takes care of managing the different devices detected by our instance
of class :class:`~coherence.base.Coherence`.

:class:`XMLRPC`
---------------

A resource that implements XML-RPC.

.. note::
    XML-RPC is a remote procedure call (RPC) protocol which uses XML to encode
    its calls and HTTP as a transport mechanism. "XML-RPC" also refers
    generically to the use of XML for remote procedure call, independently
    of the specific protocol.

.. seealso::
    XML-RPC information extracted from: https://en.wikipedia.org/wiki/XML-RPC
'''

import traceback

from twisted.internet import reactor
from twisted.web import xmlrpc, client

from eventdispatcher import EventDispatcher, Property, ListProperty

from coherence import log
from coherence.upnp.core import service
from coherence.upnp.core.event import EventServer
from coherence.upnp.devices.internet_gateway_device_client import \
    InternetGatewayDeviceClient
from coherence.upnp.devices.media_renderer_client import MediaRendererClient
from coherence.upnp.devices.media_server_client import MediaServerClient


[docs]class DeviceQuery(EventDispatcher): ''' .. versionchanged:: 0.9.0 * Introduced inheritance from EventDispatcher * Changed class variable :attr:`fired` to benefit from the EventDispatcher's properties ''' fired = Property(False) def __init__(self, type, pattern, callback, timeout=0, oneshot=True): EventDispatcher.__init__(self) self.type = type self.pattern = pattern self.callback = callback self.timeout = timeout self.oneshot = oneshot if self.type == 'uuid' and self.pattern.startswith('uuid:'): self.pattern = self.pattern[5:] if isinstance(self.callback, str): # print(f'DeviceQuery: register event {self.callback}') self.register_event(self.callback)
[docs] def fire(self, device): if callable(self.callback): self.callback(device) elif isinstance(self.callback, str): self.dispatch_event(self.callback, device=device) self.fired = True
[docs] def check(self, device): if self.fired and self.oneshot: return if (self.type == 'host' and device.host == self.pattern): self.fire(device) elif (self.type == 'friendly_name' and device.friendly_name == self.pattern): self.fire(device) elif (self.type == 'uuid' and device.get_uuid() == self.pattern): self.fire(device)
[docs]class ControlPoint(EventDispatcher, log.LogAble): ''' .. versionchanged:: 0.9.0 * Introduced inheritance from EventDispatcher, emitted events changed: - Coherence.UPnP.ControlPoint.{client.device_type}.detected => control_point_client_detected' - Coherence.UPnP.ControlPoint.{client.device_type}.removed => control_point_client_removed * Changed class variable :attr:`queries` to benefit from the EventDispatcher's properties .. warning:: Be aware that some events are removed, with the new dispatcher we remove the detection for specific device type in flavour of a global detection. ''' logCategory = 'controlpoint' queries = ListProperty([]) def __init__(self, coherence, auto_client=None): log.LogAble.__init__(self) EventDispatcher.__init__(self) self.register_event( 'control_point_client_detected', 'control_point_client_removed', ) if not auto_client: auto_client = ['MediaServer', 'MediaRenderer'] self.coherence = coherence self.auto_client = auto_client self.coherence.bind( coherence_device_detection_completed=self.check_device, coherence_device_removed=self.remove_client) self.info('Coherence UPnP ControlPoint starting...') self.event_server = EventServer(self) self.coherence.add_web_resource('RPC2', XMLRPC(self)) for device in self.get_devices(): self.info(f'ControlPoint [check device]: {device}') self.check_device(device)
[docs] def shutdown(self): for device in self.get_devices(): self.coherence.unbind( coherence_device_detection_completed=self.check_device, coherence_device_removed=self.remove_client ) if device.client is not None: device.client.unbind( detection_completed=self.completed)
[docs] def auto_client_append(self, device_type): if device_type in self.auto_client: return self.auto_client.append(device_type) for device in self.get_devices(): self.check_device(device)
[docs] def browse(self, device): self.info(f'ControlPoint.browse: {device}') device = self.coherence.get_device_with_usn(device.get_usn()) if not device: return self.check_device(device)
[docs] def process_queries(self, device): for query in self.queries: query.check(device)
[docs] def add_query(self, query): for device in self.get_devices(): query.check(device) if not query.fired and query.timeout == 0: query.callback(None) else: self.queries.append(query)
[docs] @staticmethod def check_louie(receiver, signal, method='connect'): ''' Check if the connect or disconnect method's arguments are valid in order to automatically convert to EventDispatcher's bind method. The old valid signals are: - Coherence.UPnP.ControlPoint.MediaServer.detected - Coherence.UPnP.ControlPoint.MediaServer.removed - Coherence.UPnP.ControlPoint.MediaRenderer.detected - Coherence.UPnP.ControlPoint.MediaRenderer.removed - Coherence.UPnP.ControlPoint.InternetGatewayDevice.detected - Coherence.UPnP.ControlPoint.InternetGatewayDevice.removed .. versionadded:: 0.9.0 ''' if not callable(receiver): raise Exception('The receiver should be callable in order to use' ' the method {method}') if not signal: raise Exception( f'We need a signal in order to use method {method}') if not signal.startswith('Coherence.UPnP.ControlPoint.'): raise Exception('We need a signal an old signal starting with: ' '"Coherence.UPnP.ControlPoint."')
[docs] def connect(self, receiver, signal=None, sender=None, weak=True): ''' Wrapper method around the deprecated method louie.connect. It will check if the passed signal is supported by executing the method :meth:`check_louie`. .. warning:: This will probably be removed at some point, if you use the connect method you should consider to migrate to the new event system EventDispatcher. .. versionchanged:: 0.9.0 Added EventDispatcher's compatibility for some basic signals ''' self.check_louie(receiver, signal, 'connect') if signal.endswith('.detected'): self.coherence.bind( coherence_device_detection_completed=receiver) elif signal.endswith('.removed'): self.bind( control_point_client_removed=receiver) else: raise Exception( f'Unknown signal {signal}, we cannot bind that signal.')
[docs] def disconnect(self, receiver, signal=None, sender=None, weak=True): ''' Wrapper method around the deprecated method louie.disconnect. It will check if the passed signal is supported by executing the method :meth:`check_louie`. .. warning:: This will probably be removed at some point, if you use the disconnect method you should migrate to the new event system EventDispatcher. .. versionchanged:: 0.9.0 Added EventDispatcher's compatibility for some basic signals ''' self.check_louie(receiver, signal, 'disconnect') if signal.endswith('.detected'): self.coherence.unbind( coherence_device_detection_completed=receiver) elif signal.endswith('.removed'): self.unbind( control_point_client_removed=receiver) else: raise Exception( f'Unknown signal {signal}, we cannot unbind that signal.')
[docs] def get_devices(self): return self.coherence.get_devices()
[docs] def get_device_with_id(self, id): return self.coherence.get_device_with_id(id)
[docs] def get_device_by_host(self, host): return self.coherence.get_device_by_host(host)
[docs] def check_device(self, device): if device.client is None: self.info(f'found device {device.get_friendly_name()} of type ' f'{device.get_device_type()} - {device.client}') short_type = device.get_friendly_device_type() if short_type in self.auto_client and short_type is not None: self.info( f'identified {short_type} {device.get_friendly_name()}') if short_type == 'MediaServer': client = MediaServerClient(device) if short_type == 'MediaRenderer': client = MediaRendererClient(device) if short_type == 'InternetGatewayDevice': client = InternetGatewayDeviceClient(device) client.bind(detection_completed=self.completed) client.coherence = self.coherence device.set_client(client) if device.client.detection_completed: self.completed(device.client) self.process_queries(device)
[docs] def completed(self, client, *args): self.info(f'sending signal Coherence.UPnP.ControlPoint.' f'{client.device_type}.detected {client.device.udn}') self.dispatch_event( 'control_point_client_detected', client=client, udn=client.device.udn)
[docs] def remove_client(self, udn, client): self.dispatch_event( 'control_point_client_removed', udn=udn) self.info(f'removed {client.device_type} ' f'{client.device.get_friendly_name()}') client.remove()
[docs] def propagate(self, event): self.info(f'propagate: {event}') if event.get_sid() in service.subscribers: try: service.subscribers[event.get_sid()].process_event(event) except Exception as msg: self.debug(msg) self.debug(traceback.format_exc()) pass
[docs] def put_resource(self, url, path): def got_result(result): print(result) def got_error(result): print('error', result) try: f = open(path) data = f.read() f.close() headers = { b'Content-Type': b'application/octet-stream', b'Content-Length': bytes(str(len(data)), encoding='utf-8') } df = client.getPage( url, method=b'POST', headers=headers, postdata=data) df.addCallback(got_result) df.addErrback(got_error) return df except IOError: pass
[docs]class XMLRPC(xmlrpc.XMLRPC): def __init__(self, control_point): xmlrpc.XMLRPC.__init__(self) self.control_point = control_point self.allowNone = True
[docs] def xmlrpc_list_devices(self): print('list_devices') r = [] for device in self.control_point.get_devices(): # print(device.get_friendly_name(), device.get_service_type(), # device.get_location(), device.get_id()) d = {'friendly_name': device.get_friendly_name(), 'device_type': device.get_device_type(), 'location': str(device.get_location()), 'id': str(device.get_id())} r.append(d) return r
[docs] def xmlrpc_mute_device(self, device_id): print('mute') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.rendering_control.set_mute(desired_mute=1) return 'Ok' return 'Error'
[docs] def xmlrpc_unmute_device(self, device_id): print('unmute', device_id) device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.rendering_control.set_mute(desired_mute=0) return 'Ok' return 'Error'
[docs] def xmlrpc_set_volume(self, device_id, volume): print('set volume') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.rendering_control.set_volume(desired_volume=volume) return 'Ok' return 'Error'
[docs] def xmlrpc_play(self, device_id): print('play') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.av_transport.play() return 'Ok' return 'Error'
[docs] def xmlrpc_pause(self, device_id): print('pause') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.av_transport.pause() return 'Ok' return 'Error'
[docs] def xmlrpc_stop(self, device_id): print('stop') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.av_transport.stop() return 'Ok' return 'Error'
[docs] def xmlrpc_next(self, device_id): print('next') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() next(client.av_transport) return 'Ok' return 'Error'
[docs] def xmlrpc_previous(self, device_id): print('previous') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.av_transport.previous() return 'Ok' return 'Error'
[docs] def xmlrpc_set_av_transport_uri(self, device_id, uri): print('set_av_transport_uri') device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.av_transport.set_av_transport_uri(current_uri=uri) return 'Ok' return 'Error'
[docs] def xmlrpc_create_object(self, device_id, container_id, arguments): print('create_object', arguments) device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.content_directory.create_object(container_id, arguments) return 'Ok' return 'Error'
[docs] def xmlrpc_import_resource(self, device_id, source_uri, destination_uri): print('import_resource', source_uri, destination_uri) device = self.control_point.get_device_with_id(device_id) if device is not None: client = device.get_client() client.content_directory.import_resource(source_uri, destination_uri) return 'Ok' return 'Error'
[docs] def xmlrpc_put_resource(self, url, path): print('put_resource', url, path) self.control_point.put_resource(url, path) return 'Ok'
[docs] def xmlrpc_ping(self): print('ping') return 'Ok'
[docs]def startXMLRPC(control_point, port): from twisted.web import server r = XMLRPC(control_point) print(f'XMLRPC-API on port {port:d} ready') reactor.listenTCP(port, server.Site(r))
if __name__ == '__main__': config = {} config['logmode'] = 'warning' config['serverport'] = 30020 from coherence.base import Coherence ctrl = ControlPoint(Coherence(config), auto_client=[]) def show_devices(): print('show_devices') for d in ctrl.get_devices(): print(d, d.get_id()) def the_result(r): print('result', r, r.get_id()) def query_devices(): print('query_devices') ctrl.add_query(DeviceQuery('host', '192.168.0.1', the_result)) def query_devices2(): print('query_devices with timeout') ctrl.add_query( DeviceQuery('host', '192.168.0.1', the_result, timeout=10, oneshot=False)) def stop_reactor(*args): reactor.stop() print('Stoped reactor successfully') reactor.callLater(2, show_devices) reactor.callLater(3, query_devices) reactor.callLater(4, query_devices2) reactor.callLater(5, ctrl.add_query, DeviceQuery( 'friendly_name', 'Coherence Test Content', the_result, timeout=10, oneshot=False)) reactor.callLater(6, stop_reactor) reactor.run()