Source code for coherence.backends.lastfm_storage

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2007, Frank Scholz <coherence@beebits.net>
# Copyright 2007, Moritz Struebe <morty@gmx.net>

# TODO: remove the below "terminal output" sample when this backend get fixed
# INFO  lastFM_user                 Dez 14 17:35:27  Got new sessionid:
#     '1488f34a1cbed7c9f4232f8fd563c3bd'
#     (coherence/backends/lastfm_storage.py:60)
# DEBUG lastFM_stream               Dez 14 17:35:53  render
#     <GET /da525474-5357-4d1b-a894-76b1293224c9/1005 HTTP/1.1>
#     (coherence/backends/lastfm_storage.py:148)
# command GET
# rest /user/e0362c757ef49169e9a0f0970cc2d367.mp3
# headers {'icy-metadata': '1', 'host': 'kingpin5.last.fm',
#          'te': 'trailers', 'connection': 'TE',
#          'user-agent': 'gnome-vfs/2.12.0.19 neon/0.24.7'}
# ProxyClient handleStatus HTTP/1.1 200 OK
# ProxyClient handleHeader Content-Type audio/mpeg
# ProxyClient handleHeader Content-Length 4050441
# ProxyClient handleHeader Cache-Control no-cache, must-revalidate
# DEBUG lastFM_stream               Dez 14 17:35:53  render
#     <GET /da525474-5357-4d1b-a894-76b1293224c9/1005 HTTP/1.1>
#     (coherence/backends/lastfm_storage.py:148)
# command GET
# rest /user/e0362c757ef49169e9a0f0970cc2d367.mp3
# headers {'icy-metadata': '1', 'host': 'kingpin5.last.fm',
#          'te': 'trailers', 'connection': 'TE',
#          'user-agent': 'gnome-vfs/2.12.0.19 neon/0.24.7'}
# ProxyClient handleStatus HTTP/1.1 403 Invalid ticket

from hashlib import md5
from urllib.parse import urlsplit

from coherence import log
from coherence.upnp.core import utils
from coherence.backend import Backend
from coherence.upnp.core.DIDLLite import classChooser, Container, Resource


