Source code for coherence.upnp.core.soap_service

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2007 - Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
Soap Service
============

SOAP is a simple Object Access Protocol, for actually calling actions. This
module contains  a class :class:`UPnPPublisher` which will manage SOAP's
services.

:class:`errorCode`
------------------

Custom exception for class :class:`UPnPPublisher`

:class:`UPnPPublisher`
----------------------

A custom implementation of :class:`twisted.web.soap.SOAPPublisher`, but
removing the SOAPpy dependency.
'''

from lxml import etree

from twisted.internet import defer
from twisted.python import failure
from twisted.web import server, resource

from eventdispatcher import EventDispatcher

from coherence import log, SERVER_ID
from coherence.upnp.core import soap_lite
from coherence.upnp.core.utils import parse_with_lxml, to_bytes


[docs]class errorCode(Exception): def __init__(self, status): Exception.__init__(self) self.status = status
[docs]class UPnPPublisher(EventDispatcher, resource.Resource, log.LogAble): ''' Based upon twisted.web.soap.SOAPPublisher and extracted to remove the SOAPpy dependency UPnP requires headers and OUT parameters to be returned in a slightly different way than the SOAPPublisher class does. .. versionchanged:: 0.9.0 * Migrated from louie/dispatcher to EventDispatcher * The emitted events changed: - UPnPTest.Control.Client.CommandReceived => control_client_command_received ''' logCategory = 'soap' isLeaf = 1 encoding = 'UTF-8' envelope_attrib = None def __init__(self, *args, **kwargs): EventDispatcher.__init__(self) self.register_event( 'control_client_command_received' )
[docs] def _sendResponse(self, request, response, status=200): self.debug(f'_sendResponse {status} {response}') if status == 200: request.setResponseCode(200) else: request.setResponseCode(500) if self.encoding is not None: mimeType = f'text/xml; charset="{self.encoding}"' else: mimeType = 'text/xml' request.setHeader(b'Content-type', mimeType.encode('ascii')) request.setHeader(b'Content-length', to_bytes(len(response))) request.setHeader(b'EXT', b'') request.setHeader(b'SERVER', SERVER_ID.encode('ascii')) r = response if isinstance(response, bytes) else \ response.encode('ascii') request.write(r) request.finish()
[docs] def _methodNotFound(self, request, methodName): response = soap_lite.build_soap_error(401) self._sendResponse(request, response, status=401)
[docs] def _gotResult(self, result, request, methodName, ns): self.debug(f'_gotResult {result} {request} {methodName} {ns}') response = soap_lite.build_soap_call(methodName, result, ns=ns, is_response=True) self._sendResponse(request, response)
[docs] def _gotError(self, failure, request, methodName, ns): self.info(f'_gotError {failure} {failure.value}') e = failure.value status = 500 if isinstance(e, errorCode): status = e.status else: failure.printTraceback() response = soap_lite.build_soap_error(status) self._sendResponse(request, response, status=status)
[docs] def lookupFunction(self, functionName): function = getattr(self, f'soap_{functionName}', None) if not function: function = getattr(self, 'soap__generic', None) if function: return function, getattr(function, 'useKeywords', False) else: return None, None
[docs] def render(self, request): '''Handle a SOAP command.''' data = request.content.read() headers = request.getAllHeaders() self.info(f'soap_request: {headers}') # allow external check of data self.dispatch_event( 'control_client_command_received', headers, data) def print_c(e): for c in e.getchildren(): print(c, c.tag) print_c(c) if data == b'': return b'<p>No content to show</p>' try: tree = etree.fromstring(data) except Exception: self.warning( 'UPnPPublisher.render: error on parsing soap result, probably' ' has encoding declaration, trying with another method...') tree = parse_with_lxml(data, encoding='utf-8') body = tree.find('{http://schemas.xmlsoap.org/soap/envelope/}Body') method = body.getchildren()[0] methodName = method.tag ns = None if methodName.startswith('{') and methodName.rfind('}') > 1: ns, methodName = methodName[1:].split('}') args = [] kwargs = {} for child in method.getchildren(): kwargs[child.tag] = self.decode_result(child) args.append(kwargs[child.tag]) # p, header, body, attrs = SOAPpy.parseSOAPRPC(data, 1, 1, 1) # methodName, args, kwargs, ns = p._name, p._aslist, p._asdict, p._ns try: headers[b'content-type'].index(b'text/xml') except (KeyError, ValueError): self._gotError( failure.Failure(errorCode(415)), request, methodName, ns) return server.NOT_DONE_YET self.debug(f'headers: {headers}') l_function, use_keywords = self.lookupFunction(methodName) # print('function', function, 'keywords', useKeywords, # 'args', args, 'kwargs', kwargs) if not l_function: self._methodNotFound(request, methodName) return server.NOT_DONE_YET else: keywords = {'soap_methodName': methodName} if (b'user-agent' in headers and headers[b'user-agent'].find(b'Xbox/') == 0): keywords['X_UPnPClient'] = 'XBox' # if headers.has_key(b'user-agent') and \ # headers[b'user-agent'].startswith( # b'''Mozilla/4.0 (compatible; UPnP/1.0; Windows'''): # keywords['X_UPnPClient'] = 'XBox' if (b'x-av-client-info' in headers and headers[b'x-av-client-info'].find(b'"PLAYSTATION3') > 0): keywords['X_UPnPClient'] = 'PLAYSTATION3' if (b'user-agent' in headers and headers[b'user-agent'].find( b'Philips-Software-WebClient/4.32') == 0): keywords['X_UPnPClient'] = 'Philips-TV' for k, v in list(kwargs.items()): keywords[str(k)] = v self.info(f'call {methodName} {keywords}') if hasattr(l_function, 'useKeywords'): d = defer.maybeDeferred(l_function, **keywords) else: d = defer.maybeDeferred(l_function, *args, **keywords) d.addCallback(self._gotResult, request, methodName, ns) d.addErrback(self._gotError, request, methodName, ns) return server.NOT_DONE_YET
[docs] def decode_result(self, element): type = element.get('{http://www.w3.org/1999/XMLSchema-instance}type') if type is not None: try: prefix, local = type.split(':') if prefix == 'xsd': type = local except ValueError: pass if type == 'integer' or type == 'int': return int(element.text) if type == 'float' or type == 'double': return float(element.text) if type == 'boolean': return element.text == 'true' return element.text or ''