Source code for coherence.backends.fs_storage

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2006, Frank Scholz <coherence@beebits.net>
'''
FSStore - filesystem media server
---------------------------------

FSStore exposes media files found in the directory trees defined by
the 'content' configuration entry.

The first ".jpg" or ".png" file found inside a media directory is
served as a cover image.

The plugin is configured with::

    <plugin active="yes">
      <!-- The plugin identifier, mandatory -->
      <backend>FSStore</backend>
      <!-- A comma-separated list of path containing the medias to serve -->
      <content>/media/path1,/media/path2</content>
      <!-- The avertized media server name, default: "my media" -->
      <name>my media</name>
      <!-- The highest UPnP version this
      media server should support, default: 2 -->
      <version>2</version>
      <!-- A unique identifier used to reference the media server,
      autogenerated if not set explicitly. In this case, some control points
      might memorize it between runs and display the same media server more
      than once. -->
      <uuid>2f7f4096-cba3-4390-be7d-d1d07106a6f4</uuid>
    </plugin>
'''
import glob
import mimetypes
import os
import re
import shutil
import stat
import tempfile
import traceback
from datetime import datetime
from functools import partial
from urllib.parse import quote

mimetypes.init()
mimetypes.add_type('audio/x-m4a', '.m4a')
mimetypes.add_type('audio/x-musepack', '.mpc')
mimetypes.add_type('audio/x-wavpack', '.wv')
mimetypes.add_type('video/mp4', '.mp4')
mimetypes.add_type('video/mpegts', '.ts')
mimetypes.add_type('video/divx', '.divx')
mimetypes.add_type('video/divx', '.avi')
mimetypes.add_type('video/x-matroska', '.mkv')

from urllib.parse import urlsplit

from twisted.python.filepath import FilePath
from twisted.python import failure

from coherence.upnp.core.DIDLLite import classChooser, Container, Resource
from coherence.upnp.core.DIDLLite import DIDLElement
from coherence.upnp.core.DIDLLite import simple_dlna_tags
from coherence.upnp.core.soap_service import errorCode

from coherence.upnp.core import utils

try:
    from twisted.internet.inotify import (
        INotify, IN_CREATE, IN_DELETE, IN_MOVED_FROM, IN_MOVED_TO,
        IN_ISDIR, IN_CHANGED, _FLAG_TO_HUMAN)
except Exception as msg:
    INotify = None
    no_inotify_reason = msg

from coherence.extern.xdg import xdg_content

from coherence.backend import BackendItem, BackendStore

# Sorting helpers
NUMS = re.compile('([0-9]+)')


