Source code for coherence.upnp.core.variable

# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php

# Copyright (C) 2006 Fluendo, S.A. (www.fluendo.com).
# Copyright 2006, Frank Scholz <coherence@beebits.net>
# Copyright 2018, Pol Canelles <canellestudi@gmail.com>

'''
:class:`StateVariable`
----------------------

The :class:`StateVariable` implements an UPnP variable with the ability to
emmit changes via EventDispatcher whenever some class variable changes.
'''

import time

from eventdispatcher import EventDispatcher

from coherence.upnp.core import utils

try:
    # FIXME:  there is some circular import,
    # service imports variable, variable imports service
    # how is this done properly?
    from coherence.upnp.core import service
except ImportError:
    from . import service

from coherence import log


[docs]class StateVariable(EventDispatcher, log.LogAble): ''' Represents a UPnP variable. It has the ability to dispatch changes whenever some class variable changes. .. versionchanged:: 0.9.0 * Migrated from louie/dispatcher to EventDispatcher * Added two more class variables: - :attr:`StateVariable.dispatch_targets` - :attr:`StateVariable.dispatch_events` * The emitted events changed: - Coherence.UPnP.StateVariable.changed => state_variable_changed - Coherence.UPnP.StateVariable.{var name}.changed => state_variable_{var name}_changed .. warning: This class is special regarding EventDispatcher, because we initialize the event into the service and in the class, and we only do that if the target service has inherited from EventDispatcher, otherwise we only implements EventDispatcher into this class. ''' logCategory = 'variable' dispatch_targets = [] dispatch_events = [] def __init__(self, upnp_service, name, implementation, instance, send_events, data_type, allowed_values): log.LogAble.__init__(self) EventDispatcher.__init__(self) self.service = upnp_service self.dispatch_events = [ f'state_variable_{name}_changed', 'state_variable_changed'] self.dispatch_targets = [self] if isinstance(self.service, EventDispatcher): self.dispatch_targets.append(self.service) for target in self.dispatch_targets: for evt in self.dispatch_events: if evt not in target.event_dispatcher_event_callbacks: target.register_event(evt) self.name = name self.implementation = implementation self.instance = instance self.send_events = utils.means_true(send_events) self.never_evented = False self.data_type = data_type self.allowed_values = allowed_values if self.allowed_values is None: self.allowed_values = [] self.has_vendor_values = False self.allowed_value_range = None self.dependant_variable = None self.default_value = '' self.old_value = '' self.value = '' self.last_time_touched = None self._callbacks = [] if isinstance(self.service, service.ServiceServer): self.moderated = self.service.is_variable_moderated(name) self.updated = False
[docs] def as_tuples(self): r = [] r.append(('Name', self.name)) if self.send_events: r.append(('Evented', 'yes')) else: r.append(('Evented', 'no')) r.append(('Data Type', self.data_type)) r.append(('Default Value', self.default_value)) r.append(('Current Value', str(self.value))) if self.allowed_values is not None and \ len(self.allowed_values) > 0: r.append(('Allowed Values', ','.join(self.allowed_values))) return r
[docs] def set_default_value(self, value): self.update(value) self.default_value = self.value
[docs] def set_allowed_values(self, values): if not isinstance(values, (list, tuple)): values = [values] self.allowed_values = values
[docs] def set_allowed_value_range(self, **kwargs): self.allowed_value_range = kwargs
[docs] def get_allowed_values(self): return self.allowed_values
[docs] def set_never_evented(self, value): self.never_evented = utils.means_true(value)
[docs] def update(self, value): if not isinstance(self.service, service.Service): if self.name == 'ContainerUpdateIDs': old_value = self.value if self.updated: if isinstance(value, tuple): v = old_value.split(',') i = 0 while i < len(v): if v[i] == str(value[0]): del v[i:i + 2] old_value = ','.join(v) break i += 2 if len(old_value): new_value = old_value + ',' + str( value[0]) + ',' + str(value[1]) else: new_value = str(value[0]) + ',' + str(value[1]) else: if len(old_value): new_value = str(old_value) + ',' + str(value) else: new_value = str(value) else: if isinstance(value, tuple): new_value = str(value[0]) + ',' + str(value[1]) else: new_value = value else: if self.data_type == 'string': if isinstance(value, str): value = value.split(',') if (isinstance(value, tuple) or isinstance(value, set)): value = list(value) if not isinstance(value, list): value = [value] new_value = [] for v in value: if type(v) == str: v = v # .encode('utf-8') else: v = str(v) if len(self.allowed_values): if self.has_vendor_values: new_value.append(v) elif v.upper() in [x.upper() for x in self.allowed_values]: new_value.append(v) else: self.warning( f'Variable {self.name} update, ' f'{self.has_vendor_values} value {v} ' f'doesn\'t fit in {self.allowed_values}') new_value = 'Coherence_Value_Error' else: new_value.append(v) new_value = ','.join(new_value) elif self.data_type == 'boolean': new_value = utils.generalise_boolean(value) elif self.data_type == 'bin.base64': new_value = value else: new_value = int(value) else: if self.data_type == 'string': if type(value) != str: value = str(value) if len(self.allowed_values): if self.has_vendor_values: new_value = value elif value.upper() in [v.upper() for v in self.allowed_values]: new_value = value else: self.warning(f'Variable {self.name} NOT updated, ' f'value {value} doesn\'t fit') new_value = 'Coherence_Value_Error' else: new_value = value elif self.data_type == 'boolean': new_value = utils.generalise_boolean(value) elif self.data_type == 'bin.base64': new_value = value else: try: new_value = int(value) except ValueError: new_value = 'Coherence_Value_Error' if new_value == 'Coherence_Value_Error': return if new_value == self.value: self.info(f'variable NOT updated, no value change' f' {self.name} {self.value}') return self.old_value = self.value self.value = new_value self.last_time_touched = time.time() # print(f'UPDATED {self.name} {self.service} ' # f'{isinstance(self.service, service.Service)} ' # f'{self.instance} {self.value} {self._callbacks}') self.notify() if isinstance(self.service, service.Service): # self.notify() pass else: self.updated = True if self.service.last_change is not None: self.service.last_change.updated = True
[docs] def subscribe(self, callback): self._callbacks.append(callback) callback(self)
[docs] def notify(self): if self.name.startswith('A_ARG_TYPE_'): return self.info( f'Variable {self.name} sends notify about new value >{self.value}<' ) # if self.old_value == '': # return for target in self.dispatch_targets: for evt in self.dispatch_events: target.dispatch_event(evt, variable=self) # print(f'CALLBACKS {self.name} {self.instance} {self._callbacks}') for callback in self._callbacks: callback(self)
def __repr__(self): return \ f'Variable: {self.name}, {str(self.service)}, {self.instance:d},' \ f' {self.implementation}, {self.data_type},' \ f' {str(self.allowed_values)}, {str(self.default_value)},' \ f' {str(self.old_value)}, {str(self.value)},' \ f' {str(self.send_events)}'