# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Copyright 2008, Frank Scholz <coherence@beebits.net>
import mimetypes
import time
from lxml import etree
mimetypes.init()
try:
import hashlib
def md5(s):
m = hashlib.md5()
m.update(s)
return m.hexdigest()
[docs] def sha256(s):
m = hashlib.sha256()
m.update(s)
return m.hexdigest()
except ImportError:
import md5 as oldmd5
[docs] def md5(s):
m = oldmd5.new()
m.update(s)
return m.hexdigest()
from twisted.internet import reactor, defer
from twisted.python import failure
from coherence.upnp.core import DIDLLite
from coherence.upnp.core.soap_service import errorCode
from coherence.upnp.core import utils
from coherence.backend import BackendItem, BackendStore
ROOT_CONTAINER_ID = 0
AUDIO_CONTAINER = 100
AUDIO_ALL_CONTAINER_ID = 101
AUDIO_ARTIST_CONTAINER_ID = 102
AUDIO_ALBUM_CONTAINER_ID = 103
AUDIO_PLAYLIST_CONTAINER_ID = 104
AUDIO_GENRE_CONTAINER_ID = 105
AUDIO_TAG_CONTAINER_ID = 106
VIDEO_CONTAINER_ID = 200
from urllib.parse import urlsplit
[docs]class ProxySong(utils.ReverseProxyResource):
def __init__(self, uri):
self.uri = uri
_, host_port, path, query, _ = urlsplit(uri)
if host_port.find(':') != -1:
host, port = tuple(host_port.split(':'))
port = int(port)
else:
host = host_port
port = 80
utils.ReverseProxyResource.__init__(self, host, port,
'?'.join((path, query)))
[docs]class Container(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, id, parent_id, name, store=None, children_callback=None,
container_class=DIDLLite.Container, play_container=False):
BackendItem.__init__(self)
self.id = id
self.parent_id = parent_id
self.name = name
self.mimetype = 'directory'
self.container_class = container_class
self.update_id = 0
if children_callback is not None:
self.children = children_callback
else:
self.children = []
self.childCount = None
self.store = store
self.play_container = play_container
if self.store is not None:
self.get_url = lambda: self.store.urlbase + str(self.id)
[docs] def add_child(self, child):
self.children.append(child)
if self.childCount is None:
self.childCount = 0
self.childCount += 1
[docs] def get_children(self, start=0, end=0):
self.info(f'container.get_children {start} {end}')
if (end - start > 250 or
end - start == 0):
end = start + 250
if callable(self.children):
return self.children(start, end - start)
else:
children = self.children
if end == 0:
return children[start:]
else:
return children[start:end]
[docs] def get_child_count(self):
if self.childCount is None:
if callable(self.children):
self.childCount = len(self.children())
else:
self.childCount = len(self.children)
return self.childCount
[docs] def get_item(self):
item = self.container_class(self.id, self.parent_id, self.name)
item.childCount = self.get_child_count()
# if self.store and self.play_container:
# if item.childCount > 0:
# d = defer.maybeDeferred(self.get_children, 0, 1)
# def process_result(r,item):
# res = DIDLLite.PlayContainerResource(
# self.store.server.uuid,
# cid=self.get_id(),fid=r[0].get_id())
# item.res.append(res)
# return item
# def got_error(f,item):
# return item
# d.addCallback(process_result,item)
# d.addErrback(got_error,item)
# return d
return item
[docs] def get_name(self):
return self.name
[docs] def get_id(self):
return self.id
[docs]class Playlist(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.ampache_id = element.get('id')
self.id = f'playlist.{int(element.get("id")):d}'
self.title = element.find('name').text
self.creator = element.find('owner').text
self.tracks = int(element.find('items').text)
try:
self.cover = element.find('art').text
except Exception:
self.cover = None
[docs] def get_children(self, start=0, end=0):
return self.store.ampache_query('playlist_songs', start, end - start,
filter=self.ampache_id)
[docs] def get_child_count(self):
return self.tracks
[docs] def get_item(self, parent_id=AUDIO_PLAYLIST_CONTAINER_ID):
item = DIDLLite.PlaylistItem(self.id, parent_id, self.title)
item.childCount = self.get_child_count()
# item.artist = self.artist
item.albumArtURI = self.cover
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.title
[docs] def get_cover(self):
return self.cover
[docs]class Album(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.ampache_id = element.get('id')
self.id = f'album.{int(element.get("id")):d}'
self.title = element.find('name').text
self.artist = element.find('artist').text
self.tracks = int(element.find('tracks').text)
try:
self.cover = element.find('art').text
except Exception:
self.cover = None
[docs] def get_children(self, start=0, end=0):
return self.store.ampache_query('album_songs', start, end - start,
filter=self.ampache_id)
[docs] def get_child_count(self):
return self.tracks
[docs] def get_item(self, parent_id=AUDIO_ALBUM_CONTAINER_ID):
item = DIDLLite.MusicAlbum(self.id, parent_id, self.title)
item.childCount = self.get_child_count()
item.artist = self.artist
item.albumArtURI = self.cover
# if item.childCount > 0:
# d = defer.maybeDeferred(self.get_children, 0, 1)
# def process_result(r,item):
# res = DIDLLite.PlayContainerResource(
# self.store.server.uuid,
# cid=self.get_id(),
# fid=r[0].get_id())
# item.res.append(res)
# return item
# def got_error(f,item):
# return item
# d.addCallback(process_result,item)
# d.addErrback(got_error,item)
# return d
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.title
[docs] def get_cover(self):
return self.cover
[docs]class Artist(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.ampache_id = element.get('id')
self.id = f'artist.{int(element.get("id")):d}'
try:
self.count_albums = int(element.find('albums').text)
except Exception:
self.count_albums = None
try:
self.count_songs = int(element.find('songs').text)
except Exception:
self.count_songs = None
self.name = element.find('name').text
[docs] def get_children(self, start=0, end=0):
return self.store.ampache_query('artist_albums', start, end - start,
filter=self.ampache_id)
[docs] def get_child_count(self):
if self.count_albums is not None:
return self.count_albums
def got_childs(result):
self.count_albums = len(result)
return self.count_albums
d = self.get_children()
d.addCallback(got_childs)
return d
[docs] def get_item(self, parent_id=AUDIO_ARTIST_CONTAINER_ID):
item = DIDLLite.MusicArtist(self.id, parent_id, self.name)
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.name
[docs]class Genre(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.ampache_id = element.get('id')
self.id = f'genre.{int(element.get("id")):d}'
try:
self.count_albums = int(element.find('albums').text)
except Exception:
self.count_albums = None
try:
self.count_artists = int(element.find('artists').text)
except Exception:
self.count_artists = None
try:
self.count_songs = int(element.find('songs').text)
except Exception:
self.count_songs = None
self.name = element.find('name').text
[docs] def get_children(self, start=0, end=0):
return self.store.ampache_query('genre_songs', start, end - start,
filter=self.ampache_id)
[docs] def get_child_count(self):
if self.count_songs is not None:
return self.count_songs
def got_childs(result):
self.count_songs = len(result)
return self.count_songs
d = self.get_children()
d.addCallback(got_childs)
return d
[docs] def get_item(self, parent_id=AUDIO_GENRE_CONTAINER_ID):
item = DIDLLite.Genre(self.id, parent_id, self.name)
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.name
[docs]class Tag(BackendItem):
logCategory = 'ampache_store'
get_path = None
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.ampache_id = element.get('id')
self.id = f'tag.{int(element.get("id")):d}'
try:
self.count_albums = int(element.find('albums').text)
except Exception:
self.count_albums = None
try:
self.count_artists = int(element.find('artists').text)
except Exception:
self.count_artists = None
try:
self.count_songs = int(element.find('songs').text)
except Exception:
self.count_songs = None
self.name = element.find('name').text
[docs] def get_children(self, start=0, end=0):
return self.store.ampache_query('tag_songs', start, end - start,
filter=self.ampache_id)
[docs] def get_child_count(self):
if self.count_songs is not None:
return self.count_songs
def got_childs(result):
self.count_songs = len(result)
return self.count_songs
d = self.get_children()
d.addCallback(got_childs)
return d
[docs] def get_item(self, parent_id=AUDIO_TAG_CONTAINER_ID):
item = DIDLLite.Genre(self.id, parent_id, self.name)
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.name
[docs]class Track(BackendItem):
logCategory = 'ampache_store'
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.id = f'song.{int(element.get("id")):d}'
self.parent_id = f'album.{int(element.find("album").get("id")):d}'
self.url = element.find('url').text
seconds = int(element.find('time').text)
self.duration = time.strftime(
'%H:%M:%S', time.gmtime(seconds))
self.bitrate = 0
self.title = element.find('title').text
self.artist = element.find('artist').text
self.album = element.find('album').text
try:
self.genre = element.find('genre').text
except Exception:
self.genre = None
self.track_nr = element.find('track').text
try:
self.cover = element.find('art').text
except Exception:
self.cover = None
self.mimetype = None
try:
self.mimetype = element.find('mime').text
except Exception:
self.mimetype, _ = mimetypes.guess_type(self.url, strict=False)
if self.mimetype is None:
self.mimetype = "audio/mpeg"
try:
self.size = int(element.find('size').text)
except Exception:
self.size = 0
if self.store.proxy:
self.location = ProxySong(self.url)
[docs] def get_children(self, start=0, request_count=0):
return []
[docs] def get_child_count(self):
return 0
[docs] def get_item(self, parent_id=None):
self.debug(f'Track get_item {self.id} @ {self.parent_id}')
# create item
item = DIDLLite.MusicTrack(self.id, self.parent_id)
item.album = self.album
item.artist = self.artist
# item.date =
item.genre = self.genre
item.originalTrackNumber = self.track_nr
item.title = self.title
item.albumArtURI = self.cover
# add http resource
res = DIDLLite.Resource(self.get_url(),
f'http-get:*:{self.mimetype}:*')
if self.size > 0:
res.size = self.size
if self.duration > 0:
res.duration = str(self.duration)
if self.bitrate > 0:
res.bitrate = str(self.bitrate)
item.res.append(res)
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.title
[docs] def get_url(self):
if self.store.proxy:
return self.store.urlbase + str(self.id)
else:
return self.url
[docs] def get_path(self):
return None
[docs]class Video(BackendItem):
logCategory = 'ampache_store'
def __init__(self, store, element):
BackendItem.__init__(self)
self.store = store
self.id = f'video.{int(element.get("id")):d}'
self.url = element.find('url').text
try:
seconds = int(element.find('time').text)
hours = seconds / 3600
seconds = seconds - hours * 3600
minutes = seconds / 60
seconds = seconds - minutes * 60
self.duration = f'{hours:d}:{minutes:02d}:{seconds:02d}'
except Exception:
self.duration = 0
self.cover = None
self.title = element.find('title').text
self.mimetype = None
try:
self.mimetype = element.find('mime').text
except Exception:
self.mimetype, _ = mimetypes.guess_type(self.url, strict=False)
if self.mimetype is None:
self.mimetype = 'video/avi'
try:
self.size = int(element.find('size').text)
except Exception:
self.size = 0
if self.store.proxy:
self.location = ProxySong(self.url)
[docs] def get_children(self, start=0, request_count=0):
return []
[docs] def get_child_count(self):
return 0
[docs] def get_item(self, parent_id=VIDEO_CONTAINER_ID):
self.debug(f'video get_item {self.id} @ {parent_id}')
# create item
item = DIDLLite.VideoItem(self.id, parent_id)
item.title = self.title
item.albumArtURI = self.cover
# add http resource
res = DIDLLite.Resource(self.get_url(),
f'http-get:*:{self.mimetype}:*')
if self.size > 0:
res.size = self.size
if self.duration > 0:
res.duration = str(self.duration)
item.res.append(res)
return item
[docs] def get_id(self):
return self.id
[docs] def get_name(self):
return self.title
[docs] def get_url(self):
if self.store.proxy:
return self.store.urlbase + str(self.id)
else:
return self.url
[docs] def get_path(self):
return None
[docs]class AmpacheStore(BackendStore):
'''This is a backend to the Ampache Media DB
.. versionchanged:: 0.9.0
Migrated from louie/dispatcher to EventDispatcher
'''
implements = ['MediaServer']
logCategory = 'ampache_store'
def __init__(self, server, **kwargs):
BackendStore.__init__(self, server, **kwargs)
self.config = kwargs
self.name = kwargs.get('name', 'Ampache')
self.key = kwargs.get('password', kwargs.get('key', ''))
self.user = kwargs.get('user', None)
self.url = kwargs.get('url',
'http://localhost/ampache/server/xml.server.php')
if kwargs.get('proxy', 'no') in [1, 'Yes', 'yes', 'True', 'true']:
self.proxy = True
else:
self.proxy = False
self.update_id = 0
self.token = None
self.songs = 0
self.albums = 0
self.artists = 0
self.api_version = int(kwargs.get('api_version', 350001))
# self.api_version=int(kwargs.get('api_version',340001))
self.get_token()
def __repr__(self):
return 'Ampache storage'
[docs] def get_by_id(self, id):
self.info(f'looking for id {id}')
if isinstance(id, str):
id = id.split('@', 1)[0]
elif isinstance(id, bytes):
id = id.decode('utf-8').split('@', 1)[0]
if isinstance(id, str) and id.startswith('artist_all_tracks_'):
try:
return self.containers[id]
except Exception:
return None
item = None
try:
id = int(id)
item = self.containers[id]
except ValueError:
try:
type, id = id.split('.')
if type in ['song', 'artist', 'album', 'playlist', 'genre',
'tag', 'video']:
item = self.ampache_query(type, filter=str(id))
except ValueError:
return None
return item
[docs] def got_auth_response(self, response, renegotiate=False):
self.info(f'got_auth_response {response}')
try:
response = etree.fromstring(response)
except SyntaxError as msg:
self.warning(f'error parsing ampache answer {msg}')
raise SyntaxError(f'error parsing ampache answer {msg}')
try:
error = response.find('error').text
self.warning(f'error on token request {error}')
raise ValueError(error)
except AttributeError:
try:
self.token = response.find('auth').text
self.songs = int(response.find('songs').text)
self.albums = int(response.find('albums').text)
self.artists = int(response.find('artists').text)
try:
self.playlists = int(response.find('playlists').text)
except Exception:
self.playlists = 0
try:
self.genres = int(response.find('genres').text)
except Exception:
self.genres = 0
try:
self.tags = int(response.find('tags').text)
except Exception:
self.tags = 0
try:
self.videos = int(response.find('videos').text)
except Exception:
self.videos = 0
self.info('ampache returned auth token %r', self.token)
self.info(
f'Songs: {self.songs:d}, Artists: {self.artists:d}, '
f'Albums: {self.albums:d}, Playlists {self.playlists:d}, '
f'Genres {self.genres:d}, Tags {self.tag:d}, '
f'Videos {self.videos:d}')
if not renegotiate:
self.containers = {}
self.containers[ROOT_CONTAINER_ID] = \
Container(ROOT_CONTAINER_ID, -1, self.name, store=self)
self.wmc_mapping.update(
{'4': lambda: self.get_by_id(
AUDIO_ALL_CONTAINER_ID),
# all tracks
'5': lambda: self.get_by_id(
AUDIO_GENRE_CONTAINER_ID),
# all genres
'6': lambda: self.get_by_id(
AUDIO_ARTIST_CONTAINER_ID),
# all artists
'7': lambda: self.get_by_id(
AUDIO_ALBUM_CONTAINER_ID),
# all albums
'13': lambda: self.get_by_id(
AUDIO_PLAYLIST_CONTAINER_ID), # all playlists
'8': lambda: self.get_by_id(
VIDEO_CONTAINER_ID),
# all videos
})
self.init_completed = True
except AttributeError:
raise ValueError('no authorization token returned')
[docs] def got_auth_error(self, e, renegotiate=False):
self.warning(f'error calling ampache {e}')
if not renegotiate:
self.on_init_failed(msg=e)
[docs] def get_token(self, renegotiate=False):
'''Ask Ampache for the authorization token'''
timestamp = int(time.time())
if self.api_version < 350001:
passphrase = md5(f'{timestamp:d}{self.key}')
else:
passphrase = sha256(f'{timestamp:d}{sha256(self.key)}')
request = ''.join(
(self.url,
f'?action=handshake&auth={passphrase}×tamp={timestamp:d}'))
if self.user is not None:
request = ''.join((request, f'&user={self.user}'))
if self.api_version is not None:
request = ''.join((request, f'&version={str(self.api_version)}'))
self.info(f'auth_request {request}')
d = utils.getPage(request)
d.addCallback(self.got_auth_response, renegotiate)
d.addErrback(self.got_auth_error, renegotiate)
return d
[docs] def got_error(self, e):
self.warning(f'error calling ampache {e}')
return e
[docs] def got_response(self, response, query_item, request):
self.info(f'got a response for {query_item}')
self.debug(response)
response = etree.fromstring(response)
items = []
try:
error = response.find('error')
self.warning(
f'error on token request {error.attrib["code"]} {error.text}')
if error.attrib['code'] == '401':
# session error, we need to renegotiate our session
d = self.get_token(renegotiate=True)
def resend_request(result, old_request):
# exchange the auth token in the resending request
new_request = old_request.split('&')
for part in new_request:
if part.startswith('auth='):
new_request[new_request.index(
part)] = f'auth={self.token}'
break
new_request = '&'.join(new_request)
self.info(f'ampache_query {new_request}')
return utils.getPage(new_request)
d.addCallback(resend_request, request)
d.addErrBack(self.got_error)
return d
raise ValueError(error.text)
except AttributeError:
if query_item in ('song', 'artist', 'album',
'playlist', 'genre', 'tag', 'video'):
q = response.find(query_item)
if q is None:
return None
else:
if q.tag in ['song']:
return Track(self, q)
if q.tag == 'artist':
return Artist(self, q)
if q.tag in ['album']:
return Album(self, q)
if q.tag in ['playlist']:
return Playlist(self, q)
if q.tag in ['genre']:
return Genre(self, q)
if q.tag in ['tag']:
return Tag(self, q)
if q.tag in ['video']:
return Video(self, q)
else:
if query_item in ('songs', 'artists', 'albums', 'playlists',
'genres', 'tags', 'videos'):
query_item = query_item[:-1]
if query_item in ('playlist_songs', 'album_songs',
'genre_songs', 'tag_songs'):
query_item = 'song'
if query_item in ('artist_albums',):
query_item = 'album'
for q in response.findall(query_item):
if query_item in ('song',):
items.append(Track(self, q))
if query_item in ('artist',):
items.append(Artist(self, q))
if query_item in ('album',):
items.append(Album(self, q))
if query_item in ('playlist',):
items.append(Playlist(self, q))
if query_item in ('genre',):
items.append(Genre(self, q))
if query_item in ('tag',):
items.append(Tag(self, q))
if query_item in ('video',):
items.append(Video(self, q))
return items
[docs] def ampache_query(self, item, start=0, request_count=0, filter=None):
request = ''.join(
(self.url, f'?action={item}&auth={self.token}&offset={start:d}'))
if request_count > 0:
request = ''.join((request, f'&limit={request_count:d}'))
if filter is not None:
request = ''.join((request, f'&filter={filter}'))
self.info(f'ampache_query {request}')
d = utils.getPage(request)
d.addCallback(self.got_response, item, request)
d.addErrback(self.got_error)
return d
[docs] def ampache_query_songs(self, start=0, request_count=0):
return self.ampache_query('songs', start, request_count)
[docs] def ampache_query_albums(self, start=0, request_count=0):
return self.ampache_query('albums', start, request_count)
[docs] def ampache_query_artists(self, start=0, request_count=0):
return self.ampache_query('artists', start, request_count)
[docs] def ampache_query_playlists(self, start=0, request_count=0):
return self.ampache_query('playlists', start, request_count)
[docs] def ampache_query_genres(self, start=0, request_count=0):
return self.ampache_query('genres', start, request_count)
[docs] def ampache_query_videos(self, start=0, request_count=0):
return self.ampache_query('videos', start, request_count)
[docs] def upnp_init(self):
if self.server:
self.server.connection_manager_server.set_variable(
0, 'SourceProtocolInfo',
['http-get:*:audio/mpeg:*',
'http-get:*:application/ogg:*',
'http-get:*:video/mp4:*',
'http-get:*:video/x-msvideo:*',
'http-get:*:video/avi:*',
'http-get:*:video/quicktime:*', ])
self.containers[AUDIO_ALL_CONTAINER_ID] = \
Container(AUDIO_ALL_CONTAINER_ID, ROOT_CONTAINER_ID, 'All tracks',
store=self,
children_callback=self.ampache_query_songs,
play_container=True)
self.containers[AUDIO_ALL_CONTAINER_ID].childCount = self.songs
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_ALL_CONTAINER_ID])
self.containers[AUDIO_ALBUM_CONTAINER_ID] = \
Container(AUDIO_ALBUM_CONTAINER_ID, ROOT_CONTAINER_ID, 'Albums',
store=self,
children_callback=self.ampache_query_albums)
self.containers[AUDIO_ALBUM_CONTAINER_ID].childCount = self.albums
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_ALBUM_CONTAINER_ID])
self.containers[AUDIO_ARTIST_CONTAINER_ID] = \
Container(AUDIO_ARTIST_CONTAINER_ID, ROOT_CONTAINER_ID, 'Artists',
store=self,
children_callback=self.ampache_query_artists)
self.containers[AUDIO_ARTIST_CONTAINER_ID].childCount = self.artists
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_ARTIST_CONTAINER_ID])
self.containers[AUDIO_PLAYLIST_CONTAINER_ID] = \
Container(AUDIO_PLAYLIST_CONTAINER_ID, ROOT_CONTAINER_ID,
'Playlists',
store=self,
children_callback=self.ampache_query_playlists,
container_class=DIDLLite.PlaylistContainer)
self.containers[AUDIO_PLAYLIST_CONTAINER_ID].childCount = \
self.playlists
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_PLAYLIST_CONTAINER_ID])
self.containers[AUDIO_GENRE_CONTAINER_ID] = \
Container(AUDIO_GENRE_CONTAINER_ID, ROOT_CONTAINER_ID, 'Genres',
store=self,
children_callback=self.ampache_query_genres)
self.containers[AUDIO_GENRE_CONTAINER_ID].childCount = self.genres
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_GENRE_CONTAINER_ID])
self.containers[AUDIO_TAG_CONTAINER_ID] = \
Container(AUDIO_TAG_CONTAINER_ID, ROOT_CONTAINER_ID, 'Tags',
store=self,
children_callback=self.ampache_query_tags)
self.containers[AUDIO_TAG_CONTAINER_ID].childCount = self.tags
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[AUDIO_TAG_CONTAINER_ID])
self.containers[VIDEO_CONTAINER_ID] = \
Container(VIDEO_CONTAINER_ID, ROOT_CONTAINER_ID, 'Videos',
store=self,
children_callback=self.ampache_query_videos)
self.containers[VIDEO_CONTAINER_ID].childCount = self.videos
self.containers[ROOT_CONTAINER_ID].add_child(
self.containers[VIDEO_CONTAINER_ID])
[docs] def upnp_XBrowse(self, *args, **kwargs):
try:
ObjectID = kwargs['ObjectID']
except Exception as e:
self.debug(
f'hmm, a Browse action and no ObjectID argument? '
f'An XBox maybe? [ERROR: {e}]')
try:
ObjectID = kwargs['ContainerID']
except Exception:
ObjectID = 0
BrowseFlag = kwargs['BrowseFlag']
Filter = kwargs['Filter']
StartingIndex = int(kwargs['StartingIndex'])
RequestedCount = int(kwargs['RequestedCount'])
SortCriteria = kwargs['SortCriteria']
parent_container = None
requested_id = None
if BrowseFlag == 'BrowseDirectChildren':
parent_container = str(ObjectID)
else:
requested_id = str(ObjectID)
self.info(f'upnp_Browse request {ObjectID} {BrowseFlag} '
f'{StartingIndex} {RequestedCount}')
didl = DIDLLite.DIDLElement(
upnp_client=kwargs.get('X_UPnPClient', ''),
requested_id=requested_id,
parent_container=parent_container)
def build_response(tm):
num_ret = didl.numItems()
# if int(kwargs['RequestedCount']) != 0 and \
# num_ret != int(kwargs['RequestedCount']):
# num_ret = 0
# if RequestedCount == 0 and tm-StartingIndex != num_ret:
# num_ret = 0
r = {'Result': didl.toString(), 'TotalMatches': tm,
'NumberReturned': num_ret}
self.info(f'upnp_Browse response {num_ret} {tm}')
if hasattr(item, 'update_id'):
r['UpdateID'] = item.update_id
elif hasattr(self, 'update_id'):
r['UpdateID'] = self.update_id # FIXME
else:
r['UpdateID'] = 0
return r
def got_error(r):
return r
def process_result(result, found_item):
if result is None:
result = []
if BrowseFlag == 'BrowseDirectChildren':
lc = []
def process_items(result, tm):
if result is None:
result = []
for i in result:
if i[0]:
didl.addItem(i[1])
return build_response(tm)
for it in result:
md = defer.maybeDeferred(it.get_item)
lc.append(md)
def got_child_count(count):
dl = defer.DeferredList(lc)
dl.addCallback(process_items, count)
return dl
md = defer.maybeDeferred(found_item.get_child_count)
md.addCallback(got_child_count)
return md
else:
didl.addItem(result)
total = 1
return build_response(total)
total = 0
items = []
wmc_mapping = getattr(self, 'wmc_mapping', None)
if (kwargs.get('X_UPnPClient', '') == 'XBox' and
wmc_mapping is not None and
ObjectID in wmc_mapping):
''' fake a Windows Media Connect Server
'''
root_id = wmc_mapping[ObjectID]
if callable(root_id):
item = root_id()
if item is not None:
if isinstance(item, list):
total = len(item)
if int(RequestedCount) == 0:
items = item[StartingIndex:]
else:
items = item[StartingIndex:
StartingIndex + RequestedCount]
else:
d = defer.maybeDeferred(item.get_children,
StartingIndex,
StartingIndex + RequestedCount)
d.addCallback(process_result)
d.addErrback(got_error)
return d
for i in items:
didl.addItem(i.get_item())
return build_response(total)
root_id = ObjectID
item = self.get_by_id(root_id)
if item is None:
return failure.Failure(errorCode(701))
def proceed(result):
if BrowseFlag == 'BrowseDirectChildren':
md = defer.maybeDeferred(result.get_children, StartingIndex,
StartingIndex + RequestedCount)
else:
md = defer.maybeDeferred(result.get_item)
md.addCallback(process_result, result)
md.addErrback(got_error)
return md
if isinstance(item, defer.Deferred):
item.addCallback(proceed)
return item
else:
return proceed(item)
if __name__ == '__main__':
from coherence.base import Coherence
def main():
def got_result(result):
print('got_result')
def call_browse(ObjectID=0, StartingIndex=0, RequestedCount=0):
r = f.backend.upnp_Browse(BrowseFlag='BrowseDirectChildren',
RequestedCount=RequestedCount,
StartingIndex=StartingIndex,
ObjectID=ObjectID,
SortCriteria='*',
Filter='')
r.addCallback(got_result)
r.addErrback(got_result)
def call_test(start, count):
r = f.backend.ampache_query_artists(start, count)
r.addCallback(got_result)
r.addErrback(got_result)
config = {}
config['logmode'] = 'warning'
c = Coherence(config)
f = c.add_plugin('AmpacheStore',
url='http://localhost/ampache/server/xml.server.php',
key='password',
user=None)
reactor.callLater(3, call_browse, 0, 0, 0)
# reactor.callLater(3, call_browse, AUDIO_ALL_CONTAINER_ID, 0, 0)
# reactor.callLater(3, call_browse, AUDIO_ARTIST_CONTAINER_ID, 0, 10)
# reactor.callLater(3, call_browse, AUDIO_ALBUM_CONTAINER_ID, 0, 10)
# reactor.callLater(3, call_test, 0, 10)
reactor.callWhenRunning(main)
reactor.run()