[docs]class LastFMUser(log.LogAble): logCategory = 'lastFM_user' user = None passwd = None host = 'ws.audioscrobbler.com' basepath = '/radio' sessionid = None parent = None getting_tracks = False tracks = [] def __init__(self, user, passwd): log.LogAble.__init__(self) if user is None: self.warn('No User', ) if passwd is None: self.warn('No Passwd', ) self.user = user self.passwd = passwd
[docs] def login(self): if self.sessionid is not None: self.warning('Session seems to be valid', ) return def got_page(result): lines = result[0].split('\n') for line in lines: tuple = line.rstrip().split('=', 1) if len(tuple) == 2: if tuple[0] == 'session': self.sessionid = tuple[1] self.info(f'Got new sessionid: {self.sessionid}') if tuple[0] == 'base_url': if self.host != tuple[1]: self.host = tuple[1] self.info(f'Got new host: {self.host}') if tuple[0] == 'base_path': if self.basepath != tuple[1]: self.basepath = tuple[1] self.info(f'Got new path: {self.basepath}') self.get_tracks() def got_error(error): self.warning(f'Login to LastFM Failed! {error}') self.debug(f'{error.getTraceback()}') # This function might be GPL! # Found this code in some other Projects, too. def hexify(s): result = '' for c in s: result = result + ('%02x' % ord(c)) return result password = hexify(md5(self.passwd).digest()) req = \ self.basepath + '/handshake.php/?version=1&platform=win&username='\ + self.user + '&passwordmd5=' + password\ + '&language=en&player=coherence' utils.getPage('http://' + self.host + req).addCallbacks( got_page, got_error, None, None, None, None)
[docs] def get_tracks(self): if self.getting_tracks: return def got_page(result): result = utils.parse_xml(result, encoding='utf-8') self.getting_tracks = False print(self.getting_tracks) print('got Tracks') for track in result.findall('trackList/track'): data = {} def get_data(name): # print track.find(name).text.encode('utf-8') return track.find(name).text.encode('utf-8') # Fixme: This section needs some work print('adding Track') data['mimetype'] = 'audio/mpeg' data['name'] = get_data('creator') + ' - ' + get_data('title') data['title'] = get_data('title') data['artist'] = get_data('creator') data['creator'] = get_data('creator') data['album'] = get_data('album') data['duration'] = get_data('duration') # FIXME: Image is the wrong tag. data['image'] = get_data('image') data['url'] = track.find('location').text.encode('utf-8') item = self.parent.store.append(data, self.parent) self.tracks.append(item) def got_error(error): self.warning(f'Problem getting Tracks! {error}') self.debug(f'{error.getTraceback()}') self.getting_tracks = False self.getting_tracks = True req = \ self.basepath + '/xspf.php?sk=' + self.sessionid \ + '&discovery=0&desktop=1.3.1.1' utils.getPage('http://' + self.host + req).addCallbacks( got_page, got_error, None, None, None, None)
[docs] def update(self, item): if 0 < self.tracks.count(item): while True: track = self.tracks[0] if track == item: break self.tracks.remove(track) # Do not remove so the tracks to answer the browse # request correctly. # track.store.remove(track) # del track # if len(self.tracks) < 5: self.get_tracks()
[docs]class LFMProxyStream(utils.ReverseProxyResource, log.LogAble): logCategory = 'lastFM_stream' def __init__(self, uri, parent): log.LogAble.__init__(self) self.uri = uri self.parent = parent _, host_port, path, _, _ = urlsplit(uri) if host_port.find(':') != -1: host, port = tuple(host_port.split(':')) port = int(port) else: host = host_port port = 80 if path == '': path = '/' # print(f'ProxyStream init {host} {port} {path}') utils.ReverseProxyResource.__init__(self, host, port, path)
[docs] def render(self, request): self.debug(f'render {request}') self.parent.store.LFM.update(self.parent) self.parent.played = True return utils.ReverseProxyResource.render(self, request)
[docs]class LastFMItem(log.LogAble): logCategory = 'LastFM_item' def __init__(self, id, obj, parent, mimetype, urlbase, UPnPClass, update=False): log.LogAble.__init__(self) self.id = id self.name = obj.get('name') self.title = obj.get('title') self.artist = obj.get('artist') self.creator = obj.get('creator') self.album = obj.get('album') self.duration = obj.get('duration') self.mimetype = mimetype self.parent = parent if parent: parent.add_child(self, update=update) if parent is None: parent_id = -1 else: parent_id = parent.get_id() self.item = UPnPClass(id, parent_id, self.title, False, self.creator) if isinstance(self.item, Container): self.item.childCount = 0 self.child_count = 0 self.children = [] if len(urlbase) and urlbase[-1] != '/': urlbase += '/' if self.mimetype == 'directory': self.url = urlbase + str(self.id) else: self.url = urlbase + str(self.id) self.location = LFMProxyStream(obj.get('url'), self) # self.url = obj.get('url') if self.mimetype == 'directory': self.update_id = 0 else: protocols = ('DLNA.ORG_PN=MP3', 'DLNA.ORG_CI=0', 'DLNA.ORG_OP=01', 'DLNA.ORG_FLAGS=01700000000000000000000000000000') res = Resource( self.url, f'http-get:*:{obj.get("mimetype")}:{";".join(protocols)}') res.size = -1 # None self.item.res.append(res)
[docs] def remove(self): if self.parent: self.parent.remove_child(self) del self.item
[docs] def add_child(self, child, update=False): if self.children is None: self.children = [] self.children.append(child) self.child_count += 1 if isinstance(self.item, Container): self.item.childCount += 1 if update: self.update_id += 1
[docs] def remove_child(self, child): self.info(f'remove_from {self.id:d} ({self.get_name()}) ' f'child {child.id:d} ({child.get_name()})') if child in self.children: self.child_count -= 1 if isinstance(self.item, Container): self.item.childCount -= 1 self.children.remove(child) self.update_id += 1
[docs] def get_children(self, start=0, request_count=0): if request_count == 0: return self.children[start:] else: return self.children[start:request_count]
[docs] def get_child_count(self): if self.mimetype == 'directory': return 100 # Some Testing, with strange Numbers: 0/lots return self.child_count
[docs] def get_id(self): return self.id
[docs] def get_update_id(self): if hasattr(self, 'update_id'): return self.update_id else: return None
[docs] def get_path(self): return self.url
[docs] def get_name(self): return self.name
[docs] def get_parent(self): return self.parent
[docs] def get_item(self): return self.item
[docs] def get_xml(self): return self.item.toString()
def __repr__(self): return 'id: ' + str(self.id) + ' @ ' + self.url + ' ' + self.name
[docs]class LastFMStore(Backend): ''' This is a backend to the LastFM. .. versionchanged:: 0.9.0 * Migrated from louie/dispatcher to EventDispatcher * Introduced :class:`~coherence.backend.Backend`'s inheritance ''' logCategory = 'lastFM_store' implements = ['MediaServer'] def __init__(self, server, **kwargs): Backend.__init__(self, server, **kwargs) self.next_id = 1000 self.config = kwargs self.name = kwargs.get('name', 'LastFMStore') self.update_id = 0 self.store = {} self.server = server self.urlbase = 'http://ws.audioscrobbler.com/2.0/' self.wmc_mapping = {'4': 1000} self.init_completed = True def __repr__(self): return str(self.__class__).split('.')[-1]
[docs] def append(self, obj, parent): if isinstance(obj, str): mimetype = 'directory' else: mimetype = obj['mimetype'] UPnPClass = classChooser(mimetype) id = self.getnextID() update = False if hasattr(self, 'update_id'): update = True self.store[id] = LastFMItem(id, obj, parent, mimetype, self.urlbase, UPnPClass, update=update) self.store[id].store = self if hasattr(self, 'update_id'): self.update_id += 1 if self.server: self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) if parent: # value = '%d,%d' % (parent.get_id(),parent_get_update_id()) value = (parent.get_id(), parent.get_update_id()) if self.server: self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) return self.store[id]
[docs] def remove(self, item): try: parent = item.get_parent() item.remove() del self.store[int(id)] if hasattr(self, 'update_id'): self.update_id += 1 if self.server: self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) # value = '%d,%d' % (parent.get_id(),parent_get_update_id()) value = (parent.get_id(), parent.get_update_id()) if self.server: self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) except (ValueError, KeyError): pass
[docs] def len(self): return len(self.store)
[docs] def get_by_id(self, id): if isinstance(id, str): id = id.split('@', 1)[0] elif isinstance(id, bytes): id = id.decode('utf-8').split('@', 1)[0] id = int(id) if id == 0: id = 1000 try: return self.store[id] except KeyError: return None
[docs] def getnextID(self): ret = self.next_id self.next_id += 1 return ret
[docs] def upnp_init(self): self.current_connection_id = None parent = self.append({'name': 'LastFM', 'mimetype': 'directory'}, None) self.LFM = LastFMUser(self.config.get('login'), self.config.get('password')) self.LFM.parent = parent self.LFM.login() if self.server: self.server.connection_manager_server.set_variable( 0, 'SourceProtocolInfo', ['http-get:*:audio/mpeg:*'], default=True)
[docs]def main(): f = LastFMStore(None) def got_upnp_result(result): print(f'upnp {result}') f.upnp_init()
if __name__ == '__main__': from twisted.internet import reactor reactor.callWhenRunning(main) reactor.run()