# -*- coding: utf-8 -*-
# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php
# Copyright 2008 Frank Scholz <coherence@beebits.net>
'''
A MediaServer backend to test Items.
Item information can be passed on the commandline or in the config as an XML
fragment::
coherence --plugin=backend:TestStore,name:Test,\\
item:<item><location>audio.mp3</location>\\
<mimetype>audio/mpeg</mimetype></item>,\\
item:<item><location>audio.ogg</location>\\
<mimetype>audio/ogg</mimetype></item>
coherence --plugin='backend:TestStore,name:Test,\\
item:<item><type>gstreamer</type>\\
<pipeline>v4l2src num-buffers=1 ! video/x-raw-yuv,width=640,\\
height=480 ! ffmpegcolorspace ! jpegenc name=enc</pipeline>\\
<mimetype>image/jpeg></mimetype></item>'
'video/x-raw-yuv,width=640,height=480' won't work here as it is a delimiter
for the plugin string, so if you need things like that in the pipeline,
you need to use a config file::
coherence --plugin='backend:TestStore,name:Test,\\
item:<item><type>process</type>\\
<command>man date</command>\\
<mimetype>text/html</mimetype></item>'
The XML fragment has these elements:
- type:
* file: the item is some file-system object (default)
* url: an item pointing to an object off-site
* gstreamer: the item is actually a GStreamer pipeline
* process: the items content is created by an external process
- location: the filesystem path or an url (mandatory)
- mimetype: the mimetype of the item (mandatory)
- extension: an optional extension to append to the url created for the
DIDLLite resource data
- title: the 'title' this item should have (optional)
- upnp_class: the DIDLLite class the item shall have, object.item will
be taken as default
- fourth_field: value for the 4th field of the protocolInfo phalanx,
default is '*'
- pipeline: a GStreamer pipeline that has to end with a bin named 'enc',
some pipelines do only work properly when we have a glib mainloop
running, so coherence needs to be started with *-o glib:yes*
- command: the commandline for an external script to run, its output will
be returned as the items content
In the config file the definition of this backend could look like this:
.. code-block:: xml
<plugin active='yes'>
<backend>TestStore</backend>
<name>Test</name>
<item>
<location>/tmp/audio.mp3</location>
<mimetype>audio/mpeg</mimetype>
</item>
<item>
<location>/tmp/audio.ogg</location>
<mimetype>audio/ogg</mimetype>
</item>
</plugin>
'''
import os
from lxml import etree
from twisted.internet import protocol, reactor
from twisted.python.filepath import FilePath
from twisted.web import resource, server
from coherence.backend import BackendItem
from coherence.backend import BackendStore
try:
from coherence.transcoder import GStreamerPipeline
except ImportError:
pass
from coherence.upnp.core import DIDLLite
from coherence import log
ROOT_CONTAINER_ID = 0
[docs]class ExternalProcessProtocol(protocol.ProcessProtocol):
def __init__(self, caller):
self.caller = caller
[docs] def connectionMade(self):
print('pp connection made')
[docs] def outReceived(self, data):
print(f'outReceived with {len(data):d} bytes!')
self.caller.write_data(data)
[docs] def errReceived(self, data):
# print(f'errReceived! with {len(data):d} bytes!')
print('pp (err):', data.strip())
[docs] def inConnectionLost(self):
# print('inConnectionLost! stdin is closed! (we probably did it)')
pass
[docs] def outConnectionLost(self):
# print('outConnectionLost! The child closed their stdout!')
pass
[docs] def errConnectionLost(self):
# print('errConnectionLost! The child closed their stderr.')
pass
[docs] def processEnded(self, status_object):
print(f'processEnded, status {status_object.value.exitCode:d}')
print('processEnded quitting')
self.caller.ended = True
self.caller.write_data('')
[docs]class ExternalProcessPipeline(resource.Resource, log.LogAble):
logCategory = 'externalprocess'
addSlash = True
def __init__(self, pipeline, mimetype):
log.LogAble.__init__(self)
resource.Resource.__init__(self)
self.uri = pipeline
self.mimetype = mimetype
[docs] def render(self, request):
print('ExternalProcessPipeline render')
if self.mimetype:
request.setHeader('content-type', self.mimetype)
ExternalProcessProducer(self.uri, request)
return server.NOT_DONE_YET
[docs]class ExternalProcessProducer(log.LogAble):
logCategory = 'externalprocess'
addSlash = True
def __init__(self, pipeline, request):
log.LogAble.__init__(self)
self.pipeline = pipeline
self.request = request
self.process = None
self.written = 0
self.data = ''
self.ended = False
request.registerProducer(self, 0)
[docs] def write_data(self, data):
if data:
print(f'write {len(data):d} bytes of data')
self.written += len(data)
# this .write will spin the reactor, calling .doWrite and then
# .resumeProducing again, so be prepared for a re-entrant call
self.request.write(data)
if self.request and self.ended:
print('closing')
self.request.unregisterProducer()
self.request.finish()
self.request = None
[docs] def resumeProducing(self):
print('resumeProducing', self.request)
if not self.request:
return
if self.process is None:
argv = self.pipeline.split()
executable = argv[0]
argv[0] = os.path.basename(argv[0])
self.process = reactor.spawnProcess(
ExternalProcessProtocol(self),
executable, argv, {})
[docs] def pauseProducing(self):
pass
[docs] def stopProducing(self):
print('stopProducing', self.request)
self.request.unregisterProducer()
self.process.loseConnection()
self.request.finish()
self.request = None
[docs]class Item(BackendItem):
def __init__(self, parent, id, title, location, url):
BackendItem.__init__(self)
self.parent = parent
self.id = id
self.location = location
self.url = url
self.name = title
self.duration = None
self.size = None
self.mimetype = None
self.fourth_field = '*'
self.description = None
self.date = None
self.upnp_class = DIDLLite.Item
self.item = None
[docs] def get_item(self):
print(f'get_item {self.item!r}')
if self.item is None:
self.item = self.upnp_class(self.id, self.parent.id,
self.get_name())
self.item.description = self.description
self.item.date = self.date
res = DIDLLite.Resource(
self.url, f'http-get:*:{self.mimetype}:{self.fourth_field}')
res.duration = self.duration
res.size = self.get_size()
self.item.res.append(res)
return self.item
[docs] def get_name(self):
if self.name is None:
if isinstance(self.location, FilePath):
self.name = self.location.basename().decode('utf-8', 'replace')
else:
self.name = 'item'
return self.name
[docs] def get_path(self):
if isinstance(self.location, FilePath):
return self.location.path
else:
return self.location
[docs] def get_size(self):
if isinstance(self.location, FilePath):
try:
return self.location.getsize()
except OSError:
return self.size
else:
return self.size
[docs]class ResourceItem(Item):
[docs] def get_name(self):
if self.name is None:
self.name = 'item'
return self.name
[docs] def get_path(self):
return self.location
[docs] def get_size(self):
return self.size
[docs]class Container(BackendItem):
def __init__(self, id, store, parent_id, title):
BackendItem.__init__(self)
self.url = store.urlbase + str(id)
self.parent_id = parent_id
self.id = id
self.name = title
self.mimetype = 'directory'
self.update_id = 0
self.children = []
self.item = DIDLLite.Container(self.id, self.parent_id, self.name)
self.item.childCount = 0
self.sorted = False
[docs] def add_child(self, child):
print('ADD CHILD %r' % child)
# id = child.id
# if isinstance(child.id, basestring):
# _,id = child.id.split('.')
self.children.append(child)
self.item.childCount += 1
self.sorted = False
[docs] def get_children(self, start=0, end=0):
print('GET CHILDREN')
if not self.sorted:
def childs_key_sort(x):
return x.name
sorted(self.children, key=childs_key_sort)
self.sorted = True
if end != 0:
return self.children[start:end]
return self.children[start:]
[docs] def get_child_count(self):
return len(self.children)
[docs] def get_path(self):
return self.url
[docs] def get_item(self):
return self.item
[docs] def get_name(self):
return self.name
[docs] def get_id(self):
return self.id
[docs]class TestStore(BackendStore):
implements = ['MediaServer']
def __init__(self, server, *args, **kwargs):
print('TestStore kwargs', kwargs)
BackendStore.__init__(self, server, **kwargs)
self.name = kwargs.get('name', 'TestServer')
self.next_id = 1000
self.update_id = 0
self.store = {}
self.store[ROOT_CONTAINER_ID] = \
Container(ROOT_CONTAINER_ID, self, -1, self.name)
items = kwargs.get('item', [])
if not isinstance(items, list):
items = [items]
for item in items:
if isinstance(item, str):
xml = etree.fromstring(item)
item = {}
for child in xml:
item[child.tag] = child.text
type = item.get('type', 'file')
try:
name = item.get('title', None)
if type == 'file':
location = FilePath(item.get('location'))
if type == 'url':
location = item.get('location')
mimetype = item.get('mimetype')
item_id = self.get_next_id()
extension = item.get('extension')
if extension is None:
extension = ''
if len(extension) and extension[0] != '.':
extension = '.' + extension
if extension is not None:
item_id = str(item_id) + extension
if type in ('file', 'url'):
new_item = Item(
self.store[ROOT_CONTAINER_ID], item_id,
name, location, self.urlbase + str(item_id))
elif type == 'gstreamer':
pipeline = item.get('pipeline')
try:
pipeline = GStreamerPipeline(pipeline, mimetype)
new_item = ResourceItem(
self.store[ROOT_CONTAINER_ID],
item_id, name, pipeline,
self.urlbase + str(item_id))
except NameError:
self.warning(
'Can\'t enable GStreamerPipeline, '
'probably pygst not installed')
continue
elif type == 'process':
pipeline = item.get('command')
pipeline = ExternalProcessPipeline(pipeline, mimetype)
new_item = ResourceItem(
self.store[ROOT_CONTAINER_ID],
item_id, name, pipeline,
self.urlbase + str(item_id))
try:
new_item.upnp_class = self.get_upnp_class(
item.get('upnp_class', 'object.item'))
except Exception:
pass
# item.description = u'some text what's the file about'
# item.date = something
# item.size = something
new_item.mimetype = mimetype
new_item.fourth_field = item.get('fourth_field', '*')
self.store[ROOT_CONTAINER_ID].add_child(new_item)
self.store[item_id] = new_item
except Exception:
import traceback
self.warning(traceback.format_exc())
# print self.store
self.init_completed()
[docs] def get_upnp_class(self, name):
try:
return DIDLLite.upnp_classes[name]
except KeyError:
self.warning(f'upnp_class {name} not found, trying fallback')
parts = name.split('.')
parts.pop()
while len(parts) > 1:
try:
return DIDLLite.upnp_classes['.'.join(parts)]
except KeyError:
parts.pop()
self.warning('WTF - no fallback for upnp_class {name} found ?!?')
return None
[docs] def get_next_id(self):
self.next_id += 1
return self.next_id
[docs] def get_by_id(self, id):
print(f'GET_BY_ID {id}')
item = self.store.get(id, None)
if item is None:
if int(id) == 0:
item = self.store[ROOT_CONTAINER_ID]
else:
item = self.store.get(int(id), None)
return item