Source code for coherence.upnp.services.servers.content_directory_server

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2005, Tim Potter <tpot@samba.org>
# Copyright 2006 John-Mark Gurney <gurney_j@resnet.uoregon.edu>
# Copyright 2006, Frank Scholz <coherence@beebits.net>

'''
Content Directory service
=========================
'''

from twisted.internet import defer
from twisted.python import failure
from twisted.web import resource

from coherence.upnp.core import service
from coherence.upnp.core.DIDLLite import DIDLElement
from coherence.upnp.core.soap_service import UPnPPublisher
from coherence.upnp.core.soap_service import errorCode
from coherence.upnp.core.utils import to_string


[docs]class ContentDirectoryControl(service.ServiceControl, UPnPPublisher): def __init__(self, server): service.ServiceControl.__init__(self) UPnPPublisher.__init__(self) self.service = server self.variables = server.get_variables() self.actions = server.get_actions()
[docs]class ContentDirectoryServer(service.ServiceServer, resource.Resource): logCategory = 'content_directory_server' def __init__(self, device, backend=None, transcoding=False): self.device = device self.transcoding = transcoding if backend is None: backend = self.device.backend resource.Resource.__init__(self) service.ServiceServer.__init__(self, 'ContentDirectory', self.device.version, backend) self.control = ContentDirectoryControl(self) self.putChild(b'scpd.xml', service.scpdXML(self, self.control)) self.putChild(b'control', self.control) self.set_variable(0, 'SystemUpdateID', 0) self.set_variable(0, 'ContainerUpdateIDs', '')
[docs] def listchilds(self, uri): uri = to_string(uri) cl = '' for c in self.children: c = to_string(c) cl += f'<li><a href={uri}/{c}>{c}</a></li>' return cl
[docs] def render(self, request): html = f'''\ <html> <head> <title>Cohen3 (ContentDirectoryServer)</title> <link rel="stylesheet" type="text/css" href="/styles/main.css" /> </head> <h5> <img class="logo-icon" src="/server-images/coherence-icon.svg"> </img>Root of the ContentDirectory</h5> <div class="list"><ul>{self.listchilds(request.uri)}</ul></div> </html>''' return html.encode('ascii')
[docs] def upnp_Browse(self, *args, **kwargs): try: ObjectID = kwargs['ObjectID'] except KeyError: self.debug( 'hmm, a Browse action and no ObjectID argument? ' 'An XBox maybe?') try: ObjectID = kwargs['ContainerID'] except KeyError: ObjectID = 0 BrowseFlag = kwargs['BrowseFlag'] Filter = kwargs['Filter'] StartingIndex = int(kwargs['StartingIndex']) RequestedCount = int(kwargs['RequestedCount']) SortCriteria = kwargs['SortCriteria'] parent_container = None requested_id = None item = None total = 0 items = [] if BrowseFlag == 'BrowseDirectChildren': parent_container = str(ObjectID) else: requested_id = str(ObjectID) self.info(f'upnp_Browse request {ObjectID} {BrowseFlag} ' f'{StartingIndex} {RequestedCount}') # self.debug(f'\t- kwargs: {kwargs}') didl = DIDLElement(upnp_client=kwargs.get('X_UPnPClient', ''), requested_id=requested_id, parent_container=parent_container, transcoding=self.transcoding) def got_error(r): return r def process_result(result, total=None, found_item=None): if result is None: result = [] if BrowseFlag == 'BrowseDirectChildren': cl = [] def process_items(result, tm): if result is None: result = [] for i in result: if i[0]: didl.addItem(i[1]) return build_response(tm) for i in result: d = defer.maybeDeferred(i.get_item) cl.append(d) if found_item is not None: def got_child_count(count): dl = defer.DeferredList(cl) dl.addCallback(process_items, count) return dl d = defer.maybeDeferred(found_item.get_child_count) d.addCallback(got_child_count) return d elif total is None: total = item.get_child_count() dl = defer.DeferredList(cl) dl.addCallback(process_items, total) return dl else: didl.addItem(result) total = 1 return build_response(total) def build_response(tm): r = {'Result': didl.toString(), 'TotalMatches': tm, 'NumberReturned': didl.numItems()} if hasattr(item, 'update_id'): r['UpdateID'] = item.update_id elif hasattr(self.backend, 'update_id'): r['UpdateID'] = self.backend.update_id # FIXME else: r['UpdateID'] = 0 return r def proceed(result): if BrowseFlag == 'BrowseDirectChildren': d = defer.maybeDeferred(result.get_children, StartingIndex, StartingIndex + RequestedCount) else: d = defer.maybeDeferred(result.get_item) d.addCallback(process_result, found_item=result) d.addErrback(got_error) return d root_id = ObjectID wmc_mapping = getattr(self.backend, 'wmc_mapping', None) if kwargs.get('X_UPnPClient', '') == 'XBox' and \ wmc_mapping is not None and ObjectID in wmc_mapping: ''' fake a Windows Media Connect Server ''' root_id = wmc_mapping[ObjectID] if callable(root_id): item = root_id() if item is not None: if isinstance(item, list): total = len(item) if int(RequestedCount) == 0: items = item[StartingIndex:] else: items = item[StartingIndex: StartingIndex + RequestedCount] return process_result(items, total=total) else: if isinstance(item, defer.Deferred): item.addCallback(proceed) return item else: return proceed(item) item = self.backend.get_by_id(root_id) if item is None: return process_result([], total=0) if isinstance(item, defer.Deferred): item.addCallback(proceed) return item else: return proceed(item) item = self.backend.get_by_id(root_id) if item is None: return failure.Failure(errorCode(701)) if isinstance(item, defer.Deferred): item.addCallback(proceed) return item else: return proceed(item)