Source code for coherence.backends.elisa_storage
# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Copyright 2006, Frank Scholz <coherence@beebits.net>
from twisted.internet import reactor
from twisted.python import failure
from twisted.spread import pb
from coherence.backend import Backend
from coherence.upnp.core.DIDLLite import classChooser, Container, Resource, \
DIDLElement
from coherence.upnp.core.soap_service import errorCode
[docs]class ElisaMediaStore(Backend):
'''
This is a backend to the Elisa Media DB
Elisa needs to expose two methods
.. code-block:: python
get_root_id(media_type)
if media_type == '*'
this returns the root id of the media collection
if media_type == 'audio'
this returns the root id of the audio collection
get_item_by_id(id)
this returns a dict with the following keys:
id = id in the media db
parent_id = parent_id in the media db
name = title, album name or basename
mimetype = 'directory' or real mimetype
children = list of objects for which this item is the parent
location = filesystem path if item is a file
cover = url by which the cover image can be retrieved (OPTIONAL)
size = in bytes (OPTIONAL)
.. versionchanged:: 0.9.0
* Migrated from louie/dispatcher to EventDispatcher
* Introduced :class:`~coherence.backend.Backend`'s inheritance
'''
implements = ['MediaServer']
def __init__(self, server, **kwargs):
Backend.__init__(self, server, **kwargs)
self.name = kwargs.get('name', 'Elisa')
self.host = kwargs.get('host', '127.0.0.1')
self.urlbase = kwargs.get('urlbase', '')
ignore_patterns = kwargs.get('ignore_patterns', [])
if self.urlbase[len(self.urlbase) - 1] != '/':
self.urlbase += '/'
self.server = server
self.update_id = 0
self.root_id = 0
self.store = {}
self.get_root_id()
def __repr__(self):
return 'Elisa storage'
[docs] def get_store(self):
factory = pb.PBClientFactory()
factory.noisy = False
reactor.connectTCP(self.host, 8789, factory)
return factory.getRootObject()
[docs] def get_by_id(self, id):
try:
return self.store[int(id)]
except (ValueError, KeyError):
return None
[docs] def get_root_id(self, media_type='audio'):
''' ask Elisa to tell us the id of the top item
representing the media_type == 'something' collection '''
store = self.get_store()
dfr = store.addCallback(lambda object:
object.callRemote('get_cache_manager'))
dfr.addCallback(lambda cache_mgr:
cache_mgr.callRemote('get_media_root_id', media_type))
dfr.addCallback(self.set_root_id)
[docs] def upnp_init(self):
if self.server:
self.server.connection_manager_server.set_variable(
0, 'SourceProtocolInfo',
[f'internal:{self.host}:*:*', 'http-get:*:audio/mpeg:*'])
[docs] def upnp_Browse(self, *args, **kwargs):
ObjectID = kwargs['ObjectID']
BrowseFlag = kwargs['BrowseFlag']
Filter = kwargs['Filter']
StartingIndex = int(kwargs['StartingIndex'])
RequestedCount = int(kwargs['RequestedCount'])
SortCriteria = kwargs['SortCriteria']
def build_upnp_item(elisa_item):
UPnPClass = classChooser(elisa_item['mimetype'])
upnp_item = None
if UPnPClass:
upnp_item = UPnPClass(elisa_item['id'],
elisa_item['parent_id'],
elisa_item['name'])
if isinstance(upnp_item, Container):
upnp_item.childCount = len(elisa_item.get('children', []))
if len(Filter) > 0:
upnp_item.searchable = True
upnp_item.searchClass = ('object',)
else:
internal_url = elisa_item['location'].get('internal')
external_url = elisa_item['location'].get('external')
try:
size = elisa_item['size']
except IndexError:
size = None
try:
cover = elisa_item['cover']
if cover != '':
upnp_item.albumArtURI = cover
except IndexError:
pass
res = Resource(internal_url,
f'internal:{self.host}:*:*')
res.size = size
upnp_item.res.append(res)
res = Resource(external_url,
f'http-get:*:{elisa_item["mimetype"]}:*')
res.size = size
upnp_item.res.append(res)
return upnp_item
def got_result(elisa_item):
didl = DIDLElement()
children = elisa_item.get('children', [])
if BrowseFlag == 'BrowseDirectChildren':
if RequestedCount == 0:
childs = children[StartingIndex:]
else:
childs = \
children[StartingIndex:
StartingIndex + RequestedCount]
for child in childs:
if child is not None:
item = build_upnp_item(child)
if item:
didl.addItem(item)
total = len(children)
elif elisa_item:
item = build_upnp_item(elisa_item)
if item:
didl.addItem(item)
total = 1
r = {'Result': didl.toString(), 'TotalMatches': total,
'NumberReturned': didl.numItems()}
if hasattr(elisa_item, 'update_id'):
r['UpdateID'] = item.update_id # pylint: disable=no-member
else:
r['UpdateID'] = self.update_id
return r
def errback(r):
return failure.Failure(errorCode(701))
id = ObjectID
if id == 0:
id = self.root_id
store = self.get_store()
dfr = store.addCallback(lambda object:
object.callRemote('get_cache_manager'))
dfr.addErrback(errback)
dfr.addCallback(lambda cache_mgr:
cache_mgr.callRemote('get_media_node_with_id', id))
dfr.addCallback(got_result)
return dfr
if __name__ == '__main__':
def main():
def got_result(result):
print(result)
f = ElisaMediaStore(None,
name='My Elisa Store',
host='localhost',
urlbase='http://localhost/'
)
dfr = f.upnp_Browse(BrowseFlag='BrowseDirectChildren',
RequestedCount=0,
StartingIndex=0,
ObjectID=23,
SortCriteria='*',
Filter='')
dfr.addCallback(got_result)
dfr.addCallback(lambda _: reactor.stop())
reactor.callLater(0.1, main)
reactor.run()