Source code for coherence.base

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2006,2007,2008 Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
Base
====

The core of the project. Holds the class :class:`Coherence` intended to be used
to manage all the resources of the project. Also contains some other classes
which are vital to the project.

:class:`SimpleRoot`
-------------------

A web resource representing a web site. Used to build the contents browser for
our instance of a :class:`WebServer` or :class:`WebServerUi`.

:class:`WebServer`
------------------

A class which takes care of dealing with the web representation of the running
:class:`Coherence`'s instance. This is the default webserver used.

:class:`WebServerUi`
--------------------

The default web server, :class:`WebServer`, can be replaced by this class which
will do the same thing as the default web server, but with a more polished
interface.

:class:`Plugins`
----------------

Manage all the available plugins for the Cohen3 project.

:class:`Coherence`
------------------

The Main class of the Cohen3 project. The Coherence class controls all the
servers initialization depending on the configuration passed.

'''

import copy
import logging
import os
import socket
import traceback

from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import endpoints
from twisted.internet.tcp import CannotListenError
from twisted.web import resource, static
from twisted.python.util import sibpath

from eventdispatcher import (
    EventDispatcher, ListProperty, DictProperty, Property)

from coherence import __version__
from coherence import log
from coherence.upnp.core.device import Device, RootDevice
from coherence.upnp.core.msearch import MSearch
from coherence.upnp.core.ssdp import SSDPServer
from coherence.upnp.core.utils import to_string
from coherence.upnp.core.utils import Site
from coherence.upnp.core.utils import get_ip_address, get_host_address
from coherence.upnp.devices.control_point import ControlPoint
from coherence.upnp.devices.media_renderer import MediaRenderer
from coherence.upnp.devices.media_server import MediaServer

__import_devices__ = ControlPoint, MediaServer, MediaRenderer

try:
    import pkg_resources
except ImportError:
    pkg_resources = None


[docs]class SimpleRoot(resource.Resource, log.LogAble): addSlash = True logCategory = 'coherence' def __init__(self, coherence): resource.Resource.__init__(self) log.LogAble.__init__(self) self.coherence = coherence self.putChild(b'styles', static.File(sibpath(__file__, 'web/static/styles'), defaultType='text/css')) self.putChild(b'server-images', static.File(sibpath(__file__, 'web/static/images'), defaultType='text/css'))
[docs] def getChild(self, name, request): self.debug(f'SimpleRoot getChild {name}, {request}') name = to_string(name) if name == 'oob': ''' we have an out-of-band request ''' return static.File( self.coherence.dbus.pinboard[request.args['key'][0]]) if name in ['', None]: return self # at this stage, name should be a device UUID try: return self.coherence.children[name] except KeyError: self.warning(f'Cannot find device for requested name: {name}') request.setResponseCode(404) return \ static.Data( f'<html><p>No device for requested UUID: ' f'{name.encode("ascii")}</p></html>'.encode('ascii'), 'text/html')
[docs] def listchilds(self, uri): uri = to_string(uri) self.info(f'listchilds {uri}') if uri[-1] != '/': uri += '/' cl = [] for child in self.coherence.children: device = self.coherence.get_device_with_id(child) if device is not None: cl.append( f'<li><a href={uri}{child}>' f'{device.get_friendly_device_type()}:' f'{device.get_device_type_version()} ' f'{device.get_friendly_name()}' f'</a></li>') # We put in a blacklist the styles and server-images folders, # in order to avoid to appear into the generated html list blacklist = ['styles', 'server-images'] for c in self.children: c = to_string(c) if c in blacklist: continue cl.append(f'<li><a href={uri}{c}>{c}</a></li>') return ''.join(cl)
[docs] def render(self, request): html = f'''\ <html> <head profile="http://www.w3.org/2005/10/profile"> <title>Cohen3 (SimpleRoot)</title> <link rel="stylesheet" type="text/css" href="/styles/main.css"/> <link rel="icon" type="image/png" href="/server-images/coherence-icon.ico"/> </head> <body> <div class="text-center column col-100 bottom-0"> <h5>Dlna/UPnP framework</h5> <img id="logo-image" src="/server-images/coherence-icon.svg"/> <h5>For the Digital Living</h5> </div> <div class="column col-100"> <h6 class="title-head-lines"> <img class="logo-icon" src="/server-images/coherence-icon.svg"></img> Hosting: </h6> <div class="list"> <ul>{self.listchilds(request.uri)}</ul> </div> </div> </body></html>''' return html.encode('ascii')
[docs]class WebServer(log.LogAble): logCategory = 'webserver' def __init__(self, ui, port, coherence): log.LogAble.__init__(self) self.site = Site(SimpleRoot(coherence)) self.endpoint = endpoints.TCP4ServerEndpoint(reactor, port) self._endpoint_listen(coherence, port)
[docs] def _endpoint_listen(self, coherence, port): self.endpoint_listen = self.endpoint.listen(self.site) def set_listen_port(p): self.endpoint_port = p coherence.web_server_port = port self.warning( f'WebServer on ip ' f'http://{coherence.hostname}:{coherence.web_server_port}' f' ready') def clear(whatever): self.endpoint_listen = None return whatever self.endpoint_listen.addCallback(set_listen_port).addBoth(clear)
[docs]class WebServerUi(WebServer): logCategory = 'webserverui' def __init__(self, port, coherence, unittests=False): log.LogAble.__init__(self) self.coherence = coherence from coherence.web.ui import Web, IWeb, WebUI from twisted.web import server, resource from twisted.python.components import registerAdapter def resource_factory(original): return WebUI(IWeb, original) registerAdapter(resource_factory, Web, resource.IResource) self.web_root_resource = WebUI(coherence) if not unittests: site_cls = server.Site else: from tests.web_utils import DummySite site_cls = DummySite self.site = site_cls(self.web_root_resource) self.endpoint = endpoints.TCP4ServerEndpoint(reactor, port) self._endpoint_listen(coherence, port) self.ws_endpoint = endpoints.TCP4ServerEndpoint(reactor, 9000) self._ws_endpoint_listen(coherence)
[docs] def _endpoint_listen(self, coherence, port): self.endpoint_listen = self.endpoint.listen(self.site) def set_listen_port(p): self.endpoint_port = p coherence.web_server_port = port self.warning( f'WebServerUi on ip ' f'http://{coherence.hostname}:{coherence.web_server_port}' f' ready') def clear(whatever): self.endpoint_listen = None return whatever self.endpoint_listen.addCallback(set_listen_port).addBoth(clear)
[docs] def _ws_endpoint_listen(self, coherence): self.ws_endpoint_listen = self.ws_endpoint.listen( self.web_root_resource.factory) def set_ws_listen_port(p): self.ws_endpoint_port = p def clear_ws(whatever): self.ws_endpoint_listen = None return whatever self.ws_endpoint_listen.addCallback( set_ws_listen_port).addBoth(clear_ws)
[docs]class Plugins(log.LogAble): logCategory = 'plugins' __instance = None # Singleton __initialized = False _valids = ('coherence.plugins.backend.media_server', 'coherence.plugins.backend.media_renderer', 'coherence.plugins.backend.binary_light', 'coherence.plugins.backend.dimmable_light') _plugins = {} def __new__(cls, *args, **kwargs): if cls.__instance is None: cls.__instance = super(Plugins, cls).__new__(cls) cls.__instance.__initialized = False cls.__instance.__cls = cls return cls.__instance def __init__(self, ids=_valids): # initialize only once if self.__initialized: return self.__initialized = True log.LogAble.__init__(self) if not isinstance(ids, (list, tuple)): ids = (ids,) if pkg_resources: for group in ids: for entrypoint in pkg_resources.iter_entry_points(group): # set a placeholder for lazy loading self._plugins[entrypoint.name] = entrypoint else: self.info('no pkg_resources, fallback to simple plugin handling') if len(self._plugins) == 0: self._collect_from_module() def __repr__(self): return str(self._plugins) def __getitem__(self, key): plugin = self._plugins.__getitem__(key) if pkg_resources and isinstance(plugin, pkg_resources.EntryPoint): try: plugin = plugin.load(require=False) except (ImportError, AttributeError, pkg_resources.ResolutionError) as msg: self.warning(f'Can\'t load plugin {plugin.name} ({msg}), ' f'maybe missing dependencies...') self.info(traceback.format_exc()) del self._plugins[key] raise KeyError else: self._plugins[key] = plugin return plugin
[docs] def get(self, key, default=None): try: return self.__getitem__(key) except KeyError: return default
def __setitem__(self, key, value): self._plugins.__setitem__(key, value)
[docs] def set(self, key, value): return self.__setitem__(key, value)
[docs] def keys(self): return list(self._plugins.keys())
[docs] def _collect_from_module(self): from coherence.extern.simple_plugin import Reception reception = Reception( os.path.join(os.path.dirname(__file__), 'backends'), log=self.warning) self.info(reception.guestlist()) for cls in reception.guestlist(): self._plugins[cls.__name__.split('.')[-1]] = cls
[docs]class Coherence(EventDispatcher, log.LogAble): ''' The Main class of the Cohen3 project. The Coherence class controls all the servers initialization depending on the configuration passed. It is also capable of initialize the plugins defined in config variable or by configuration file. It supports the creation of multiple servers at once. **Example of a simple server via plugin AppleTrailersStore**:: from coherence.base import Coherence from coherence.upnp.core.uuid import UUID from twisted.internet import reactor new_uuid = UUID() coherence = Coherence( {'logmode': 'info', 'controlpoint': 'yes', 'plugin': [{'backend': 'AppleTrailersStore', 'name': 'Cohen3 Example FSStore', 'uuid': new_uuid, } ] } ) reactor.run() .. versionchanged:: 0.9.0 * Introduced inheritance from EventDispatcher * The emitted events changed: - Coherence.UPnP.Device.detection_completed => coherence_device_detection_completed - Coherence.UPnP.Device.removed => coherence_device_removed - Coherence.UPnP.RootDevice.removed => coherence_root_device_removed * Changed some variables to benefit from the EventDispatcher's properties: - :attr:`devices` - :attr:`children` - :attr:`_callbacks` - :attr:`active_backends` - :attr:`ctrl` - :attr:`dbus` - :attr:`json` - :attr:`msearch` - :attr:`ssdp_server` - :attr:`transcoder_manager` - :attr:`web_server` ''' __instance = None # Singleton __initialized = False __incarnations = 0 __cls = None logCategory = 'coherence' devices = ListProperty([]) '''A list of the added devices.''' children = DictProperty({}) '''A dict containing the web resources.''' _callbacks = DictProperty({}) '''A dict containing the callbacks, used by the methods :meth:`subscribe` and :meth:`unsubscribe`.''' active_backends = DictProperty({}) '''A dict containing the active backends.''' # Services/Devices ctrl = Property(None) '''A coherence's instance of class :class:`~coherence.upnp.devices.control_point.ControlPoint`. This will be enabled if we request it by config dict or configuration file via keyword *controlpoint = yes*.''' dbus = Property(None) '''A coherence's instance of class :class:`~coherence.dbus_service.DBusPontoon`. This will be enabled if we request it by config dict or configuration file via keyword *use_dbus = yes*.''' json = Property(None) '''A coherence's instance of class :class:`~coherence.json_service.JsonInterface`. This will be enabled if we request it by config dict or configuration file via keyword *json = yes*.''' msearch = Property(None) '''A coherence's instance of class :class:`~coherence.upnp.core.msearch.MSearch`. This is automatically enabled when :class:`Coherence` is initialized''' ssdp_server = Property(None) '''A coherence's instance of class :class:`~coherence.upnp.core.ssdp.SSDPServer`. This is automatically enabled when :class:`Coherence` is initialized''' transcoder_manager = Property(None) '''A coherence's instance of class :class:`~coherence.transcoder.TranscoderManager`. This will be enabled if we request itby config dict or configuration file via keyword *transcoding = yes*.''' web_server = Property(None) '''A coherence's instance of class :class:`WebServer` or :class:`WebServerUi`. We can request our preference by config dict or configuration file. If we use the keyword *web-ui = yes*, then the class :class:`WebServerUi` will be used, otherwise, the enabled web server will be of class :class:`WebServer`.''' def __new__(cls, *args, **kwargs): if cls.__instance is None: cls.__instance = super(Coherence, cls).__new__(cls) cls.__instance.__initialized = False cls.__instance.__incarnations = 0 cls.__instance.__cls = cls cls.__instance.config = kwargs.get('config', {}) cls.__instance.__incarnations += 1 return cls.__instance def __init__(self, config=None): # initialize only once if self.__initialized: return self.__initialized = True # supers log.LogAble.__init__(self) EventDispatcher.__init__(self) self.register_event( 'coherence_device_detection_completed', 'coherence_device_removed', 'coherence_root_device_removed', ) self.config = config or {} self.available_plugins = None self.external_address = None self.urlbase = None self.web_server_port = int(config.get('serverport', 8080)) # initializes log's system, a COHEN_DEBUG environment # variable overwrites all level settings here. try: logmode = config.get('logging').get('level', 'warning') except (KeyError, AttributeError): logmode = config.get('logmode', 'warning') try: subsystems = config.get('logging')['subsystem'] if isinstance(subsystems, dict): subsystems = [subsystems] for subsystem in subsystems: try: if subsystem['active'] == 'no': continue except (KeyError, TypeError): pass self.info(f'setting log-level for subsystem ' f'{subsystem["name"]} to {subsystem["level"]}') logging.getLogger(subsystem['name'].lower()).setLevel( subsystem['level'].upper()) except (KeyError, TypeError): subsystem_log = config.get('subsystem_log', {}) for subsystem, level in list(subsystem_log.items()): logging.getLogger(subsystem.lower()).setLevel(level.upper()) try: logfile = config.get('logging').get('logfile', None) if logfile is not None: logfile = str(logfile) except (KeyError, AttributeError, TypeError): logfile = config.get('logfile', None) log.init(logfile, logmode.upper()) self.warning(f'Coherence UPnP framework version {__version__} ' f'starting [log level: {logmode}]...') network_if = config.get('interface') if network_if: self.hostname = get_ip_address(f'{network_if}') else: try: self.hostname = socket.gethostbyname(socket.gethostname()) except socket.gaierror: self.warning('hostname can\'t be resolved, ' 'maybe a system misconfiguration?') self.hostname = '127.0.0.1' if self.hostname.startswith('127.'): # use interface detection via routing table as last resort def catch_result(hostname): self.hostname = hostname self.setup_part2() d = defer.maybeDeferred(get_host_address) d.addCallback(catch_result) else: self.setup_part2()
[docs] def clear(self): '''We do need this to survive multiple calls to Coherence during trial tests''' self.unbind_all() self.__cls.__instance = None
[docs] def setup_part2(self): '''Initializes the basic and optional services/devices and the enabled plugins (backends).''' self.info(f'running on host: {self.hostname}') if self.hostname.startswith('127.'): self.warning(f'detection of own ip failed, using {self.hostname} ' f'as own address, functionality will be limited') unittest = self.config.get('unittest', 'no') unittest = False if unittest == 'no' else True try: # TODO: add ip/interface bind self.ssdp_server = SSDPServer(test=unittest) except CannotListenError as err: self.error(f'Error starting the SSDP-server: {err}') self.debug('Error starting the SSDP-server', exc_info=True) reactor.stop() return # maybe some devices are already notified, so we enforce # to create the device, if it is not already added...and # then we connect the signals for new detections. for st, usn in self.ssdp_server.root_devices: self.create_device(st, usn) self.ssdp_server.bind(new_device=self.create_device) self.ssdp_server.bind(removed_device=self.remove_device) self.ssdp_server.subscribe('new_device', self.add_device) self.ssdp_server.subscribe('removed_device', self.remove_device) self.msearch = MSearch(self.ssdp_server, test=unittest) reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown, force=True) # Web Server Initialization try: # TODO: add ip/interface bind if self.config.get('web-ui', 'no') != 'yes': self.web_server = WebServer( None, self.web_server_port, self) else: self.web_server = WebServerUi( self.web_server_port, self, unittests=unittest) except CannotListenError: self.error( f'port {self.web_server_port} already in use, aborting!') reactor.stop() return self.urlbase = f'http://{self.hostname}:{self.web_server_port:d}/' # self.renew_service_subscription_loop = \ # task.LoopingCall(self.check_devices) # self.renew_service_subscription_loop.start(20.0, now=False) # Plugins Initialization try: plugins = self.config['plugin'] if isinstance(plugins, dict): plugins = [plugins] except Exception: plugins = None if plugins is None: plugins = self.config.get('plugins', None) if plugins is None: self.info('No plugin defined!') else: if isinstance(plugins, dict): for plugin, arguments in list(plugins.items()): try: if not isinstance(arguments, dict): arguments = {} self.add_plugin(plugin, **arguments) except Exception as msg: self.warning(f'Can\'t enable plugin, {plugin}: {msg}!') self.info(traceback.format_exc()) else: for plugin in plugins: try: if plugin['active'] == 'no': continue except (KeyError, TypeError): pass try: backend = plugin['backend'] arguments = copy.copy(plugin) del arguments['backend'] backend = self.add_plugin(backend, **arguments) if self.writeable_config(): if 'uuid' not in plugin: plugin['uuid'] = str(backend.uuid)[5:] self.config.save() except Exception as msg: self.warning(f'Can\'t enable plugin, {plugin}: {msg}!') self.info(traceback.format_exc()) self.external_address = ':'.join( (self.hostname, str(self.web_server_port))) # Control Point Initialization if self.config.get('controlpoint', 'no') == 'yes' or self.config.get( 'json', 'no') == 'yes': self.ctrl = ControlPoint(self) # Json Interface Initialization if self.config.get('json', 'no') == 'yes': from coherence.json_service import JsonInterface self.json = JsonInterface(self.ctrl) # Transcoder Initialization if self.config.get('transcoding', 'no') == 'yes': from coherence.transcoder import TranscoderManager self.transcoder_manager = TranscoderManager(self) # DBus Initialization if self.config.get('use_dbus', 'no') == 'yes': try: from coherence import dbus_service if self.ctrl is None: self.ctrl = ControlPoint(self) self.ctrl.auto_client_append('InternetGatewayDevice') self.dbus = dbus_service.DBusPontoon(self.ctrl) except Exception as msg: self.warning(f'Unable to activate dbus sub-system: {msg}') self.debug(traceback.format_exc())
[docs] def add_plugin(self, plugin, **kwargs): self.info(f'adding plugin {plugin}') self.available_plugins = Plugins() # TODO clean up this exception concept try: plugin_class = self.available_plugins.get(plugin, None) if plugin_class is None: raise KeyError for device in plugin_class.implements: try: device_class = globals().get(device, None) if device_class is None: raise KeyError self.info(f'Activating {plugin} plugin as {device}...') new_backend = device_class(self, plugin_class, **kwargs) self.active_backends[str(new_backend.uuid)] = new_backend return new_backend except KeyError: self.warning(f'Can\'t enable {plugin} plugin, ' f'sub-system {device} not found!') except Exception as e1: self.exception(f'Can\'t enable {plugin} plugin for ' f'sub-system {device} [exception: {e1}]') self.debug(traceback.format_exc()) except KeyError: self.warning(f'Can\'t enable {plugin} plugin, not found!') except Exception as e2: self.warning(f'Can\'t enable {plugin} plugin, {e2}!') self.debug(traceback.format_exc())
[docs] def remove_plugin(self, plugin): '''Removes a backend from Coherence Args: plugin (object): is the object return by add_plugin or an UUID string. ''' if isinstance(plugin, str): try: plugin = self.active_backends[plugin] except KeyError: self.warning(f'no backend with the uuid {plugin} found') return '' try: del self.active_backends[str(plugin.uuid)] self.info(f'removing plugin {plugin}') plugin.unregister() return plugin.uuid except KeyError: self.warning(f'no backend with the uuid {plugin.uuid} found') return ''
[docs] @staticmethod def writeable_config(): '''Do we have a new-style config file''' return False
[docs] def store_plugin_config(self, uuid, items): '''Find the backend with uuid and store in its the config the key and value pair(s).''' plugins = self.config.get('plugin') if plugins is None: self.warning('storing a plugin config option is only possible' ' with the new config file format') return if isinstance(plugins, dict): plugins = [plugins] uuid = str(uuid) if uuid.startswith('uuid:'): uuid = uuid[5:] for plugin in plugins: try: if plugin['uuid'] == uuid: for k, v in list(items.items()): plugin[k] = v self.config.save() except Exception as e: self.warning(f'Coherence.store_plugin_config: {e}') else: self.info(f'storing plugin config option ' f'for {uuid} failed, plugin not found')
[docs] def receiver(self, signal, *args, **kwargs): pass
[docs] def shutdown(self, force=False): if self.__incarnations > 1 and not force: self.__incarnations -= 1 return if self.dbus: self.dbus.shutdown() self.dbus = None for backend in self.active_backends.values(): backend.unregister() self.active_backends = {} # send service unsubscribe messages if self.web_server is not None: if hasattr(self.web_server, 'endpoint_listen'): if self.web_server.endpoint_listen is not None: self.web_server.endpoint_listen.cancel() self.web_server.endpoint_listen = None if self.web_server.endpoint_port is not None: self.web_server.endpoint_port.stopListening() if hasattr(self.web_server, 'ws_endpoint_listen'): if self.web_server.ws_endpoint_listen is not None: self.web_server.ws_endpoint_listen.cancel() self.web_server.ws_endpoint_listen = None if self.web_server.ws_endpoint_port is not None: self.web_server.ws_endpoint_port.stopListening() try: if hasattr(self.msearch, 'double_discover_loop'): self.msearch.double_discover_loop.stop() if hasattr(self.msearch, 'port'): self.msearch.port.stopListening() if hasattr(self.ssdp_server, 'resend_notify_loop'): self.ssdp_server.resend_notify_loop.stop() if hasattr(self.ssdp_server, 'port'): self.ssdp_server.port.stopListening() # self.renew_service_subscription_loop.stop() except Exception: pass dev_l = [] for root_device in self.get_devices(): if hasattr(root_device, 'root_device_detection_completed'): root_device.unbind( root_device_detection_completed=self.add_device) for device in root_device.get_devices(): dd = device.unsubscribe_service_subscriptions() dd.addCallback(device.remove) dev_l.append(dd) rd = root_device.unsubscribe_service_subscriptions() rd.addCallback(root_device.remove) dev_l.append(rd) def homecleanup(result): # cleans up anything left over self.ssdp_server.unbind(new_device=self.create_device) self.ssdp_server.unbind(removed_device=self.remove_device) self.ssdp_server.shutdown() if self.ctrl: self.ctrl.shutdown() self.warning('Coherence UPnP framework shutdown') return result dl = defer.DeferredList(dev_l) dl.addCallback(homecleanup) return dl
[docs] def check_devices(self): '''Iterate over devices and their embedded ones and renew subscriptions.''' for root_device in self.get_devices(): root_device.renew_service_subscriptions() for device in root_device.get_devices(): device.renew_service_subscriptions()
[docs] def subscribe(self, name, callback): self._callbacks.setdefault(name, []).append(callback)
[docs] def unsubscribe(self, name, callback): callbacks = self._callbacks.get(name, []) if callback in callbacks: callbacks.remove(callback) self._callbacks[name] = callbacks
[docs] def callback(self, name, *args): for callback in self._callbacks.get(name, []): callback(*args)
[docs] def get_device_by_host(self, host): found = [] for device in self.devices: if device.get_host() == host: found.append(device) return found
[docs] def get_device_with_usn(self, usn): found = None for device in self.devices: if device.get_usn() == usn: found = device break return found
[docs] def get_device_with_id(self, device_id): # print(f'get_device_with_id [{type(device_id)}]: {device_id}') found = None for device in self.devices: id = device.get_id() if device_id[:5] != 'uuid:': id = id[5:] if id == device_id: found = device break return found
[docs] def get_devices(self): # print(f'get_devices: {self.devices}') return self.devices
[docs] def get_local_devices(self): # print(f'get_local_devices: ' # f'{[d for d in self.devices if d.manifestation == "local"]}') return [d for d in self.devices if d.manifestation == 'local']
[docs] def get_nonlocal_devices(self): # print(f'get_nonlocal_devices: ' # f'{[d for d in self.devices if d.manifestation == "remote"]}') return [d for d in self.devices if d.manifestation == 'remote']
[docs] def is_device_added(self, infos): ''' Check if the device exists in our list of created :attr:`devices`. Args: infos (dict): Information about the device Returns: True if the device exists in our list of :attr:`devices`, otherwise, returns False. .. versionadded:: 0.9.0 ''' for d in self.devices: if d.st == infos['ST'] and d.usn == infos['USN']: return True return False
[docs] def create_device(self, device_type, infos): if self.is_device_added(infos): self.warning( f'No need to create the device, we already added device: ' f'{infos["ST"]} with usn {infos["USN"]}...!!') return self.info(f'creating {infos["ST"]} {infos["USN"]}') if infos['ST'] == 'upnp:rootdevice': self.info(f'creating upnp:rootdevice {infos["USN"]}') root = RootDevice(infos) root.bind(root_detection_completed=self.add_device) else: self.info(f'creating device/service {infos["USN"]}') root_id = infos['USN'][:-len(infos['ST']) - 2] root = self.get_device_with_id(root_id) # TODO: must check that this is working as expected device = Device(root, udn=infos['UDN'])
[docs] def add_device(self, device, *args): self.info(f'adding device {device.get_id()} {device.get_usn()} ' f'{device.friendly_device_type}') self.devices.append(device) self.dispatch_event( 'coherence_device_detection_completed', device=device)
[docs] def remove_device(self, device_type, infos): self.info(f'removed device {infos["ST"]} {infos["USN"]}') device = self.get_device_with_usn(infos['USN']) if device: self.dispatch_event('coherence_device_removed', infos['USN'], device.client) self.devices.remove(device) device.remove() if infos['ST'] == 'upnp:rootdevice': self.dispatch_event( 'coherence_root_device_removed', infos['USN'], device.client) self.callback('removed_device', infos['ST'], infos['USN'])
[docs] def add_web_resource(self, name, sub): self.children[name] = sub
[docs] def remove_web_resource(self, name): try: del self.children[name] except KeyError: ''' probably the backend init failed ''' pass
[docs] @staticmethod def check_louie(receiver, signal, method='connect'): ''' Check if the connect or disconnect method's arguments are valid in order to automatically convert to EventDispatcher's bind The old valid signals are: - Coherence.UPnP.Device.detection_completed - Coherence.UPnP.RootDevice.detection_completed - Coherence.UPnP.Device.removed - Coherence.UPnP.RootDevice.removed .. versionadded:: 0.9.0 ''' if not callable(receiver): raise Exception('The receiver should be callable in order to use' ' the method {method}') if not signal: raise Exception( f'We need a signal in order to use method {method}') if not (signal.startswith('Coherence.UPnP.Device.') or signal.startswith('Coherence.UPnP.RootDevice.')): raise Exception( 'We need a signal an old signal starting with: ' '"Coherence.UPnP.Device." or "Coherence.UPnP.RootDevice."')
[docs] def connect(self, receiver, signal=None, sender=None, weak=True): ''' Wrapper method around the deprecated method louie.connect. It will check if the passed signal is supported by executing the method :meth:`check_louie`. .. warning:: This will probably be removed at some point, if you use the connect method you should migrate to the new event system EventDispatcher. .. versionchanged:: 0.9.0 Added EventDispatcher's compatibility for some basic signals ''' self.check_louie(receiver, signal, 'connect') if signal.endswith('.detection_completed'): self.bind(coherence_device_detection_completed=receiver) elif signal.endswith('.Device.removed'): self.bind(coherence_device_removed=receiver) elif signal.endswith('.RootDevice.removed'): self.bind(coherence_root_device_removed=receiver) else: raise Exception( f'Unknown signal {signal}, we cannot bind that signal.')
[docs] def disconnect(self, receiver, signal=None, sender=None, weak=True): ''' Wrapper method around the deprecated method louie.disconnect. It will check if the passed signal is supported by executing the method :meth:`check_louie`. .. warning:: This will probably be removed at some point, if you use the disconnect method you should migrate to the new event system EventDispatcher. .. versionchanged:: 0.9.0 Added EventDispatcher's compatibility for some basic signals ''' self.check_louie(receiver, signal, 'disconnect') if signal.endswith('.detected'): self.unbind( coherence_device_detection_completed=receiver) elif signal.endswith('.removed'): self.unbind( control_point_client_removed=receiver) else: raise Exception( f'Unknown signal {signal}, we cannot unbind that signal.')