# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# a backend
# Copyright 2007, Frank Scholz <coherence@beebits.net>
# Copyright 2008, Jean-Michel Sizun <jm.sizun@free.fr>
import re
from coherence.backend import BackendItem, Container, AbstractBackendStore
from coherence.upnp.core import DIDLLite
from coherence.upnp.core.DIDLLite import classChooser, Resource
from coherence.upnp.core.utils import getPage
[docs]class PlaylistItem(BackendItem):
logCategory = 'playlist_store'
def __init__(self, title, stream_url, mimetype, **kwargs):
BackendItem.__init__(self)
self.name = title
self.stream_url = stream_url
self.mimetype = mimetype
self.url = stream_url
self.item = kwargs.get('item', None)
self.parent = kwargs.get('parent', None)
self.update_id = kwargs.get('update_id', 0)
[docs] def get_id(self):
return self.storage_id
[docs] def get_item(self):
if self.item is None:
upnp_id = self.get_id()
upnp_parent_id = self.parent.get_id()
if self.mimetype.startswith('video/'):
item = DIDLLite.VideoItem(upnp_id, upnp_parent_id, self.name)
else:
item = DIDLLite.AudioItem(upnp_id, upnp_parent_id, self.name)
# what to do with MMS:// feeds?
protocol = 'http-get'
if self.stream_url.startswith('rtsp://'):
protocol = 'rtsp-rtp-udp'
res = Resource(self.stream_url,
f'{protocol}:*:{self.mimetype}:*')
res.size = None
item.res.append(res)
self.item = item
return self.item
[docs] def get_url(self):
return self.url
[docs]class PlaylistStore(AbstractBackendStore):
'''
.. versionchanged:: 0.9.0
Migrated from louie/dispatcher to EventDispatcher
'''
logCategory = 'playlist_store'
implements = ['MediaServer']
wmc_mapping = {'16': 1000}
description = (
'Playlist',
'exposes the list of video/audio streams from a m3u playlist (e.g. '
'web TV listings published by french ISPs such as Free, SFR...).',
None)
options = [
{'option': 'name', 'text': 'Server Name:', 'type': 'string',
'default': 'my media',
'help': 'the name under this MediaServer '
'shall show up with on other UPnP clients'},
{'option': 'version', 'text': 'UPnP Version:', 'type': 'int',
'default': 2, 'enum': (2, 1),
'help': 'the highest UPnP version this MediaServer shall support',
'level': 'advance'},
{'option': 'uuid', 'text': 'UUID Identifier:', 'type': 'string',
'help': 'the unique (UPnP) identifier for this MediaServer, '
'usually automatically set',
'level': 'advance'},
{'option': 'playlist_url', 'text': 'Playlist file URL:',
'type': 'string', 'help': 'URL to the playlist file (M3U).'},
]
playlist_url = None
def __init__(self, server, **kwargs):
AbstractBackendStore.__init__(self, server, **kwargs)
self.playlist_url = self.config.get(
'playlist_url',
'https://mafreebox.freebox.fr/freeboxtv/playlist.m3u'
)
self.name = self.config.get('name', 'playlist')
self.init_completed = True
def __repr__(self):
return self.__class__.__name__
[docs] def append(self, obj, parent):
if isinstance(obj, str):
mimetype = 'directory'
else:
mimetype = obj['mimetype']
UPnPClass = classChooser(mimetype)
id = self.getnextID()
update = False
if hasattr(self, 'update_id'):
update = True
item = PlaylistItem(
id, obj, mimetype,
parent=parent,
storageid=parent,
upnpclass=UPnPClass,
update=update)
self.store[id] = item
self.store[id].store = self
if hasattr(self, 'update_id'):
self.update_id += 1
if self.server:
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
if parent:
value = (parent.get_id(), parent.get_update_id())
if self.server:
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
if mimetype == 'directory':
return self.store[id]
return None
[docs] def upnp_init(self):
self.current_connection_id = None
if self.server:
self.server.connection_manager_server.set_variable(
0,
'SourceProtocolInfo',
['rtsp-rtp-udp:*:video/mpeg:*',
'http-get:*:video/mpeg:*',
'rtsp-rtp-udp:*:audio/mpeg:*',
'http-get:*:audio/mpeg:*'],
default=True)
rootItem = Container(None, self.name)
self.set_root_item(rootItem)
return self.retrievePlaylistItems(self.playlist_url, rootItem)
[docs] def retrievePlaylistItems(self, url, parent_item):
def gotPlaylist(playlist):
self.info('got playlist')
items = []
if playlist:
content, header = playlist
if isinstance(content, bytes):
content = content.decode('utf-8')
lines = content.splitlines().__iter__()
line = next(lines)
while line is not None:
self.debug(line)
if re.search('#EXTINF', line):
channel = re.match('#EXTINF:.*,(.*)', line).group(1)
mimetype = 'video/mpeg'
self.info('\t- channel found: [%r] => %r' % (
mimetype, channel))
line = next(lines)
while re.search('#EXTVLCOPT', line):
option = re.match('#EXTVLCOPT:(.*)', line).group(1)
if option == 'no-video':
mimetype = 'audio/mpeg'
line = next(lines)
url = line
item = PlaylistItem(channel, url, mimetype)
parent_item.add_child(item)
items.append(item)
try:
line = next(lines)
except StopIteration:
line = None
return items
def gotError(error):
self.warning(f'Unable to retrieve playlist: {url}')
print(f'Error: {error}')
return None
d = getPage(url)
d.addCallback(gotPlaylist)
d.addErrback(gotError)
return d