Source code for coherence.upnp.core.ssdp

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright 2005, Tim Potter <tpot@samba.org>
# Copyright 2006 John-Mark Gurney <gurney_j@resnet.uroegon.edu>
# Copyright (C) 2006 Fluendo, S.A. (www.fluendo.com).
# Copyright 2006,2007,2008,2009 Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
:class:`SSDPServer`
-------------------

Implementation of a SSDP server under Twisted and EventDispatcher.
'''

import random
import socket
import time

from twisted.internet import reactor
from twisted.internet import task
from twisted.internet.protocol import DatagramProtocol
from twisted.web.http import datetimeToString
from twisted.test import proto_helpers

from eventdispatcher import EventDispatcher, ListProperty

from coherence.upnp.core.utils import to_bytes, to_string
from coherence import log, SERVER_ID

SSDP_PORT = 1900
SSDP_ADDR = '239.255.255.250'


[docs]class SSDPServer(EventDispatcher, DatagramProtocol, log.LogAble): ''' A class implementing a SSDP server. .. versionchanged:: 0.9.0 * Migrated from louie/dispatcher to EventDispatcher * The emitted events changed: - datagram_received => datagram_received - Coherence.UPnP.SSDP.new_device => new_device - Coherence.UPnP.SSDP.removed_device => removed_device - Coherence.UPnP.Log => log * Added new class variable `root_devices` which uses EventDispatcher's properties .. note:: The methods :meth:`notifyReceived` and :meth:`searchReceived` are called when the appropriate type of datagram is received by the server. ''' logCategory = 'ssdp' root_devices = ListProperty([]) '''A list of the detected root devices''' def __init__(self, test=False, interface=''): # Create SSDP server log.LogAble.__init__(self) EventDispatcher.__init__(self) self.register_event( 'datagram_received', 'new_device', 'removed_device', 'log', ) self.known = {} self._callbacks = {} self.test = test if not self.test: self.port = reactor.listenMulticast( SSDP_PORT, self, listenMultiple=True, interface=interface) self.port.joinGroup(SSDP_ADDR, interface=interface) self.resend_notify_loop = task.LoopingCall(self.resendNotify) self.resend_notify_loop.start(777.0, now=False) self.check_valid_loop = task.LoopingCall(self.check_valid) self.check_valid_loop.start(333.0, now=False) self.active_calls = []
[docs] def shutdown(self): '''Shutdowns the server :class:`SSDPServer` and sends out the bye bye notifications via method :meth:`doByebye`.''' for call in reactor.getDelayedCalls(): if call.func == self.send_it: call.cancel() if not self.test: if self.resend_notify_loop.running: self.resend_notify_loop.stop() if self.check_valid_loop.running: self.check_valid_loop.stop() for st in self.known: if self.known[st]['MANIFESTATION'] == 'local': self.doByebye(st)
[docs] def datagramReceived(self, data, xxx_todo_changeme): '''Handle a received multicast datagram.''' self.debug(f'datagramReceived: {data}') (host, port) = xxx_todo_changeme data = to_string(data) try: header, payload = data.split('\r\n\r\n')[:2] except ValueError as err: print(err) print('Arggg,', data) import pdb pdb.set_trace() lines = header.split('\r\n') cmd = lines[0].split(' ') lines = [x.replace(': ', ':', 1) for x in lines[1:]] lines = [x for x in lines if len(x) > 0] # TODO: Find and fix where some of the header's keys are quoted. # This hack, allows to fix the quoted keys for the headers, introduced # at some point of the source code. I notice that the issue appears # when using FSStore plugin. But where? def fix_string(s, to_lower=True): for q in ['\'', '"']: while s.startswith(q): s = s[1:] for q in ['\'', '"']: while s.endswith(q): s = s[:-1] if to_lower: s = s.lower() return s headers = [x.split(':', 1) for x in lines] headers = \ dict([(fix_string(x[0]), fix_string(x[1], to_lower=False)) for x in headers]) self.msg(f'SSDP command {cmd[0]} {cmd[1]} - from {host}:{port}') self.debug(f'with headers: {headers}') if cmd[0] == 'M-SEARCH' and cmd[1] == '*': # SSDP discovery self.discoveryRequest(headers, (host, port)) elif cmd[0] == 'NOTIFY' and cmd[1] == '*': # SSDP presence self.notifyReceived(headers, (host, port)) else: self.warning(f'Unknown SSDP command {cmd[0]} {cmd[1]}') # make raw data available # send out the signal after we had a chance to register the device self.dispatch_event('datagram_received', data, host, port)
[docs] def register(self, manifestation, usn, st, location, server=SERVER_ID, cache_control='max-age=1800', silent=False, host=None): '''Register a service or device that this SSDP server will respond to.''' self.info(f'Registering {st} ({location}) -> {manifestation}') self.debug(f'\t-searching usn: {usn}') try: self.known[usn] = {} self.known[usn]['USN'] = usn self.known[usn]['LOCATION'] = location self.known[usn]['ST'] = st self.known[usn]['EXT'] = '' self.known[usn]['SERVER'] = server self.known[usn]['CACHE-CONTROL'] = cache_control self.known[usn]['MANIFESTATION'] = manifestation self.known[usn]['SILENT'] = silent self.known[usn]['HOST'] = host self.known[usn]['last-seen'] = time.time() self.msg(self.known[usn]) self.debug(f'\t-self.known: {self.known}') if manifestation == 'local': self.doNotify(usn) if st == 'upnp:rootdevice': self.dispatch_event( 'new_device', device_type=st, infos=self.known[usn]) self.root_devices.append(usn) # self.callback('new_device', st, self.known[usn]) # print('\t - ok all') except Exception as err: self.error(f'\t -> Error on registering service: {manifestation} ' f'[error: "{err}"]')
[docs] def unRegister(self, usn): self.msg(f'Un-registering {usn}') st = self.known[usn]['ST'] if st == 'upnp:rootdevice': self.dispatch_event( 'removed_device', device_type=st, infos=self.known[usn]) # self.callback('removed_device', st, self.known[usn]) self.root_devices.remove(usn) del self.known[usn]
[docs] def isKnown(self, usn): return usn in self.known
[docs] def notifyReceived(self, headers, xxx_todo_changeme1): '''Process a presence announcement. We just remember the details of the SSDP service announced.''' (host, port) = xxx_todo_changeme1 self.info(f'Notification from ({host},{port}) for {headers["nt"]}') self.debug(f'Notification headers: {headers}') if headers['nts'] == 'ssdp:alive': try: self.known[headers['usn']]['last-seen'] = time.time() self.debug(f'updating last-seen for {headers["usn"]}') except KeyError: self.register('remote', headers['usn'], headers['nt'], headers['location'], headers['server'], headers['cache-control'], host=host) elif headers['nts'] == 'ssdp:byebye': if self.isKnown(headers['usn']): self.unRegister(headers['usn']) else: self.warning(f'Unknown subtype {headers["nts"]} ' f'for notification type {headers["nt"]}') self.dispatch_event('log', 'SSDP', host, f'Notify {headers["nts"]} for {headers["usn"]}')
[docs] def send_it(self, response, destination, delay, usn): self.info(f'send discovery response delayed by ' f'{delay} for {usn} to {destination}') try: self.transport.write(to_bytes(response), destination) except (AttributeError, socket.error) as msg: self.exception(f'failure sending out datagram: {msg}')
[docs] def discoveryRequest(self, headers, xxx_todo_changeme2): '''Process a discovery request. The response must be sent to the address specified by (host, port).''' (host, port) = xxx_todo_changeme2 self.info( f'Discovery request from ({host},{port}) for {headers["st"]}') self.info(f'Discovery request for {headers["st"]}') self.dispatch_event('log', 'SSDP', host, f'M-Search for {headers["st"]}') # Do we know about this service? for i in list(self.known.values()): if i['MANIFESTATION'] == 'remote': continue if (headers['st'] == 'ssdp:all' and i['SILENT'] is True): continue if (i['ST'] == headers['st'] or headers['st'] == 'ssdp:all'): response = [] response.append(b'HTTP/1.1 200 OK') for k, v in list(i.items()): if k == 'USN': usn = v if k not in ('MANIFESTATION', 'SILENT', 'HOST'): response.append(f'{k}: {v}'.encode('ascii')) response.append(f'DATE: {datetimeToString()}'.encode('ascii')) response.extend((b'', b'')) delay = random.randint(0, int(headers['mx'])) reactor.callLater( delay, self.send_it, b'\r\n'.join(response), (host, port), delay, usn)
[docs] def doNotify(self, usn): '''Do notification''' if self.known[usn]['SILENT'] is True: return self.info(f'Sending alive notification for {usn}') # self.info(f'\t - self.known[usn]: {self.known[usn]}') resp = ['NOTIFY * HTTP/1.1', f'HOST: {SSDP_ADDR}:{SSDP_PORT}', 'NTS: ssdp:alive', ] stcpy = dict(iter(self.known[usn].items())) stcpy['NT'] = stcpy['ST'] del stcpy['ST'] del stcpy['MANIFESTATION'] del stcpy['SILENT'] del stcpy['HOST'] del stcpy['last-seen'] resp.extend([ f'{k}: {v}' for k, v in stcpy.items()]) resp.extend(('', '')) r = '\r\n'.join(resp).encode('ascii') self.debug(f'doNotify content {r} [transport is: {self.transport}]') if not self.transport: try: self.warning('transport not initialized...' 'trying to initialize a FakeDatagramTransport') self.transport = proto_helpers.FakeDatagramTransport() except Exception as er: self.error(f'Cannot initialize transport: {er}') try: self.transport.write(r, (SSDP_ADDR, SSDP_PORT)) except (AttributeError, socket.error) as msg: self.info(f'failure sending out alive notification: {msg}')
[docs] def doByebye(self, usn): '''Do byebye''' self.info(f'Sending byebye notification for {usn}') resp = ['NOTIFY * HTTP/1.1', f'HOST: {SSDP_ADDR}:{SSDP_PORT}', 'NTS: ssdp:byebye', ] try: stcpy = dict(iter(self.known[usn].items())) stcpy['NT'] = stcpy['ST'] del stcpy['ST'] del stcpy['MANIFESTATION'] del stcpy['SILENT'] del stcpy['HOST'] del stcpy['last-seen'] resp.extend([ f'{k}: {v}' for k, v in stcpy.items()]) resp.extend(('', '')) r = '\r\n'.join(resp).encode('ascii') self.debug(f'doByebye content {resp}') if not self.transport: self.warning('transport not initialized...' 'trying to initialize a FakeDatagramTransport') self.transport = proto_helpers.FakeDatagramTransport() self.makeConnection(self.transport) try: self.transport.write(r, (SSDP_ADDR, SSDP_PORT)) except (AttributeError, socket.error) as msg: self.info( f'failure sending out byebye notification: {msg}') except KeyError as msg: self.debug(f'error building byebye notification: {msg}')
[docs] def resendNotify(self): for usn in self.known: if self.known[usn]['MANIFESTATION'] == 'local': self.doNotify(usn)
[docs] def check_valid(self): ''' Check if the discovered devices are still ok, or if we haven't received a new discovery response ''' self.debug('Checking devices/services are still valid') removable = [] for usn in self.known: if self.known[usn]['MANIFESTATION'] != 'local': _, expiry = self.known[usn]['CACHE-CONTROL'].split('=') expiry = int(expiry) now = time.time() last_seen = self.known[usn]['last-seen'] self.debug( f'Checking if {self.known[usn]["USN"]} is still valid - ' f'last seen {last_seen} (+{expiry}), now {now}') if last_seen + expiry + 30 < now: self.debug(f'Expiring: {self.known[usn]}') if self.known[usn]['ST'] == 'upnp:rootdevice': self.dispatch_event( 'removed_device', device_type=self.known[usn]['ST'], infos=self.known[usn]) removable.append(usn) while len(removable) > 0: usn = removable.pop(0) del self.known[usn]
[docs] def subscribe(self, name, callback): self._callbacks.setdefault(name, []).append(callback)
[docs] def unsubscribe(self, name, callback): callbacks = self._callbacks.get(name, []) if callback in callbacks: callbacks.remove(callback) self._callbacks[name] = callbacks
[docs] def callback(self, name, *args): for callback in self._callbacks.get(name, []): callback(*args)