Source code for coherence.backends.bbc_storage

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

from functools import cmp_to_key

# Copyright 2008 Frank Scholz <coherence@beebits.net>
from lxml import etree
from twisted.internet import reactor

from coherence.backend import BackendItem
from coherence.backend import BackendStore
from coherence.upnp.core import DIDLLite
from coherence.upnp.core.utils import getPage

ROOT_CONTAINER_ID = 0
SERIES_CONTAINER_ID = 100


[docs]class BBCItem(BackendItem): def __init__(self, parent_id, id, title, url): BackendItem.__init__(self) self.parent_id = parent_id self.id = id self.location = url self.name = title self.duration = None self.size = None self.mimetype = 'audio/mpeg' self.description = None self.item = None
[docs] def get_item(self): if self.item is None: self.item = DIDLLite.AudioItem(self.id, self.parent_id, self.name) self.item.description = self.description res = DIDLLite.Resource(self.location, f'http-get:*:{self.mimetype}:*') res.duration = self.duration res.size = self.size self.item.res.append(res) return self.item
[docs]class Container(BackendItem): def __init__(self, id, store, parent_id, title): BackendItem.__init__(self) self.url = store.urlbase + str(id) self.parent_id = parent_id self.id = id self.name = title self.mimetype = 'directory' self.update_id = 0 self.children = [] self.item = DIDLLite.Container(self.id, self.parent_id, self.name) self.item.childCount = 0 self.sorted = False
[docs] def add_child(self, child): id = child.id if isinstance(child.id, str): _, id = child.id.split('.') self.children.append(child) self.item.childCount += 1 self.sorted = False
[docs] def get_children(self, start=0, end=0): if not self.sorted: def childs_sort(x, y): r = cmp_to_key(x.name, y.name) return r self.children = sorted( self.children.sort, key=childs_sort) self.sorted = True if end != 0: return self.children[start:end] return self.children[start:]
[docs] def get_child_count(self): return len(self.children)
[docs] def get_path(self): return self.url
[docs] def get_item(self): return self.item
[docs] def get_name(self): return self.name
[docs] def get_id(self): return self.id
[docs]class BBCStore(BackendStore): implements = ['MediaServer'] rss_url = 'http://open.bbc.co.uk/rad/uriplay/availablecontent' def __init__(self, server, *args, **kwargs): BackendStore.__init__(self, server, **kwargs) self.name = kwargs.get('name', 'BBC') self.refresh = int(kwargs.get('refresh', 1)) * (60 * 60) self.next_id = 1000 self.update_id = 0 self.last_updated = None self.store = {} d = self.update_data() d.addCallback(self.init_completed)
[docs] def get_next_id(self): self.next_id += 1 return self.next_id
[docs] def get_by_id(self, id): # print('looking for id %r' % id) if isinstance(id, str): id = id.split('@', 1)[0] elif isinstance(id, bytes): id = id.decode('utf-8').split('@', 1)[0] try: return self.store[int(id)] except (ValueError, KeyError): pass return None
[docs] def upnp_init(self): if self.server: self.server.connection_manager_server.set_variable( 0, 'SourceProtocolInfo', ['http-get:*:audio/mpeg:DLNA.ORG_PN=MP3;DLNA.ORG_OP=11' ';DLNA.ORG_FLAGS=01700000000000000000000000000000', 'http-get:*:audio/mpeg:*'])
[docs] def update_data(self): def fail(f): print('fail', f) return f dfr = getPage(self.rss_url) dfr.addCallback(etree.fromstring) dfr.addErrback(fail) dfr.addCallback(self.parse_data) dfr.addErrback(fail) dfr.addBoth(self.queue_update) return dfr
[docs] def parse_data(self, root): self.store = { ROOT_CONTAINER_ID: Container(ROOT_CONTAINER_ID, self, -1, self.name), SERIES_CONTAINER_ID: Container(SERIES_CONTAINER_ID, self, ROOT_CONTAINER_ID, 'Series') } self.store[ROOT_CONTAINER_ID].add_child( self.store[SERIES_CONTAINER_ID]) for brand in root.findall('./{http://purl.org/ontology/po/}Brand'): first = None for episode in brand.findall( '*/{http://purl.org/ontology/po/}Episode'): for version in episode.findall( '*/{http://purl.org/ontology/po/}Version'): seconds = int(version.find( './{http://uriplay.org/elements/}' 'publishedDuration').text) hours = seconds / 3600 seconds = seconds - hours * 3600 minutes = seconds / 60 seconds = seconds - minutes * 60 duration = f'{hours:d}:{minutes:02d}:{seconds:02d}' for manifestation in version.findall( './{http://uriplay.org/elements/}manifestedAs'): encoding = manifestation.find( '*/{http://uriplay.org/elements/}' 'dataContainerFormat') size = manifestation.find( '*/{http://uriplay.org/elements/}dataSize') for location in manifestation.findall( '*/*/{http://uriplay.org/elements/}Location'): uri = location.find( './{http://uriplay.org/elements/}uri') uri = uri.attrib['{http://www.w3.org/1999/02/22' '-rdf-syntax-ns#}resource'] if first is None: id = self.get_next_id() self.store[id] = \ Container( id, self, SERIES_CONTAINER_ID, brand.find( './{http://purl.org/dc/' 'elements/1.1/}title').text) self.store[SERIES_CONTAINER_ID].add_child( self.store[id]) first = self.store[id] item = BBCItem(first.id, self.get_next_id(), episode.find( './{http://purl.org/dc/' 'elements/1.1/}title').text, uri) first.add_child(item) item.mimetype = encoding.text item.duration = duration item.size = int(size.text) * 1024 item.description = episode.find( './{http://purl.org/dc/' 'elements/1.1/}description').text self.update_id += 1
# if self.server: # self.server.content_directory_server.set_variable( # 0, 'SystemUpdateID', self.update_id) # value = (ROOT_CONTAINER_ID, self.container.update_id) # self.server.content_directory_server.set_variable( # 0, 'ContainerUpdateIDs', value)
[docs] def queue_update(self, error_or_failure): reactor.callLater(self.refresh, self.update_data)