Source code for coherence.upnp.services.servers.connection_manager_server

# -*- coding: utf-8 -*-

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2006, Frank Scholz <coherence@beebits.net>

'''
Connection Manager service
==========================
'''
import time

from twisted.internet import task
from twisted.python import failure
from twisted.web import resource

from coherence.upnp.core import service
from coherence.upnp.core.DIDLLite import build_dlna_additional_info
from coherence.upnp.core.soap_service import UPnPPublisher
from coherence.upnp.core.soap_service import errorCode
from coherence.upnp.core.utils import to_string


[docs]class ConnectionManagerControl(service.ServiceControl, UPnPPublisher): def __init__(self, server): service.ServiceControl.__init__(self) # self.debug(f'ConnectionManagerControl.__init__: {server}') UPnPPublisher.__init__(self) self.service = server self.variables = server.get_variables() self.actions = server.get_actions()
# self.debug(f'\t- variables: {self.variables}') # self.debug(f'\t- actions: {self.actions}')
[docs]class ConnectionManagerServer(service.ServiceServer, resource.Resource): logCategory = 'connection_manager_server' def __init__(self, device, backend=None): self.device = device if backend is None: backend = self.device.backend resource.Resource.__init__(self) service.ServiceServer.__init__(self, 'ConnectionManager', self.device.version, backend) self.control = ConnectionManagerControl(self) self.putChild(self.scpd_url, service.scpdXML(self, self.control)) self.putChild(self.control_url, self.control) self.next_connection_id = 1 self.next_avt_id = 1 self.next_rcs_id = 1 self.connections = {} self.set_variable(0, 'SourceProtocolInfo', '') self.set_variable(0, 'SinkProtocolInfo', '') self.set_variable(0, 'CurrentConnectionIDs', '') self.does_playcontainer = False try: if 'playcontainer-0-1' in backend.dlna_caps: self.does_playcontainer = True except AttributeError: pass self.remove_lingering_connections_loop = task.LoopingCall( self.remove_lingering_connections) self.remove_lingering_connections_loop.start(180.0, now=False)
[docs] def release(self): self.remove_lingering_connections_loop.stop()
[docs] def add_connection(self, RemoteProtocolInfo, Direction, PeerConnectionID, PeerConnectionManager): id = self.next_connection_id self.next_connection_id += 1 avt_id = 0 rcs_id = 0 if self.device.device_type == 'MediaServer': self.connections[id] = \ {'ProtocolInfo': RemoteProtocolInfo, 'Direction': Direction, 'PeerConnectionID': PeerConnectionID, 'PeerConnectionManager': PeerConnectionManager, 'AVTransportID': avt_id, 'RcsID': rcs_id, 'Status': 'OK'} if self.device.device_type == 'MediaRenderer': ''' this is the place to instantiate AVTransport and RenderingControl for this connection ''' avt_id = self.next_avt_id self.next_avt_id += 1 self.device.av_transport_server.create_new_instance(avt_id) rcs_id = self.next_rcs_id self.next_rcs_id += 1 self.device.rendering_control_server.create_new_instance(rcs_id) self.connections[id] = \ {'ProtocolInfo': RemoteProtocolInfo, 'Direction': Direction, 'PeerConnectionID': PeerConnectionID, 'PeerConnectionManager': PeerConnectionManager, 'AVTransportID': avt_id, 'RcsID': rcs_id, 'Status': 'OK'} self.backend.current_connection_id = id csv_ids = ','.join([str(x) for x in self.connections]) self.set_variable(0, 'CurrentConnectionIDs', csv_ids) return id, avt_id, rcs_id
[docs] def remove_connection(self, id): if self.device.device_type == 'MediaRenderer': try: self.device.av_transport_server.remove_instance( self.lookup_avt_id(id)) self.device.rendering_control_server.remove_instance( self.lookup_rcs_id(id)) del self.connections[id] except Exception as e: self.warning(f'ConnectionManagerServer.remove_connection: {e}') self.backend.current_connection_id = None if self.device.device_type == 'MediaServer': del self.connections[id] csv_ids = ','.join([str(x) for x in self.connections]) self.set_variable(0, 'CurrentConnectionIDs', csv_ids)
[docs] def remove_lingering_connections(self): '''Check if we have a connection that hasn't a StateVariable change within the last 300 seconds, if so remove it.''' if self.device.device_type != 'MediaRenderer': return now = time.time() for id, connection in list(self.connections.items()): avt_id = connection['AVTransportID'] rcs_id = connection['RcsID'] avt_active = True rcs_active = True # print('remove_lingering_connections', id, avt_id, rcs_id) if avt_id > 0: avt_variables = \ self.device.av_transport_server.get_variables().get(avt_id) if avt_variables: avt_active = False for variable in list(avt_variables.values()): if variable.last_time_touched + 300 >= now: avt_active = True break if rcs_id > 0: rcs_variables = \ self.device.rendering_control_server.get_variables().get( rcs_id) if rcs_variables: rcs_active = False for variable in list(rcs_variables.values()): if variable.last_time_touched + 300 >= now: rcs_active = True break if not avt_active and not rcs_active: self.remove_connection(id)
[docs] def lookup_connection(self, id): return self.connections.get(id)
[docs] def lookup_avt_id(self, id): try: return self.connections[id]['AVTransportID'] except (ValueError, KeyError): return 0
[docs] def lookup_rcs_id(self, id): try: return self.connections[id]['RcsID'] except (ValueError, KeyError): return 0
[docs] def listchilds(self, uri): uri = to_string(uri) cl = '' for c in self.children: c = to_string(c) cl += f'<li><a href={uri}/{c}>{c}</a></li>' return cl
[docs] def render(self, request): html = f'''\ <html> <head> <title>Cohen3 (ConnectionManagerServer)</title> <link rel="stylesheet" type="text/css" href="/styles/main.css" /> </head> <h5> <img class="logo-icon" src="/server-images/coherence-icon.svg"> </img>Root of the ConnectionManager</h5> <div class="list"><ul>{self.listchilds(request.uri)}</ul></div> </html>''' return html.encode('ascii')
[docs] def set_variable(self, instance, variable_name, value, default=False): if (variable_name == 'SourceProtocolInfo' or variable_name == 'SinkProtocolInfo'): if isinstance(value, str) and len(value) > 0: value = [v.strip() for v in value.split(',')] without_dlna_tags = [] for v in value: protocol, network, content_format, additional_info = v.split( ':') if additional_info == '*': without_dlna_tags.append(v) def with_some_tag_already_there(protocolinfo): protocol, network, content_format, additional_info = \ protocolinfo.split(':') for v in value: v_protocol, v_network, \ v_content_format, v_additional_info = v.split(':') if ((protocol, network, content_format) == (v_protocol, v_network, v_content_format) and v_additional_info != '*'): return True return False for w in without_dlna_tags: if not with_some_tag_already_there(w): protocol, network, content_format, additional_info = \ w.split(':') if variable_name == 'SinkProtocolInfo': value.append( ':'.join( (protocol, network, content_format, build_dlna_additional_info( content_format, does_playcontainer=self.does_playcontainer )))) else: value.append( ':'.join( (protocol, network, content_format, build_dlna_additional_info( content_format)))) service.ServiceServer.set_variable( self, instance, variable_name, value, default=default)
[docs] def upnp_PrepareForConnection(self, *args, **kwargs): self.info('upnp_PrepareForConnection') ''' check if we really support that mimetype ''' RemoteProtocolInfo = kwargs['RemoteProtocolInfo'] ''' if we are a MR and this in not 'Input' then there is something strange going on ''' Direction = kwargs['Direction'] if (self.device.device_type == 'MediaRenderer' and Direction == 'Output'): return failure.Failure(errorCode(702)) if (self.device.device_type == 'MediaServer' and Direction != 'Input'): return failure.Failure(errorCode(702)) ''' the InstanceID of the MS ? ''' PeerConnectionID = kwargs['PeerConnectionID'] ''' ??? ''' PeerConnectionManager = kwargs['PeerConnectionManager'] local_protocol_infos = None if self.device.device_type == 'MediaRenderer': local_protocol_infos = self.get_variable('SinkProtocolInfo').value if self.device.device_type == 'MediaServer': local_protocol_infos = \ self.get_variable('SourceProtocolInfo').value self.debug( f'ProtocalInfos: {RemoteProtocolInfo} -- {local_protocol_infos}') try: remote_protocol, remote_network, remote_content_format, _ = \ RemoteProtocolInfo.split(':') except Exception as e: self.warning(f'unable to process RemoteProtocolInfo ' f'{RemoteProtocolInfo} [error: {e}]') return failure.Failure(errorCode(701)) for protocol_info in local_protocol_infos.split(','): # print remote_protocol,remote_network,remote_content_format # print protocol_info local_protocol, local_network, local_content_format, _ = \ protocol_info.split(':') # print local_protocol,local_network,local_content_format if ((remote_protocol == local_protocol or remote_protocol == '*' or local_protocol == '*') and (remote_network == local_network or remote_network == '*' or local_network == '*') and (remote_content_format == local_content_format or remote_content_format == '*' or local_content_format == '*')): connection_id, avt_id, rcs_id = \ self.add_connection( RemoteProtocolInfo, Direction, PeerConnectionID, PeerConnectionManager) return {'ConnectionID': connection_id, 'AVTransportID': avt_id, 'RcsID': rcs_id} return failure.Failure(errorCode(701))
[docs] def upnp_ConnectionComplete(self, *args, **kwargs): ConnectionID = int(kwargs['ConnectionID']) ''' remove this ConnectionID and the associated instances @ AVTransportID and RcsID ''' self.remove_connection(ConnectionID) return {}
[docs] def upnp_GetCurrentConnectionInfo(self, *args, **kwargs): ConnectionID = int(kwargs['ConnectionID']) ''' return for this ConnectionID the associated InstanceIDs @ AVTransportID and RcsID ProtocolInfo PeerConnectionManager PeerConnectionID Direction Status or send a 706 if there isn't such a ConnectionID ''' connection = self.lookup_connection(ConnectionID) if connection is None: return failure.Failure(errorCode(706)) else: return {'AVTransportID': connection['AVTransportID'], 'RcsID': connection['RcsID'], 'ProtocolInfo': connection['ProtocolInfo'], 'PeerConnectionManager': connection[ 'PeerConnectionManager'], 'PeerConnectionID': connection['PeerConnectionID'], 'Direction': connection['Direction'], 'Status': connection['Status'], }