Source code for coherence.upnp.core.service

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright (C) 2006 Fluendo, S.A. (www.fluendo.com).
# Copyright 2006, Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
Services
========

This module contains several classes related to services:

:class:`Service`
----------------

Object representing a device's service.

:class:`ServiceServer`
----------------------

A Service's server.

:class:`scpdXML`
----------------

A `twisted.web.resource.Resource` representing xml's data for SCPD.

.. note:: SCPD is a Service Control Point Definition, for defining the actions
          offered by the various services in a UPnP's network.

:class:`ServiceControl`
-----------------------

Object to control service's SOAP actions.
'''

import os
import time
from urllib.parse import urlparse

from lxml import etree
from twisted.internet import defer, reactor
from twisted.internet import task
from twisted.python import failure, util
from twisted.web import static

from eventdispatcher import EventDispatcher, Property

from coherence import log
from coherence.upnp.core import action
from coherence.upnp.core import event
from coherence.upnp.core import utils
from coherence.upnp.core import variable
from coherence.upnp.core.event import EventSubscriptionServer
from coherence.upnp.core.soap_proxy import SOAPProxy
from coherence.upnp.core.soap_service import errorCode
from coherence.upnp.core.xml_constants import UPNP_SERVICE_NS

global subscribers
subscribers = dict()

NS_UPNP_ORG_EVENT_1_0 = 'urn:schemas-upnp-org:event-1-0'


[docs]def subscribe(service): subscribers[service.get_sid()] = service
[docs]def unsubscribe(service): subscribers.pop(service.get_sid(), None)
[docs]class Service(EventDispatcher, log.LogAble): ''' This class represents a Device's service. Emits events which will be received by class :class:`~coherence.upnp.core.device.Device`. .. versionchanged:: 0.9.0 * Migrated from louie/dispatcher to EventDispatcher * The emitted events changed: - Coherence.UPnP.Service.detection_completed => service_detection_completed - Coherence.UPnP.Service.detection_failed => service_detection_failed - Coherence.UPnP.DeviceClient.Service.Event.processed => service_event_processed - Coherence.UPnP.DeviceClient.Service.notified => service_notified * changed class variable :attr:`detection_completed` to benefit from the EventDispatcher's properties .. note:: This class initializes some events outside this class. This is done this way to make easier to make connections between this service and the module :mod:`~coherence.dbus_service`, which uses some events triggered by :class:`~coherence.upnp.core.variable.StateVariable`. The mentioned events are (old => new): - Coherence.UPnP.StateVariable.changed => state_variable_changed - Coherence.UPnP.StateVariable.{var name}.changed => state_variable_{var name}_changed .. warning:: This class is special regarding EventDispatcher, because some events are initialized outside this class by the class :class:`~coherence.upnp.core.variable.StateVariable`. ''' logCategory = 'service_client' detection_completed = Property(False) ''' To know whenever the service detection has completed. Defaults to `False` and it will be set automatically to `True` by the class method :meth:`parse_actions`. ''' def __init__(self, service_type, service_id, location, control_url, event_sub_url, presentation_url, scpd_url, device): log.LogAble.__init__(self) EventDispatcher.__init__(self) self.register_event( 'service_detection_completed', 'service_detection_failed', 'service_event_processed', 'service_notified', ) self.debug('Service.__init__: ...') self.service_type = service_type self.id = service_id self.control_url = control_url if isinstance(location, bytes) else \ control_url.encode('ascii') if control_url else None self.event_sub_url = \ event_sub_url if isinstance(event_sub_url, bytes) else \ event_sub_url.encode('ascii') if event_sub_url else None self.presentation_url = \ presentation_url if isinstance(presentation_url, bytes) else \ presentation_url.encode('ascii') if presentation_url else None self.scpd_url = scpd_url if isinstance(scpd_url, bytes) else \ scpd_url.encode('ascii') if scpd_url else None self.device = device self._actions = {} self._variables = {0: {}} self._var_subscribers = {} self.subscription_id = None self.timeout = 0 self.event_connection = None self.last_time_updated = None self.client = None self.info('\t- parsing ...') if isinstance(location, bytes): location = location.decode('utf-8') parsed = urlparse(location) self.url_base = f'{parsed[0]}://{parsed[1]}'.encode('ascii') self.parse_actions() self.info(f'{device.friendly_name} {self.service_type} ' f'{self.id} initialized')
[docs] def as_tuples(self): r = [] def append(name, attribute): try: if isinstance(attribute, tuple): a0 = attribute[0] if isinstance(attribute[0], str) else \ attribute[0].decode('utf-8') if callable(a0): v1 = attribute[0]() elif hasattr(self, a0): v1 = getattr(self, a0) else: v1 = a0 if v1 in [None, 'None', b'']: return if callable(attribute[1]): v2 = attribute[1]() elif hasattr(self, attribute[1]): v2 = getattr(self, attribute[1]) else: v2 = attribute[1] if v2 in [None, 'None', b'']: return if len(attribute) > 2: r.append((name, (v1, v2, attribute[2]))) else: r.append((name, (v1, v2))) return elif callable(attribute): v = attribute() elif hasattr(self, attribute): v = getattr(self, attribute) else: v = attribute if v not in [None, 'None', b'']: r.append((name, v)) except Exception as e: self.error(f'Service.as_tuples.append: {e}') import traceback self.debug(traceback.format_exc()) r.append(('Location', (self.device.get_location(), self.device.get_location()))) append('URL base', self.device.get_urlbase) r.append(('UDN', self.device.get_id())) r.append(('Type', self.service_type)) r.append(('ID', self.id)) append('Service Description URL', ( self.scpd_url, lambda: self.device.make_fullyqualified( self.scpd_url))) append('Control URL', ( self.control_url, lambda: self.device.make_fullyqualified( self.control_url), False)) append('Event Subscription URL', ( self.event_sub_url, lambda: self.device.make_fullyqualified( self.event_sub_url), False)) return r
[docs] def as_dict(self): d = {'type': self.service_type} d['actions'] = [a.as_dict() for a in list(self._actions.values())] return d
def __repr__(self): return f'Service {self.service_type} {self.id}' # def __del__(self): # print('Service deleted') # pass
[docs] def _get_client(self, name): self.debug(f'Service._get_client: {name}') url = self.get_control_url() self.debug(f'\t- url: {url}') namespace = self.get_type() action = f'{namespace}#{name}' self.debug(f'\t- action: {action}') client = SOAPProxy(url, namespace=('u', namespace), soapaction=action) self.debug(f'\t- client: {client}') return client
[docs] def remove(self): self.info(f'removal of {self.device.friendly_name} ' f'{self.service_type} {self.id}') try: self.renew_subscription_call.cancel() except Exception: pass if self.event_connection is not None: self.event_connection.teardown() if self.subscription_id is not None: self.unsubscribe() for name, action in list(self._actions.items()): self.debug(f'remove {name} {action}') del self._actions[name] del action for instance, variables in list(self._variables.items()): for name, variable in list(variables.items()): del variables[name] del variable if instance in variables: del variables[instance] del variables del self
[docs] def get_device(self): return self.device
[docs] def get_type(self): return self.service_type
[docs] def set_timeout(self, timeout): self.info(f'set timout for {self.device.friendly_name}/' f'{self.service_type} to {int(timeout):d}') self.timeout = timeout try: self.renew_subscription_call.reset(int(self.timeout) - 30) self.info(f'reset renew subscription call for ' f'{self.device.friendly_name}/{self.service_type} to ' f'{int(self.timeout) - 30:d}') except Exception: self.renew_subscription_call = reactor.callLater( int(self.timeout) - 30, self.renew_subscription) self.info(f'starting renew subscription call for ' f'{self.device.friendly_name}/{self.service_type} to ' f'{int(self.timeout) - 30:d}')
[docs] def get_timeout(self): return self.timeout
[docs] def get_id(self): return self.id
[docs] def get_sid(self): return self.subscription_id
[docs] def set_sid(self, sid): self.info(f'set subscription id for {self.device.friendly_name}/' f'{self.service_type} to {sid}') self.subscription_id = sid if sid is not None: subscribe(self) self.debug(f'add subscription for {self.id}')
[docs] def get_actions(self): return self._actions
[docs] def get_scpdXML(self): return self.scpdXML
[docs] def get_action(self, name): try: return self._actions[name] except KeyError: return None # not implemented
[docs] def get_state_variables(self, instance): return self._variables.get(int(instance))
[docs] def get_state_variable(self, name, instance=0): return self._variables.get(int(instance)).get(name)
[docs] def get_control_url(self): return self.device.make_fullyqualified(self.control_url)
[docs] def get_event_sub_url(self): return self.device.make_fullyqualified(self.event_sub_url)
[docs] def get_presentation_url(self): return self.device.make_fullyqualified(self.presentation_url)
[docs] def get_scpd_url(self): return self.device.make_fullyqualified(self.scpd_url)
[docs] def get_base_url(self): return self.device.make_fullyqualified('.')
[docs] def subscribe(self): self.debug(f'subscribe {self.id}') event.subscribe(self)
# global subscribers # subscribers[self.get_sid()] = self
[docs] def unsubscribe(self): def remove_it(r, sid): self.debug(f'remove subscription for {self.id}') unsubscribe(self) self.subscription_id = None # global subscribers # if subscribers.has_key(sid): # del subscribers[sid] self.debug(f'unsubscribe {self.id}') d = event.unsubscribe(self) d.addCallback(remove_it, self.get_sid()) return d
[docs] def subscribe_for_variable(self, var_name, instance=0, callback=None, signal=False): variable = self.get_state_variable(var_name) if variable: if callback is not None: if signal: callback(variable) variable.bind(state_variable_changed=callback) else: variable.subscribe(callback)
[docs] def renew_subscription(self): self.info('renew_subscription') event.subscribe(self)
[docs] def process_event(self, event): self.info(f'process event {self} {event}') for var_name, var_value in list(event.items()): if var_name == 'LastChange': self.info('we have a LastChange event') self.get_state_variable(var_name, 0).update(var_value) tree = etree.fromstring(var_value) namespace_uri, tag = tree.tag[1:].split('}', 1) for instance in tree.findall(f'{{{namespace_uri}}}InstanceID'): instance_id = instance.attrib['val'] self.info(f'instance_id {instance} {instance_id}') for var in instance.getchildren(): self.info(f'var {var}') namespace_uri, tag = var.tag[1:].split('}', 1) self.info(f'{namespace_uri} {tag} {var.attrib["val"]}') self.get_state_variable(tag, instance_id).update( var.attrib['val']) self.info(f'updated var {var}') if len(var.attrib) > 1: self.info(f'Extended StateVariable ' f'{var.tag} - {var.attrib}') if 'channel' in var.attrib and \ var.attrib['channel'] != 'Master': # TODO handle attributes that # them selves have multiple instances self.info( f'Skipping update to {var.tag} its not ' f'for master channel {var.attrib}') pass else: if not self.get_state_variables(instance_id): # TODO Create instance ? self.error( f'{self} update failed (not self.get_' f'state_variables(instance_id)) ' f'{instance_id}') elif tag not in self.get_state_variables( instance_id): # TODO Create instance StateVariable? # SONOS stuff self.error( f'{self} update failed (not self.get_' f'state_variables(instance_id).' f'has_key(tag)) {tag}') else: val = None if 'val' in var.attrib: val = var.attrib['val'] # self.debug( # f'{self} update {namespace_uri} ' # f'{tag} {var.attrib["val"]}') self.get_state_variable( tag, instance_id).update( var.attrib['val']) self.debug( f'updated "attributed" var {var}') self.dispatch_event( 'service_event_processed', self, (var_name, var_value, event.raw)) else: self.get_state_variable(var_name, 0).update(var_value) self.dispatch_event( 'service_event_processed', self, (var_name, var_value, event.raw)) if self.last_time_updated is None: # The clients (e.g. media_server_client) check for last time # to detect whether service detection is complete so we need to # set it here and now to avoid a potential race condition self.last_time_updated = time.time() self.dispatch_event( 'service_notified', sender=self.device, service=self) self.info(f'send signal ' f'Coherence.UPnP.DeviceClient.Service.notified for ' f'{self}') self.last_time_updated = time.time()
[docs] def parse_actions(self): self.debug('Service.parse_actions: ...') def gotPage(x): data, headers = x if isinstance(data, str): self.scpdXML = data.encode('ascii') else: self.scpdXML = data try: tree = etree.fromstring(self.scpdXML) except Exception as e: self.warning(f'Invalid service description received from ' f'{self.get_scpd_url()}: {e}') return ns = UPNP_SERVICE_NS # self.debug(f'processPage tree is: {tree}') for action_node in tree.findall(f'.//{{{ns}}}action'): name = action_node.findtext(f'{{{ns}}}name') # self.debug(f'\t->processing action: {name}') arguments = [] for argument in action_node.findall(f'.//{{{ns}}}argument'): arg_name = argument.findtext(f'{{{ns}}}name') arg_direction = argument.findtext(f'{{{ns}}}direction') arg_state_var = argument.findtext( f'{{{ns}}}relatedStateVariable') arguments.append(action.Argument(arg_name, arg_direction, arg_state_var)) self._actions[name] = action.Action(self, name, 'n/a', arguments) for var_node in tree.findall(f'.//{{{ns}}}stateVariable'): send_events = var_node.attrib.get('sendEvents', 'yes') name = var_node.findtext(f'{{{ns}}}name') data_type = var_node.findtext(f'{{{ns}}}dataType') values = [] # we need to ignore this, as there we don't get there our # {urn:schemas-beebits-net:service-1-0}X_withVendorDefines # attribute there for allowed in var_node.findall(f'.//{{{ns}}}allowedValue'): values.append(allowed.text) instance = 0 self._variables.get(instance)[name] = variable.StateVariable( self, name, 'n/a', instance, send_events, data_type, values) # we need to do this here, as there we don't get there our # {urn:schemas-beebits-net:service-1-0}X_withVendorDefines # attribute there self._variables.get(instance)[name].has_vendor_values = True # print('service parse:', self, self.device) self.detection_completed = True self.dispatch_event('service_detection_completed', sender=self.device, device=self.device) self.info( f'send signal Coherence.UPnP.Service.detection_' f'completed for {self}') # if (self.last_time_updated == None): # if( self.id.endswith('AVTransport') or # self.id.endswith('RenderingControl')): # self.dispatch_event('service_notified', # sender=self.device, service=self) # self.last_time_updated = time.time() def gotError(failure, url): self.warning(f'error requesting {url}') self.info(f'failure {failure}') self.dispatch_event('service_detection_failed', self.device, device=self.device) d = utils.getPage(self.get_scpd_url()) d.addCallbacks(gotPage, gotError, None, None, [self.get_scpd_url()], None)
moderated_variables = \ {'urn:schemas-upnp-org:service:AVTransport:2': ['LastChange'], 'urn:schemas-upnp-org:service:AVTransport:1': ['LastChange'], 'urn:schemas-upnp-org:service:ContentDirectory:2': ['SystemUpdateID', 'ContainerUpdateIDs'], 'urn:schemas-upnp-org:service:ContentDirectory:1': ['SystemUpdateID', 'ContainerUpdateIDs'], 'urn:schemas-upnp-org:service:RenderingControl:2': ['LastChange'], 'urn:schemas-upnp-org:service:RenderingControl:1': ['LastChange'], 'urn:schemas-upnp-org:service:ScheduledRecording:1': ['LastChange'], }
[docs]class ServiceServer(log.LogAble): logCategory = 'service_server' def __init__(self, id, version, backend): log.LogAble.__init__(self) self.id = id self.version = version self.backend = backend self.debug(f'ServiceServer.__init__: {id} ' f'[version: {version}, backend: {backend}]') if getattr(self, 'namespace', None) is None: self.namespace = 'schemas-upnp-org' if getattr(self, 'id_namespace', None) is None: self.id_namespace = 'upnp-org' self.service_type = \ f'urn:{self.namespace}:service:{id}:{int(version):d}' self.debug(f'\t-service_type: {self.service_type}') self.scpdXML = None self.scpd_url = b'scpd.xml' self.control_url = b'control' self.subscription_url = b'subscribe' self.event_metadata = '' if id == 'AVTransport': self.event_metadata = 'urn:schemas-upnp-org:metadata-1-0/AVT/' if id == 'RenderingControl': self.event_metadata = 'urn:schemas-upnp-org:metadata-1-0/RCS/' if id == 'ScheduledRecording': self.event_metadata = 'urn:schemas-upnp-org:av:srs-event' self._actions = {} self._variables = {0: {}} self._subscribers = {} self._pending_notifications = {} self.implementation = None self.last_change = None self.init_var_and_actions() try: if 'LastChange' in moderated_variables[self.service_type]: self.last_change = self._variables[0]['LastChange'] except Exception: pass self.debug(f'ServiceServer.__init__: putChild ' f'{self.subscription_url} ...wait') self.putChild( # pylint: disable=no-member self.subscription_url, EventSubscriptionServer(self)) self.debug(f'ServiceServer.__init__: putChild ' f'{self.subscription_url} => OK') self.check_subscribers_loop = task.LoopingCall(self.check_subscribers) self.check_subscribers_loop.start(120.0, now=False) self.check_moderated_loop = None if self.service_type in moderated_variables: self.check_moderated_loop = task.LoopingCall( self.check_moderated_variables) self.check_moderated_loop.start(0.5, now=False)
[docs] def _release(self): for p in list(self._pending_notifications.values()): p.disconnect() self._pending_notifications = {}
[docs] def get_action(self, action_name): try: return self._actions[action_name] except KeyError: return None # not implemented
[docs] def get_actions(self): return self._actions
[docs] def get_variables(self): return self._variables
[docs] def get_subscribers(self): return self._subscribers
[docs] def rm_notification(self, result, d): del self._pending_notifications[d]
[docs] def new_subscriber(self, subscriber): notify = [] for vdict in list(self._variables.values()): notify += [v for v in list(vdict.values()) if v.send_events] self.info(f'new_subscriber {subscriber} {notify}') if len(notify) <= 0: return root = etree.Element(f'{{{NS_UPNP_ORG_EVENT_1_0}}}propertyset', nsmap={'e': NS_UPNP_ORG_EVENT_1_0}) evented_variables = 0 for n in notify: e = etree.SubElement(root, f'{{{NS_UPNP_ORG_EVENT_1_0}}}property') if n.name == 'LastChange': if subscriber['seq'] == 0: text = self.build_last_change_event(n.instance, force=True) else: text = self.build_last_change_event(n.instance) if text is not None: etree.SubElement(e, n.name).text = text evented_variables += 1 else: etree.SubElement(e, n.name).text = str(n.value) evented_variables += 1 if evented_variables > 0: xml = etree.tostring(root, encoding='utf-8', pretty_print=True) d, p = event.send_notification(subscriber, xml) self._pending_notifications[d] = p d.addBoth(self.rm_notification, d) self._subscribers[subscriber['sid']] = subscriber
[docs] def get_id(self): return self.id
[docs] def get_type(self): return self.service_type
[docs] def create_new_instance(self, instance): self._variables[instance] = {} for v in list(self._variables[0].values()): self._variables[instance][v.name] = variable.StateVariable( v.service, v.name, v.implementation, instance, v.send_events, v.data_type, v.allowed_values) self._variables[instance][ v.name].has_vendor_values = v.has_vendor_values self._variables[instance][v.name].default_value = v.default_value # self._variables[instance][v.name].value = v.default_value # FIXME self._variables[instance][v.name].old_value = v.old_value self._variables[instance][v.name].value = v.value self._variables[instance][ v.name].dependant_variable = v.dependant_variable
[docs] def remove_instance(self, instance): if instance == 0: return del (self._variables[instance])
[docs] def set_variable(self, instance, variable_name, value, default=False): def process_value(result): variable.update(result) if default: variable.default_value = variable.value if variable.send_events and not variable.moderated and len( self._subscribers) > 0: xml = self.build_single_notification(instance, variable_name, variable.value) for s in list(self._subscribers.values()): d, p = event.send_notification(s, xml) self._pending_notifications[d] = p d.addBoth(self.rm_notification, d) try: variable = self._variables[int(instance)][variable_name] if isinstance(value, defer.Deferred): value.addCallback(process_value) else: process_value(value) except KeyError: pass
[docs] def get_variable(self, variable_name, instance=0): try: return self._variables[int(instance)][variable_name] except KeyError: return None
[docs] def build_single_notification(self, instance, variable_name, value): root = etree.Element(f'{{{NS_UPNP_ORG_EVENT_1_0}}}propertyset', nsmap={'e': NS_UPNP_ORG_EVENT_1_0}) e = etree.SubElement(root, f'{{{NS_UPNP_ORG_EVENT_1_0}}}property') etree.SubElement(e, variable_name).text = str(value) return etree.tostring(root, encoding='utf-8', pretty_print=True)
[docs] def build_last_change_event(self, instance=0, force=False): got_one = False root = etree.Element('Event', nsmap={None: self.event_metadata}) for instance, vdict in list(self._variables.items()): e = etree.SubElement(root, 'InstanceID') e.attrib['val'] = str(instance) for variable in list(vdict.values()): if variable.name != 'LastChange' and \ variable.name[0:11] != 'A_ARG_TYPE_' and \ not variable.never_evented: if variable.updated or force: s = etree.SubElement(e, variable.name) s.attrib['val'] = str(variable.value) variable.updated = False got_one = True if variable.dependant_variable is not None: dependants = variable.dependant_variable.\ get_allowed_values() if dependants is not None and len(dependants) > 0: s.attrib['channel'] = dependants[0] if got_one: return etree.tostring(root, encoding='utf-8', pretty_print=True) else: return None
[docs] def propagate_notification(self, notify): if len(self._subscribers) <= 0: return if len(notify) <= 0: return root = etree.Element(f'{{{NS_UPNP_ORG_EVENT_1_0}}}propertyset', nsmap={'e': NS_UPNP_ORG_EVENT_1_0}) if isinstance(notify, variable.StateVariable): notify = [notify, ] evented_variables = 0 for n in notify: e = etree.SubElement(root, f'{{{NS_UPNP_ORG_EVENT_1_0}}}property') if n.name == 'LastChange': text = self.build_last_change_event(instance=n.instance) if text is not None: etree.SubElement(e, n.name).text = text evented_variables += 1 else: s = etree.SubElement(e, n.name).text = str(n.value) evented_variables += 1 if n.dependant_variable is not None: dependants = n.dependant_variable.get_allowed_values() if dependants is not None and len(dependants) > 0: e.attrib['channel'] = dependants[0] if evented_variables == 0: return xml = etree.tostring(root, encoding='utf-8', pretty_print=True) for s in list(self._subscribers.values()): d, p = event.send_notification(s, xml) self._pending_notifications[d] = p d.addBoth(self.rm_notification, d)
[docs] def check_subscribers(self): for s in list(self._subscribers.values()): timeout = 86400 if s['timeout'].startswith('Second-'): timeout = int(s['timeout'][len('Second-'):]) if time.time() > s['created'] + timeout: del s
[docs] def check_moderated_variables(self): # print(f'check_moderated for {self.id}') # print(self._subscribers) if len(self._subscribers) <= 0: return variables = moderated_variables[self.get_type()] notify = [] for v in variables: # print self._variables[0][v].name, self._variables[0][v].updated for vdict in list(self._variables.values()): if vdict[v].updated: vdict[v].updated = False notify.append(vdict[v]) self.propagate_notification(notify)
[docs] def is_variable_moderated(self, name): try: variables = moderated_variables[self.get_type()] if name in variables: return True except KeyError: pass return False
[docs] def simulate_notification(self): self.info(f'simulate_notification for {self.id}') self.set_variable(0, 'CurrentConnectionIDs', '0')
[docs] def get_scpdXML(self): if self.scpdXML is None: self.scpdXML = scpdXML(self) self.scpdXML = self.scpdXML.build_xml() return self.scpdXML
[docs] def register_vendor_variable(self, name, implementation='optional', instance=0, evented='no', data_type='string', dependant_variable=None, default_value=None, allowed_values=None, has_vendor_values=False, allowed_value_range=None, moderated=False): ''' Enables a backend to add an own, vendor defined, :class:`coherence.upnp.core.variable.StateVariable` to the service. Args: name (str): the name of the new StateVariable implementation (str): either 'optional' or 'required' instance: the instance number of the service that variable should be assigned to, usually '0' evented (str): boolean as string 'yes' 'no' or the special keyword 'never' if the variable doesn't show up in a LastChange event too data_type (str): `string`, `boolean`, `bin.base64` or various number formats dependant_variable (object): the name of another StateVariable that depends on this one default_value (object): the value this StateVariable should have by default when created for another instance of in the service allowed_values (list): a list of values this StateVariable can have has_vendor_values (bool): if there are values outside the allowed_values list too allowed_value_range (dict): a dict of 'minimum','maximum' and 'step' values moderated (bool): True if this StateVariable should only be emitted via a LastChange event Returns: A new variable of class :class:`coherence.upnp.core.variable.StateVariable` ''' # FIXME # we should raise an Exception when there as a # StateVariable with that name already if evented == 'never': send_events = 'no' else: send_events = evented new_variable = variable.StateVariable(self, name, implementation, instance, send_events, data_type, allowed_values) if default_value is None: new_variable.default_value = '' else: new_variable.default_value = \ new_variable.old_value = new_variable.value = default_value new_variable.dependant_variable = dependant_variable new_variable.has_vendor_values = has_vendor_values new_variable.allowed_value_range = allowed_value_range new_variable.moderated = moderated if evented == 'never': new_variable.never_evented = True self._variables.get(instance)[name] = new_variable return new_variable
[docs] def register_vendor_action(self, name, implementation, arguments=None, needs_callback=True): ''' Enables a backend to add an own, vendor defined, Action to the service. Args: name (str): the name of the new Action implementation (str): either 'optional' or 'required' arguments (list): a C{list} if argument C{tuples}, like (name,direction,relatedStateVariable) needs_callback (bool): this Action needs a method in the backend or service class Returns: An action of class :class:`coherence.upnp.core.action.Action` ''' # FIXME: we should raise an Exception when there as an Action # with that name already we should raise an Exception when there # is no related StateVariable for an Argument # check for action in backend callback = getattr(self.backend, f'upnp_{name}', None) if callback is None: # check for action in ServiceServer callback = getattr(self, f'upnp_{name}', None) if needs_callback and callback is None: # we have one or more 'A_ARG_TYPE_' # variables issue a warning for now if implementation == 'optional': self.info(f'{self.id} has a missing callback for ' f'{implementation} action {name}, action disabled') return else: if (hasattr(self, 'implementation') and self.implementation == 'required') or not hasattr( self, 'implementation'): self.warning( f'{self.id} has a missing callback for ' f'{implementation} action {name}, service disabled') raise LookupError('missing callback') arguments_list = [] for argument in arguments: arguments_list.append( action.Argument(argument[0], argument[1].lower(), argument[2])) new_action = action.Action(self, name, implementation, arguments_list) self._actions[name] = new_action if callback is not None: new_action.set_callback(callback) self.info(f'Add callback {callback} for {self.id}/{name}') return new_action
[docs] def init_var_and_actions(self): desc_file = util.sibpath( __file__, os.path.join('xml-service-descriptions', f'{self.id}{int(self.version):d}.xml')) tree = etree.parse(desc_file) for action_node in tree.findall('.//action'): name = action_node.findtext('name') implementation = 'required' needs_callback = False if action_node.attrib.get( '{urn:schemas-beebits-net:service-1-0}X_needs_backend', None) is not None: needs_callback = True if action_node.find('Optional') is not None: implementation = 'optional' if action_node.find( 'Optional').attrib.get( '{urn:schemas-beebits-net:service-1-0}X_needs_backend', None) is not None or action_node.attrib.get( '{urn:schemas-beebits-net:service-1-0}X_needs_backend', None) is not None: needs_callback = True arguments = [] for argument in action_node.findall('.//argument'): arg_name = argument.findtext('name') arg_direction = argument.findtext('direction') arg_state_var = argument.findtext('relatedStateVariable') arguments.append( action.Argument(arg_name, arg_direction, arg_state_var)) if arg_state_var[ 0:11] == 'A_ARG_TYPE_' and arg_direction == 'out': needs_callback = True # check for action in backend callback = getattr(self.backend, f'upnp_{name}', None) if callback is None: # check for action in ServiceServer callback = getattr(self, f'upnp_{name}', None) if needs_callback and callback is None: # we have one or more 'A_ARG_TYPE_' # variables issue a warning for now if implementation == 'optional': self.info( f'{self.id} has a missing callback for ' f'{implementation} action {name}, action disabled') continue else: if (hasattr(self, 'implementation') and self.implementation == 'required') or \ not hasattr(self, 'implementation'): self.warning( f'{self.id} has a missing callback for ' f'{implementation} action {name}, service disabled' ) raise LookupError('missing callback') new_action = action.Action(self, name, implementation, arguments) self._actions[name] = new_action if callback is not None: new_action.set_callback(callback) self.info(f'Add callback {callback} for {self.id}/{name}') backend_vendor_value_defaults = getattr(self.backend, 'vendor_value_defaults', None) service_value_defaults = None if backend_vendor_value_defaults: service_value_defaults = backend_vendor_value_defaults.get(self.id, None) backend_vendor_range_defaults = getattr(self.backend, 'vendor_range_defaults', None) service_range_defaults = None if backend_vendor_range_defaults: service_range_defaults = backend_vendor_range_defaults.get(self.id) for var_node in tree.findall('.//stateVariable'): instance = 0 name = var_node.findtext('name') implementation = 'required' if action_node.find('Optional') is not None: implementation = 'optional' send_events = var_node.findtext('sendEventsAttribute') data_type = var_node.findtext('dataType') values = [] for allowed in var_node.findall('.//allowedValue'): values.append(allowed.text) self._variables.get(instance)[name] = \ variable.StateVariable(self, name, implementation, instance, send_events, data_type, values) dependant_variable = var_node.findtext( '{urn:schemas-beebits-net:service-1-0}X_dependantVariable') if dependant_variable: self._variables.get(instance)[ name].dependant_variable = dependant_variable default_value = var_node.findtext('defaultValue') if default_value: self._variables.get(instance)[name].set_default_value( default_value) if var_node.find('sendEventsAttribute') is not None: never_evented = var_node.find( 'sendEventsAttribute').attrib.get( '{urn:schemas-beebits-net:service-1-0}X_no_means_never', None) if never_evented is not None: self._variables.get(instance)[name].set_never_evented( never_evented) allowed_value_list = var_node.find('allowedValueList') if allowed_value_list is not None: vendor_values = allowed_value_list.attrib.get( '{urn:schemas-beebits-net:service-1-0}X_withVendorDefines', None) if service_value_defaults: variable_value_defaults = service_value_defaults.get(name, None) if variable_value_defaults: self.info(f'overwriting {name} default value ' f'with {variable_value_defaults}') self._variables.get(instance)[name].set_allowed_values( variable_value_defaults) if vendor_values is not None: self._variables.get( instance)[name].has_vendor_values = True allowed_value_range = var_node.find('allowedValueRange') if allowed_value_range: vendor_values = \ allowed_value_range.attrib.get( '{urn:schemas-beebits-net:service-1-0}' 'X_withVendorDefines', None) range = {} for e in list(allowed_value_range): range[e.tag] = e.text if vendor_values is not None: if service_range_defaults: variable_range_defaults = \ service_range_defaults.get(name) if (variable_range_defaults is not None and variable_range_defaults.get( e.tag) is not None): self.info( f'overwriting {name} attribute {e.tag} with' # noqa f' {str(variable_range_defaults[e.tag])}') range[e.tag] = variable_range_defaults[e.tag] elif e.text is None: self.info(f'missing vendor definition for ' f'{name}, attribute {e.tag}') self._variables.get(instance)[name].set_allowed_value_range( **range) if vendor_values is not None: self._variables.get( instance)[name].has_vendor_values = True elif service_range_defaults: variable_range_defaults = service_range_defaults.get(name) if variable_range_defaults is not None: self._variables.get( instance)[name].set_allowed_value_range( **variable_range_defaults) self._variables.get( instance)[name].has_vendor_values = True for v in list(self._variables.get(0).values()): if isinstance(v.dependant_variable, str): v.dependant_variable = self._variables.get(instance).get( v.dependant_variable)
[docs]class scpdXML(static.Data, log.LogAble): logCategory = 'service_scpdxml' def __init__(self, server, control=None): log.LogAble.__init__(self) self.debug(f'scpdXML.__init: {server} [{control}]') self.service_server = server self.control = control static.Data.__init__(self, b'', 'text/xml')
[docs] def render(self, request): if self.data in [None, b'']: self.data = self.build_xml() return static.Data.render(self, request)
[docs] def build_xml(self): self.debug(f'scpdXML.build_xml: {self.service_server}') root = etree.Element('scpd', nsmap={None: 'urn:schemas-upnp-org:service-1-0'}) e = etree.SubElement(root, 'specVersion') etree.SubElement(e, 'major').text = '1' etree.SubElement(e, 'minor').text = '0' e = etree.SubElement(root, 'actionList') for action in list(self.service_server._actions.values()): s = etree.SubElement(e, 'action') etree.SubElement(s, 'name').text = action.get_name() al = etree.SubElement(s, 'argumentList') for argument in action.get_arguments_list(): a = etree.SubElement(al, 'argument') etree.SubElement(a, 'name').text = argument.get_name() etree.SubElement(a, 'direction').text = \ argument.get_direction() etree.SubElement(a, 'relatedStateVariable').text = \ argument.get_state_variable() e = etree.SubElement(root, 'serviceStateTable') for var in list(self.service_server._variables[0].values()): s = etree.SubElement(e, 'stateVariable') if var.send_events: s.attrib['sendEvents'] = 'yes' else: s.attrib['sendEvents'] = 'no' etree.SubElement(s, 'name').text = var.name etree.SubElement(s, 'dataType').text = var.data_type if not var.has_vendor_values and len(var.allowed_values): v = etree.SubElement(s, 'allowedValueList') for value in var.allowed_values: etree.SubElement(v, 'allowedValue').text = value if var.allowed_value_range is not None and len( var.allowed_value_range) > 0: complete = True for name, value in list(var.allowed_value_range.items()): if value is None: complete = False if complete: avl = etree.SubElement(s, 'allowedValueRange') for name, value in list(var.allowed_value_range.items()): if value is not None: etree.SubElement(avl, name).text = str(value) return etree.tostring(root, encoding='utf-8', xml_declaration=True, pretty_print=True)
from twisted.python.util import OrderedDict
[docs]class ServiceControl(log.LogAble):
[docs] def get_action_results(self, result, action, instance): ''' check for out arguments if yes: - check if there are related ones to StateVariables with non `A_ARG_TYPE_` prefix: if yes: - check if there is a call plugin method for this action: - if yes: update StateVariable values with call result - if no: get StateVariable values and add them to result dict Args: result (object): The result from an action action (object): An instance of class :class:`coherence.upnp.core.action.Action` instance (object): An instance of :class:`coherence.upnp.core.variable.StateVariable` Returns: An `OrderedDict`. ''' self.debug(f'get_action_results {action.name} {result}') r = result notify = [] for argument in action.get_out_arguments(): # print 'get_state_variable_contents', argument.name if argument.name[0:11] != 'A_ARG_TYPE_': if action.get_callback() is not None: variable = \ self.variables[instance][ # pylint: disable=no-member argument.get_state_variable()] variable.update(r[argument.name]) if variable.send_events == 'yes' and \ not variable.moderated: notify.append(variable) else: variable = \ self.variables[instance][ # pylint: disable=no-member argument.get_state_variable()] r[argument.name] = variable.value self.service.propagate_notification( # pylint: disable=no-member notify) if len(r) == 0: return r ordered_result = OrderedDict() for argument in action.get_out_arguments(): ordered_result[argument.name] = r[argument.name] return ordered_result
[docs] def soap__generic(self, *args, **kwargs): ''' Generic UPnP service control method, which will be used if no soap_ACTIONNAME method in the server service control class can be found ''' try: action = self.actions[ # pylint: disable=no-member kwargs['soap_methodName']] except KeyError: return failure.Failure(errorCode(401)) try: instance = int(kwargs['InstanceID']) except Exception: instance = 0 self.info(f'soap__generic {action} {__name__} {kwargs}') self.debug(f'\t- action.name {action.name}') del kwargs['soap_methodName'] if ('X_UPnPClient' in kwargs and kwargs['X_UPnPClient'] == 'XBox'): if (action.name == 'Browse' and 'ContainerID' in kwargs): # XXX: THIS IS SICK kwargs['ObjectID'] = kwargs['ContainerID'] del kwargs['ContainerID'] in_arguments = action.get_in_arguments() for arg_name, arg in kwargs.items(): if arg_name.find('X_') == 0: continue al = [a for a in in_arguments if arg_name == a.get_name()] if len(al) > 0: in_arguments.remove(al[0]) else: self.critical( f'argument {arg_name} not valid for action {action.name}') return failure.Failure(errorCode(402)) if len(in_arguments) > 0: args_names = [a.get_name() for a in in_arguments] self.critical(f'argument {args_names} ' f'missing for action {action.name}') return failure.Failure(errorCode(402)) def callit(*args, **kwargs): # self.debug('callit args', args) # self.debug('callit kwargs', kwargs) result = {} callback = action.get_callback() # self.debug('callit callback', callback) if callback is not None: return callback(**kwargs) return result def got_error(x): # print 'failure', x self.info('soap__generic error during call processing') return x # call plugin method for this action d = defer.maybeDeferred(callit, *args, **kwargs) d.addCallback(self.get_action_results, action, instance) d.addErrback(got_error) return d