Source code for coherence.upnp.core.DIDLLite

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2005, Tim Potter <tpot@samba.org>
# Copyright 2006, Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
DIDLLite
========

Different classes and methods used to represent atomic content objects for
DIDL (Digital Item Declaration Language)

.. note:: For more information about DIDL you can check the
          `wikipedia entry <https://en.wikipedia.org/wiki/Digital_Item>`_.

:class:`Resources`
------------------
A list of resources, always sorted after an append

:class:`Resource`
-----------------
An object representing a resource.

:class:`PlayContainerResource`
------------------------------
An object representing a DLNA play container resource.

:class:`Object`
---------------
The root class of the entire content directory class hierarchy.

:class:`~coherence.upnp.core.DIDLLite.Item`
-------------------------------------------
A class used to represent atomic (non-container) content objects.

:class:`ImageItem`
------------------
An item class which represents a image.

:class:`Photo`
--------------
An item class which represents a photo.

:class:`AudioItem`
------------------
An item class which represents a audio.

:class:`MusicTrack`
-------------------
An item class which represents a music track.

:class:`AudioBroadcast`
-----------------------
An item class which represents a audio broadcast.

:class:`AudioBook`
------------------
An item class which represents a audio book.

:class:`VideoItem`
------------------
An item class which represents a video.

:class:`Movie`
--------------
An item class which represents a movie.

:class:`VideoBroadcast`
-----------------------
An item class which represents a video broadcast.

:class:`MusicVideoClip`
-----------------------
An item class which represents a music video clip.

:class:`PlayListItem`
---------------------
An item class which represents a play list.

:class:`TextItem`
-----------------
An item class which represents a text.

:class:`~coherence.upnp.core.DIDLLite.Container`
------------------------------------------------
An object that can contain other objects.

:class:`Person`
---------------
An container class which represents a Person.

:class:`MusicArtist`
--------------------
An container class which represents a Music Artist.

:class:`PlaylistContainer`
--------------------------
An container class which represents a Play List.

:class:`Album`
--------------
An container class which represents a generic Album.

:class:`MusicAlbum`
-------------------
An container class which represents a Music Album.

:class:`PhotoAlbum`
-------------------
An container class which represents a Photo Album.

:class:`Genre`
--------------
An container class which represents a generic genre.

:class:`MusicGenre`
-------------------
An container class which represents a Music genre.

:class:`MovieGenre`
-------------------
An container class which represents a Movie genre.

:class:`StorageSystem`
----------------------
An container class which represents a Storage System.

:class:`StorageVolume`
----------------------
An container class which represents a Storage Volume.

:class:`StorageFolder`
----------------------
An container class which represents a Storage Folder.

