# -*- coding: utf-8 -*-
# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Copyright 2006, Frank Scholz <coherence@beebits.net>
'''
FSStore - filesystem media server
---------------------------------
FSStore exposes media files found in the directory trees defined by
the 'content' configuration entry.
The first ".jpg" or ".png" file found inside a media directory is
served as a cover image.
The plugin is configured with::
<plugin active="yes">
<!-- The plugin identifier, mandatory -->
<backend>FSStore</backend>
<!-- A comma-separated list of path containing the medias to serve -->
<content>/media/path1,/media/path2</content>
<!-- The avertized media server name, default: "my media" -->
<name>my media</name>
<!-- The highest UPnP version this
media server should support, default: 2 -->
<version>2</version>
<!-- A unique identifier used to reference the media server,
autogenerated if not set explicitly. In this case, some control points
might memorize it between runs and display the same media server more
than once. -->
<uuid>2f7f4096-cba3-4390-be7d-d1d07106a6f4</uuid>
</plugin>
'''
import glob
import mimetypes
import os
import re
import shutil
import stat
import tempfile
import traceback
from datetime import datetime
from functools import partial
from urllib.parse import quote
mimetypes.init()
mimetypes.add_type('audio/x-m4a', '.m4a')
mimetypes.add_type('audio/x-musepack', '.mpc')
mimetypes.add_type('audio/x-wavpack', '.wv')
mimetypes.add_type('video/mp4', '.mp4')
mimetypes.add_type('video/mpegts', '.ts')
mimetypes.add_type('video/divx', '.divx')
mimetypes.add_type('video/divx', '.avi')
mimetypes.add_type('video/x-matroska', '.mkv')
from urllib.parse import urlsplit
from twisted.python.filepath import FilePath
from twisted.python import failure
from coherence.upnp.core.DIDLLite import classChooser, Container, Resource
from coherence.upnp.core.DIDLLite import DIDLElement
from coherence.upnp.core.DIDLLite import simple_dlna_tags
from coherence.upnp.core.soap_service import errorCode
from coherence.upnp.core import utils
try:
from twisted.internet.inotify import (
INotify, IN_CREATE, IN_DELETE, IN_MOVED_FROM, IN_MOVED_TO,
IN_ISDIR, IN_CHANGED, _FLAG_TO_HUMAN)
except Exception as msg:
INotify = None
no_inotify_reason = msg
from coherence.extern.xdg import xdg_content
from coherence.backend import BackendItem, BackendStore
# Sorting helpers
NUMS = re.compile('([0-9]+)')
[docs]def _natural_key(s):
# strip the spaces
s = s.get_name().strip()
return [part.isdigit() and int(part) or part.lower() for part in
NUMS.split(s)]
[docs]class NoThumbnailFound(Exception):
'''no thumbnail found'''
[docs]def _find_thumbnail(filename, thumbnail_folder='.thumbs'):
''' looks for a thumbnail file of the same basename
in a folder named '.thumbs' relative to the file
returns the filename of the thumb, its mimetype and
the correspondig DLNA PN string or throws an Exception otherwise
'''
name, ext = os.path.splitext(os.path.basename(filename))
pattern = os.path.join(os.path.dirname(filename), thumbnail_folder,
name + '.*')
for f in glob.glob(pattern):
mimetype, _ = mimetypes.guess_type(f, strict=False)
if mimetype in ('image/jpeg', 'image/png'):
if mimetype == 'image/jpeg':
dlna_pn = 'DLNA.ORG_PN=JPEG_TN'
else:
dlna_pn = 'DLNA.ORG_PN=PNG_TN'
return os.path.abspath(f), mimetype, dlna_pn
else:
raise NoThumbnailFound()
[docs]class FSItem(BackendItem):
logCategory = 'fs_item'
def __init__(self, object_id, parent, path, mimetype, urlbase, UPnPClass,
update=False, store=None):
BackendItem.__init__(self)
self.id = object_id
self.parent = parent
if parent:
parent.add_child(self, update=update)
if mimetype == 'root':
self.location = str(path)
else:
if mimetype == 'item' and path is None:
path = os.path.join(parent.get_realpath(), str(self.id))
# self.location = FilePath(unicode(path))
self.location = FilePath(path)
self.mimetype = mimetype
if urlbase[-1] != '/':
urlbase += '/'
self.url = urlbase + str(self.id)
self.store = store
if parent is None:
parent_id = -1
else:
parent_id = parent.get_id()
self.item = UPnPClass(object_id, parent_id, self.get_name())
if isinstance(self.item, Container):
self.item.childCount = 0
self.child_count = 0
self.children = []
self.sorted = False
self.caption = None
if mimetype in ['directory', 'root']:
self.update_id = 0
self.get_url = lambda: self.url
# self.item.searchable = True
# self.item.searchClass = 'object'
if (isinstance(self.location, FilePath) and
self.location.isdir() is True):
self.check_for_cover_art()
if getattr(self, 'cover', None):
_, ext = os.path.splitext(self.cover)
''' add the cover image extension to help clients
not reacting on the mimetype '''
self.item.albumArtURI = \
''.join((urlbase, str(self.id), '?cover', str(ext)))
else:
self.get_url = lambda: self.url
if self.mimetype.startswith('audio/'):
if getattr(parent, 'cover', None):
_, ext = os.path.splitext(parent.cover)
''' add the cover image extension to help clients
not reacting on the mimetype '''
self.item.albumArtURI = \
''.join((urlbase, str(self.id), '?cover', ext))
_, host_port, _, _, _ = urlsplit(urlbase)
if host_port.find(':') != -1:
host, port = tuple(host_port.split(':'))
else:
host = host_port
try:
size = self.location.getsize()
except Exception:
size = 0
if (self.store.server and
self.store.server.coherence.config.get('transcoding',
'no') == 'yes'):
if self.mimetype in ('application/ogg', 'audio/ogg',
'audio/x-wav',
'audio/x-m4a',
'application/x-flac'):
new_res = Resource(self.url + '/transcoded.mp3',
f'http-get:*:{"audio/mpeg"}:*')
new_res.size = None
# self.item.res.append(new_res)
if mimetype != 'item':
res = Resource(
'file://' + quote(
self.get_path(),
encoding='utf-8'),
f'internal:{host}:{self.mimetype}:*')
res.size = size
self.item.res.append(res)
if mimetype != 'item':
res = Resource(self.url, f'http-get:*:{self.mimetype}:*')
else:
res = Resource(self.url, 'http-get:*:*:*')
res.size = size
self.item.res.append(res)
''' if this item is of type audio and we want to add a transcoding
rule for it, this is the way to do it:
create a new Resource object, at least a 'http-get'
and maybe an 'internal' one too
for transcoding to wav this looks like that
res = Resource(
url_for_transcoded audio,
'http-get:*:audio/x-wav:%s'% ';'.join(
['DLNA.ORG_PN=JPEG_TN']+simple_dlna_tags))
res.size = None
self.item.res.append(res)
'''
if (self.store.server and
self.store.server.coherence.config.get(
'transcoding', 'no') == 'yes'):
if self.mimetype in ('audio/mpeg',
'application/ogg', 'audio/ogg',
'audio/x-wav',
'audio/x-m4a',
'audio/flac',
'application/x-flac'):
dlna_pn = 'DLNA.ORG_PN=LPCM'
dlna_tags = simple_dlna_tags[:]
# dlna_tags[1] = 'DLNA.ORG_OP=00'
dlna_tags[2] = 'DLNA.ORG_CI=1'
new_res = Resource(
self.url + '?transcoded=lpcm',
f'http-get:*:{"audio/L16;rate=44100;channels=2"}:'
f'{";".join([dlna_pn] + dlna_tags)}')
new_res.size = None
# self.item.res.append(new_res)
if self.mimetype != 'audio/mpeg':
new_res = Resource(self.url + '?transcoded=mp3',
f'http-get:*:{"audio/mpeg"}:*')
new_res.size = None
# self.item.res.append(new_res)
''' if this item is an image and we want to add a thumbnail for it
we have to follow these rules:
create a new Resource object, at least a 'http-get'
and maybe an 'internal' one too
for an JPG this looks like that
res = Resource(url_for_thumbnail,
'http-get:*:image/jpg:%s'% ';'.join(
['DLNA.ORG_PN=JPEG_TN']+simple_dlna_tags))
res.size = size_of_thumbnail
self.item.res.append(res)
and for a PNG the Resource creation is like that
res = Resource(url_for_thumbnail,
'http-get:*:image/png:%s'% ';'.join(
simple_dlna_tags+['DLNA.ORG_PN=PNG_TN']))
if not hasattr(self.item, 'attachments'):
self.item.attachments = {}
self.item.attachments[key] = utils.StaticFile(
filename_of_thumbnail)
'''
if (self.mimetype in ('image/jpeg', 'image/png') or
self.mimetype.startswith('video/')):
try:
filename, mimetype, dlna_pn = _find_thumbnail(
self.get_path())
except NoThumbnailFound:
pass
except Exception:
self.warning(traceback.format_exc())
else:
dlna_tags = simple_dlna_tags[:]
dlna_tags[
3] = 'DLNA.ORG_FLAGS=00f00000000000000000000000000000'
hash_from_path = str(id(filename))
new_res = Resource(
self.url + '?attachment=' + hash_from_path,
f'http-get:*:{mimetype}:'
f'{";".join([dlna_pn] + dlna_tags)}')
new_res.size = os.path.getsize(filename)
self.item.res.append(new_res)
if not hasattr(self.item, 'attachments'):
self.item.attachments = {}
self.item.attachments[hash_from_path] = utils.StaticFile(
filename)
if self.mimetype.startswith('video/'):
# check for a subtitles file
caption, _ = os.path.splitext(self.get_path())
caption = caption + '.srt'
if os.path.exists(caption):
hash_from_path = str(id(caption))
mimetype = 'smi/caption'
new_res = Resource(
self.url + '?attachment=' + hash_from_path,
f'http-get:*:{mimetype}:{"*"}')
new_res.size = os.path.getsize(caption)
self.caption = new_res.data
self.item.res.append(new_res)
if not hasattr(self.item, 'attachments'):
self.item.attachments = {}
self.item.attachments[hash_from_path] = utils.StaticFile(
caption)
try:
# FIXME: getmtime is deprecated in Twisted 2.6
self.item.date = datetime.fromtimestamp(
self.location.getmtime())
except Exception:
self.item.date = None
[docs] def rebuild(self, urlbase):
# print('rebuild', self.mimetype)
if self.mimetype != 'item':
return
# print('rebuild for', self.get_path())
mimetype, _ = mimetypes.guess_type(self.get_path(), strict=False)
if mimetype is None:
return
self.mimetype = mimetype
# print('rebuild', self.mimetype)
UPnPClass = classChooser(self.mimetype)
self.item = UPnPClass(self.id, self.parent.id, self.get_name())
if getattr(self.parent, 'cover', None):
_, ext = os.path.splitext(self.parent.cover)
''' add the cover image extension to help clients not reacting on
the mimetype '''
self.item.albumArtURI = ''.join(
(urlbase, str(self.id), '?cover', ext))
_, host_port, _, _, _ = urlsplit(urlbase)
if host_port.find(':') != -1:
host, port = tuple(host_port.split(':'))
else:
host = host_port
res = Resource(
'file://' + quote(
self.get_path()),
f'internal:{host}:{self.mimetype}:*')
try:
res.size = self.location.getsize()
except Exception:
res.size = 0
self.item.res.append(res)
res = Resource(self.url, f'http-get:*:{self.mimetype}:*')
try:
res.size = self.location.getsize()
except Exception:
res.size = 0
self.item.res.append(res)
try:
# FIXME: getmtime is deprecated in Twisted 2.6
self.item.date = datetime.fromtimestamp(self.location.getmtime())
except Exception:
self.item.date = None
self.parent.update_id += 1
[docs] def check_for_cover_art(self):
''' let's try to find in the current directory some jpg file,
or png if the jpg search fails, and take the first one
that comes around
'''
try:
jpgs = [i.path for i in self.location.children() if
i.splitext()[1] in ('.jpg', '.JPG')]
try:
self.cover = jpgs[0]
except IndexError:
pngs = [i.path for i in self.location.children() if
i.splitext()[1] in ('.png', '.PNG')]
try:
self.cover = pngs[0]
except IndexError:
return
except UnicodeDecodeError:
self.warning(
f'UnicodeDecodeError - there is something wrong with a '
f'file located in {self.location.path}')
[docs] def remove(self):
# print('FSItem remove', self.id, self.get_name(), self.parent)
if self.parent:
self.parent.remove_child(self)
del self.item
[docs] def add_child(self, child, update=False):
self.children.append(child)
self.child_count += 1
if isinstance(self.item, Container):
self.item.childCount += 1
if update:
self.update_id += 1
self.sorted = False
[docs] def remove_child(self, child):
# print(f'remove_from {self.id:d} ({self.get_name()}) '
# f'child {child.id:d} ({child.get_name()})')
if child in self.children:
self.child_count -= 1
if isinstance(self.item, Container):
self.item.childCount -= 1
self.children.remove(child)
self.update_id += 1
self.sorted = False
[docs] def get_children(self, start=0, request_count=0):
if not self.sorted:
self.children.sort(key=_natural_key)
self.sorted = True
if request_count == 0:
return self.children[start:]
else:
return self.children[start:request_count]
[docs] def get_child_count(self):
return self.child_count
[docs] def get_id(self):
return self.id
[docs] def get_update_id(self):
if hasattr(self, 'update_id'):
return self.update_id
else:
return None
[docs] def get_path(self):
if self.mimetype in ['directory', 'root']:
return None
if isinstance(self.location, FilePath):
return self.location.path
else:
return self.location
[docs] def get_realpath(self):
if isinstance(self.location, FilePath):
return self.location.path
else:
return self.location
[docs] def set_path(self, path=None, extension=None):
if path is None:
path = self.get_path()
if extension is not None:
path, old_ext = os.path.splitext(path)
path = ''.join((path, extension))
if isinstance(self.location, FilePath):
self.location = FilePath(path)
else:
self.location = path
[docs] def get_name(self):
if isinstance(self.location, FilePath):
name = self.location.basename()
else:
name = self.location
return name
[docs] def get_cover(self):
if self.cover:
return self.cover
try:
return self.parent.cover
except AttributeError:
return None
[docs] def get_parent(self):
return self.parent
[docs] def get_item(self):
return self.item
[docs] def get_xml(self):
return self.item.toString()
def __repr__(self):
return 'id: ' + str(self.id) + ' @ ' + \
str(self.get_name().encode('ascii', 'xmlcharrefreplace'))
[docs]class FSStore(BackendStore):
'''
.. versionchanged:: 0.9.0
Migrated from louie/dispatcher to EventDispatcher
'''
logCategory = 'fs_store'
implements = ['MediaServer']
description = '''MediaServer exporting files from the file-system'''
options = [
{'option': 'name', 'type': 'string', 'default': 'my media',
'help': 'the name under this MediaServer '
'shall show up with on other UPnP clients'},
{'option': 'version', 'type': 'int', 'default': 2, 'enum': (2, 1),
'help': 'the highest UPnP version this MediaServer shall support',
'level': 'advance'},
{'option': 'uuid', 'type': 'string',
'help': 'the unique (UPnP) identifier for this MediaServer,'
' usually automatically set',
'level': 'advance'},
{'option': 'content', 'type': 'string', 'default': xdg_content(),
'help': 'the path(s) this MediaServer shall export'},
{'option': 'ignore_patterns', 'type': 'string',
'help': 'list of regex patterns, matching filenames will be ignored'},
{'option': 'enable_inotify', 'type': 'string', 'default': 'yes',
'help': 'enable real-time monitoring of the content folders'},
{'option': 'enable_destroy', 'type': 'string', 'default': 'no',
'help': 'enable deleting a file via an UPnP method'},
{'option': 'import_folder', 'type': 'string',
'help': 'The path to store files imported via an UPnP method, '
'if empty the Import method is disabled'}
]
def __init__(self, server, **kwargs):
BackendStore.__init__(self, server, **kwargs)
self.next_id = 1000
self.name = kwargs.get('name', 'my media')
self.content = kwargs.get('content', None)
if self.content is not None:
if isinstance(self.content, str):
self.content = [self.content]
cl = []
for a in self.content:
cl += a.split(',')
self.content = cl
else:
self.content = xdg_content()
self.content = [x[0] for x in self.content]
if self.content is None:
self.content = 'tests/content'
if not isinstance(self.content, list):
self.content = [self.content]
self.content = set([os.path.abspath(x) for x in self.content])
ignore_patterns = kwargs.get('ignore_patterns', [])
self.store = {}
self.inotify = None
if kwargs.get('enable_inotify', 'yes') == 'yes':
if INotify:
try:
self.inotify = INotify()
self.inotify.startReading()
except Exception as msg:
self.error(f'inotify disabled: {msg}')
self.inotify = None
else:
self.info(f'{no_inotify_reason}')
else:
self.info('FSStore content auto-update disabled upon user request')
if kwargs.get('enable_destroy', 'no') == 'yes':
self.upnp_DestroyObject = self.hidden_upnp_DestroyObject
self.import_folder = kwargs.get('import_folder', None)
if self.import_folder is not None:
self.import_folder = os.path.abspath(self.import_folder)
if not os.path.isdir(self.import_folder):
self.import_folder = None
self.ignore_file_pattern = re.compile(
r'|'.join([r'^\..*'] + list(ignore_patterns)))
parent = None
self.update_id = 0
if (len(self.content) > 1 or
utils.means_true(kwargs.get('create_root', False)) or
self.import_folder is not None):
UPnPClass = classChooser('root')
id = str(self.getnextID())
try:
parent = self.store[id] = FSItem(
id, parent, 'media', 'root',
self.urlbase, UPnPClass, update=True, store=self)
except Exception as e:
self.error(
f'Error on setting self.store[id], Error on FSItem: {e}')
exit(1)
if self.import_folder is not None:
id = str(self.getnextID())
self.store[id] = FSItem(
id, parent, self.import_folder, 'directory',
self.urlbase, UPnPClass, update=True, store=self)
self.import_folder_id = id
for bytesPath in self.content:
if isinstance(bytesPath, (list, tuple)):
path = str(path[0])
else:
path = str(bytesPath)
if self.ignore_file_pattern.match(path):
continue
try:
self.walk(path, parent, self.ignore_file_pattern)
except Exception as msg:
self.warning(f'on walk of {path!r}: {msg!r}')
import traceback
self.debug(traceback.format_exc())
self.wmc_mapping.update({'14': '0',
'15': '0',
'16': '0',
'17': '0'
})
self.init_completed = True
def __repr__(self):
return self.__class__.__name__
[docs] def release(self):
if self.inotify is not None:
self.inotify.stopReading()
[docs] def len(self):
return len(self.store)
[docs] def get_by_id(self, id):
# print('get_by_id', id, type(id))
# we have referenced ids here when we are in WMC mapping mode
if isinstance(id, str):
id = id.split('@', 1)[0]
elif isinstance(id, bytes):
id = id.decode('utf-8').split('@', 1)[0]
elif isinstance(id, int):
id = str(id)
# try:
# id = int(id)
# except ValueError:
# id = 1000
if id == '0':
id = '1000'
# print('get_by_id 2', id)
try:
r = self.store[id]
except KeyError:
r = None
# print('get_by_id 3', r)
return r
[docs] def get_id_by_name(self, parent='0', name=''):
self.info(f'get_id_by_name {parent} ({type(parent)}) {name}')
try:
parent = self.store[parent]
self.debug(f'{parent} {len(parent.children):d}')
for child in parent.children:
# if not isinstance(name, unicode):
# name = name.decode('utf8')
self.debug(f'{child.get_name()} {child.get_realpath()} '
f'{name == child.get_realpath()}')
if name == child.get_realpath():
return child.id
except Exception as e:
self.error(f'get_id_by_name: {e!r}')
import traceback
self.info(traceback.format_exc())
self.debug('get_id_by_name not found')
return None
[docs] def get_url_by_name(self, parent='0', name=''):
self.info(f'get_url_by_name {parent!r} {name!r}')
id = self.get_id_by_name(parent, name)
# print 'get_url_by_name', id
if id is None:
return ''
return self.store[id].url
[docs] def update_config(self, **kwargs):
self.info(f'update_config: {kwargs}')
if 'content' in kwargs:
new_content = kwargs['content']
new_content = set(
[os.path.abspath(x) for x in new_content.split(',')])
new_folders = new_content.difference(self.content)
obsolete_folders = self.content.difference(new_content)
self.debug(f'new folders: {new_folders}\n'
f'obsolete folders: {obsolete_folders}')
for folder in obsolete_folders:
self.remove_content_folder(folder)
for folder in new_folders:
self.add_content_folder(folder)
self.content = new_content
[docs] def add_content_folder(self, path):
path = os.path.abspath(path)
if path not in self.content:
self.content.add(path)
self.walk(path, self.store['1000'], self.ignore_file_pattern)
[docs] def remove_content_folder(self, path):
path = os.path.abspath(path)
if path in self.content:
id = self.get_id_by_name('1000', path)
self.remove(id)
self.content.remove(path)
[docs] def walk(self, path, parent=None, ignore_file_pattern=''):
self.debug(f'walk {path}')
containers = []
parent = self.append(path, parent)
if parent is not None:
containers.append(parent)
while len(containers) > 0:
container = containers.pop()
try:
self.debug(f'adding {container.location!r}')
self.info(f'walk.adding: {container.location}')
for child in container.location.children():
if ignore_file_pattern.match(child.basename()) is not None:
continue
new_container = self.append(child.path, container)
if new_container is not None:
containers.append(new_container)
except UnicodeDecodeError:
self.warning(
f'UnicodeDecodeError - there is something wrong with a '
f'file located in {container.get_path()!r}')
[docs] def create(self, mimetype, path, parent):
self.debug(f'create {mimetype} {path} {type(path)} {parent}')
UPnPClass = classChooser(mimetype)
if UPnPClass is None:
return None
id = self.getnextID()
if mimetype in ('root', 'directory'):
id = str(id)
else:
_, ext = os.path.splitext(path)
id = str(id) + ext.lower()
update = False
if hasattr(self, 'update_id'):
update = True
self.store[id] = FSItem(id, parent, path, mimetype, self.urlbase,
UPnPClass, update=True, store=self)
if hasattr(self, 'update_id'):
self.update_id += 1
# print(self.update_id)
if self.server:
if hasattr(self.server, 'content_directory_server'):
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
if parent is not None:
value = (parent.get_id(), parent.get_update_id())
if self.server:
if hasattr(self.server, 'content_directory_server'):
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
return id
[docs] def append(self, bytes_path, parent):
path = str(bytes_path)
self.debug(f'append {path} {type(path)} {parent}')
if not os.path.exists(path):
self.warning(f'path {path!r} not available - ignored')
return None
if stat.S_ISFIFO(os.stat(path).st_mode):
self.warning(f'path {path!r} is a FIFO - ignored')
return None
try:
mimetype, _ = mimetypes.guess_type(path, strict=False)
if mimetype is None:
if os.path.isdir(path):
mimetype = 'directory'
if mimetype is None:
return None
id = self.create(mimetype, path, parent)
if mimetype == 'directory':
if self.inotify is not None:
mask = \
IN_CREATE | IN_DELETE | IN_MOVED_FROM | \
IN_MOVED_TO | IN_CHANGED
self.inotify.watch(
FilePath(os.path.abspath(path)),
mask=mask, autoAdd=False,
callbacks=[partial(self.notify, parameter=id)])
return self.store[id]
except OSError as os_msg:
'''
seems we have some permissions issues along the content path
'''
self.warning(f'path {path} isn\'t accessible, error {os_msg}')
return None
[docs] def remove(self, id):
self.debug(f'FSSTore remove id: {id}')
try:
item = self.store[id]
parent = item.get_parent()
item.remove()
del self.store[id]
if hasattr(self, 'update_id'):
self.update_id += 1
if self.server:
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
# value = f'{parent.get_id():d},{parent_get_update_id():d}'
value = (parent.get_id(), parent.get_update_id())
if self.server:
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
except KeyError:
pass
[docs] def notify(self, ignore, path, mask, parameter=None):
self.info('Event %s on %s - parameter %r',
', '.join([fl for fl in _FLAG_TO_HUMAN if fl[0] == mask][0]),
path.path, parameter)
if mask & IN_CHANGED:
# FIXME react maybe on access right changes, loss of read rights?
# print(f'{path} was changed, parent {parameter:d} ({iwp.path})')
pass
if mask & IN_DELETE or mask & IN_MOVED_FROM:
self.info(f'{path.path} was deleted, '
f'parent {parameter!r} ({path.parent.path})')
id = self.get_id_by_name(parameter, path.path)
if id is not None:
self.remove(id)
if mask & IN_CREATE or mask & IN_MOVED_TO:
if mask & IN_ISDIR:
self.info(f'directory {path.path} was created, '
f'parent {parameter!r} ({path.parent.path})')
else:
self.info(f'file {path.path} was created, '
f'parent {parameter!r} ({path.parent.path})')
if self.get_id_by_name(parameter, path.path) is None:
if path.isdir():
self.walk(path.path, self.get_by_id(parameter),
self.ignore_file_pattern)
else:
if self.ignore_file_pattern.match(parameter) is None:
self.append(
str(path.path),
str(self.get_by_id(parameter)))
[docs] def getnextID(self):
ret = self.next_id
self.next_id += 1
return ret
[docs] def backend_import(self, item, data):
try:
f = open(item.get_path(), 'w+b')
if hasattr(data, 'read'):
data = data.read()
f.write(data)
f.close()
item.rebuild(self.urlbase)
return 200
except IOError:
self.warning(f'import of file {item.get_path()} failed')
except Exception as msg:
import traceback
self.warning(traceback.format_exc())
return 500
[docs] def upnp_init(self):
self.current_connection_id = None
if self.server:
self.server.connection_manager_server.set_variable(
0,
'SourceProtocolInfo',
[f'internal:{self.server.coherence.hostname}:audio/mpeg:*',
'http-get:*:audio/mpeg:*',
f'internal:{self.server.coherence.hostname}:video/mp4:*',
'http-get:*:video/mp4:*',
f'internal:{self.server.coherence.hostname}:application/ogg:*', # noqa
'http-get:*:application/ogg:*',
f'internal:{self.server.coherence.hostname}:video/x-msvideo:*', # noqa
'http-get:*:video/x-msvideo:*',
f'internal:{self.server.coherence.hostname}:video/mpeg:*',
'http-get:*:video/mpeg:*',
f'internal:{self.server.coherence.hostname}:video/avi:*',
'http-get:*:video/avi:*',
f'internal:{self.server.coherence.hostname}:video/divx:*',
'http-get:*:video/divx:*',
f'internal:{self.server.coherence.hostname}:video/quicktime:*', # noqa
'http-get:*:video/quicktime:*',
f'internal:{self.server.coherence.hostname}:image/gif:*',
'http-get:*:image/gif:*',
f'internal:{self.server.coherence.hostname}:image/jpeg:*',
'http-get:*:image/jpeg:*'
# 'http-get:*:audio/mpeg:DLNA.ORG_PN=MP3;DLNA.ORG_OP=11;'
# 'DLNA.ORG_FLAGS=01700000000000000000000000000000',
# 'http-get:*:audio/x-ms-wma:DLNA.ORG_PN=WMABASE;'
# 'DLNA.ORG_OP=11;DLNA.ORG_FLAGS'
# '=01700000000000000000000000000000',
# 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_TN;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=00f00000000000000000000000000000',
# 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_SM;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=00f00000000000000000000000000000',
# 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_MED;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=00f00000000000000000000000000000',
# 'http-get:*:image/jpeg:DLNA.ORG_PN=JPEG_LRG;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=00f00000000000000000000000000000',
# 'http-get:*:video/mpeg:DLNA.ORG_PN=MPEG_PS_PAL;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=01700000000000000000000000000000',
# 'http-get:*:video/x-ms-wmv:DLNA.ORG_PN=WMVMED_BASE;'
# 'DLNA.ORG_OP=01;DLNA.ORG_FLAGS'
# '=01700000000000000000000000000000',
],
default=True)
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
# self.server.content_directory_server.set_variable(
# 0, 'SortCapabilities', '*')
[docs] def upnp_ImportResource(self, *args, **kwargs):
SourceURI = kwargs['SourceURI']
DestinationURI = kwargs['DestinationURI']
if DestinationURI.endswith('?import'):
id = DestinationURI.split('/')[-1]
id = id[:-7] # remove the ?import
else:
return failure.Failure(errorCode(718))
item = self.get_by_id(id)
if item is None:
return failure.Failure(errorCode(718))
def gotPage(headers):
# print('gotPage', headers)
content_type = headers.get('content-type', [])
if not isinstance(content_type, list):
content_type = list(content_type)
if len(content_type) > 0:
extension = mimetypes.guess_extension(content_type[0],
strict=False)
item.set_path(None, extension)
shutil.move(tmp_path, item.get_path())
item.rebuild(self.urlbase)
if hasattr(self, 'update_id'):
self.update_id += 1
if self.server:
if hasattr(self.server, 'content_directory_server'):
self.server.content_directory_server.set_variable(
0, 'SystemUpdateID', self.update_id)
if item.parent is not None:
value = (item.parent.get_id(), item.parent.get_update_id())
if self.server:
if hasattr(self.server, 'content_directory_server'):
self.server.content_directory_server.set_variable(
0, 'ContainerUpdateIDs', value)
def gotError(error, url):
self.warning(f'error requesting {url}')
self.info(error)
os.unlink(tmp_path)
return failure.Failure(errorCode(718))
tmp_fp, tmp_path = tempfile.mkstemp()
os.close(tmp_fp)
utils.downloadPage(
SourceURI, tmp_path).addCallbacks(
gotPage, gotError, None, None, [SourceURI], None)
transfer_id = 0 # FIXME
return {'TransferID': transfer_id}
[docs] def upnp_CreateObject(self, *args, **kwargs):
# print(f'CreateObject {kwargs}')
if kwargs['ContainerID'] == 'DLNA.ORG_AnyContainer':
if self.import_folder is not None:
ContainerID = self.import_folder_id
else:
return failure.Failure(errorCode(712))
else:
ContainerID = kwargs['ContainerID']
Elements = kwargs['Elements']
parent_item = self.get_by_id(ContainerID)
if parent_item is None:
return failure.Failure(errorCode(710))
if parent_item.item.restricted:
return failure.Failure(errorCode(713))
if len(Elements) == 0:
return failure.Failure(errorCode(712))
elt = DIDLElement.fromString(Elements)
if elt.numItems() != 1:
return failure.Failure(errorCode(712))
item = elt.getItems()[0]
if item.parentID == 'DLNA.ORG_AnyContainer':
item.parentID = ContainerID
if (item.id != '' or
item.parentID != ContainerID or
item.restricted is True or
item.title == ''):
return failure.Failure(errorCode(712))
if ('..' in item.title or
'~' in item.title or
os.sep in item.title):
return failure.Failure(errorCode(712))
if item.upnp_class == 'object.container.storageFolder':
if len(item.res) != 0:
return failure.Failure(errorCode(712))
path = os.path.join(parent_item.get_path(), item.title)
id = self.create('directory', path, parent_item)
try:
os.mkdir(path)
except Exception:
self.remove(id)
return failure.Failure(errorCode(712))
if self.inotify is not None:
mask = \
IN_CREATE | IN_DELETE | IN_MOVED_FROM | \
IN_MOVED_TO | IN_CHANGED
self.inotify.watch(
path, mask=mask, autoAdd=False,
callbacks=[partial(self.notify, parameter=id)])
new_item = self.get_by_id(id)
didl = DIDLElement()
didl.addItem(new_item.item)
return {'ObjectID': id, 'Result': didl.toString()}
if item.upnp_class.startswith('object.item'):
_, _, content_format, _ = item.res[0].protocolInfo.split(':')
extension = mimetypes.guess_extension(content_format, strict=False)
path = os.path.join(parent_item.get_realpath(),
item.title + extension)
id = self.create('item', path, parent_item)
new_item = self.get_by_id(id)
for res in new_item.item.res:
res.importUri = new_item.url + '?import'
res.data = None
didl = DIDLElement()
didl.addItem(new_item.item)
return {'ObjectID': id, 'Result': didl.toString()}
return failure.Failure(errorCode(712))
[docs] def hidden_upnp_DestroyObject(self, *args, **kwargs):
ObjectID = kwargs['ObjectID']
item = self.get_by_id(ObjectID)
if item is None:
return failure.Failure(errorCode(701))
self.info(f'upnp_DestroyObject: {item.location}')
try:
item.location.remove()
except Exception as msg:
self.error(f'upnp_DestroyObject [{Exception}]: {msg}')
return failure.Failure(errorCode(715))
return {}
if __name__ == '__main__':
from twisted.internet import reactor
p = 'tests/content'
f = FSStore(None, name='my media', content=p,
urlbase='http://localhost/xyz')
print(f.len())
print(f.get_by_id(1000).child_count, f.get_by_id(1000).get_xml())
print(f.get_by_id(1001).child_count, f.get_by_id(1001).get_xml())
print(f.get_by_id(1002).child_count, f.get_by_id(1002).get_xml())
print(f.get_by_id(1003).child_count, f.get_by_id(1003).get_xml())
print(f.get_by_id(1004).child_count, f.get_by_id(1004).get_xml())
print(f.get_by_id(1005).child_count, f.get_by_id(1005).get_xml())
print(f.store[1000].get_children(0, 0))
# print(f.upnp_Search(
# ContainerID='4',
# Filter='dc:title,upnp:artist',
# RequestedCount='1000',
# StartingIndex='0',
# SearchCriteria='(upnp:class = 'object.container.album.musicAlbum')',
# SortCriteria='+dc:title'))
f.upnp_ImportResource(SourceURI='http://spiegel.de', DestinationURI='ttt')
reactor.run()