[docs]def _natural_key(s): # strip the spaces s = s.get_name().strip() return [part.isdigit() and int(part) or part.lower() for part in NUMS.split(s)]
[docs]class NoThumbnailFound(Exception): '''no thumbnail found'''
[docs]def _find_thumbnail(filename, thumbnail_folder='.thumbs'): ''' looks for a thumbnail file of the same basename in a folder named '.thumbs' relative to the file returns the filename of the thumb, its mimetype and the correspondig DLNA PN string or throws an Exception otherwise ''' name, ext = os.path.splitext(os.path.basename(filename)) pattern = os.path.join(os.path.dirname(filename), thumbnail_folder, name + '.*') for f in glob.glob(pattern): mimetype, _ = mimetypes.guess_type(f, strict=False) if mimetype in ('image/jpeg', 'image/png'): if mimetype == 'image/jpeg': dlna_pn = 'DLNA.ORG_PN=JPEG_TN' else: dlna_pn = 'DLNA.ORG_PN=PNG_TN' return os.path.abspath(f), mimetype, dlna_pn else: raise NoThumbnailFound()
[docs]class FSItem(BackendItem): logCategory = 'fs_item' def __init__(self, object_id, parent, path, mimetype, urlbase, UPnPClass, update=False, store=None): BackendItem.__init__(self) self.id = object_id self.parent = parent if parent: parent.add_child(self, update=update) if mimetype == 'root': self.location = str(path) else: if mimetype == 'item' and path is None: path = os.path.join(parent.get_realpath(), str(self.id)) # self.location = FilePath(unicode(path)) self.location = FilePath(path) self.mimetype = mimetype if urlbase[-1] != '/': urlbase += '/' self.url = urlbase + str(self.id) self.store = store if parent is None: parent_id = -1 else: parent_id = parent.get_id() self.item = UPnPClass(object_id, parent_id, self.get_name()) if isinstance(self.item, Container): self.item.childCount = 0 self.child_count = 0 self.children = [] self.sorted = False self.caption = None if mimetype in ['directory', 'root']: self.update_id = 0 self.get_url = lambda: self.url # self.item.searchable = True # self.item.searchClass = 'object' if (isinstance(self.location, FilePath) and self.location.isdir() is True): self.check_for_cover_art() if getattr(self, 'cover', None): _, ext = os.path.splitext(self.cover) ''' add the cover image extension to help clients not reacting on the mimetype ''' self.item.albumArtURI = \ ''.join((urlbase, str(self.id), '?cover', str(ext))) else: self.get_url = lambda: self.url if self.mimetype.startswith('audio/'): if getattr(parent, 'cover', None): _, ext = os.path.splitext(parent.cover) ''' add the cover image extension to help clients not reacting on the mimetype ''' self.item.albumArtURI = \ ''.join((urlbase, str(self.id), '?cover', ext)) _, host_port, _, _, _ = urlsplit(urlbase) if host_port.find(':') != -1: host, port = tuple(host_port.split(':')) else: host = host_port try: size = self.location.getsize() except Exception: size = 0 if (self.store.server and self.store.server.coherence.config.get('transcoding', 'no') == 'yes'): if self.mimetype in ('application/ogg', 'audio/ogg', 'audio/x-wav', 'audio/x-m4a', 'application/x-flac'): new_res = Resource(self.url + '/transcoded.mp3', f'http-get:*:{"audio/mpeg"}:*') new_res.size = None # self.item.res.append(new_res) if mimetype != 'item': res = Resource( 'file://' + quote( self.get_path(), encoding='utf-8'), f'internal:{host}:{self.mimetype}:*') res.size = size self.item.res.append(res) if mimetype != 'item': res = Resource(self.url, f'http-get:*:{self.mimetype}:*') else: res = Resource(self.url, 'http-get:*:*:*') res.size = size self.item.res.append(res) ''' if this item is of type audio and we want to add a transcoding rule for it, this is the way to do it: create a new Resource object, at least a 'http-get' and maybe an 'internal' one too for transcoding to wav this looks like that res = Resource( url_for_transcoded audio, 'http-get:*:audio/x-wav:%s'% ';'.join( ['DLNA.ORG_PN=JPEG_TN']+simple_dlna_tags)) res.size = None self.item.res.append(res) ''' if (self.store.server and self.store.server.coherence.config.get( 'transcoding', 'no') == 'yes'): if self.mimetype in ('audio/mpeg', 'application/ogg', 'audio/ogg', 'audio/x-wav', 'audio/x-m4a', 'audio/flac', 'application/x-flac'): dlna_pn = 'DLNA.ORG_PN=LPCM' dlna_tags = simple_dlna_tags[:] # dlna_tags[1] = 'DLNA.ORG_OP=00' dlna_tags[2] = 'DLNA.ORG_CI=1' new_res = Resource( self.url + '?transcoded=lpcm', f'http-get:*:{"audio/L16;rate=44100;channels=2"}:' f'{";".join([dlna_pn] + dlna_tags)}') new_res.size = None # self.item.res.append(new_res) if self.mimetype != 'audio/mpeg': new_res = Resource(self.url + '?transcoded=mp3', f'http-get:*:{"audio/mpeg"}:*') new_res.size = None # self.item.res.append(new_res) ''' if this item is an image and we want to add a thumbnail for it we have to follow these rules: create a new Resource object, at least a 'http-get' and maybe an 'internal' one too for an JPG this looks like that res = Resource(url_for_thumbnail, 'http-get:*:image/jpg:%s'% ';'.join( ['DLNA.ORG_PN=JPEG_TN']+simple_dlna_tags)) res.size = size_of_thumbnail self.item.res.append(res) and for a PNG the Resource creation is like that res = Resource(url_for_thumbnail, 'http-get:*:image/png:%s'% ';'.join( simple_dlna_tags+['DLNA.ORG_PN=PNG_TN'])) if not hasattr(self.item, 'attachments'): self.item.attachments = {} self.item.attachments[key] = utils.StaticFile( filename_of_thumbnail) ''' if (self.mimetype in ('image/jpeg', 'image/png') or self.mimetype.startswith('video/')): try: filename, mimetype, dlna_pn = _find_thumbnail( self.get_path()) except NoThumbnailFound: pass except Exception: self.warning(traceback.format_exc()) else: dlna_tags = simple_dlna_tags[:] dlna_tags[ 3] = 'DLNA.ORG_FLAGS=00f00000000000000000000000000000' hash_from_path = str(id(filename)) new_res = Resource( self.url + '?attachment=' + hash_from_path, f'http-get:*:{mimetype}:' f'{";".join([dlna_pn] + dlna_tags)}') new_res.size = os.path.getsize(filename) self.item.res.append(new_res) if not hasattr(self.item, 'attachments'): self.item.attachments = {} self.item.attachments[hash_from_path] = utils.StaticFile( filename) if self.mimetype.startswith('video/'): # check for a subtitles file caption, _ = os.path.splitext(self.get_path()) caption = caption + '.srt' if os.path.exists(caption): hash_from_path = str(id(caption)) mimetype = 'smi/caption' new_res = Resource( self.url + '?attachment=' + hash_from_path, f'http-get:*:{mimetype}:{"*"}') new_res.size = os.path.getsize(caption) self.caption = new_res.data self.item.res.append(new_res) if not hasattr(self.item, 'attachments'): self.item.attachments = {} self.item.attachments[hash_from_path] = utils.StaticFile( caption) try: # FIXME: getmtime is deprecated in Twisted 2.6 self.item.date = datetime.fromtimestamp( self.location.getmtime()) except Exception: self.item.date = None
[docs] def rebuild(self, urlbase): # print('rebuild', self.mimetype) if self.mimetype != 'item': return # print('rebuild for', self.get_path()) mimetype, _ = mimetypes.guess_type(self.get_path(), strict=False) if mimetype is None: return self.mimetype = mimetype # print('rebuild', self.mimetype) UPnPClass = classChooser(self.mimetype) self.item = UPnPClass(self.id, self.parent.id, self.get_name()) if getattr(self.parent, 'cover', None): _, ext = os.path.splitext(self.parent.cover) ''' add the cover image extension to help clients not reacting on the mimetype ''' self.item.albumArtURI = ''.join( (urlbase, str(self.id), '?cover', ext)) _, host_port, _, _, _ = urlsplit(urlbase) if host_port.find(':') != -1: host, port = tuple(host_port.split(':')) else: host = host_port res = Resource( 'file://' + quote( self.get_path()), f'internal:{host}:{self.mimetype}:*') try: res.size = self.location.getsize() except Exception: res.size = 0 self.item.res.append(res) res = Resource(self.url, f'http-get:*:{self.mimetype}:*') try: res.size = self.location.getsize() except Exception: res.size = 0 self.item.res.append(res) try: # FIXME: getmtime is deprecated in Twisted 2.6 self.item.date = datetime.fromtimestamp(self.location.getmtime()) except Exception: self.item.date = None self.parent.update_id += 1
[docs] def check_for_cover_art(self): ''' let's try to find in the current directory some jpg file, or png if the jpg search fails, and take the first one that comes around ''' try: jpgs = [i.path for i in self.location.children() if i.splitext()[1] in ('.jpg', '.JPG')] try: self.cover = jpgs[0] except IndexError: pngs = [i.path for i in self.location.children() if i.splitext()[1] in ('.png', '.PNG')] try: self.cover = pngs[0] except IndexError: return except UnicodeDecodeError: self.warning( f'UnicodeDecodeError - there is something wrong with a ' f'file located in {self.location.path}')
[docs] def remove(self): # print('FSItem remove', self.id, self.get_name(), self.parent) if self.parent: self.parent.remove_child(self) del self.item
[docs] def add_child(self, child, update=False): self.children.append(child) self.child_count += 1 if isinstance(self.item, Container): self.item.childCount += 1 if update: self.update_id += 1 self.sorted = False
[docs] def remove_child(self, child): # print(f'remove_from {self.id:d} ({self.get_name()}) ' # f'child {child.id:d} ({child.get_name()})') if child in self.children: self.child_count -= 1 if isinstance(self.item, Container): self.item.childCount -= 1 self.children.remove(child) self.update_id += 1 self.sorted = False
[docs] def get_children(self, start=0, request_count=0): if not self.sorted: self.children.sort(key=_natural_key) self.sorted = True if request_count == 0: return self.children[start:] else: return self.children[start:request_count]
[docs] def get_child_count(self): return self.child_count
[docs] def get_id(self): return self.id
[docs] def get_update_id(self): if hasattr(self, 'update_id'): return self.update_id else: return None
[docs] def get_path(self): if self.mimetype in ['directory', 'root']: return None if isinstance(self.location, FilePath): return self.location.path else: return self.location
[docs] def get_realpath(self): if isinstance(self.location, FilePath): return self.location.path else: return self.location
[docs] def set_path(self, path=None, extension=None): if path is None: path = self.get_path() if extension is not None: path, old_ext = os.path.splitext(path) path = ''.join((path, extension)) if isinstance(self.location, FilePath): self.location = FilePath(path) else: self.location = path
[docs] def get_name(self): if isinstance(self.location, FilePath): name = self.location.basename() else: name = self.location return name
[docs] def get_cover(self): if self.cover: return self.cover try: return self.parent.cover except AttributeError: return None
[docs] def get_parent(self): return self.parent
[docs] def get_item(self): return self.item
[docs] def get_xml(self): return self.item.toString()
def __repr__(self): return 'id: ' + str(self.id) + ' @ ' + \ str(self.get_name().encode('ascii', 'xmlcharrefreplace'))
[docs]class FSStore(BackendStore): ''' .. versionchanged:: 0.9.0 Migrated from louie/dispatcher to EventDispatcher ''' logCategory = 'fs_store' implements = ['MediaServer'] description = '''MediaServer exporting files from the file-system''' options = [ {'option': 'name', 'type': 'string', 'default': 'my media', 'help': 'the name under this MediaServer ' 'shall show up with on other UPnP clients'}, {'option': 'version', 'type': 'int', 'default': 2, 'enum': (2, 1), 'help': 'the highest UPnP version this MediaServer shall support', 'level': 'advance'}, {'option': 'uuid', 'type': 'string', 'help': 'the unique (UPnP) identifier for this MediaServer,' ' usually automatically set', 'level': 'advance'}, {'option': 'content', 'type': 'string', 'default': xdg_content(), 'help': 'the path(s) this MediaServer shall export'}, {'option': 'ignore_patterns', 'type': 'string', 'help': 'list of regex patterns, matching filenames will be ignored'}, {'option': 'enable_inotify', 'type': 'string', 'default': 'yes', 'help': 'enable real-time monitoring of the content folders'}, {'option': 'enable_destroy', 'type': 'string', 'default': 'no', 'help': 'enable deleting a file via an UPnP method'}, {'option': 'import_folder', 'type': 'string', 'help': 'The path to store files imported via an UPnP method, ' 'if empty the Import method is disabled'} ] def __init__(self, server, **kwargs): BackendStore.__init__(self, server, **kwargs) self.next_id = 1000 self.name = kwargs.get('name', 'my media') self.content = kwargs.get('content', None) if self.content is not None: if isinstance(self.content, str): self.content = [self.content] cl = [] for a in self.content: cl += a.split(',') self.content = cl else: self.content = xdg_content() self.content = [x[0] for x in self.content] if self.content is None: self.content = 'tests/content' if not isinstance(self.content, list): self.content = [self.content] self.content = set([os.path.abspath(x) for x in self.content]) ignore_patterns = kwargs.get('ignore_patterns', []) self.store = {} self.inotify = None if kwargs.get('enable_inotify', 'yes') == 'yes': if INotify: try: self.inotify = INotify() self.inotify.startReading() except Exception as msg: self.error(f'inotify disabled: {msg}') self.inotify = None else: self.info(f'{no_inotify_reason}') else: self.info('FSStore content auto-update disabled upon user request') if kwargs.get('enable_destroy', 'no') == 'yes': self.upnp_DestroyObject = self.hidden_upnp_DestroyObject self.import_folder = kwargs.get('import_folder', None) if self.import_folder is not None: self.import_folder = os.path.abspath(self.import_folder) if not os.path.isdir(self.import_folder): self.import_folder = None self.ignore_file_pattern = re.compile( r'|'.join([r'^\..*'] + list(ignore_patterns))) parent = None self.update_id = 0 if (len(self.content) > 1 or utils.means_true(kwargs.get('create_root', False)) or self.import_folder is not None): UPnPClass = classChooser('root') id = str(self.getnextID()) try: parent = self.store[id] = FSItem( id, parent, 'media', 'root', self.urlbase, UPnPClass, update=True, store=self) except Exception as e: self.error( f'Error on setting self.store[id], Error on FSItem: {e}') exit(1) if self.import_folder is not None: id = str(self.getnextID()) self.store[id] = FSItem( id, parent, self.import_folder, 'directory', self.urlbase, UPnPClass, update=True, store=self) self.import_folder_id = id for bytesPath in self.content: if isinstance(bytesPath, (list, tuple)): path = str(path[0]) else: path = str(bytesPath) if self.ignore_file_pattern.match(path): continue try: self.walk(path, parent, self.ignore_file_pattern) except Exception as msg: self.warning(f'on walk of {path!r}: {msg!r}') import traceback self.debug(traceback.format_exc()) self.wmc_mapping.update({'14': '0', '15': '0', '16': '0', '17': '0' }) self.init_completed = True def __repr__(self): return self.__class__.__name__
[docs] def release(self): if self.inotify is not None: self.inotify.stopReading()
[docs] def len(self): return len(self.store)
[docs] def get_by_id(self, id): # print('get_by_id', id, type(id)) # we have referenced ids here when we are in WMC mapping mode if isinstance(id, str): id = id.split('@', 1)[0] elif isinstance(id, bytes): id = id.decode('utf-8').split('@', 1)[0] elif isinstance(id, int): id = str(id) # try: # id = int(id) # except ValueError: # id = 1000 if id == '0': id = '1000' # print('get_by_id 2', id) try: r = self.store[id] except KeyError: r = None # print('get_by_id 3', r) return r
[docs] def get_id_by_name(self, parent='0', name=''): self.info(f'get_id_by_name {parent} ({type(parent)}) {name}') try: parent = self.store[parent] self.debug(f'{parent} {len(parent.children):d}') for child in parent.children: # if not isinstance(name, unicode): # name = name.decode('utf8') self.debug(f'{child.get_name()} {child.get_realpath()} ' f'{name == child.get_realpath()}') if name == child.get_realpath(): return child.id except Exception as e: self.error(f'get_id_by_name: {e!r}') import traceback self.info(traceback.format_exc()) self.debug('get_id_by_name not found') return None
[docs] def get_url_by_name(self, parent='0', name=''): self.info(f'get_url_by_name {parent!r} {name!r}') id = self.get_id_by_name(parent, name) # print 'get_url_by_name', id if id is None: return '' return self.store[id].url
[docs] def update_config(self, **kwargs): self.info(f'update_config: {kwargs}') if 'content' in kwargs: new_content = kwargs['content'] new_content = set( [os.path.abspath(x) for x in new_content.split(',')]) new_folders = new_content.difference(self.content) obsolete_folders = self.content.difference(new_content) self.debug(f'new folders: {new_folders}\n' f'obsolete folders: {obsolete_folders}') for folder in obsolete_folders: self.remove_content_folder(folder) for folder in new_folders: self.add_content_folder(folder) self.content = new_content
[docs] def add_content_folder(self, path): path = os.path.abspath(path) if path not in self.content: self.content.add(path) self.walk(path, self.store['1000'], self.ignore_file_pattern)
[docs] def remove_content_folder(self, path): path = os.path.abspath(path) if path in self.content: id = self.get_id_by_name('1000', path) self.remove(id) self.content.remove(path)
[docs] def walk(self, path, parent=None, ignore_file_pattern=''): self.debug(f'walk {path}') containers = [] parent = self.append(path, parent) if parent is not None: containers.append(parent) while len(containers) > 0: container = containers.pop() try: self.debug(f'adding {container.location!r}') self.info(f'walk.adding: {container.location}') for child in container.location.children(): if ignore_file_pattern.match(child.basename()) is not None: continue new_container = self.append(child.path, container) if new_container is not None: containers.append(new_container) except UnicodeDecodeError: self.warning( f'UnicodeDecodeError - there is something wrong with a ' f'file located in {container.get_path()!r}')
[docs] def create(self, mimetype, path, parent): self.debug(f'create {mimetype} {path} {type(path)} {parent}') UPnPClass = classChooser(mimetype) if UPnPClass is None: return None id = self.getnextID() if mimetype in ('root', 'directory'): id = str(id) else: _, ext = os.path.splitext(path) id = str(id) + ext.lower() update = False if hasattr(self, 'update_id'): update = True self.store[id] = FSItem(id, parent, path, mimetype, self.urlbase, UPnPClass, update=True, store=self) if hasattr(self, 'update_id'): self.update_id += 1 # print(self.update_id) if self.server: if hasattr(self.server, 'content_directory_server'): self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) if parent is not None: value = (parent.get_id(), parent.get_update_id()) if self.server: if hasattr(self.server, 'content_directory_server'): self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) return id
[docs] def append(self, bytes_path, parent): path = str(bytes_path) self.debug(f'append {path} {type(path)} {parent}') if not os.path.exists(path): self.warning(f'path {path!r} not available - ignored') return None if stat.S_ISFIFO(os.stat(path).st_mode): self.warning(f'path {path!r} is a FIFO - ignored') return None try: mimetype, _ = mimetypes.guess_type(path, strict=False) if mimetype is None: if os.path.isdir(path): mimetype = 'directory' if mimetype is None: return None id = self.create(mimetype, path, parent) if mimetype == 'directory': if self.inotify is not None: mask = \ IN_CREATE | IN_DELETE | IN_MOVED_FROM | \ IN_MOVED_TO | IN_CHANGED self.inotify.watch( FilePath(os.path.abspath(path)), mask=mask, autoAdd=False, callbacks=[partial(self.notify, parameter=id)]) return self.store[id] except OSError as os_msg: ''' seems we have some permissions issues along the content path ''' self.warning(f'path {path} isn\'t accessible, error {os_msg}') return None
[docs] def remove(self, id): self.debug(f'FSSTore remove id: {id}') try: item = self.store[id] parent = item.get_parent() item.remove() del self.store[id] if hasattr(self, 'update_id'): self.update_id += 1 if self.server: self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) # value = f'{parent.get_id():d},{parent_get_update_id():d}' value = (parent.get_id(), parent.get_update_id()) if self.server: self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) except KeyError: pass
[docs] def notify(self, ignore, path, mask, parameter=None): self.info('Event %s on %s - parameter %r', ', '.join([fl for fl in _FLAG_TO_HUMAN if fl[0] == mask][0]), path.path, parameter) if mask & IN_CHANGED: # FIXME react maybe on access right changes, loss of read rights? # print(f'{path} was changed, parent {parameter:d} ({iwp.path})') pass if mask & IN_DELETE or mask & IN_MOVED_FROM: self.info(f'{path.path} was deleted, ' f'parent {parameter!r} ({path.parent.path})') id = self.get_id_by_name(parameter, path.path) if id is not None: self.remove(id) if mask & IN_CREATE or mask & IN_MOVED_TO: if mask & IN_ISDIR: self.info(f'directory {path.path} was created, ' f'parent {parameter!r} ({path.parent.path})') else: self.info(f'file {path.path} was created, ' f'parent {parameter!r} ({path.parent.path})') if self.get_id_by_name(parameter, path.path) is None: if path.isdir(): self.walk(path.path, self.get_by_id(parameter), self.ignore_file_pattern) else: if self.ignore_file_pattern.match(parameter) is None: self.append( str(path.path), str(self.get_by_id(parameter)))
[docs] def getnextID(self): ret = self.next_id self.next_id += 1 return ret
[docs] def backend_import(self, item, data): try: f = open(item.get_path(), 'w+b') if hasattr(data, 'read'): data = data.read() f.write(data) f.close() item.rebuild(self.urlbase) return 200 except IOError: self.warning(f'import of file {item.get_path()} failed') except Exception as msg: import traceback self.warning(traceback.format_exc()) return 500
[docs] def upnp_init(self): self.current_connection_id = None if self.server: self.server.connection_manager_server.set_variable( 0, 'SourceProtocolInfo', [f'internal:{self.server.coherence.hostname}:audio/mpeg:*', 'http-get:*:audio/mpeg:*', f'internal:{self.server.coherence.hostname}:video/mp4:*', 'http-get:*:video/mp4:*', f'internal:{self.server.coherence.hostname}:application/ogg:*', # noqa 'http-get:*:application/ogg:*', f'internal:{self.server.coherence.hostname}:video/x-msvideo:*', # noqa 'http-get:*:video/x-msvideo:*', f'internal:{self.server.coherence.hostname}:video/mpeg:*', 'http-get:*:video/mpeg:*', f'internal:{self.server.coherence.hostname}:video/avi:*', 'http-get:*:video/avi:*', f'internal:{self.server.coherence.hostname}:video/divx:*', 'http-get:*:video/divx:*', f'internal:{self.server.coherence.hostname}:video/quicktime:*', # noqa 'http-get:*:video/quicktime:*', f'internal:{self.server.coherence.hostname}:image/gif:*', 'http-get:*:image/gif:*', f'internal:{self.server.coherence.hostname}:image/jpeg:*', 'http-get:*:image/jpeg:*' # 'http-get:*:audio/mpeg:DLNA.ORG_PN=MP3;DLNA.ORG_OP=11;' # 'DLNA.ORG_FLAGS=01700000000000000000000000000000', # 'http-get:*:audio/x-ms-wma:DLNA.ORG_PN=WMABASE;' # 'DLNA.ORG_OP=11;DLNA.ORG_FLAGS' # '=01700000000000000000000000000000', # 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_TN;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=00f00000000000000000000000000000', # 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_SM;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=00f00000000000000000000000000000', # 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_MED;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=00f00000000000000000000000000000', # 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_LRG;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=00f00000000000000000000000000000', # 'http-get:*:video/mpeg:DLNA.ORG_PN=MPEG_PS_PAL;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=01700000000000000000000000000000', # 'http-get:*:video/x-ms-wmv:DLNA.ORG_PN=WMVMED_BASE;' # 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS' # '=01700000000000000000000000000000', ], default=True) self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id)
# self.server.content_directory_server.set_variable( # 0, 'SortCapabilities', '*')
[docs] def upnp_ImportResource(self, *args, **kwargs): SourceURI = kwargs['SourceURI'] DestinationURI = kwargs['DestinationURI'] if DestinationURI.endswith('?import'): id = DestinationURI.split('/')[-1] id = id[:-7] # remove the ?import else: return failure.Failure(errorCode(718)) item = self.get_by_id(id) if item is None: return failure.Failure(errorCode(718)) def gotPage(headers): # print('gotPage', headers) content_type = headers.get('content-type', []) if not isinstance(content_type, list): content_type = list(content_type) if len(content_type) > 0: extension = mimetypes.guess_extension(content_type[0], strict=False) item.set_path(None, extension) shutil.move(tmp_path, item.get_path()) item.rebuild(self.urlbase) if hasattr(self, 'update_id'): self.update_id += 1 if self.server: if hasattr(self.server, 'content_directory_server'): self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) if item.parent is not None: value = (item.parent.get_id(), item.parent.get_update_id()) if self.server: if hasattr(self.server, 'content_directory_server'): self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) def gotError(error, url): self.warning(f'error requesting {url}') self.info(error) os.unlink(tmp_path) return failure.Failure(errorCode(718)) tmp_fp, tmp_path = tempfile.mkstemp() os.close(tmp_fp) utils.downloadPage( SourceURI, tmp_path).addCallbacks( gotPage, gotError, None, None, [SourceURI], None) transfer_id = 0 # FIXME return {'TransferID': transfer_id}
[docs] def upnp_CreateObject(self, *args, **kwargs): # print(f'CreateObject {kwargs}') if kwargs['ContainerID'] == 'DLNA.ORG_AnyContainer': if self.import_folder is not None: ContainerID = self.import_folder_id else: return failure.Failure(errorCode(712)) else: ContainerID = kwargs['ContainerID'] Elements = kwargs['Elements'] parent_item = self.get_by_id(ContainerID) if parent_item is None: return failure.Failure(errorCode(710)) if parent_item.item.restricted: return failure.Failure(errorCode(713)) if len(Elements) == 0: return failure.Failure(errorCode(712)) elt = DIDLElement.fromString(Elements) if elt.numItems() != 1: return failure.Failure(errorCode(712)) item = elt.getItems()[0] if item.parentID == 'DLNA.ORG_AnyContainer': item.parentID = ContainerID if (item.id != '' or item.parentID != ContainerID or item.restricted is True or item.title == ''): return failure.Failure(errorCode(712)) if ('..' in item.title or '~' in item.title or os.sep in item.title): return failure.Failure(errorCode(712)) if item.upnp_class == 'object.container.storageFolder': if len(item.res) != 0: return failure.Failure(errorCode(712)) path = os.path.join(parent_item.get_path(), item.title) id = self.create('directory', path, parent_item) try: os.mkdir(path) except Exception: self.remove(id) return failure.Failure(errorCode(712)) if self.inotify is not None: mask = \ IN_CREATE | IN_DELETE | IN_MOVED_FROM | \ IN_MOVED_TO | IN_CHANGED self.inotify.watch( path, mask=mask, autoAdd=False, callbacks=[partial(self.notify, parameter=id)]) new_item = self.get_by_id(id) didl = DIDLElement() didl.addItem(new_item.item) return {'ObjectID': id, 'Result': didl.toString()} if item.upnp_class.startswith('object.item'): _, _, content_format, _ = item.res[0].protocolInfo.split(':') extension = mimetypes.guess_extension(content_format, strict=False) path = os.path.join(parent_item.get_realpath(), item.title + extension) id = self.create('item', path, parent_item) new_item = self.get_by_id(id) for res in new_item.item.res: res.importUri = new_item.url + '?import' res.data = None didl = DIDLElement() didl.addItem(new_item.item) return {'ObjectID': id, 'Result': didl.toString()} return failure.Failure(errorCode(712))
[docs] def hidden_upnp_DestroyObject(self, *args, **kwargs): ObjectID = kwargs['ObjectID'] item = self.get_by_id(ObjectID) if item is None: return failure.Failure(errorCode(701)) self.info(f'upnp_DestroyObject: {item.location}') try: item.location.remove() except Exception as msg: self.error(f'upnp_DestroyObject [{Exception}]: {msg}') return failure.Failure(errorCode(715)) return {}
if __name__ == '__main__': from twisted.internet import reactor p = 'tests/content' f = FSStore(None, name='my media', content=p, urlbase='http://localhost/xyz') print(f.len()) print(f.get_by_id(1000).child_count, f.get_by_id(1000).get_xml()) print(f.get_by_id(1001).child_count, f.get_by_id(1001).get_xml()) print(f.get_by_id(1002).child_count, f.get_by_id(1002).get_xml()) print(f.get_by_id(1003).child_count, f.get_by_id(1003).get_xml()) print(f.get_by_id(1004).child_count, f.get_by_id(1004).get_xml()) print(f.get_by_id(1005).child_count, f.get_by_id(1005).get_xml()) print(f.store[1000].get_children(0, 0)) # print(f.upnp_Search( # ContainerID='4', # Filter='dc:title,upnp:artist', # RequestedCount='1000', # StartingIndex='0', # SearchCriteria='(upnp:class = 'object.container.album.musicAlbum')', # SortCriteria='+dc:title')) f.upnp_ImportResource(SourceURI='http://spiegel.de', DestinationURI='ttt') reactor.run()