Source code for coherence.backends.banshee_storage

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2009, Philippe Normand <phil@base-art.net>

# TODO: podcasts

import mimetypes
import os
import re
import time
import urllib.error
import urllib.parse
import urllib.request
from sqlite3 import dbapi2
from urllib.parse import urlsplit
from datetime import datetime

from twisted.internet import defer, task

from coherence.backend import BackendItem, BackendStore
from coherence.extern import db_row
from coherence.log import LogAble
from coherence.upnp.core import DIDLLite

# fallback on pysqlite2.dbapi2

mimetypes.init()
mimetypes.add_type('audio/x-m4a', '.m4a')
mimetypes.add_type('video/mp4', '.mp4')
mimetypes.add_type('video/mpegts', '.ts')
mimetypes.add_type('video/divx', '.divx')
mimetypes.add_type('video/divx', '.avi')
mimetypes.add_type('video/x-matroska', '.mkv')
mimetypes.add_type('audio/x-musepack', '.mpc')
mimetypes.add_type('audio/x-wavpack', '.flac')
mimetypes.add_type('audio/x-wavpack', '.wv')
mimetypes.add_type('audio/mp4', '.m4a')

ROOT_CONTAINER_ID = 0
AUDIO_CONTAINER = 200
VIDEO_CONTAINER = 300
AUDIO_ALL_CONTAINER_ID = 201
AUDIO_ARTIST_CONTAINER_ID = 202
AUDIO_ALBUM_CONTAINER_ID = 203
AUDIO_PLAYLIST_CONTAINER_ID = 204
VIDEO_ALL_CONTAINER_ID = 301
VIDEO_PLAYLIST_CONTAINER_ID = 302


