# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Copyright 2007, Frank Scholz <coherence@beebits.net>
# Copyright 2007, Moritz Struebe <morty@gmx.net>
# TODO: remove the below "terminal output" sample when this backend get fixed
# INFO lastFM_user Dez 14 17:35:27 Got new sessionid:
# '1488f34a1cbed7c9f4232f8fd563c3bd'
# (coherence/backends/lastfm_storage.py:60)
# DEBUG lastFM_stream Dez 14 17:35:53 render
# <GET /da525474-5357-4d1b-a894-76b1293224c9/1005 HTTP/1.1>
# (coherence/backends/lastfm_storage.py:148)
# command GET
# rest /user/e0362c757ef49169e9a0f0970cc2d367.mp3
# headers {'icy-metadata': '1', 'host': 'kingpin5.last.fm',
# 'te': 'trailers', 'connection': 'TE',
# 'user-agent': 'gnome-vfs/2.12.0.19 neon/0.24.7'}
# ProxyClient handleStatus HTTP/1.1 200 OK
# ProxyClient handleHeader Content-Type audio/mpeg
# ProxyClient handleHeader Content-Length 4050441
# ProxyClient handleHeader Cache-Control no-cache, must-revalidate
# DEBUG lastFM_stream Dez 14 17:35:53 render
# <GET /da525474-5357-4d1b-a894-76b1293224c9/1005 HTTP/1.1>
# (coherence/backends/lastfm_storage.py:148)
# command GET
# rest /user/e0362c757ef49169e9a0f0970cc2d367.mp3
# headers {'icy-metadata': '1', 'host': 'kingpin5.last.fm',
# 'te': 'trailers', 'connection': 'TE',
# 'user-agent': 'gnome-vfs/2.12.0.19 neon/0.24.7'}
# ProxyClient handleStatus HTTP/1.1 403 Invalid ticket
from hashlib import md5
from urllib.parse import urlsplit
from coherence import log
from coherence.upnp.core import utils
from coherence.backend import Backend
from coherence.upnp.core.DIDLLite import classChooser, Container, Resource
[docs]class LastFMUser(log.LogAble):
logCategory = 'lastFM_user'
user = None
passwd = None
host = 'ws.audioscrobbler.com'
basepath = '/radio'
sessionid = None
parent = None
getting_tracks = False
tracks = []
def __init__(self, user, passwd):
log.LogAble.__init__(self)
if user is None:
self.warn('No User', )
if passwd is None:
self.warn('No Passwd', )
self.user = user
self.passwd = passwd
[docs] def login(self):
if self.sessionid is not None:
self.warning('Session seems to be valid', )
return
def got_page(result):
lines = result[0].split('\n')
for line in lines:
tuple = line.rstrip().split('=', 1)
if len(tuple) == 2:
if tuple[0] == 'session':
self.sessionid = tuple[1]
self.info(f'Got new sessionid: {self.sessionid}')
if tuple[0] == 'base_url':
if self.host != tuple[1]:
self.host = tuple[1]
self.info(f'Got new host: {self.host}')
if tuple[0] == 'base_path':
if self.basepath != tuple[1]:
self.basepath = tuple[1]
self.info(f'Got new path: {self.basepath}')
self.get_tracks()
def got_error(error):
self.warning(f'Login to LastFM Failed! {error}')
self.debug(f'{error.getTraceback()}')
# This function might be GPL!
# Found this code in some other Projects, too.
def hexify(s):
result = ''
for c in s:
result = result + ('%02x' % ord(c))
return result
password = hexify(md5(self.passwd).digest())
req = \
self.basepath + '/handshake.php/?version=1&platform=win&username='\
+ self.user + '&passwordmd5=' + password\
+ '&language=en&player=coherence'
utils.getPage('http://' + self.host + req).addCallbacks(
got_page, got_error, None, None, None, None)
[docs] def get_tracks(self):
if self.getting_tracks:
return
def got_page(result):
result = utils.parse_xml(result, encoding='utf-8')
self.getting_tracks = False
print(self.getting_tracks)
print('got Tracks')
for track in result.findall('trackList/track'):
data = {}
def get_data(name):
# print track.find(name).text.encode('utf-8')
return track.find(name).text.encode('utf-8')
# Fixme: This section needs some work
print('adding Track')
data['mimetype'] = 'audio/mpeg'
data['name'] = get_data('creator') + ' - ' + get_data('title')
data['title'] = get_data('title')
data['artist'] = get_data('creator')
data['creator'] = get_data('creator')
data['album'] = get_data('album')
data['duration'] = get_data('duration')
# FIXME: Image is the wrong tag.
data['image'] = get_data('image')
data['url'] = track.find('location').text.encode('utf-8')
item = self.parent.store.append(data, self.parent)
self.tracks.append(item)
def got_error(error):
self.warning(f'Problem getting Tracks! {error}')
self.debug(f'{error.getTraceback()}')
self.getting_tracks = False
self.getting_tracks = True
req = \
self.basepath + '/xspf.php?sk=' + self.sessionid \
+ '&discovery=0&desktop=1.3.1.1'
utils.getPage('http://' + self.host + req).addCallbacks(
got_page, got_error, None, None, None, None)
[docs] def update(self, item):
if 0 < self.tracks.count(item):
while True:
track = self.tracks[0]
if track == item:
break
self.tracks.remove(track)
# Do not remove so the tracks to answer the browse
# request correctly.
# track.store.remove(track)
# del track
# if len(self.tracks) < 5:
self.get_tracks()
[docs]class LFMProxyStream(utils.ReverseProxyResource, log.LogAble):
logCategory = 'lastFM_stream'
def __init__(self, uri, parent):
log.LogAble.__init__(self)
self.uri = uri
self.parent = parent
_, host_port, path, _, _ = urlsplit(uri)
if host_port.find(':') != -1:
host, port = tuple(host_port.split(':'))
port = int(port)
else:
host = host_port
port = 80
if path == '':
path = '/'
# print(f'ProxyStream init {host} {port} {path}')
utils.ReverseProxyResource.__init__(self, host, port, path)
[docs] def render(self, request):
self.debug(f'render {request}')
self.parent.store.LFM.update(self.parent)
self.parent.played = True
return utils.ReverseProxyResource.render(self, request)
[docs]class LastFMItem(log.LogAble):
logCategory = 'LastFM_item'
def __init__(self, id, obj, parent, mimetype, urlbase, UPnPClass,
update=False):
log.LogAble.__init__(self)
self.id = id
self.name = obj.get('name')
self.title = obj.get('title')
self.artist = obj.get('artist')
self.creator = obj.get('creator')
self.album = obj.get('album')
self.duration = obj.get('duration')
self.mimetype = mimetype
self.parent = parent
if parent:
parent.add_child(self, update=update)
if parent is None:
parent_id = -1
else:
parent_id = parent.get_id()
self.item = UPnPClass(id, parent_id, self.title, False, self.creator)
if isinstance(self.item, Container):
self.item.childCount = 0
self.child_count = 0
self.children = []
if len(urlbase) and urlbase[-1] != '/':
urlbase += '/'
if self.mimetype == 'directory':
self.url = urlbase + str(self.id)
else:
self.url = urlbase + str(self.id)
self.location = LFMProxyStream(obj.get('url'), self)
# self.url = obj.get('url')
if self.mimetype == 'directory':
self.update_id = 0
else:
protocols = ('DLNA.ORG_PN=MP3',
'DLNA.ORG_CI=0',
'DLNA.ORG_OP=01',
'DLNA.ORG_FLAGS=01700000000000000000000000000000')
res = Resource(
self.url,
f'http-get:*:{obj.get("mimetype")}:{";".join(protocols)}')
res.size = -1 # None
self.item.res.append(res)
[docs] def remove(self):
if self.parent:
self.parent.remove_child(self)
del self.item
[docs] def add_child(self, child, update=False):
if self.children is None:
self.children = []
self.children.append(child)
self.child_count += 1
if isinstance(self.item, Container):
self.item.childCount += 1
if update:
self.update_id += 1
[docs] def remove_child(self, child):
self.info(f'remove_from {self.id:d} ({self.get_name()}) '
f'child {child.id:d} ({child.get_name()})')
if child in self.children:
self.child_count -= 1
if isinstance(self.item, Container):
self.item.childCount -= 1
self.children.remove(child)
self.update_id += 1
[docs] def get_children(self, start=0, request_count=0):
if request_count == 0:
return self.children[start:]
else:
return self.children[start:request_count]
[docs] def get_child_count(self):
if self.mimetype == 'directory':
return 100 # Some Testing, with strange Numbers: 0/lots
return self.child_count
[docs] def get_id(self):
return self.id
[docs] def get_update_id(self):
if hasattr(self, 'update_id'):
return self.update_id
else:
return None
[docs] def get_path(self):
return self.url
[docs] def get_name(self):
return self.name
[docs] def get_parent(self):
return self.parent
[docs] def get_item(self):
return self.item
[docs] def get_xml(self):
return self.item.toString()
def __repr__(self):
return 'id: ' + str(self.id) + ' @ ' + self.url + ' ' + self.name
[docs]class LastFMStore(Backend):
'''
This is a backend to the LastFM.
.. versionchanged:: 0.9.0
* Migrated from louie/dispatcher to EventDispatcher
* Introduced :class:`~coherence.backend.Backend`'s inheritance
'''
logCategory = 'lastFM_store'
implements = ['MediaServer']
def __init__(self, server, **kwargs):
Backend.__init__(self, server, **kwargs)
self.next_id = 1000
self.config = kwargs
self.name = kwargs.get('name', 'LastFMStore')
self.update_id = 0
self.store = {}
self.server = server
self.urlbase = 'http://ws.audioscrobbler.com/2.0/'
self.wmc_mapping = {'4': 1000}
self.init_completed = True
def __repr__(self):
return str(self.__class__).split('.')[-1]
[docs] def append(self, obj, parent):
if isinstance(obj, str):
mimetype = 'directory'
else:
mimetype = obj['mimetype']
UPnPClass = classChooser(mimetype)
id = self.getnextID()
update = False
if hasattr(self, 'update_id'):
update = True
self.store[id] = LastFMItem(id, obj, parent, mimetype, self.urlbase,
UPnPClass, update=update)
self.store[id].store = self
if hasattr(self, 'update_id'):
self.update_id += 1
if self.server:
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
if parent:
# value = '%d,%d' % (parent.get_id(),parent_get_update_id())
value = (parent.get_id(), parent.get_update_id())
if self.server:
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
return self.store[id]
[docs] def remove(self, item):
try:
parent = item.get_parent()
item.remove()
del self.store[int(id)]
if hasattr(self, 'update_id'):
self.update_id += 1
if self.server:
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
# value = '%d,%d' % (parent.get_id(),parent_get_update_id())
value = (parent.get_id(), parent.get_update_id())
if self.server:
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
except (ValueError, KeyError):
pass
[docs] def len(self):
return len(self.store)
[docs] def get_by_id(self, id):
if isinstance(id, str):
id = id.split('@', 1)[0]
elif isinstance(id, bytes):
id = id.decode('utf-8').split('@', 1)[0]
id = int(id)
if id == 0:
id = 1000
try:
return self.store[id]
except KeyError:
return None
[docs] def getnextID(self):
ret = self.next_id
self.next_id += 1
return ret
[docs] def upnp_init(self):
self.current_connection_id = None
parent = self.append({'name': 'LastFM', 'mimetype': 'directory'}, None)
self.LFM = LastFMUser(self.config.get('login'),
self.config.get('password'))
self.LFM.parent = parent
self.LFM.login()
if self.server:
self.server.connection_manager_server.set_variable(
0, 'SourceProtocolInfo', ['http-get:*:audio/mpeg:*'],
default=True)
[docs]def main():
f = LastFMStore(None)
def got_upnp_result(result):
print(f'upnp {result}')
f.upnp_init()
if __name__ == '__main__':
from twisted.internet import reactor
reactor.callWhenRunning(main)
reactor.run()