# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Backend to retrieve the video streams from Shoutcast TV
# Copyright 2007, Frank Scholz <coherence@beebits.net>
# Copyright 2008,2009 Jean-Michel Sizun <jmDOTsizunATfreeDOTfr>
from functools import cmp_to_key
from twisted.internet import reactor
from twisted.web import server
from coherence import log
from coherence.backend import BackendStore, BackendItem
from coherence.upnp.core import DIDLLite
from coherence.upnp.core import utils
ROOT_CONTAINER_ID = 0
SHOUTCAST_WS_URL = 'http://www.shoutcast.com/sbin/newtvlister.phtml?' \
'service=winamp2&no_compress=1'
SHOUTCAST_TUNEIN_URL = \
'http://www.shoutcast.com/sbin/tunein-tvstation.pls?id=%s'
VIDEO_MIMETYPE = 'video/x-nsv'
[docs]class ProxyStream(utils.ReverseProxyUriResource, log.LogAble):
logCategory = 'itv'
stream_url = None
def __init__(self, uri):
log.LogAble.__init__(self)
self.stream_url = None
utils.ReverseProxyUriResource.__init__(self, uri)
[docs] def requestFinished(self, result):
''' self.connection is set in utils.ReverseProxyResource.render '''
self.info('ProxyStream requestFinished')
if self.connection is not None:
self.connection.transport.loseConnection()
[docs] def render(self, request):
if self.stream_url is None:
def got_playlist(result):
if result is None:
self.warning(
'Error to retrieve playlist - nothing retrieved')
return self.requestFinished(result)
result = result[0].split('\n')
for line in result:
if line.startswith('File1='):
self.stream_url = line[6:].split(';')[0]
break
# print('stream URL:', self.stream_url)
if self.stream_url is None:
self.warning(
'Error to retrieve playlist - '
'inconsistent playlist file')
return self.requestFinished(result)
# self.resetUri(self.stream_url)
request.uri = self.stream_url
return self.render(request)
def got_error(error):
self.warning(error)
return None
playlist_url = self.uri
# print('playlist URL:', playlist_url)
d = utils.getPage(playlist_url, timeout=20)
d.addCallbacks(got_playlist, got_error)
return server.NOT_DONE_YET
if request.clientproto == 'HTTP/1.1':
self.connection = request.getHeader('connection')
if self.connection:
tokens = list(map(str.lower, self.connection.split(' ')))
if 'close' in tokens:
d = request.notifyFinish()
d.addBoth(self.requestFinished)
else:
d = request.notifyFinish()
d.addBoth(self.requestFinished)
return utils.ReverseProxyUriResource.render(self, request)
[docs]class Container(BackendItem):
def __init__(self, id, store, parent_id, title):
BackendItem.__init__(self)
self.url = store.urlbase + str(id)
self.parent_id = parent_id
self.id = id
self.name = title
self.mimetype = 'directory'
self.update_id = 0
self.children = []
self.store = store
self.item = DIDLLite.Container(self.id, self.parent_id, self.name)
self.item.childCount = 0
self.sorted = False
[docs] def add_child(self, child):
id = child.id
if isinstance(child.id, str):
_, id = child.id.split('.')
if self.children is None:
self.children = []
self.children.append(child)
self.item.childCount += 1
self.sorted = False
[docs] def get_children(self, start=0, end=0):
if not self.sorted:
def childs_sort(x, y):
r = cmp_to_key(x.name, y.name)
return r
self.children = sorted(
self.children.sort, key=childs_sort)
self.sorted = True
if end != 0:
return self.children[start:end]
return self.children[start:]
[docs] def get_child_count(self):
if self.children is None:
return 0
return len(self.children)
[docs] def get_path(self):
return self.url
[docs] def get_item(self):
return self.item
[docs] def get_name(self):
return self.name
[docs] def get_id(self):
return self.id
[docs]class ITVItem(BackendItem):
logCategory = 'itv'
def __init__(self, store, id, obj, parent):
BackendItem.__init__(self)
self.parent = parent
self.id = id
self.name = obj.get('name')
self.mimetype = obj.get('mimetype')
self.description = None
self.date = None
self.item = None
self.duration = None
self.store = store
self.url = self.store.urlbase + str(self.id)
self.stream_url = obj.get('url')
self.location = ProxyStream(self.stream_url)
[docs] def get_item(self):
if self.item is None:
self.item = DIDLLite.VideoItem(self.id, self.parent.id, self.name)
self.item.description = self.description
self.item.date = self.date
res = DIDLLite.Resource(
self.url, 'http-get:*:%s:*' % self.mimetype)
res.duration = self.duration
# res.size = 0 #None
self.item.res.append(res)
return self.item
[docs] def get_path(self):
return self.url
[docs]class ITVStore(BackendStore):
logCategory = 'itv'
implements = ['MediaServer']
description = (
'Shoutcast TV',
'cexposes the list of video streams from Shoutcast TV.', None)
options = [
{'option': 'name', 'text': 'Server Name:', 'type': 'string',
'default': 'my media',
'help': 'the name under this MediaServer shall '
'show up with on other UPnP clients'},
{'option': 'version', 'text': 'UPnP Version:', 'type': 'int',
'default': 2, 'enum': (2, 1),
'help': 'the highest UPnP version this MediaServer shall support',
'level': 'advance'},
{'option': 'uuid', 'text': 'UUID Identifier:', 'type': 'string',
'help': 'the unique (UPnP) identifier for this MediaServer, '
'usually automatically set',
'level': 'advance'},
{'option': 'genrelist', 'text': 'Server URL', 'type': 'string',
'default': SHOUTCAST_WS_URL}
]
def __init__(self, server, **kwargs):
BackendStore.__init__(self, server, **kwargs)
self.next_id = 1000
self.config = kwargs
self.name = kwargs.get('name', 'iTV')
self.update_id = 0
self.store = {}
self.wmc_mapping = {'4': 1000}
self.shoutcast_ws_url = self.config.get('genrelist', SHOUTCAST_WS_URL)
self.init_completed()
def __repr__(self):
return self.__class__.__name__
[docs] def storeItem(self, parent, item, id):
self.store[id] = item
parent.add_child(item)
[docs] def appendGenre(self, genre, parent):
id = self.getnextID()
item = Container(id, self, -1, genre)
self.storeItem(parent, item, id)
return item
[docs] def appendFeed(self, obj, parent):
id = self.getnextID()
item = ITVItem(self, id, obj, parent)
self.storeItem(parent, item, id)
return item
[docs] def len(self):
return len(self.store)
[docs] def get_by_id(self, id):
if isinstance(id, str):
id = id.split('@', 1)[0]
elif isinstance(id, bytes):
id = id.decode('utf-8').split('@', 1)[0]
try:
return self.store[int(id)]
except (ValueError, KeyError):
pass
return None
[docs] def getnextID(self):
ret = self.next_id
self.next_id += 1
return ret
[docs] def upnp_init(self):
self.current_connection_id = None
if self.server:
self.server.connection_manager_server.set_variable(
0, 'SourceProtocolInfo',
[f'http-get:*:{VIDEO_MIMETYPE}:*', ],
default=True)
rootItem = Container(ROOT_CONTAINER_ID, self, -1, self.name)
self.store[ROOT_CONTAINER_ID] = rootItem
self.retrieveList_attemptCount = 0
self.retrieveList(rootItem)
[docs] def retrieveList(self, parent):
self.info('Retrieving Shoutcast TV listing...')
def got_page(result):
if self.retrieveList_attemptCount == 0:
self.info('Connection to ShoutCast service '
'successful for TV listing')
else:
self.warning(f'Connection to ShoutCast service '
f'successful for TV listing after '
f'{self.retrieveList_attemptCount:d} attempts.')
result = result[0]
result = utils.parse_xml(result, encoding='utf-8')
genres = []
stations = {}
for stationResult in result.findall('station'):
mimetype = VIDEO_MIMETYPE
station_id = stationResult.get('id')
bitrate = stationResult.get('br')
rating = stationResult.get('rt')
name = stationResult.get('name').encode('utf-8')
genre = stationResult.get('genre')
url = SHOUTCAST_TUNEIN_URL % (station_id)
if genres.count(genre) == 0:
genres.append(genre)
sameStation = stations.get(name)
if sameStation is None or bitrate > sameStation['bitrate']:
station = {'name': name,
'station_id': station_id,
'mimetype': mimetype,
'id': station_id,
'url': url,
'bitrate': bitrate,
'rating': rating,
'genre': genre}
stations[name] = station
genreItems = {}
for genre in genres:
genreItem = self.appendGenre(genre, parent)
genreItems[genre] = genreItem
for station in list(stations.values()):
genre = station.get('genre')
parentItem = genreItems[genre]
self.appendFeed({'name': station.get('name'),
'mimetype': station['mimetype'],
'id': station.get('station_id'),
'url': station.get('url')},
parentItem)
def got_error(error):
self.warning(
'Connection to ShoutCast service failed. Will retry in 5s!')
self.debug(f'{error.getTraceback()}')
# will retry later
self.retrieveList_attemptCount += 1
reactor.callLater(5, self.retrieveList, parent=parent)
d = utils.getPage(self.shoutcast_ws_url)
d.addCallbacks(got_page, got_error)