[docs]def get_cover_path(artist_name, album_title): def _escape_part(part): escaped = '' if part: if part.find('(') > -1: part = part[:part.find('(')] escaped = re.sub('[^A-Za-z0-9]*', '', part).lower() return escaped base_dir = os.path.expanduser('~/.cache/album-art') return \ os.path.join( base_dir, f'{_escape_part(artist_name)}-{_escape_part(album_title)}.jpg')
[docs]class SQLiteDB(LogAble): ''' Python DB API 2.0 backend support. ''' logCategory = 'sqlite'
[docs] def __init__(self, database): ''' Connect to a db backend hosting the given database. ''' LogAble.__init__(self) self._params = {'database': database, 'check_same_thread': True} self.connect()
[docs] def disconnect(self): self._db.close()
[docs] def connect(self): ''' Connect to the database, set L{_db} instance variable. ''' self._db = dbapi2.connect(**self._params)
[docs] def reconnect(self): ''' Disconnect and reconnect to the database. ''' self.disconnect() self.connect()
[docs] def sql_execute(self, request, *params, **kw): ''' Execute a SQL query in the db backend ''' t0 = time.time() debug_msg = request if params: debug_msg = f'{request} params={params!r}' debug_msg = ''.join(debug_msg.splitlines()) if debug_msg: self.debug(f'QUERY: {debug_msg}') cursor = self._db.cursor() result = [] cursor.execute(request, params) if cursor.description: all_rows = cursor.fetchall() result = db_row.getdict(all_rows, cursor.description) cursor.close() delta = time.time() - t0 self.log(f'SQL request took {delta} seconds') return result
[docs]class Container(BackendItem): get_path = None def __init__(self, id, parent_id, name, children_callback=None, store=None, play_container=False): BackendItem.__init__(self) self.id = id self.parent_id = parent_id self.name = name self.mimetype = 'directory' self.store = store self.play_container = play_container self.update_id = 0 if children_callback is not None: self.children = children_callback else: self.children = []
[docs] def add_child(self, child): self.children.append(child)
[docs] def get_children(self, start=0, request_count=0): def got_children(children): if request_count == 0: return children[start:] else: return children[start:request_count] if callable(self.children): dfr = defer.maybeDeferred(self.children) else: dfr = defer.succeed(self.children) dfr.addCallback(got_children) return dfr
[docs] def get_child_count(self): count = 0 if callable(self.children): count = defer.maybeDeferred(self.children) count.addCallback(lambda children: len(children)) else: count = len(self.children) return count
[docs] def get_item(self): item = DIDLLite.Container(self.id, self.parent_id, self.name) def got_count(count): item.childCount = count if self.store and self.play_container: if item.childCount > 0: dfr = self.get_children(request_count=1) dfr.addCallback(got_child, item) return dfr return item def got_child(children, item): res = DIDLLite.PlayContainerResource( self.store.server.uuid, cid=self.get_id(), fid=children[0].get_id()) item.res.append(res) return item dfr = defer.maybeDeferred(self.get_child_count) dfr.addCallback(got_count) return dfr
[docs] def get_name(self): return self.name
[docs] def get_id(self): return self.id
[docs]class Artist(BackendItem): def __init__(self, *args, **kwargs): BackendItem.__init__(self, *args, **kwargs) self._row = args[0] self._db = args[1] self._local_music_library_id = args[2] self.musicbrainz_id = self._row.MusicBrainzID self.itemID = self._row.ArtistID self.name = self._row.Name or '' if self.name: self.name = self.name.encode('utf-8')
[docs] def get_children(self, start=0, end=0): albums = [] def query_db(): q = 'select * from CoreAlbums where ArtistID=? and AlbumID in ' \ '(select distinct(AlbumID) from CoreTracks where ' \ 'PrimarySourceID=?) order by Title' rows = self._db.sql_execute(q, self.itemID, self._local_music_library_id) for row in rows: album = Album(row, self._db, self) albums.append(album) yield album dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: albums) return dfr
[docs] def get_child_count(self): q = 'select count(AlbumID) as c from CoreAlbums ' \ 'where ArtistID=? and AlbumID in (select distinct(AlbumID) ' \ 'from CoreTracks where PrimarySourceID=?) ' return self._db.sql_execute(q, self.itemID, self._local_music_library_id)[0].c
[docs] def get_item(self): item = DIDLLite.MusicArtist(self.get_id(), AUDIO_ARTIST_CONTAINER_ID, self.name) item.childCount = self.get_child_count() return item
[docs] def get_id(self): return f'artist.{self.itemID:d}'
def __repr__(self): return \ f'<Artist {self.itemID:d} name="{self.name}" ' \ f'musicbrainz="{self.musicbrainz_id}">'
[docs]class Album(BackendItem): ''' definition for an album ''' mimetype = 'directory' get_path = None def __init__(self, *args, **kwargs): BackendItem.__init__(self, *args, **kwargs) self._row = args[0] self._db = args[1] self.artist = args[2] self.itemID = self._row.AlbumID self.title = self._row.Title self.cover = get_cover_path(self.artist.name, self.title) if self.title: self.title = self.title.encode('utf-8') self.musicbrainz_id = self._row.MusicBrainzID self.cd_count = 1
[docs] def get_children(self, start=0, request_count=0): tracks = [] def query_db(): q = 'select * from CoreTracks where AlbumID=? order by TrackNumber' if request_count: q += f' limit {request_count:d}' rows = self._db.sql_execute(q, self.itemID) for row in rows: track = Track(row, self._db, self) tracks.append(track) yield track dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: tracks) return dfr
[docs] def get_child_count(self): q = 'select count(TrackID) as c from CoreTracks where AlbumID=?' count = self._db.sql_execute(q, self.itemID)[0].c return count
[docs] def get_item(self): item = DIDLLite.MusicAlbum(self.get_id(), AUDIO_ALBUM_CONTAINER_ID, self.title) item.artist = self.artist.name item.childCount = self.get_child_count() if self.cover: _, ext = os.path.splitext(self.cover) item.albumArtURI = ''.join((self._db.urlbase, self.get_id(), '?cover', ext)) def got_tracks(tracks): res = DIDLLite.PlayContainerResource(self._db.server.uuid, cid=self.get_id(), fid=tracks[0].get_id()) item.res.append(res) return item if item.childCount > 0: dfr = self.get_children(request_count=1) dfr.addCallback(got_tracks) else: dfr = defer.succeed(item) return dfr
[docs] def get_id(self): return f'album.{self.itemID:d}'
[docs] def get_name(self): return self.title
[docs] def get_cover(self): return self.cover
def __repr__(self): return \ f'<Album {self.itemID:d} title="{self.title}" ' \ f'artist="{self.artist.name}" #cds {self.cd_count:d} ' \ f'cover="{self.cover}" musicbrainz="{self.musicbrainz_id}">'
[docs]class BasePlaylist(BackendItem): ''' definition for a playlist ''' id_type = 'baseplaylist' mimetype = 'directory' get_path = None def __init__(self, *args, **kwargs): BackendItem.__init__(self, *args, **kwargs) self._row = args[0] self._store = args[1] self._db = self._store.db self.title = self._row.Name if self.title: self.title = self.title.encode('utf-8') @property def db_id(self): '''Should be implemented in subclass''' return ''
[docs] def get_tracks(self, request_count): return []
[docs] def db_to_didl(self, row): album = self._store.get_album_with_id(row.AlbumID) track = Track(row, self._db, album) return track
[docs] def get_id(self): return f'{self.id_type}.{self.db_id:d}'
def __repr__(self): return \ f'<{self.__class__.__name__} ' \ f'{self.db_id:d} title="{self.title}">'
[docs] def get_children(self, start=0, request_count=0): tracks = [] def query_db(): rows = self.get_tracks(request_count) for row in rows: track = self.db_to_didl(row) tracks.append(track) yield track dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: tracks) return dfr
[docs] def get_child_count(self): return self._row.CachedCount
[docs] def get_item(self): item = DIDLLite.PlaylistContainer( self.get_id(), AUDIO_PLAYLIST_CONTAINER_ID, self.title) item.childCount = self.get_child_count() def got_tracks(tracks): res = DIDLLite.PlayContainerResource( self._db.server.uuid, cid=self.get_id(), fid=tracks[0].get_id()) item.res.append(res) return item if item.childCount > 0: dfr = self.get_children(request_count=1) dfr.addCallback(got_tracks) else: dfr = defer.succeed(item) return dfr
[docs] def get_name(self): return self.title
[docs]class MusicPlaylist(BasePlaylist): id_type = 'musicplaylist' @property def db_id(self): return self._row.PlaylistID
[docs] def get_tracks(self, request_count): q = 'select * from CoreTracks where TrackID in (select TrackID ' \ 'from CorePlaylistEntries where PlaylistID=?)' if request_count: q += f' limit {request_count:d}' return self._db.sql_execute(q, self.db_id)
[docs]class MusicSmartPlaylist(BasePlaylist): id_type = 'musicsmartplaylist' @property def db_id(self): return self._row.SmartPlaylistID
[docs] def get_tracks(self, request_count): q = 'select * from CoreTracks where TrackID in (select TrackID ' \ 'from CoreSmartPlaylistEntries where SmartPlaylistID=?)' if request_count: q += f' limit {request_count:d}' return self._db.sql_execute(q, self.db_id)
[docs]class VideoPlaylist(MusicPlaylist): id_type = 'videoplaylist'
[docs] def db_to_didl(self, row): return Video(row, self._db)
[docs]class VideoSmartPlaylist(MusicSmartPlaylist): id_type = 'videosmartplaylist'
[docs] def db_to_didl(self, row): return Video(row, self._db)
[docs]class BaseTrack(BackendItem): ''' definition for a track ''' def __init__(self, *args, **kwargs): BackendItem.__init__(self, *args, **kwargs) self._row = args[0] self._db = args[1] self.itemID = self._row.TrackID self.title = self._row.Title self.track_nr = self._row.TrackNumber self.location = self._row.Uri self.playlist = kwargs.get('playlist') self.album = None
[docs] def get_children(self, start=0, request_count=0): return []
[docs] def get_child_count(self): return 0
[docs] def get_resources(self): resources = [] _, host_port, _, _, _ = urlsplit(self._db.urlbase) if host_port.find(':') != -1: host, port = tuple(host_port.split(':')) else: host = host_port _, ext = os.path.splitext(self.location) ext = ext.lower() # FIXME: drop this hack when we switch to tagbin mimetype, dummy = mimetypes.guess_type(f'dummy{ext}') if not mimetype: mimetype = 'audio/mpeg' ext = 'mp3' statinfo = os.stat(self.get_path()) res = DIDLLite.Resource( self.location, f'internal:{host}:{mimetype}:*') try: res.size = statinfo.st_size except Exception: res.size = 0 resources.append(res) url = f'{self._db.urlbase}{self.get_id()}{ext}' res = DIDLLite.Resource(url, f'http-get:*:{mimetype}:*') try: res.size = statinfo.st_size except Exception: res.size = 0 resources.append(res) return statinfo, resources
[docs] def get_path(self): return urllib.parse.unquote(self.location[7:].encode('utf-8'))
[docs] def get_id(self): return 'track.%d' % self.itemID
[docs] def get_name(self): return self.title
[docs] def get_url(self): return self._db.urlbase + str(self.itemID).encode('utf-8')
[docs] def get_cover(self): return self.album.cover
def __repr__(self): return \ f'<Track {self.itemID:d} title="{self.title}" ' \ f'nr="{self.track_nr:d}" album="{self.album.title}" ' \ f'artist="{self.album.artist.name}" path="{self.location}">'
[docs]class Track(BaseTrack): def __init__(self, *args, **kwargs): BaseTrack.__init__(self, *args, **kwargs) self.album = args[2]
[docs] def get_item(self): item = DIDLLite.MusicTrack( self.get_id(), self.album.itemID, self.title) item.artist = self.album.artist.name item.album = self.album.title item.playlist = self.playlist if self.album.cover != '': _, ext = os.path.splitext(self.album.cover) ''' add the cover image extension to help clients not reacting on the mimetype ''' item.albumArtURI = ''.join((self._db.urlbase, self.get_id(), '?cover', ext)) item.originalTrackNumber = self.track_nr item.server_uuid = str(self._db.server.uuid)[5:] statinfo, resources = self.get_resources() item.res.extend(resources) try: # FIXME: getmtime is deprecated in Twisted 2.6 item.date = datetime.fromtimestamp(statinfo.st_mtime) except Exception: item.date = None return item
[docs]class Video(BaseTrack):
[docs] def get_item(self): item = DIDLLite.VideoItem(self.get_id(), VIDEO_ALL_CONTAINER_ID, self.title) item.server_uuid = str(self._db.server.uuid)[5:] statinfo, resources = self.get_resources() item.res.extend(resources) try: # FIXME: getmtime is deprecated in Twisted 2.6 item.date = datetime.fromtimestamp(statinfo.st_mtime) except Exception: item.date = None return item
[docs]class BansheeDB(LogAble): logCategory = 'banshee_db' def __init__(self, path=None): LogAble.__init__(self) self._local_music_library_id = None self._local_video_library_id = None default_db_path = os.path.expanduser('~/.config/banshee-1/banshee.db') self._db_path = path or default_db_path
[docs] def open_db(self): self.db = SQLiteDB(self._db_path)
[docs] def close(self): self.db.disconnect()
[docs] def get_local_music_library_id(self): if self._local_music_library_id is None: q = 'select PrimarySourceID ' \ 'from CorePrimarySources where StringID=?' row = self.db.sql_execute(q, 'MusicLibrarySource-Library')[0] self._local_music_library_id = row.PrimarySourceID return self._local_music_library_id
[docs] def get_local_video_library_id(self): if self._local_video_library_id is None: q = 'select PrimarySourceID ' \ 'from CorePrimarySources where StringID=?' row = self.db.sql_execute(q, 'VideoLibrarySource-VideoLibrary')[0] self._local_video_library_id = row.PrimarySourceID return self._local_video_library_id
[docs] def get_artists(self): artists = [] def query_db(): source_id = self.get_local_music_library_id() q = 'select * from CoreArtists where ArtistID in ' \ '(select distinct(ArtistID) from CoreTracks where ' \ 'PrimarySourceID=?) order by Name' for row in self.db.sql_execute(q, source_id): artist = Artist(row, self.db, source_id) artists.append(artist) yield artist dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: artists) return dfr
[docs] def get_albums(self): albums = [] artists = {} def query_db(): q = 'select * from CoreAlbums where AlbumID in ' \ '(select distinct(AlbumID) from CoreTracks where ' \ 'PrimarySourceID=?) order by Title' for row in self.db.sql_execute(q, self.get_local_music_library_id()): try: artist = artists[row.ArtistID] except KeyError: artist = self.get_artist_with_id(row.ArtistID) artists[row.ArtistID] = artist album = Album(row, self.db, artist) albums.append(album) yield album dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: albums) return dfr
[docs] def get_music_playlists(self): return self.get_playlists(self.get_local_music_library_id(), MusicPlaylist, MusicSmartPlaylist)
[docs] def get_playlists(self, source_id, PlaylistClass, SmartPlaylistClass): playlists = [] def query_db(): q = 'select * from CorePlaylists ' \ 'where PrimarySourceID=? order by Name' for row in self.db.sql_execute(q, source_id): playlist = PlaylistClass(row, self) playlists.append(playlist) yield playlist q = 'select * from CoreSmartPlaylists ' \ 'where PrimarySourceID=? order by Name' for row in self.db.sql_execute(q, source_id): playlist = SmartPlaylistClass(row, self) playlists.append(playlist) yield playlist dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: playlists) return dfr
[docs] def get_artist_with_id(self, artist_id): q = 'select * from CoreArtists where ArtistID=? limit 1' row = self.db.sql_execute(q, artist_id)[0] return Artist(row, self.db, self.get_local_music_library_id())
[docs] def get_album_with_id(self, album_id): q = 'select * from CoreAlbums where AlbumID=? limit 1' row = self.db.sql_execute(q, album_id)[0] artist = self.get_artist_with_id(row.ArtistID) return Album(row, self.db, artist)
[docs] def get_playlist_with_id(self, playlist_id, PlaylistClass): q = 'select * from CorePlaylists where PlaylistID=? limit 1' row = self.db.sql_execute(q, playlist_id)[0] return PlaylistClass(row, self)
[docs] def get_smart_playlist_with_id(self, playlist_id, PlaylistClass): q = 'select * from CoreSmartPlaylists where SmartPlaylistID=? limit 1' row = self.db.sql_execute(q, playlist_id)[0] return PlaylistClass(row, self)
[docs] def get_music_playlist_with_id(self, playlist_id): return self.get_playlist_with_id(playlist_id, MusicPlaylist)
[docs] def get_music_smart_playlist_with_id(self, playlist_id): return self.get_smart_playlist_with_id(playlist_id, MusicSmartPlaylist)
[docs] def get_video_playlist_with_id(self, playlist_id): return self.get_playlist_with_id(playlist_id, VideoPlaylist)
[docs] def get_video_smart_playlist_with_id(self, playlist_id): return self.get_smart_playlist_with_id(playlist_id, VideoSmartPlaylist)
[docs] def get_track_with_id(self, track_id): q = 'select * from CoreTracks where TrackID=? limit 1' row = self.db.sql_execute(q, track_id)[0] album = self.get_album_with_id(row.AlbumID) return Track(row, self.db, album)
[docs] def get_track_for_uri(self, track_uri): q = 'select * from CoreTracks where Uri=? limit 1' try: row = self.db.sql_execute(q, track_uri)[0] except IndexError: # not found track = None else: album = self.get_album_with_id(row.AlbumID) track = Track(row, self, album) return track
[docs] def get_tracks(self): tracks = [] albums = {} def query_db(): q = 'select * from CoreTracks where TrackID in ' \ '(select distinct(TrackID) from CoreTracks where ' \ 'PrimarySourceID=?) order by AlbumID,TrackNumber' for row in self.db.sql_execute(q, self.get_local_music_library_id()): if row.AlbumID not in albums: album = self.get_album_with_id(row.AlbumID) albums[row.AlbumID] = album else: album = albums[row.AlbumID] track = Track(row, self.db, album) tracks.append(track) yield track dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: tracks) return dfr
[docs] def get_video_with_id(self, video_id): q = 'select * from CoreTracks where TrackID=? limit 1' row = self.db.sql_execute(q, video_id)[0] return Video(row, self.db)
[docs] def get_videos(self): videos = [] def query_db(): source_id = self.get_local_video_library_id() q = 'select * from CoreTracks where TrackID in ' \ '(select distinct(TrackID) from CoreTracks where ' \ 'PrimarySourceID=?)' for row in self.db.sql_execute(q, source_id): video = Video(row, self.db, source_id) videos.append(video) yield video dfr = task.coiterate(query_db()) dfr.addCallback(lambda gen: videos) return dfr
[docs] def get_video_playlists(self): return self.get_playlists(self.get_local_video_library_id(), VideoPlaylist, VideoSmartPlaylist)
[docs]class BansheeStore(BackendStore, BansheeDB): ''' .. versionchanged:: 0.9.0 Migrated from louie/dispatcher to EventDispatcher ''' logCategory = 'banshee_store' implements = ['MediaServer'] def __init__(self, server, **kwargs): BackendStore.__init__(self, server, **kwargs) BansheeDB.__init__(self, kwargs.get('db_path')) self.update_id = 0 self.name = kwargs.get('name', 'Banshee') self.containers = {} self.containers[ROOT_CONTAINER_ID] = Container( ROOT_CONTAINER_ID, -1, self.name, store=self) self.init_completed = True
[docs] def upnp_init(self): self.open_db() music = Container(AUDIO_CONTAINER, ROOT_CONTAINER_ID, 'Music', store=self) self.containers[ROOT_CONTAINER_ID].add_child(music) self.containers[AUDIO_CONTAINER] = music artists = Container(AUDIO_ARTIST_CONTAINER_ID, AUDIO_CONTAINER, 'Artists', children_callback=self.get_artists, store=self) self.containers[AUDIO_ARTIST_CONTAINER_ID] = artists self.containers[AUDIO_CONTAINER].add_child(artists) albums = Container(AUDIO_ALBUM_CONTAINER_ID, AUDIO_CONTAINER, 'Albums', children_callback=self.get_albums, store=self) self.containers[AUDIO_ALBUM_CONTAINER_ID] = albums self.containers[AUDIO_CONTAINER].add_child(albums) tracks = Container(AUDIO_ALL_CONTAINER_ID, AUDIO_CONTAINER, 'All tracks', children_callback=self.get_tracks, play_container=True, store=self) self.containers[AUDIO_ALL_CONTAINER_ID] = tracks self.containers[AUDIO_CONTAINER].add_child(tracks) playlists = Container(AUDIO_PLAYLIST_CONTAINER_ID, AUDIO_CONTAINER, 'Playlists', store=self, children_callback=self.get_music_playlists) self.containers[AUDIO_PLAYLIST_CONTAINER_ID] = playlists self.containers[AUDIO_CONTAINER].add_child(playlists) videos = Container(VIDEO_CONTAINER, ROOT_CONTAINER_ID, 'Videos', store=self) self.containers[ROOT_CONTAINER_ID].add_child(videos) self.containers[VIDEO_CONTAINER] = videos all_videos = Container(VIDEO_ALL_CONTAINER_ID, VIDEO_CONTAINER, 'All Videos', children_callback=self.get_videos, store=self) self.containers[VIDEO_ALL_CONTAINER_ID] = all_videos self.containers[VIDEO_CONTAINER].add_child(all_videos) playlists = Container(VIDEO_PLAYLIST_CONTAINER_ID, VIDEO_CONTAINER, 'Playlists', store=self, children_callback=self.get_video_playlists) self.containers[VIDEO_PLAYLIST_CONTAINER_ID] = playlists self.containers[VIDEO_CONTAINER].add_child(playlists) self.db.server = self.server self.db.urlbase = self.urlbase self.db.containers = self.containers self.current_connection_id = None if self.server: hostname = self.server.coherence.hostname source_protocol_info = [f'internal:{hostname}:audio/mpeg:*', 'http-get:*:audio/mpeg:*', f'internal:{hostname}:application/ogg:*', 'http-get:*:application/ogg:*'] self.server.connection_manager_server.set_variable( 0, 'SourceProtocolInfo', source_protocol_info, default=True)
[docs] def release(self): self.db.disconnect()
[docs] def get_by_id(self, item_id): self.info(f'get_by_id {item_id}') if isinstance(item_id, bytes): item_id = item_id.decode('utf-8') if isinstance(item_id, str) and item_id.find('.') > 0: item_id = item_id.split('@', 1) item_type, item_id = item_id[0].split('.')[:2] item_id = int(item_id) dfr = self._lookup(item_type, item_id) else: item_id = int(item_id) item = self.containers[item_id] dfr = defer.succeed(item) return dfr
[docs] def _lookup(self, item_type, item_id): lookup_mapping = dict( artist=self.get_artist_with_id, album=self.get_album_with_id, musicplaylist=self.get_music_playlist_with_id, musicsmartplaylist=self.get_music_smart_playlist_with_id, videoplaylist=self.get_video_playlist_with_id, videosmartplaylist=self.get_video_smart_playlist_with_id, track=self.get_track_with_id, video=self.get_video_with_id) item = None func = lookup_mapping.get(item_type) if func: item = func(item_id) return defer.succeed(item)