Source code for coherence.backends.axiscam_storage

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2007, Frank Scholz <coherence@beebits.net>

# check http://www.iana.org/assignments/rtp-parameters
# for the RTP payload type identifier
#


from coherence.backend import BackendStore, BackendItem
from coherence.upnp.core.DIDLLite import classChooser, Container, Resource


[docs]class AxisCamItem(BackendItem): logCategory = 'axis_cam_item' def __init__(self, id, obj, parent, mimetype, urlbase, UPnPClass, update=False): BackendItem.__init__(self) self.id = id if mimetype == 'directory': self.name = obj self.mimetype = mimetype else: self.name = obj.get('name') self.mimetype = mimetype self.parent = parent if parent: parent.add_child(self, update=update) if parent is None: parent_id = -1 else: parent_id = parent.get_id() self.item = UPnPClass(id, parent_id, self.name) if isinstance(self.item, Container): self.item.childCount = 0 self.children = [] if (len(urlbase) and urlbase[-1] != '/'): urlbase += '/' if self.mimetype == 'directory': self.url = urlbase + str(self.id) else: self.url = obj.get('url') if self.mimetype == 'directory': self.update_id = 0 else: res = Resource(self.url, obj.get('protocol')) res.size = None self.item.res.append(res) def __del__(self): # print('AxisCamItem __del__', self.id, self.name) pass
[docs] def remove(self): # print('AxisCamItem remove', self.id, self.name, self.parent) if self.parent: self.parent.remove_child(self) del self.item
[docs] def add_child(self, child, update=False): self.children.append(child) if isinstance(self.item, Container): self.item.childCount += 1 if update: self.update_id += 1
[docs] def remove_child(self, child): self.info( f'remove_from {self.id:d} {self.get_name():s} ' f'child {child.id:d} ({child.get_name():s})') if child in self.children: if isinstance(self.item, Container): self.item.childCount -= 1 self.children.remove(child) self.update_id += 1
[docs] def get_children(self, start=0, request_count=0): if request_count == 0: return self.children[start:] else: return self.children[start:request_count]
[docs] def get_child_count(self): return len(self.children)
[docs] def get_id(self): return self.id
[docs] def get_update_id(self): if hasattr(self, 'update_id'): return self.update_id else: return None
[docs] def get_path(self): return self.url
[docs] def get_name(self): return self.name
[docs] def get_parent(self): return self.parent
[docs] def get_item(self): return self.item
[docs] def get_xml(self): return self.item.toString()
def __repr__(self): return 'id: ' + str(self.id) + ' @ ' + self.url
[docs]class AxisCamStore(BackendStore): logCategory = 'axis_cam_store' implements = ['MediaServer'] def __init__(self, server, **kwargs): BackendStore.__init__(self, server, **kwargs) self.next_id = 1000 self.config = kwargs self.name = kwargs.get('name', 'AxisCamStore') self.update_id = 0 self.store = {} self.wmc_mapping = {'8': 1000} self.init_completed() def __repr__(self): return str(self.__class__).split('.')[-1]
[docs] def append(self, obj, parent): if isinstance(obj, str): mimetype = 'directory' else: protocol, network, content_type, info = obj['protocol'].split(':') mimetype = content_type UPnPClass = classChooser(mimetype) id = self.getnextID() update = False if hasattr(self, 'update_id'): update = True self.store[id] = AxisCamItem(id, obj, parent, mimetype, self.urlbase, UPnPClass, update=update) if hasattr(self, 'update_id'): self.update_id += 1 if self.server: self.server.content_directory_server.set_variable( 0, 'SystemUpdateID', self.update_id) if parent: # value = '%d,%d' % (parent.get_id(),parent_get_update_id()) value = (parent.get_id(), parent.get_update_id()) if self.server: self.server.content_directory_server.set_variable( 0, 'ContainerUpdateIDs', value) if mimetype == 'directory': return self.store[id] return None
[docs] def len(self): return len(self.store)
[docs] def get_by_id(self, id): if isinstance(id, str): id = id.split('@', 1)[0] elif isinstance(id, bytes): id = id.decode('utf-8').split('@', 1)[0] id = int(id) if id == 0: id = 1000 try: return self.store[id] except KeyError: return None
[docs] def getnextID(self): ret = self.next_id self.next_id += 1 return ret
[docs] def upnp_init(self): self.current_connection_id = None parent = self.append('AxisCam', None) source_protocols = set() for k, v in list(self.config.items()): if isinstance(v, dict): v['name'] = k source_protocols.add(v['protocol']) self.append(v, parent) if self.server: self.server.connection_manager_server.set_variable( 0, 'SourceProtocolInfo', source_protocols, default=True)
[docs]def main(): f = AxisCamStore(None) def got_upnp_result(result): print('upnp', result)
# f.upnp_init() # print f.store # r = f.upnp_Browse(BrowseFlag='BrowseDirectChildren', # RequestedCount=0, # StartingIndex=0, # ObjectID=0, # SortCriteria='*', # Filter='') # got_upnp_result(r) if __name__ == '__main__': from twisted.internet import reactor reactor.callWhenRunning(main) reactor.run()