:class:`DIDLElement`
--------------------
Our element for DIDL (Digital Item Declaration Language).
'''
# TODO: use more XPath expressions in fromElement() methods

import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from functools import cmp_to_key

from lxml import etree

from coherence import log
from coherence.upnp.core import utils
from . import xml_constants


[docs]def qname(tag, ns=None): if not ns: return tag return f'{{{ns}}}{tag}'
[docs]def is_audio(mimetype): '''Checks for type audio, expects a mimetype or an UPnP protocolInfo.''' test = mimetype.split(':') if len(test) == 4: mimetype = test[2] if mimetype == 'application/ogg': return True if mimetype.startswith('audio/'): return True return False
[docs]def is_video(mimetype): '''Checks for type video, expects a mimetype or an UPnP protocolInfo.''' test = mimetype.split(':') if len(test) == 4: mimetype = test[2] if mimetype.startswith('video/'): return True return False
[docs]class Resources(list): '''A list of resources, always sorted after an append.''' def __init__(self): super(Resources, self).__init__() self.sort(key=cmp_to_key(self.p_sort))
[docs] def append(self, value): list.append(self, value) self.sort(key=cmp_to_key(self.p_sort))
[docs] def p_sort(self, x, y): '''We want the following order: http-get is always at the beginning, rtsp-rtp-udp the second and anything else after that.''' if x.protocolInfo is None: return 1 if y.protocolInfo is None: return -1 x_protocol = x.protocolInfo.split(':')[0] y_protocol = y.protocolInfo.split(':')[0] x_protocol = x_protocol.lower() y_protocol = y_protocol.lower() if x_protocol == y_protocol: return 0 if x_protocol == 'http-get': return -1 if x_protocol == 'rtsp-rtp-udp' and y_protocol == 'http-get': return 1 if x_protocol == 'rtsp-rtp-udp' and y_protocol != 'http-get': return -1 return 1
[docs] def get_matching(self, local_protocol_infos, protocol_type=None): result = [] if not isinstance(local_protocol_infos, list): local_protocol_infos = [local_protocol_infos] for res in self: if res.importUri is not None: continue # print('res', res.protocolInfo, res.data) remote_protocol, remote_network, remote_content_format, _ = \ res.protocolInfo.split(':') # print('remote', remote_protocol, # remote_network,remote_content_format) if (protocol_type is not None and remote_protocol.lower() != protocol_type.lower()): continue for protocol_info in local_protocol_infos: local_protocol, local_network, local_content_format, _ = \ protocol_info.split(':') # print('local', local_protocol, # local_network,local_content_format) if (remote_protocol == local_protocol or remote_protocol == '*' or local_protocol == '*') and \ (remote_network == local_network or remote_network == '*' or local_network == '*') and \ (remote_content_format.startswith( local_content_format) or remote_content_format == '*' or local_content_format == '*'): result.append(res) return result
[docs]def classChooser(mimetype, sub=None): if mimetype == 'root': return Container if mimetype == 'item': return Item if mimetype == 'directory': if sub == 'music': return MusicAlbum return Container else: if mimetype.find('image/') == 0: return Photo if mimetype.find('audio/') == 0: if sub == 'music': # FIXME: this is stupid return MusicTrack return AudioItem if mimetype.find('video/') == 0: return VideoItem if mimetype == 'application/ogg': if sub == 'music': # FIXME: this is stupid return MusicTrack return AudioItem if mimetype == 'application/x-flac': if sub == 'music': # FIXME: this is stupid return MusicTrack return AudioItem return None
simple_dlna_tags = ['DLNA.ORG_OP=01', # operations parameter 'DLNA.ORG_PS=1', # play speed parameter 'DLNA.ORG_CI=0', # transcoded parameter 'DLNA.ORG_FLAGS=01100000000000000000000000000000']
[docs]def build_dlna_additional_info(content_format, does_playcontainer=False): additional_info = ['*'] if content_format == 'audio/mpeg': additional_info = ['DLNA.ORG_PN=MP3'] + simple_dlna_tags if content_format == 'audio/ms-wma': additional_info = ['DLNA.ORG_PN=WMABASE'] + simple_dlna_tags if content_format == 'image/jpeg': dlna_tags = simple_dlna_tags[:] dlna_tags[3] = 'DLNA.ORG_FLAGS=00900000000000000000000000000000' additional_info = ['DLNA.ORG_PN=JPEG_LRG'] + dlna_tags if content_format == 'image/png': dlna_tags = simple_dlna_tags[:] dlna_tags[3] = 'DLNA.ORG_FLAGS=00900000000000000000000000000000' additional_info = ['DLNA.ORG_PN=PNG_LRG'] + dlna_tags if content_format == 'video/mpeg': additional_info = ['DLNA.ORG_PN=MPEG_PS_PAL'] + simple_dlna_tags if content_format == 'video/mpegts': additional_info = ['DLNA.ORG_PN=MPEG_TS_PAL'] + simple_dlna_tags content_format = 'video/mpeg' if content_format in ['video/mp4', 'video/x-m4a']: additional_info = \ ['DLNA.ORG_PN=AVC_TS_BL_CIF15_AAC'] + simple_dlna_tags if content_format in ['video/x-msvideo', 'video/avi', 'video/divx']: # additional_info = ';'.join( # ['DLNA.ORG_PN=MPEG4_P2_MP4_SP_AAC']+simple_dlna_tags) additional_info = ['*'] if content_format == 'video/x-ms-wmv': additional_info = ['DLNA.ORG_PN=WMV_BASE'] + simple_dlna_tags if content_format == '*': additional_info = simple_dlna_tags if does_playcontainer: i = 0 for part in additional_info: if part.startswith('DLNA.ORG_FLAGS'): _, bits = part.split('=') bits = int(bits, 16) bits |= 0x10000000000000000000000000000000 additional_info[i] = f'DLNA.ORG_FLAGS={bits:.32x}' break i += 1 return ';'.join(additional_info)
[docs]class Resource(object): '''An object representing a resource.''' def __init__(self, data=None, protocol_info=None): if not isinstance(data, bytes): data = utils.to_bytes(data) self.data = data self.protocolInfo = protocol_info self.bitrate = None self.size = None self.duration = None self.nrAudioChannels = None self.resolution = None self.importUri = None if self.protocolInfo is not None: protocol, network, content_format, additional_info = \ self.protocolInfo.split(':') if additional_info == '*': self.protocolInfo = ':'.join([protocol, network, content_format, build_dlna_additional_info( content_format)]) elif additional_info == '#': self.protocolInfo = ':'.join([protocol, network, content_format, '*'])
[docs] def get_additional_info(self, upnp_client=''): protocol, network, content_format, additional_info = \ self.protocolInfo.split(':') if upnp_client in ('XBox', 'Philips-TV',): # we don't need the DLNA tags there, # and maybe they irritate these poor things anyway additional_info = '*' elif upnp_client in ('PLAYSTATION3',): if content_format.startswith('video/'): additional_info = '*' a_list = additional_info.split(';') for part in a_list: if part == 'DLNA.ORG_PS=1': a_list.remove(part) break additional_info = ';'.join(a_list) return additional_info
[docs] def toElement(self, **kwargs): root = etree.Element('res') if kwargs.get('upnp_client', '') in ('XBox',): protocol, network, content_format, additional_info = \ self.protocolInfo.split(':') if content_format in ['video/divx', 'video/x-msvideo']: content_format = 'video/avi' if content_format == 'audio/x-wav': content_format = 'audio/wav' additional_info = self.get_additional_info( upnp_client=kwargs.get('upnp_client', '')) root.attrib['protocolInfo'] = ':'.join( (protocol, network, content_format, additional_info)) else: protocol, network, content_format, additional_info = \ self.protocolInfo.split(':') if content_format == 'video/x-msvideo': content_format = 'video/divx' additional_info = self.get_additional_info( upnp_client=kwargs.get('upnp_client', '')) root.attrib['protocolInfo'] = ':'.join( (protocol, network, content_format, additional_info)) root.text = self.data if self.bitrate is not None: root.attrib['bitrate'] = str(self.bitrate) if self.size is not None: root.attrib['size'] = str(self.size) if self.duration is not None: root.attrib['duration'] = self.duration if self.nrAudioChannels is not None: root.attrib['nrAudioChannels'] = self.nrAudioChannels if self.resolution is not None: root.attrib['resolution'] = self.resolution if self.importUri is not None: root.attrib['importUri'] = self.importUri return root
[docs] def fromElement(self, elt): self.protocolInfo = elt.attrib['protocolInfo'] self.data = elt.text self.bitrate = elt.attrib.get('bitrate') self.size = elt.attrib.get('size') self.duration = elt.attrib.get('duration', None) self.resolution = elt.attrib.get('resolution', None) self.importUri = elt.attrib.get('importUri', None)
[docs] def toString(self, **kwargs): return etree.tostring(self.toElement(**kwargs), encoding='utf-8')
[docs] @classmethod def fromString(cls, _string): instance = cls() elt = etree.fromstring(_string) instance.fromElement(elt) return instance
[docs] def transcoded(self, format): protocol, network, content_format, additional_info = \ self.protocolInfo.split(':') dlna_tags = simple_dlna_tags[:] # dlna_tags[1] = 'DLNA.ORG_OP=00' dlna_tags[2] = 'DLNA.ORG_CI=1' if format == 'mp3': if content_format == 'audio/mpeg': return None content_format = 'audio/mpeg' dlna_pn = 'DLNA.ORG_PN=MP3' elif format == 'lpcm': dlna_pn = 'DLNA.ORG_PN=LPCM' content_format = 'audio/L16;rate=44100;channels=2' elif format == 'mpegts': if content_format == 'video/mpeg': return None # 'DLNA.ORG_PN=MPEG_TS_SD_EU' # FIXME - don't forget HD dlna_pn = 'DLNA.ORG_PN=MPEG_PS_PAL' content_format = 'video/mpeg' else: return None additional_info = ';'.join([dlna_pn] + dlna_tags) new_protocol_info = ':'.join( (protocol, network, content_format, additional_info)) new_res = Resource(self.data + f'/transcoded/{format}', new_protocol_info) new_res.size = None new_res.duration = self.duration new_res.resolution = self.resolution return new_res
[docs]class PlayContainerResource(Resource): '''An object representing a DLNA play container resource.''' def __init__(self, udn, sid='urn:upnp-org:serviceId:ContentDirectory', cid=None, fid=None, fii=0, sc='', md=0, protocol_info=None): Resource.__init__(self) if cid is None: raise AttributeError('missing Container Id') if fid is None: raise AttributeError('missing first Child Id') self.protocolInfo = protocol_info args = ['sid=' + urllib.parse.quote(sid), 'cid=' + urllib.parse.quote(str(cid)), 'fid=' + urllib.parse.quote(str(fid)), 'fii=' + urllib.parse.quote(str(fii)), 'sc=' + urllib.parse.quote(''), 'md=' + urllib.parse.quote(str(0))] self.data = 'dlna-playcontainer://' + \ urllib.parse.quote(str(udn)) \ + '?' + '&'.join(args) if self.protocolInfo is None: self.protocolInfo = 'http-get:*:*:*'
[docs]class Object(log.LogAble): '''The root class of the entire content directory class hierarchy.''' logCategory = 'didllite' elementName = None upnp_class = 'object' creator = None res = None writeStatus = None date = None albumArtURI = None artist = None genre = None genres = None album = None originalTrackNumber = None description = None longDescription = None refID = None server_uuid = None def __init__(self, id=None, parentID=None, title=None, restricted=False, creator=None): log.LogAble.__init__(self) self.id = id self.parentID = parentID self.title = title self.creator = creator self.restricted = restricted self.res = Resources()
[docs] def checkUpdate(self): return self
[docs] def toElement(self, **kwargs): root = etree.Element(self.elementName) root.attrib['id'] = str(self.id) root.attrib['parentID'] = str(self.parentID) etree.SubElement(root, qname('title', xml_constants.DC_NS)).text = \ self.title if kwargs.get('upnp_client', '') != 'XBox': if self.refID: root.attrib['refID'] = str(self.refID) if kwargs.get('requested_id', None): if kwargs.get('requested_id') == '0': t = root.find(qname('title', xml_constants.DC_NS)) t.text = 'root' # if kwargs.get('requested_id') != '0' and # kwargs.get('requested_id') != root.attrib['id']: if kwargs.get('requested_id') != root.attrib['id']: if kwargs.get('upnp_client', '') != 'XBox': root.attrib['refID'] = root.attrib['id'] r_id = kwargs.get('requested_id') root.attrib['id'] = r_id r_id = r_id.split('@', 1) try: root.attrib['parentID'] = r_id[1] except IndexError: pass if kwargs.get('upnp_client', '') != 'XBox': self.info(f'Changing ID from {root.attrib["refID"]} to ' f'{root.attrib["id"]}, with parentID ' f'{root.attrib["parentID"]}') else: self.info( f'Changing ID from {self.id} to {root.attrib["id"]}, ' f'with parentID {root.attrib["parentID"]}') elif kwargs.get('parent_container', None): if (kwargs.get('parent_container') != '0' and kwargs.get('parent_container') != root.attrib['parentID']): if kwargs.get('upnp_client', '') != 'XBox': root.attrib['refID'] = root.attrib['id'] root.attrib['id'] = '@'.join( (root.attrib['id'], kwargs.get('parent_container'))) root.attrib['parentID'] = kwargs.get('parent_container') if kwargs.get('upnp_client', '') != 'XBox': self.info( f'Changing ID from {root.attrib["refID"]} to ' f'{root.attrib["id"]}, with parentID from ' f'{self.parentID} to {root.attrib["parentID"]}') else: self.info( f'Changing ID from {self.id} to {root.attrib["id"]}, ' f'with parentID from {self.parentID} to ' f'{root.attrib["parentID"]}') etree.SubElement( root, qname('class', xml_constants.UPNP_NS)).text = self.upnp_class if kwargs.get('upnp_client', '') == 'XBox': u = root.find(qname('class', xml_constants.UPNP_NS)) if kwargs.get('parent_container', None) is not None and u.text.startswith( 'object.container'): if kwargs.get('parent_container') in ('14', '15', '16'): u.text = 'object.container.storageFolder' if self.upnp_class == 'object.container': u.text = 'object.container.storageFolder' if self.restricted: root.attrib['restricted'] = '1' else: root.attrib['restricted'] = '0' if self.creator is not None: etree.SubElement(root, qname( 'creator', xml_constants.DC_NS)).text = self.creator if self.writeStatus is not None: etree.SubElement(root, qname( 'writeStatus', xml_constants.UPNP_NS)).text = self.writeStatus if self.date is not None: if isinstance(self.date, datetime): etree.SubElement(root, qname( 'date', xml_constants.DC_NS)).text = self.date.isoformat() else: etree.SubElement(root, qname( 'date', xml_constants.DC_NS)).text = self.date else: etree.SubElement(root, qname( 'date', xml_constants.DC_NS)).text = utils.datefaker().isoformat() if self.albumArtURI is not None: e = etree.SubElement( root, qname('albumArtURI', xml_constants.UPNP_NS)) e.text = self.albumArtURI e.attrib[qname( 'profileID', xml_constants.DLNA_NS)] = 'JPEG_TN' if self.artist is not None: etree.SubElement( root, qname( 'artist', xml_constants.UPNP_NS)).text = self.artist if self.genre is not None: etree.SubElement( root, qname( 'genre', xml_constants.UPNP_NS)).text = self.genre if self.genres is not None: for genre in self.genres: etree.SubElement( root, qname( 'genre', xml_constants.UPNP_NS)).text = genre if self.originalTrackNumber is not None: etree.SubElement( root, qname('originalTrackNumber', xml_constants.UPNP_NS)).text = \ str(self.originalTrackNumber) if self.description is not None: etree.SubElement( root, qname('description', xml_constants.DC_NS)).text = \ self.description if self.longDescription is not None: etree.SubElement( root, qname('longDescription', xml_constants.UPNP_NS)).text = \ self.longDescription if self.server_uuid is not None: etree.SubElement( root, qname('server_uuid', xml_constants.UPNP_NS)).text = \ self.server_uuid return root
[docs] def toString(self, **kwargs): return etree.tostring(self.toElement(**kwargs), encoding='utf-8').decode('utf-8')
[docs] def fromElement(self, elt): # TODO: # * creator # * writeStatus self.elementName = elt.tag self.id = elt.attrib.get('id', None) self.parentID = elt.attrib.get('parentID', None) self.refID = elt.attrib.get('refID', None) if elt.attrib.get('restricted', None) in [ 1, 'true', 'True', '1', 'yes', 'Yes']: self.restricted = True else: self.restricted = False for child in elt.getchildren(): if child.tag.endswith('title'): self.title = child.text elif child.tag.endswith('albumArtURI'): self.albumArtURI = child.text elif child.tag.endswith('originalTrackNumber'): self.originalTrackNumber = int(child.text) elif child.tag.endswith('description'): self.description = child.text elif child.tag.endswith('longDescription'): self.longDescription = child.text elif child.tag.endswith('artist'): self.artist = child.text elif child.tag.endswith('genre'): if self.genre is not None: if self.genres is None: self.genres = [self.genre, ] self.genres.append(child.text) self.genre = child.text elif child.tag.endswith('album'): self.album = child.text elif child.tag.endswith('class'): self.upnp_class = child.text elif child.tag.endswith('server_uuid'): self.server_uuid = child.text elif child.tag.endswith('res'): res = Resource.fromString(etree.tostring(child)) self.res.append(res)
[docs] @classmethod def fromString(cls, data): instance = cls() elt = etree.fromstring(data) instance.fromElement(elt) return instance
[docs]class Item(Object): '''A class used to represent atomic (non-container) content objects.''' upnp_class = Object.upnp_class + '.item' elementName = 'item' refID = None director = None actors = [] language = None def __init__(self, *args, **kwargs): Object.__init__(self, *args, **kwargs)
[docs] def toElement(self, **kwargs): root = Object.toElement(self, **kwargs) if self.director is not None: etree.SubElement( root, qname('director', xml_constants.UPNP_NS)).text = \ self.director if self.refID is not None: etree.SubElement(root, 'refID').text = self.refID if len(self.actors) > 0: for actor in self.actors: etree.SubElement( root, qname('actor', xml_constants.DC_NS)).text = actor if self.language is not None: etree.SubElement( root, qname('language', xml_constants.DC_NS)).text = \ self.language if kwargs.get('transcoding', False): res = self.res.get_matching(['*:*:*:*'], protocol_type='http-get') if len(res) > 0 and is_audio(res[0].protocolInfo): old_res = res[0] if kwargs.get('upnp_client', '') == 'XBox': transcoded_res = old_res.transcoded('mp3') if transcoded_res is not None: root.append(transcoded_res.toElement(**kwargs)) else: root.append(old_res.toElement(**kwargs)) else: for res in self.res: root.append(res.toElement(**kwargs)) transcoded_res = old_res.transcoded('lpcm') if transcoded_res is not None: root.append(transcoded_res.toElement(**kwargs)) elif len(res) > 0 and is_video(res[0].protocolInfo): old_res = res[0] for res in self.res: root.append(res.toElement(**kwargs)) transcoded_res = old_res.transcoded('mpegts') if transcoded_res is not None: root.append(transcoded_res.toElement(**kwargs)) else: for res in self.res: root.append(res.toElement(**kwargs)) else: for res in self.res: root.append(res.toElement(**kwargs)) return root
[docs] def fromElement(self, elt): Object.fromElement(self, elt) for child in elt.getchildren(): if child.tag.endswith('refID'): self.refID = child.text elif child.tag.endswith('director'): self.director = child.text
[docs]class ImageItem(Item): upnp_class = Item.upnp_class + '.imageItem' rating = None storageMedium = None publisher = None rights = None
[docs] def toElement(self, **kwargs): root = Item.toElement(self, **kwargs) if self.rating is not None: etree.SubElement( root, qname('rating', xml_constants.UPNP_NS)).text = \ str(self.rating) if self.storageMedium is not None: etree.SubElement( root, qname('storageMedium', xml_constants.UPNP_NS)).text = \ self.storageMedium if self.publisher is not None: etree.SubElement( root, qname('publisher', xml_constants.DC_NS)).text = \ self.publisher if self.rights is not None: etree.SubElement( root, qname('rights', xml_constants.DC_NS)).text = self.rights return root
[docs]class Photo(ImageItem): upnp_class = ImageItem.upnp_class + '.photo' album = None
[docs] def toElement(self, **kwargs): root = ImageItem.toElement(self, **kwargs) if self.album is not None: etree.SubElement( root, qname('album', xml_constants.UPNP_NS)).text = self.album return root
[docs]class AudioItem(Item): '''A piece of content that when rendered generates some audio.''' upnp_class = Item.upnp_class + '.audioItem' publisher = None language = None relation = None rights = None valid_keys = ['genre', 'description', 'longDescription', 'publisher', 'language', 'relation', 'rights', 'albumArtURI']
[docs] def toElement(self, **kwargs): root = Item.toElement(self, **kwargs) if self.publisher is not None: etree.SubElement( root, qname('publisher', xml_constants.DC_NS)).text = \ self.publisher if self.language is not None: etree.SubElement( root, qname('language', xml_constants.DC_NS)).text = \ self.language if self.relation is not None: etree.SubElement( root, qname('relation', xml_constants.DC_NS)).text = \ self.relation if self.rights is not None: etree.SubElement( root, qname('rights', xml_constants.DC_NS)).text = self.rights return root
[docs] def fromElement(self, elt): Item.fromElement(self, elt) for child in elt.getchildren(): tag = child.tag val = child.text if tag in self.valid_keys: setattr(self, tag, val)
[docs]class MusicTrack(AudioItem): '''A discrete piece of audio that should be interpreted as music.''' upnp_class = AudioItem.upnp_class + '.musicTrack' album = None playlist = None storageMedium = None contributor = None
[docs] def toElement(self, **kwargs): root = AudioItem.toElement(self, **kwargs) if self.album is not None: etree.SubElement( root, qname('album', xml_constants.UPNP_NS)).text = self.album if self.playlist is not None: etree.SubElement( root, qname('playlist', xml_constants.UPNP_NS)).text = \ self.playlist if self.storageMedium is not None: etree.SubElement( root, qname('storageMedium', xml_constants.UPNP_NS)).text = \ self.storageMedium if self.contributor is not None: etree.SubElement( root, qname('contributor', xml_constants.DC_NS)).text = \ self.contributor return root
[docs]class AudioBroadcast(AudioItem): upnp_class = AudioItem.upnp_class + '.audioBroadcast'
[docs]class AudioBook(AudioItem): upnp_class = AudioItem.upnp_class + '.audioBook'
[docs]class VideoItem(Item): upnp_class = Item.upnp_class + '.videoItem' valid_attrs = dict(producer=xml_constants.UPNP_NS, rating=xml_constants.UPNP_NS, publisher=xml_constants.DC_NS, relation=xml_constants.DC_NS)
[docs] def toElement(self, **kwargs): root = Item.toElement(self, **kwargs) for attr_name, ns in self.valid_attrs.items(): value = getattr(self, attr_name, None) if value: self.debug(f'Setting value {{{ns}}}{attr_name}={value}') etree.SubElement(root, qname(attr_name, ns)).text = value return root
[docs] def fromElement(self, elt): Item.fromElement(self, elt) for child in elt.getchildren(): tag = child.tag val = child.text if tag in list(self.valid_attrs.keys()): setattr(self, tag, val)
[docs]class Movie(VideoItem): upnp_class = VideoItem.upnp_class + '.movie' def __init__(self, *args, **kwargs): VideoItem.__init__(self, *args, **kwargs) self.valid_attrs.update( dict( storageMedium=xml_constants.UPNP_NS, DVDRegionCode=xml_constants.UPNP_NS, channelName=xml_constants.UPNP_NS, scheduledStartTime=xml_constants.UPNP_NS, sccheduledEndTime=xml_constants.UPNP_NS))
[docs]class VideoBroadcast(VideoItem): upnp_class = VideoItem.upnp_class + '.videoBroadcast'
[docs]class MusicVideoClip(VideoItem): upnp_class = VideoItem.upnp_class + '.musicVideoClip'
[docs]class PlaylistItem(Item): upnp_class = Item.upnp_class + '.playlistItem'
[docs]class TextItem(Item): upnp_class = Item.upnp_class + '.textItem'
[docs]class Container(Object): '''An object that can contain other objects.''' upnp_class = Object.upnp_class + '.container' elementName = 'container' childCount = None createClass = None searchable = None def __init__(self, id=None, parent_id=None, title=None, restricted=False, creator=None): Object.__init__(self, id, parent_id, title, restricted, creator) self.searchClass = []
[docs] def toElement(self, **kwargs): root = Object.toElement(self, **kwargs) if self.childCount is not None: root.attrib['childCount'] = str(self.childCount) if self.createClass is not None: etree.SubElement( root, qname('createclass', xml_constants.UPNP_NS)).text = \ self.createClass if not isinstance(self.searchClass, (list, tuple)): self.searchClass = [self.searchClass] for i in self.searchClass: sc = etree.SubElement( root, qname('searchClass', xml_constants.UPNP_NS)) sc.attrib['includeDerived'] = '1' sc.text = i if self.searchable is not None: if self.searchable in (1, '1', True, 'true', 'True'): root.attrib['searchable'] = '1' else: root.attrib['searchable'] = '0' for res in self.res: root.append(res.toElement(**kwargs)) return root
[docs] def fromElement(self, elt): Object.fromElement(self, elt) v = elt.attrib.get('childCount', None) if v is not None: self.childCount = int(v) # self.searchable = int(elt.attrib.get('searchable','0')) self.searchable = \ elt.attrib.get('searchable', '0') in [1, 'True', 'true', '1'] self.searchClass = [] for child in elt.getchildren(): if child.tag.endswith('createclass'): self.createClass = child.text elif child.tag.endswith('searchClass'): self.searchClass.append(child.text)
[docs]class Person(Container): upnp_class = Container.upnp_class + '.person'
[docs]class MusicArtist(Person): upnp_class = Person.upnp_class + '.musicArtist'
[docs]class PlaylistContainer(Container): upnp_class = Container.upnp_class + '.playlistContainer'
[docs]class Album(Container): upnp_class = Container.upnp_class + '.album'
[docs]class MusicAlbum(Album): upnp_class = Album.upnp_class + '.musicAlbum'
[docs]class PhotoAlbum(Album): upnp_class = Album.upnp_class + '.photoAlbum'
[docs]class Genre(Container): upnp_class = Container.upnp_class + '.genre'
[docs]class MusicGenre(Genre): upnp_class = Genre.upnp_class + '.musicGenre'
[docs]class MovieGenre(Genre): upnp_class = Genre.upnp_class + '.movieGenre'
[docs]class StorageSystem(Container): upnp_class = Container.upnp_class + '.storageSystem'
[docs]class StorageVolume(Container): upnp_class = Container.upnp_class + '.storageVolume'
[docs]class StorageFolder(Container): upnp_class = Container.upnp_class + '.storageFolder'
[docs]class DIDLElement(log.LogAble): logCategory = 'didllite' def __init__(self, upnp_client='', parent_container=None, requested_id=None, transcoding=False): log.LogAble.__init__(self) self.element = etree.Element( 'DIDL-Lite', nsmap={None: xml_constants.DIDLLITE_NS, 'dc': xml_constants.DC_NS, 'upnp': xml_constants.UPNP_NS}) self._items = [] self.upnp_client = upnp_client self.parent_container = parent_container self.requested_id = requested_id self.transcoding = transcoding
[docs] def addContainer(self, id, parent_id, title, restricted=False): e = Container(id, parent_id, title, restricted, creator='') self.element.append(e.toElement())
[docs] def addItem(self, item): self.element.append(item.toElement( upnp_client=self.upnp_client, parent_container=self.parent_container, requested_id=self.requested_id, transcoding=self.transcoding)) self._items.append(item)
[docs] def rebuild(self): self.element.clear() for item in self._items: self.element.append(item.toElement( upnp_client=self.upnp_client, parent_container=self.parent_container, requested_id=self.requested_id, transcoding=self.transcoding))
[docs] def numItems(self): return len(self._items)
[docs] def getItems(self): return self._items
[docs] def toString(self): # sigh - having that optional preamble here # breaks some of the older ContentDirectoryClients return etree.tostring(self.element, encoding='utf-8', pretty_print=True).decode('utf-8')
[docs] def get_upnp_class(self, name): try: return upnp_classes[name]() except KeyError: self.warning(f'upnp_class {name} not found, trying fallback') parts = name.split('.') parts.pop() while len(parts) > 1: try: return upnp_classes['.'.join(parts)]() except KeyError: parts.pop() self.warning(f'WTF - no fallback for upnp_class {name} found ?!?') return None
[docs] @classmethod def fromString(cls, data): instance = cls() elt = etree.fromstring(data) for node in elt.getchildren(): upnp_class_name = node.findtext( '{urn:schemas-upnp-org:metadata-1-0/upnp/}class') upnp_class = instance.get_upnp_class(upnp_class_name.strip()) new_node = upnp_class.fromString(etree.tostring(node)) instance.addItem(new_node) return instance
upnp_classes = {'object': Object, 'object.item': Item, 'object.item.imageItem': ImageItem, 'object.item.imageItem.photo': Photo, 'object.item.audioItem': AudioItem, 'object.item.audioItem.musicTrack': MusicTrack, 'object.item.audioItem.audioBroadcast': AudioBroadcast, 'object.item.audioItem.audioBook': AudioBook, 'object.item.videoItem': VideoItem, 'object.item.videoItem.movie': Movie, 'object.item.videoItem.videoBroadcast': VideoBroadcast, 'object.item.videoItem.musicVideoClip': MusicVideoClip, 'object.item.playlistItem': PlaylistItem, 'object.item.textItem': TextItem, 'object.container': Container, 'object.container.person': Person, 'object.container.person.musicArtist': MusicArtist, 'object.container.playlistContainer': PlaylistContainer, 'object.container.album': Album, 'object.container.album.musicAlbum': MusicAlbum, 'object.container.album.photoAlbum': PhotoAlbum, 'object.container.genre': Genre, 'object.container.genre.musicGenre': MusicGenre, 'object.container.genre.movieGenre': MovieGenre, 'object.container.storageSystem': StorageSystem, 'object.container.storageVolume': StorageVolume, 'object.container.storageFolder': StorageFolder, } if __name__ == '__main__': res = Resources() res.append(Resource('1', 'file:*:*:*')) res.append(Resource('2', 'rtsp-rtp-udp:*:*:*')) res.append(Resource('3', None)) res.append(Resource('4', 'internal:*:*:*')) res.append(Resource('5', 'http-get:*:*:*')) res.append(Resource('6', 'something:*:*:*')) res.append(Resource('7', 'http-get:*:*:*')) for r in res: print(r.data, r.protocolInfo)