''' Created on Nov 28, 2012 @author: Repnikov Dmitry ''' import os, logging, threading, re, copy from jsonrpc_server import JSONRPCServer from module_loader import ModuleLoader from helperBTE import HelperBTE from helperIPcamera import HelperIPcamera from message_queue import MessageQueue from helperSNMPSensors import HelperSNMPSensors from helperElectricityMeter import HelperElectricityMeter from helperDGS import HelperDGS from helperMPSU import HelperMPSU from helperInverPack import HelperInverPackMeter from helperCRS import HelperCRS from helperPulsar import HelperPulsar from helperUPKB import HelperUPKB from configman import make_copy from sensor_scheduler import SensorScheduler class SensorHolder: """ Class to hold sensor, cache and other """ sensors_lock = threading.Lock() # Global lock for sensors def __init__(self, _sensor): self.state = None # Current state of this sensor self.value = None # Current value of this sensor self.sensor = _sensor def onChange(self): pass class SensorManager(JSONRPCServer, HelperPulsar, HelperBTE, HelperIPcamera, HelperSNMPSensors, HelperElectricityMeter, HelperDGS, HelperMPSU, HelperInverPackMeter, HelperCRS, HelperUPKB): """ Sensor manager. API for sensors storing and management. """ JSONRPC_SERVER_ADDRESS = ("127.0.0.1", 4665) STATE_CHANGE_TRIGGER_PATH = ["notifications", "notifications", "3db09436-17a8-4b0e-b70c-93ceaf094cff", "trigger"] SENSOR_SCHEDULERS = ["DEFAULT", "1W", "CONMAN"] def __init__(self, _configman, _queue_event, _queue_action): self.logger = logging.getLogger(__name__) self.sensors = {} self.sensor_modules = ModuleLoader(__file__, "/../sensors/", "sensor_*", _recursive = True) self.event_modules = ModuleLoader(__file__, "/../events/", "event_*") self.configman = _configman self.queue_event = _queue_event self.queue_action = _queue_action self.deviceman = None self.sensor_schedulers = {} for name in self.SENSOR_SCHEDULERS: self.sensor_schedulers[name] = SensorScheduler(name) HelperBTE.__init__(self, _configman) HelperIPcamera.__init__(self, _configman) HelperSNMPSensors.__init__(self, _configman) HelperElectricityMeter.__init__(self, _configman) HelperDGS.__init__(self, _configman) HelperMPSU.__init__(self, _configman) HelperInverPackMeter.__init__(self, _configman) HelperCRS.__init__(self, _configman) HelperPulsar.__init__(self, _configman) HelperUPKB.__init__(self, _configman) JSONRPCServer.__init__(self) def set_device_man(self, _deviceman): self.deviceman = _deviceman try: HelperBTE.set_device_man(_deviceman) HelperDGS.set_device_man(_deviceman) HelperMPSU.set_device_man(_deviceman) HelperPulsar.set_device_man(_deviceman) except: self.logger.exception("Unable to set device manager instance") #try: self.MPSU_check({"alias":"serial232.0", "name":"/dev/ttyS1"}, 9600) #except: self.logger.exception("Unable to set check MPSU") def start_sensor(self, id): """ Start sensor thread, if any """ try: self.logger.debug("Starting sensor module '"+id+"'") self.put_value_pair(id, (None, None), default=True) self.sensors[id].sensor.restart() except: self.logger.exception("Unable to start sensor '"+id+"'") def start_scheduler(self): try: for scheduler in self.sensor_schedulers.values(): scheduler.start() except: self.logger.exception("start scheduler error") def register_polling_sensor(self, sensor): self.sensor_schedulers[sensor.get_scheduler_name()].register_sensor(sensor) def unregister_polling_sensor(self, sensor): self.sensor_schedulers[sensor.get_scheduler_name()].unregister_sensor(sensor) def list_sensor_ids(self): return self.sensors.keys() def find_sensor_by_class(self, class_name): """ Find sensor id by class name (can be regular expression) """ SensorHolder.sensors_lock.acquire() res = [] try: for id in self.sensors.keys(): if re.match(class_name, self.sensors[id].sensor.__class__.__name__): res.append(id) except: self.logger.exception("Unable to find sensor by class '"+str(class_name)+"'") SensorHolder.sensors_lock.release() return res def find_sensor_by_group(self, group_name): """ Find sensor id by group name (can be regular expression) """ SensorHolder.sensors_lock.acquire() res = [] try: for id in self.sensors.keys(): if re.match(group_name, self.sensors[id].sensor.group): res.append(id) except: self.logger.exception("Unable to find sensor by group '"+str(group_name)+"'") SensorHolder.sensors_lock.release() return res def get_available_classes(self): """ Return dictionary {"class_name":("description", [ids])} """ res = {} self.logger.debug("get_available_classes start") SensorHolder.sensors_lock.acquire() try: sensor_classes = self.sensor_modules.class_by_regexp("Sensor[\w]+") for sensor_class in sensor_classes: sensor_ids = [] for id in self.sensors.keys(): if (not (id in sensor_ids)) and (self.sensors[id].sensor.__class__.__name__ == sensor_class.__name__): sensor_ids.append(id) res[sensor_class.__name__] = ( [sensor_class.__doc__, sensor_ids] ) except: res = None SensorHolder.sensors_lock.release() self.logger.debug("get_available_classes end") return res def get_view_dict(self, id): """ Returns view dictionary of sensor. This dictionary contains strings, that can be used in formatting. """ try: return self.sensors[id].sensor.get_view_dict() except: return {} def get_view_dicts(self, ids): """ Returns view dictionaries of sensors. """ res = {} try: for id in ids: res[id] = self.get_view_dict(id) return res except: return {} def get_view_dict_by_alias(self, alias): try: return self.get_sensor_by_alias(alias).get_view_dict() except: return {} def get_view_dicts_by_aliases(self, aliases): res = {} for alias in aliases: try: res[alias] = self.get_sensor_by_alias(alias).get_view_dict() except: self.logger.exception("get_sensor_by_alias exception") return res def get_all_sensors_views(self): res = {} for id in self.sensors.keys(): res[id] = self.get_view_dict(id) return res def get_value_description(self, id, value): """ Returns full form of value description, include state """ try: return self.sensors[id].sensor.get_value_descr(value) except: return {} def apply_view_dict(self, id, nview): """ Set dict nview to sensor (through SensorView class) """ try: return self.sensors[id].sensor.apply_view_dict(nview) except: return False def apply_view_dict_by_alias(self, alias, nview): """ Set dict nview to sensor (through SensorView class) """ try: return self.get_sensor_by_alias(alias).apply_view_dict(nview) except: return False def put_value_pair(self, id, pair, default = False, priority = None): """ Write value pair (value, state) to sensor corresponding files """ """ default - if no value/state files were created, this pair be written as default """ value, state = pair fRefreshValue, fRefreshSate = False, False SensorHolder.sensors_lock.acquire() if not (id in self.sensors.keys()): SensorHolder.sensors_lock.release() return try: if not default: old_value, old_state = self.get_value_pair(id, False) else: old_value, old_state = pair except: self.logger.exception("Exception while put state/value for sensor '"+str(id)+"'") SensorHolder.sensors_lock.release() return try: if (old_value != value) or (default): fRefreshValue, self.sensors[id].value = True, value if (old_state != state) or (default): fRefreshSate, self.sensors[id].state = True, state except: self.logger.exception("Exception while put state/value for sensor '"+str(id)+"'") SensorHolder.sensors_lock.release() return #fRefreshValue = fRefreshValue or self.sensors[id].sensor.history['any_value'] fRefreshValue = fRefreshValue or (id in [ 'DEAD0000-BEAF-DEAD-BEAF-EE0000000005', 'DEAD0000-BEAF-DEAD-BEAF-EE0000000006', 'DEAD0000-BEAF-DEAD-BEAF-EE00000000A5', 'DEAD0000-BEAF-DEAD-BEAF-EE00000000A6' ]) #if id == '00000000-0000-0000-0000-000000000001': # self.logger.error("Value change of '"+id+"' detected: "+str(old_value)+" -> "+str(value) + " " + str(fRefreshValue) + " " + str(default)) if fRefreshValue: #self.logger.error("Value change of '"+id+"' detected: "+str(old_value)+" -> "+str(value) + " " + str(fRefreshValue) + " " + str(default)) if self.logger.level <= logging.DEBUG: self.logger.debug("Value change of '"+id+"' detected: "+str(old_value)+" -> "+str(value)) if not default: self.create_event_custprior(priority, "EventValueChange", self.sensors[id].sensor, old_value, value, state) if fRefreshSate: if self.logger.level <= logging.DEBUG: self.logger.debug("State change of '"+id+"' detected: "+str(old_state)+" -> "+str(state)) if not default: self.create_event_custprior(priority, "EventStateChange", self.sensors[id].sensor, old_state, state, value) if fRefreshValue or fRefreshSate: self.sensors[id].onChange() SensorHolder.sensors_lock.release() def get_value_pair(self, id, fLock = True): """ Read value pair (value, state) for sensor from corresponding files """ value, state = None, None if fLock: SensorHolder.sensors_lock.acquire() try: if id in self.sensors.keys(): value, state = self.sensors[id].value, self.sensors[id].state finally: if fLock: SensorHolder.sensors_lock.release() return (value, state) def get_value_pairs(self, ids): """ Returns id:value/state pairs for list of ids """ SensorHolder.sensors_lock.acquire() try: res = dict(zip(ids, map(lambda x: self.get_value_pair(x, False), ids))) except: res = {} SensorHolder.sensors_lock.release() return res def get_value_descrs(self, values): """ Returns id:description pairs for list of ids """ SensorHolder.sensors_lock.acquire() try: res = dict(zip(values.keys(), map(lambda x: self.get_value_description(x, values[x]), values.keys()))) except: res = {} SensorHolder.sensors_lock.release() return res def get_value_pair_and_descr(self, ids): """ Returns id:value/state/description pairs for list of ids """ SensorHolder.sensors_lock.acquire() try: res = dict(zip(ids, map(lambda x: self.get_value_pair(x, False), ids))) res = dict(zip(ids, map(lambda x: (res[x][0], res[x][1], self.get_value_description(x, res[x][0])), ids ))) except: self.logger.exception("Exception while get_value_pair_and_descr.") res = {} SensorHolder.sensors_lock.release() return res def get_sensor_by_id(self, id): return self.sensors[id].sensor def get_sensor_by_alias(self, alias): for sensor_holder in self.sensors.values(): if sensor_holder.sensor.alias == alias: return sensor_holder.sensor raise Exception("sensor {0} not found".format(alias)) def get_sensor_id_by_alias(self, alias): for sensor_holder in self.sensors.values(): if sensor_holder.sensor.alias == alias: return sensor_holder.sensor.id raise Exception("sensor {0} not found".format(alias)) def update_snmp_sensor(self, id): """ Immidiate update of snmp sensor. Blocking. """ try: self.get_sensor_by_id(id).immidiate_update() except: self.logger.exception("Exception while update_snmp_sensor") """ ToDo: rename this to event_taker """ def subscribe(self, _class, _event, _handler): """ Register new handler for some sensor """ self.queue_event.subscribe(_class, _event, _handler) def action_taker(self, _class, _event, _handler): """ Register new handler for some sensor """ self.queue_action.subscribe(_class, _event, _handler) def put_message(self, _queue, _class, _priority, *_args): """ Create action or event message for some subsystem. _class - string with event class name. """ """ _args - will be translate to command. if [0] (sender) = None, we set it as self """ if (len(_args) >= 1): if (len(_class) > 2) and (_class[-2:] != "\Z"): _class += "\Z" if self.logger.level <= logging.INFO: self.logger.info("Try to generate event '"+str(_class)+"' for '"+str(_args[0])+"'...") classes = self.event_modules.class_by_regexp(_class) if len(classes) == 1: if (len(_args) >= 1) and (_args[0] == None): _args = (self,)+_args[1:] try: _queue.put(classes[0](*_args)) except: self.logger.exception("Exception while creating event class '"+_class+"'") else: self.logger.error("Loading event "+_class+" - FAIL, no or too many classes ("+str(classes)+")") else: self.logger.error("Not enough arguments to create event '"+_class+"' - FAIL") def create_event(self, _class, *_args): self.put_message(self.queue_event, _class, MessageQueue.PRIOR_LOWEST, *_args) def create_event_hiprior(self, _class, *_args): self.put_message(self.queue_event, _class, MessageQueue.PRIOR_HIGHEST, *_args) def create_event_custprior(self, _prior, _class, *_args): if _prior == None: _prior = MessageQueue.PRIOR_LOWEST self.put_message(self.queue_event, _class, _prior, *_args) def create_action(self, _class, *_args): self.put_message(self.queue_action, _class, MessageQueue.PRIOR_LOWEST, *_args) """ ToDo: maybe move this to some power-helper """ def is_battery_present(self): try: return self.get_sensor_by_alias("BAT").is_battery_present() except: return False """ --- This two routines for compatibility, and for raw Event/Action class put --- """ def put_action(self, action, priority = None): """ Put action element in queue """ if priority == None: priority = MessageQueue.PRIOR_LOWEST self.queue_action.put(action, priority) def put_event(self, event, priority = None): """ Put event element in queue """ if priority == None: priority = MessageQueue.PRIOR_LOWEST self.queue_event.put(event) def load_sensor(self, id, descr): """ Create sensor from configuration description and put it in self.sensors """ """ WARNING! By default this function works only with NEW configuration. """ """ For newly added sensor please use load_new_sensor, to save/deserialize configuration. """ try: self.logger.info("Try to load sensor "+id+"...") sensor_classes = self.sensor_modules.class_by_regexp(descr["sensor_class"]+"\Z") if len(sensor_classes) == 1: try: self.logger.debug("Found sensor class '"+str(sensor_classes[0].__name__)+"' for sensor "+id+", creating...") self.sensors[id] = SensorHolder(sensor_classes[0](self.configman, self, id)) return True except: self.logger.exception("Exception while creating sensor class '"+str(sensor_classes[0])+"', id "+id) else: self.logger.error("Loading sensor "+id+" - FAIL, no such class: "+descr["sensor_class"]+" or too many classes ("+str(sensor_classes)+")") except: self.logger.exception("Exception while searching for sensor with description '"+str(descr)+"'") return False def load_new_sensor(self, id, class_name): fake_descr = {"sensor_class":class_name} if self.load_sensor(id, fake_descr): self.start_sensor(id) self.configman.confirm_path(["sensors", id]) return True return False def confirm_sensors(self): self.configman.confirm_path(["sensors"]) def delete_sensor(self, id): result = False self.sensors[id].sensor.stop() SensorHolder.sensors_lock.acquire() try: self.sensors[id].sensor.delete() del self.sensors[id] except: self.logger.exception("Exception while delete sensor '"+str(id)+"'") finally: SensorHolder.sensors_lock.release() try: """ This must not be under lock """ sensors = make_copy(self.configman.read_config(["sensors"])) del sensors[id] self.configman.write_config(["sensors"], sensors) result = True except: self.logger.exception("Exception while delete sensor '"+str(id)+"'") return result def delete_sensor_device(self, sensor_id): result = False ids = [] try: device_sensors = [sensor for sensor in self.sensors[sensor_id].sensor.get_device_sensors()] self.sensors[sensor_id].sensor.remove_device() except: self.logger.exception("Exception while delete device '"+str(sensor_id)+"'") try: """ This must not be under lock """ sensors = make_copy(self.configman.read_config(["sensors"])) aliases = [] for sensor in device_sensors: try: self.logger.debug("delete sensor '"+str(sensor.id)+"'") aliases.append(sensor.alias) del sensors[sensor.id] del self.sensors[sensor.id] except: self.logger.exception("Exception while delete sensor '"+str(sensor.id)+"'") self.configman.write_config(["sensors"], sensors) try: self.set_send_inform_flag(aliases, False) except: self.logger.exception("can't set inform flag") result = True except: self.logger.exception("Exception while delete device '"+str(sensor_id)+"'") return result def get_controller_state(self): sensor = self.get_sensor_by_alias("CNT") value = self.get_value_pair(sensor.id)[0] return self.get_value_description(sensor.id, value).get("caption", "n/a") def get_Sig(self, teploAPI=None): def get_sensor_state(alias): return self.get_sensor_state_by_alias(alias) == 'almaj' events_functions = { 0: [lambda: get_sensor_state("VIBlis")], 1: [lambda: self.get_sensor_value_by_alias("teploUB.pwr") in ("DISCHARGING", "SHUTDOWN")], 2: [lambda: get_sensor_state("DI1")], 3: [lambda: get_sensor_state("DI2")], 4: [lambda: get_sensor_state("DI3")], 5: [lambda: get_sensor_state("DI4")], 6: [lambda: get_sensor_state("teploUB.extDS1")], 7: [lambda: teploAPI is not None and not teploAPI.check_serial_numbers()], } sig = 0 for bit, func_list in events_functions.items(): sig |= any(func() for func in func_list) << bit return sig def get_sensor_description_by_alias(self, alias): #description sensor = self.get_sensor_by_alias(alias) value = self.get_value_pair(sensor.id)[0] return self.get_value_description(sensor.id, value).get("caption", "n/a") def get_sensor_state_by_alias(self, alias): sensor = self.get_sensor_by_alias(alias) return self.get_value_pair(sensor.id)[1] def get_sensor_value_by_alias(self, alias): sensor = self.get_sensor_by_alias(alias) return self.get_value_pair(sensor.id)[0] def find_free_alias(self, pattern): """ ToDo: merge with same function of event monitor (load_sensors) and move to configman """ res = set(range(1, 999)) try: self.logger.debug("Search for free aliases...") all_sensors = self.configman.read_config(["sensors"]) for sensor in all_sensors.keys(): try: self.logger.debug("matching sensor ID \'{0}\' alias \'{1}\'".format(sensor, all_sensors[sensor]["alias"])) matched = re.match(pattern, all_sensors[sensor]["alias"]) if matched: self.logger.debug("\tAllready have sensor ID '"+sensor+"'") num = int(matched.group("num"), 10) self.logger.debug("removing {0}".format(num)) if num in res: res.remove(num) except: self.logger.exception("Exception while scan aliases '"+str(sensor)+"'") except: self.logger.exception("Exception while read sensors configuration") return res # This is bad - on exception will return full range def set_send_inform_flag(self, aliases, flag): try: change_state_trigger = self.configman.read_config(self.STATE_CHANGE_TRIGGER_PATH, copy_=True) for alias in aliases: if len(change_state_trigger) == 0: change_state_trigger.append("or") trigger = [ "sensor_state", alias ] if flag: if trigger not in change_state_trigger: change_state_trigger.append(trigger) else: try: change_state_trigger.remove(trigger) except Exception as e: self.logger.warning("No sensor for trigger "+str(trigger)) self.configman.write_config(self.STATE_CHANGE_TRIGGER_PATH, change_state_trigger) except: self.logger.exception("set_send_snmp_flag exception")