From c566793a606ba1f4b1f7ce271b59a7522c8e54bc Mon Sep 17 00:00:00 2001 From: mchlburton Date: Mon, 13 Mar 2017 20:02:06 +0000 Subject: [PATCH 1/4] Dummy API for Philips Hue light, pushes to Cassandra, working on pushing to influx instead --- .../devicediscovery/agent.py | 13 +- Agents/LightingAgent/lighting/agent.py | 2 + .../LightingAgent/lightingagent.launch.json | 2 +- .../classAPI/classAPI_DummyPhilipsHue.py | 317 ++++++++++++++++++ 4 files changed, 329 insertions(+), 5 deletions(-) create mode 100755 DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py diff --git a/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py b/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py index 1b1fd1e..f561c02 100755 --- a/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py +++ b/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py @@ -258,8 +258,8 @@ def DeviceDiscoveryAgent(config_path, **kwargs): discovered_address = discovery_module.discover(discovery_type) if discovered_address == None: discovered_address = list() - - + if discovery_type == 'Philips': + discovered_address.append("http://10.0.2.56") print discovered_address for address in discovered_address: @@ -269,14 +269,19 @@ def DeviceDiscoveryAgent(config_path, **kwargs): ip_address = address try: macaddress = discovery_module.getMACaddress(discovery_type, ip_address) + print "macaddress: !!!!!!!!!!!! {}".format(macaddress) if macaddress is not None: _valid_macaddress = True else: _valid_macaddress = False except Exception as er: - print "exception: ",er - _valid_macaddress = False + if ip_address == "http://10.0.2.56": + macaddress = "00:17:88:1a:2b:3c" + _valid_macaddress = True + else: + print "exception: ",er + _valid_macaddress = False else: # this device does not return IP, wait until it get the right mac try: diff --git a/Agents/LightingAgent/lighting/agent.py b/Agents/LightingAgent/lighting/agent.py index 8970aa3..d3030f6 100755 --- a/Agents/LightingAgent/lighting/agent.py +++ b/Agents/LightingAgent/lighting/agent.py @@ -211,6 +211,7 @@ def LightingAgent(config_path, **kwargs): print("device connection is not successful") self.changed_variables = dict() + for v in log_variables: if v in Light.variables: if not v in self.variables or self.variables[v] != Light.variables[v]: @@ -253,6 +254,7 @@ def LightingAgent(config_path, **kwargs): (self.get_variable('hexcolor'), agent_id)) self.con.commit() try: + if self.get_variable('status') == "ON": multiple_on_off_status = "" for dummyvar in range(self.get_variable('number_lights')): diff --git a/Agents/LightingAgent/lightingagent.launch.json b/Agents/LightingAgent/lightingagent.launch.json index 34e661d..80a4f51 100755 --- a/Agents/LightingAgent/lightingagent.launch.json +++ b/Agents/LightingAgent/lightingagent.launch.json @@ -10,7 +10,7 @@ "model": "Philips Hue", "type": "lighting", "api": "classAPI_DummyPhilipsHue", - "address": "http://38.68.232.161/api/newdeveloper/groups/0/", + "address": "http://10.0.2.56", "topic": "datalogger/log/building1/zone1/lightingload/PhilipsHue", "db_host": "localhost", "db_port": "5432", diff --git a/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py b/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py new file mode 100755 index 0000000..6276e10 --- /dev/null +++ b/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- +''' +Copyright (c) 2016, Virginia Tech +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + following conditions are met: +1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following +disclaimer in the documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +The views and conclusions contained in the software and documentation are those of the authors and should not be +interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. + +This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the +United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, +nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, +express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or +any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe +privately owned rights. + +Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or +otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States +Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors +expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. + +VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE +under Contract DE-EE0006352 + +#__author__ = "BEMOSS Team" +#__credits__ = "" +#__version__ = "2.0" +#__maintainer__ = "BEMOSS Team" +#__email__ = "aribemoss@gmail.com" +#__website__ = "www.bemoss.org" +#__created__ = "2014-09-12 12:04:50" +#__lastUpdated__ = "2016-03-14 11:23:33" +''' + +import time +import urllib2 +import json +from random import randint +from bemoss_lib.utils import rgb_cie +class API: + # 1. constructor : gets call every time when create a new class + # requirements for instantiation1. model, 2.type, 3.api, 4. address + def __init__(self,**kwargs): # default color is white + # Initialized common attributes + self.variables = kwargs + self.debug = True + self.set_variable('offline_count',0) + self.set_variable('connection_renew_interval',6000) #nothing to renew, right now + self.only_white_bulb = None + # to initialize the only white bulb value + self.getDeviceStatus() + def renewConnection(self): + pass + + def set_variable(self,k,v): # k=key, v=value + self.variables[k] = v + + def get_variable(self,k): + return self.variables.get(k, None) # default of get_variable is none + + # 2. Attributes from Attributes table + ''' + Attributes: + ------------------------------------------------------------------------------------------ + status GET POST Philips Hue ON/OFF status + brightness GET POST brightness percentage + effect GET POST Hue light effect 'none' or 'colorloop' + color GET POST temporary target heat setpoint (floating point in deg F) + ------------------------------------------------------------------------------------------ + + ''' + + # 3. Capabilites (methods) from Capabilities table + ''' + API3 available methods: + 1. getDeviceStatus() GET + 2. setDeviceStatus(postmsg) PUT + 3. identifyDevice() + ''' + + # ---------------------------------------------------------------------- + # getDeviceStatus(), getDeviceStatusJson(data), printDeviceStatus() + def getDeviceStatus(self): + getDeviceStatusResult = True + _hue_username = self.get_variable("username") + _url_append = '/api/'+_hue_username+'/groups/0/' + _urlData = self.get_variable("address").replace(':80', _url_append) + try: + '''_deviceUrl = urllib2.urlopen(_urlData, timeout=20) + print(" {0}Agent is querying its current status (status:{1}) please wait ...". + format(self.variables.get('agent_id', None), _deviceUrl.getcode())) + if (_deviceUrl.getcode() == 200): + self.getDeviceStatusJson(_deviceUrl.read().decode("utf-8")) #convert string data to JSON object then interpret it + if self.debug is True: + self.printDeviceStatus() + else: + print (" Received an error from server, cannot retrieve results " + str(_deviceUrl.getcode())) + getDeviceStatusResult = False''' + onoff = randint(0, 1) + if onoff == 1: + self.set_variable('status', "ON") + else: + self.set_variable('status', "OFF") + bri = randint(0, 255) + hue = randint(0, 255) + xy = [randint(0, 255), randint(0,255)] + ct = randint(0, 255) + sat = randint(0, 255) + effect = "effect" + mode = "warm" + name = "DummyHue" + # 2. brightness convert to % + self.set_variable('brightness',int(round(float(bri)*100/255,0))) + # update only white variable every round is necessary in case user add a/take away all color bulb(s). + self.only_white_bulb = False if hue else True + if self.only_white_bulb is False: + # 3. color convert to RGB 0-255 + self.set_variable('hue', hue) + self.set_variable('xy', xy) + self.set_variable('ct', ct) + x=xy[0] + y=xy[1] + self.set_variable('color', rgb_cie.ColorHelper.getRGBFromXYAndBrightness(x,y,bri)) + self.set_variable('hexcolor', '#%02x%02x%02x' % self.get_variable('color')) + # 4. saturation convert to % + self.set_variable('saturation',int(round(float(sat)*100/255,0))) + self.set_variable('effect',effect) + self.set_variable('colormode',mode) + self.set_variable('number_lights', [2]) + self.set_variable('name',name) + + + # Check the connectivity + if getDeviceStatusResult==True: + self.set_variable('offline_count', 0) + else: + self.set_variable('offline_count', self.get_variable('offline_count')+1) + except Exception as er: + print er + print('ERROR: classAPI_PhilipsHue failed to getDeviceStatus') + self.set_variable('offline_count',self.get_variable('offline_count')+1) + + ''' + def getDeviceStatusJson(self,data): + # Use the json module to load the string data into a dictionary + _theJSON = json.loads(data) + # 1. status + if _theJSON["action"]["on"] == True: + self.set_variable('status',"ON") + else: + self.set_variable('status',"OFF") + # 2. brightness convert to % + self.set_variable('brightness',int(round(float(_theJSON["action"]["bri"])*100/255,0))) + # update only white variable every round is necessary in case user add a/take away all color bulb(s). + self.only_white_bulb = False if 'hue' in _theJSON["action"].keys() else True + if self.only_white_bulb is False: + # 3. color convert to RGB 0-255 + self.set_variable('hue', _theJSON["action"]["hue"]) + self.set_variable('xy', _theJSON["action"]["xy"]) + self.set_variable('ct', _theJSON["action"]["ct"]) + x=_theJSON["action"]["xy"][0] + y=_theJSON["action"]["xy"][1] + self.set_variable('color', rgb_cie.ColorHelper.getRGBFromXYAndBrightness(x,y,_theJSON["action"]["bri"])) + self.set_variable('hexcolor', '#%02x%02x%02x' % self.get_variable('color')) + # 4. saturation convert to % + self.set_variable('saturation',int(round(float(_theJSON["action"]["sat"])*100/255,0))) + self.set_variable('effect',_theJSON["action"]["effect"]) + self.set_variable('colormode',_theJSON["action"]["colormode"]) + for k in _theJSON["lights"]: + self.set_variable("lights{}".format(k), k) + self.set_variable('number_lights', len(_theJSON["lights"])) + self.set_variable('name',_theJSON["name"]) + ''' + def printDeviceStatus(self): + # now we can access the contents of the JSON like any other Python object + print(" the current status is as follows:") + print(" name = {}".format(self.get_variable('name'))) + #print(" number_lights = {}".format(self.get_variable('number_lights'))) + print(" status = {}".format(self.get_variable('status'))) + print(" brightness = {}".format(self.get_variable('brightness'))) + if self.only_white_bulb is False: + print(" hue = {}".format(self.get_variable('hue'))) + print(" color = {}".format(self.get_variable('color'))) + print(" saturation = {}".format(self.get_variable('saturation'))) + print(" xy= {}".format(self.get_variable('xy'))) + print(" ct = {}".format(self.get_variable('ct'))) + print(" effect = {}".format(self.get_variable('effect'))) + print(" colormode = {}\n".format(self.get_variable('colormode'))) + # ---------------------------------------------------------------------- + # setDeviceStatus(postmsg), isPostmsgValid(postmsg), convertPostMsg(postmsg) + def setDeviceStatus(self, postmsg): + setDeviceStatusResult = True + ''' + #Ex. postmsg = {"on":True,"bri":100,"hue":50260,"sat":200} + _hue_username = self.get_variable("username") + _url_append = '/api/'+_hue_username+'/groups/0/' + _urlData = self.get_variable("address").replace(':80', _url_append) + if self.isPostMsgValid(postmsg) == True: #check if the data is valid + _data = json.dumps(self.convertPostMsg(postmsg)) + _data = _data.encode(encoding='utf_8') + _request = urllib2.Request(_urlData+'action') + _request.add_header('Content-Type','application/json') + _request.get_method = lambda: 'PUT' + try: + _f = urllib2.urlopen(_request, _data, timeout=20) #when include data this become a POST command + print(" {0}Agent for {1} is changing its status with {2} please wait ..." + .format(self.variables.get('agent_id', None), self.variables.get('model', None), postmsg)) + print(" after send a POST request: {}".format(_f.read().decode('utf-8'))) + except: + print("ERROR: classAPI_PhilipsHue connection failure! @ setDeviceStatus") + setDeviceStatusResult = False + else: + print("The POST message is invalid, try again\n") + ''' + return setDeviceStatusResult + + def isPostMsgValid(self,postmsg): #check validity of postmsg + dataValidity = True + #TODO algo to check whether postmsg is valid + return dataValidity + + + def convertPostMsg(self,postmsg): + msgToDevice = {} + datacontainsRGB=False + if 'color' in postmsg.keys(): + datacontainsRGB=True + + for k,v in postmsg.items(): + if k == 'status': + if postmsg.get('status') == "ON": + msgToDevice['on'] = True + elif postmsg.get('status') == "OFF": + msgToDevice['on'] = False + elif k == 'brightness': + msgToDevice['bri'] = int(round(float(postmsg.get('brightness'))*255.0/100.0,0)) + elif k == 'color': + if self.only_white_bulb is False: + print(type(postmsg['color'])) + _red = postmsg['color'][0] + _green = postmsg['color'][1] + _blue = postmsg['color'][2] + _xyY = rgb_cie.ColorHelper.getXYPointFromRGB(_red, _green, _blue) + msgToDevice['xy'] = [_xyY.x, _xyY.y] + #msgToDevice['bri']= int(round(_xyY.y*255,0)) + elif k == 'hue': + if datacontainsRGB==False and self.only_white_bulb is False: + msgToDevice['hue'] = postmsg.get('hue') + elif k == 'saturation': + if datacontainsRGB==False and self.only_white_bulb is False: + msgToDevice['sat'] = int(round(float(postmsg.get('saturation'))*255.0/100.0,0)) + else: + msgToDevice[k] = v + return msgToDevice + # ---------------------------------------------------------------------- + # method3: Identify this lights (Physically) + def identifyDevice(self): + identifyDeviceResult = False + print(" {0}Agent for {1} is identifying itself by doing colorloop. Please observe your lights" + .format(self.variables.get('agent_id',None), self.variables.get('model',None))) + try: + devicewasoff=0 + if self.get_variable('status')=="OFF": + devicewasoff=1 + self.setDeviceStatus({"status":"ON"}) + elif self.only_white_bulb: + self.setDeviceStatus({"status":"OFF"}) + if self.only_white_bulb is False: + self.setDeviceStatus({"effect": "colorloop"}) + if self.only_white_bulb: + time_iden = 3 + else: + time_iden = 10 #time to do identification + t0 = time.time() + self.seconds = time_iden + while time.time() - t0 <= time_iden: + self.seconds = self.seconds - 1 + print("wait: {} sec".format(self.seconds)) + time.sleep(1) + self.setDeviceStatus({"effect": "none"}) + if devicewasoff==1: + self.setDeviceStatus({"status":"OFF"}) + else: + self.setDeviceStatus({"status":"ON"}) + identifyDeviceResult = True + except: + print("ERROR: classAPI_PhilipsHue connection failure! @ identifyDevice") + return identifyDeviceResult + # ---------------------------------------------------------------------- + +# This main method will not be executed when this class is used as a module +def main(): + # create an object with initialized data from DeviceDiscovery Agent + # requirements for instantiation1. model, 2.type, 3.api, 4. address + PhilipsHue = API(model='Philips Hue',type='wifiLight',api='API3',address='http://192.168.10.14:80',username='acquired username',agent_id='LightingAgent') + print("{0}agent is initialzed for {1} using API={2} at {3}".format(PhilipsHue.get_variable('type'),PhilipsHue.get_variable('model'),PhilipsHue.get_variable('api'),PhilipsHue.get_variable('address'))) + + PhilipsHue.getDeviceStatus() + PhilipsHue.setDeviceStatus({"status":"ON","color":(155,113,255)}) + PhilipsHue.identifyDevice() + + +if __name__ == "__main__": main() \ No newline at end of file -- GitLab From 4ff917ac844af199fe7f7f3217dcf9f7dac2d63d Mon Sep 17 00:00:00 2001 From: mchlburton Date: Wed, 22 Mar 2017 04:00:05 +0000 Subject: [PATCH 2/4] Added influx agent for lighting and basic API for influx, exception handling and config files still need to be done --- Agents/LightingAgent/lighting/influxagent.py | 556 +++++++++++++++++++ bemoss_lib/databases/influxAPI/InfluxDB.py | 217 ++++++++ bemoss_lib/databases/influxAPI/__init__.py | 1 + 3 files changed, 774 insertions(+) create mode 100755 Agents/LightingAgent/lighting/influxagent.py create mode 100755 bemoss_lib/databases/influxAPI/InfluxDB.py create mode 100644 bemoss_lib/databases/influxAPI/__init__.py diff --git a/Agents/LightingAgent/lighting/influxagent.py b/Agents/LightingAgent/lighting/influxagent.py new file mode 100755 index 0000000..b1e2dc1 --- /dev/null +++ b/Agents/LightingAgent/lighting/influxagent.py @@ -0,0 +1,556 @@ +import sys +import json +import logging +import importlib +import datetime +import socket + +from volttron.platform.agent import BaseAgent, PublishMixin, periodic +from volttron.platform.agent import utils, matching +from volttron.platform.messaging import headers as headers_mod +from bemoss_lib.communication.Email import EmailService +from bemoss_lib.communication.sms import SMSService +import psycopg2 +import psycopg2.extras +import settings +from influxdb import InfluxDBClient +from bemoss_lib.databases.cassandraAPI import cassandraDB +from bemoss_lib.databases.influxAPI import InfluxDB + +utils.setup_logging() +_log = logging.getLogger(__name__) + +# Step1: Agent Initialization +def LightingAgent(config_path, **kwargs): + config = utils.load_config(config_path) + + def get_config(name): + try: + kwargs.pop(name) + except KeyError: + return config.get(name, '') + + #1. @params agent + agent_id = get_config('agent_id') + device_monitor_time = get_config('device_monitor_time') + max_monitor_time = int(settings.DEVICES['max_monitor_time']) + + debug_agent = False + #List of all keywords for a lighting agent + agentAPImapping = dict(status=[], brightness=[], color=[], saturation=[],power=[]) + log_variables = dict(status='text',brightness='double',hexcolor='text',power='double',offline_count='int') + #2. @params device_info + #TODO correct the launchfile in Device Discovery Agent + building_name = get_config('building_name') + zone_id = get_config('zone_id') + model = get_config('model') + if model == "Philips hue bridge": + hue_username = get_config('username') + else: + hue_username = '' + device_type = get_config('type') + address = get_config('address') + _address = address + _address = _address.replace('http://', '') + _address = _address.replace('https://', '') + try: # validate whether or not address is an ip address + socket.inet_aton(_address) + ip_address = _address + except socket.error: + ip_address = None + identifiable = get_config('identifiable') + + #3. @params agent & DB interfaces + #TODO delete variable topic + topic = get_config('topic') + + #TODO get database parameters from settings.py, add db_table for specific table + db_host = get_config('db_host') + db_port = get_config('db_port') + db_database = get_config('db_database') + db_user = get_config('db_user') + db_password = get_config('db_password') + db_table_lighting = settings.DATABASES['default']['TABLE_lighting'] + db_table_active_alert = settings.DATABASES['default']['TABLE_active_alert'] + db_table_bemoss_notify = settings.DATABASES['default']['TABLE_bemoss_notify'] + db_table_alerts_notificationchanneladdress = settings.DATABASES['default'][ + 'TABLE_alerts_notificationchanneladdress'] + db_table_temp_time_counter = settings.DATABASES['default']['TABLE_temp_time_counter'] + db_table_priority = settings.DATABASES['default']['TABLE_priority'] + + #construct _topic_Agent_UI based on data obtained from DB + _topic_Agent_UI_tail = building_name + '/' + str(zone_id) + '/' + agent_id + + #4. @params device_api + api = get_config('api') + apiLib = importlib.import_module("DeviceAPI.classAPI."+api) + + #4.1 initialize device object + Light = apiLib.API(model=model, device_type=device_type, api=api, address=address, username = hue_username, agent_id=agent_id, db_host=db_host, db_port=db_port, db_user=db_user, db_password=db_password, db_database=db_database) + print("{0}agent is initialized for {1} using API={2} at {3}".format(agent_id, Light.get_variable('model'), + Light.get_variable('api'), + Light.get_variable('address'))) + + #5. @params notification_info + send_notification = True + email_fromaddr = settings.NOTIFICATION['email']['fromaddr'] + email_username = settings.NOTIFICATION['email']['username'] + email_password = settings.NOTIFICATION['email']['password'] + email_mailServer = settings.NOTIFICATION['email']['mailServer'] + notify_heartbeat = settings.NOTIFICATION['heartbeat'] + + class Agent(PublishMixin, BaseAgent): + """Agent for querying WeatherUndergrounds API""" + + #1. agent initialization + def __init__(self, **kwargs): + super(Agent, self).__init__(**kwargs) + #1. initialize all agent variables + self.variables = kwargs + self.valid_data = False + self._keep_alive = True + self.flag = 1 + self.topic = topic + self.time_send_notification = 0 + self.event_ids = list() + self.time_sent_notifications = {} + self.notify_heartbeat = notify_heartbeat + self.ip_address = ip_address if ip_address != None else None + self.changed_variables = None + self.lastUpdateTime = None + self.already_offline = False + + #2. setup connection with db -> Connect to bemossdb database + try: + self.con = psycopg2.connect(host=db_host, port=db_port, database=db_database, user=db_user, + password=db_password) + self.cur = self.con.cursor() # open a cursor to perfomm database operations + print("{} connects to the database name {} successfully".format(agent_id, db_database)) + except: + print("ERROR: {} fails to connect to the database name {}".format(agent_id, db_database)) + + host = 'localhost' + port = 8086 + user = 'root' + password = 'root' + dbname = 'example' + dbuser = 'mike' + dbuser_password = 'bemoss' + client = InfluxDBClient(host, port, user, password, dbname) + + #3. send notification to notify building admin + self.send_notification = send_notification + self.subject = 'Message from ' + agent_id + + #These set and get methods allow scalability + def set_variable(self,k,v): # k=key, v=value + self.variables[k] = v + + def get_variable(self,k): + return self.variables.get(k, None) # default of get_variable is none + + #2. agent setup method + def setup(self): + super(Agent, self).setup() + #Do a one time push when we start up so we don't have to wait for the periodic + self.timer(1, self.deviceMonitorBehavior) + if identifiable == "True": Light.identifyDevice() + + @periodic(max_monitor_time) #save all data every max_monitor_time + def backupSaveData(self): + try: + Light.getDeviceStatus() + cassandraDB.insert(agent_id,Light.variables,log_variables) + print('Every Data Pushed to cassandra') + except Exception as er: + print("ERROR: {} fails to update cassandra database".format(agent_id)) + print er + + #3. deviceMonitorBehavior (TickerBehavior) + @periodic(device_monitor_time) + def deviceMonitorBehavior(self): + #step1: get current status of a thermostat, then map keywords and variables to agent knowledge + try: + Light.getDeviceStatus() + except: + print("device connection is not successful") + + self.changed_variables = dict() + + for v in log_variables: + if v in Light.variables: + if not v in self.variables or self.variables[v] != Light.variables[v]: + self.variables[v] = Light.variables[v] + self.changed_variables[v] = log_variables[v] + else: + if v not in self.variables: #it won't be in self.variables either (in the first time) + self.changed_variables[v] = log_variables[v] + self.variables[v] = None + try: + # Step: Check if any Device is OFFLINE + self.cur.execute("SELECT id FROM " + db_table_active_alert + " WHERE event_trigger_id=%s", ('5',)) + if self.cur.rowcount != 0: + self.device_offline_detection() + + # Put scan time in database + _time_stamp_last_scanned = datetime.datetime.now() + self.cur.execute("UPDATE "+db_table_lighting+" SET last_scanned_time=%s " + "WHERE lighting_id=%s", + (_time_stamp_last_scanned, agent_id)) + self.con.commit() + except Exception as er: + print er + print("ERROR: {} failed to update last scanned time".format(agent_id)) + + if len(self.changed_variables) == 0: + print 'nothing changed' + return + + self.updateUI() + #step4: update PostgresQL (meta-data) database + try: + self.cur.execute("UPDATE "+db_table_lighting+" SET status=%s WHERE lighting_id=%s", + (self.get_variable('status'), agent_id)) + self.con.commit() + self.cur.execute("UPDATE "+db_table_lighting+" SET brightness=%s WHERE lighting_id=%s", + (self.get_variable('brightness'), agent_id)) + self.con.commit() + self.cur.execute("UPDATE "+db_table_lighting+" SET color=%s WHERE lighting_id=%s", + (self.get_variable('hexcolor'), agent_id)) + self.con.commit() + try: + + if self.get_variable('status') == "ON": + multiple_on_off_status = "" + for dummyvar in range(self.get_variable('number_lights')): + multiple_on_off_status += "1" + self.cur.execute("UPDATE "+db_table_lighting+" SET multiple_on_off=%s WHERE lighting_id=%s", + (multiple_on_off_status, agent_id)) + self.con.commit() + else: # status is off + multiple_on_off_status = "" + for dummyvar in range(self.get_variable('number_lights')): + multiple_on_off_status += "0" + self.cur.execute("UPDATE "+db_table_lighting+" SET multiple_on_off=%s WHERE lighting_id=%s", + (multiple_on_off_status, agent_id)) + self.con.commit() + except: + print("{} this agent has no multiple_on_off_status".format(agent_id)) + #TODO check ip_address + if self.ip_address != None: + psycopg2.extras.register_inet() + _ip_address = psycopg2.extras.Inet(self.ip_address) + self.cur.execute("UPDATE "+db_table_lighting+" SET ip_address=%s WHERE lighting_id=%s", + (_ip_address, agent_id)) + self.con.commit() + + + if self.get_variable('offline_count') >= 3: + self.cur.execute("UPDATE "+db_table_lighting+" SET network_status=%s WHERE lighting_id=%s", + ('OFFLINE', agent_id)) + self.con.commit() + if self.already_offline is False: + self.already_offline = True + _time_stamp_last_offline = str(datetime.datetime.now()) + self.cur.execute("UPDATE "+db_table_lighting+" SET last_offline_time=%s WHERE lighting_id=%s", + (_time_stamp_last_offline, agent_id)) + self.con.commit() + else: + self.already_offline = False + self.cur.execute("UPDATE "+db_table_lighting+" SET network_status=%s WHERE lighting_id=%s", + ('ONLINE', agent_id)) + self.con.commit() + print("{} updates database name {} during deviceMonitorBehavior successfully".format(agent_id, db_database)) + except: + print("ERROR: {} fails to update the database name {}".format(agent_id,db_database)) + + #step6: Try to update Influxdb + try: + time = str(datetime.datetime.utcnow()) + InfluxDB.insert(agent_id, self.variables, log_variables) + print "{} success INFLUX update".format(agent_id) + + except: + print("ERROR: {} fails to update INFLUX".format(agent_id)) + + #step6: debug agent knowledge + if debug_agent: + print("printing agent's knowledge") + for k,v in self.variables.items(): + print (k,v) + print('') + + if debug_agent: + print("printing agentAPImapping's fields") + for k, v in agentAPImapping.items(): + if k is None: + agentAPImapping.update({v: v}) + agentAPImapping.pop(k) + for k, v in agentAPImapping.items(): + print (k, v) + + def device_offline_detection(self): + self.cur.execute("SELECT nickname FROM " + db_table_lighting + " WHERE lighting_id=%s", + (agent_id,)) + print agent_id + if self.cur.rowcount != 0: + device_nickname=self.cur.fetchone()[0] + print device_nickname + else: + device_nickname = '' + _db_notification_subject = 'BEMOSS Device {} {} went OFFLINE!!!'.format(device_nickname,agent_id) + _email_subject = '#Attention: BEMOSS Device {} {} went OFFLINE!!!'.format(device_nickname,agent_id) + _email_text = '#Attention: BEMOSS Device {} {} went OFFLINE!!!'.format(device_nickname,agent_id) + self.cur.execute("SELECT network_status FROM " + db_table_lighting + " WHERE lighting_id=%s", + (agent_id,)) + self.network_status = self.cur.fetchone()[0] + print self.network_status + if self.network_status=="OFFLINE": + print "Found Device OFFLINE" + self.cur.execute("SELECT id FROM " + db_table_active_alert + " WHERE event_trigger_id=%s", ('5',)) + self._active_alert_id = self.cur.fetchone()[0] + self.cur.execute( + "SELECT id FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(self._active_alert_id), agent_id,)) + # If this is the first detected violation + if self.cur.rowcount == 0: + print "first device offline detected" + # create counter in DB + self.cur.execute( + "INSERT INTO " + db_table_temp_time_counter + " VALUES(DEFAULT,%s,%s,%s,%s,%s)", + (self._active_alert_id, agent_id, '0', '0', '0')) + self.con.commit() + self.send_device_notification_db(_db_notification_subject, self._active_alert_id) + + # Send email if exist + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'1')) + if self.cur.rowcount != 0: + self._alert_email = self.cur.fetchall() + for single_email_1 in self._alert_email: + print single_email_1[0] + self.send_device_notification_email(single_email_1[0], _email_subject, _email_text) + + # Send SMS if provided by user + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'2')) + if self.cur.rowcount != 0: + self._alert_sms_phone_no = self.cur.fetchall() + for single_number in self._alert_sms_phone_no: + print single_number[0] + self.send_device_notification_sms(single_number[0], _email_subject) + else: + self.priority_counter(self._active_alert_id, _db_notification_subject) + else: + print "The Device is ONLINE" + + def send_device_notification_db(self, _tampering_device_msg, _active_alert_id): + print " INSIDE send_device_notification_db" + + # Find the priority id + self.cur.execute( + "SELECT priority_id FROM " + db_table_active_alert + " WHERE id=%s", + (str(_active_alert_id),)) + self.priority_id = self.cur.fetchone()[0] + + # Find the priority level + self.cur.execute( + "SELECT priority_level FROM " + db_table_priority + " WHERE id=%s", + str(self.priority_id)) + self.priority_level = self.cur.fetchone()[0] + + # Insert into DB the notification + self.cur.execute("INSERT INTO " + db_table_bemoss_notify + " VALUES(DEFAULT,%s,%s,%s,%s)", + (_tampering_device_msg, + str(datetime.datetime.now()), 'Alert', str(self.priority_level))) + self.con.commit() + + # Find the number of notifications sent for the same alert and device + self.cur.execute( + "SELECT no_notifications_sent FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(_active_alert_id), agent_id,)) + self._no_notifications_sent = self.cur.fetchone()[0] + self.con.commit() + print self._no_notifications_sent + self._no_notifications_sent = int(self._no_notifications_sent) + 1 + print self._no_notifications_sent + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET no_notifications_sent=%s WHERE alert_id=%s AND device_id=%s", + (str(self._no_notifications_sent), str(_active_alert_id), agent_id,)) + self.con.commit() + + def send_device_notification_email(self, _active_alert_email, _email_subject, _email_text): + emailService = EmailService() + # Send Email + emailService.sendEmail(email_fromaddr, _active_alert_email, email_username, + email_password, _email_subject, _email_text, email_mailServer) + + def send_device_notification_sms(self, _active_alert_phone_number_misoperation, _sms_subject): + print "INSIDE send_device_notification_sms" + print _active_alert_phone_number_misoperation + smsService = SMSService() + smsService.sendSMS(email_fromaddr, _active_alert_phone_number_misoperation, email_username, email_password, _sms_subject, email_mailServer) + + # TODO: this function is in all other agents, need to get rid of those redundent codes. + def priority_counter(self, _active_alert_id, _tampering_device_msg_1): + # Find the priority counter limit then compare it with priority_counter in priority table + # if greater than the counter limit then send notification and reset the value + # else just increase the counter + print "INSIDE the priority_counter" + _email_subject = '#Attention: BEMOSS Device {} went OFFLINE!!!'.format(agent_id) + _email_text = '#Attention: BEMOSS Device {} went OFFLINE!!!'.format(agent_id) + self.cur.execute( + "SELECT priority_counter FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(_active_alert_id), agent_id,)) + self.priority_count = self.cur.fetchone()[0] + self.con.commit() + + # Find the priority id from active alert table + self.cur.execute( + "SELECT priority_id FROM " + db_table_active_alert + " WHERE id=%s", + (str(_active_alert_id),)) + self.priority_id = self.cur.fetchone()[0] + self.con.commit() + + # Find the priority limit from the priority table + self.cur.execute( + "SELECT priority_counter FROM " + db_table_priority + " WHERE id=%s", + (str(self.priority_id),)) + self.priority_limit = self.cur.fetchone()[0] + self.con.commit() + + # If the counter reaches the limit + if int(self.priority_count) > int(self.priority_limit): + self.send_device_notification_db(_tampering_device_msg_1, _active_alert_id) + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET priority_counter=%s WHERE alert_id=%s AND device_id=%s", + ('0', str(_active_alert_id), agent_id,)) + self.con.commit() + + print "INSIDE the priority counter exceeded the defined range" + # Send email if exist + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'1')) + if self.cur.rowcount != 0: + self._alert_email = self.cur.fetchall() + for single_email_1 in self._alert_email: + print single_email_1[0] + self.send_device_notification_email(single_email_1[0], _email_subject, _email_text) + + # Send SMS if provided by user + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'2')) + if self.cur.rowcount != 0: + self._alert_sms_phone_no = self.cur.fetchall() + for single_number in self._alert_sms_phone_no: + print single_number[0] + self.send_device_notification_sms(single_number[0], _email_subject) + else: + self.priority_count = int(self.priority_count) + 1 + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET priority_counter=%s WHERE alert_id=%s AND device_id=%s", + (str(self.priority_count), str(_active_alert_id), agent_id,)) + + def updateUI(self): + topic = '/agent/ui/'+device_type+'/device_status_response/'+_topic_Agent_UI_tail + headers = { + 'AgentID': agent_id, + headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.JSON, + headers_mod.FROM: agent_id, + headers_mod.TO: 'ui' + } + _data={'status': self.get_variable('status'), + 'brightness': self.get_variable('brightness'), 'color': self.get_variable('hexcolor'), + 'saturation': self.get_variable('saturation')} + message = json.dumps(_data) + message = message.encode(encoding='utf_8') + self.publish(topic, headers, message) + + #4. updateUIBehavior (generic behavior) + @matching.match_exact('/ui/agent/'+device_type+'/device_status/'+_topic_Agent_UI_tail) + def updateUIBehavior(self,topic,headers,message,match): + print "{} agent got\nTopic: {topic}".format(self.get_variable("agent_id"),topic=topic) + print "Headers: {headers}".format(headers=headers) + print "Message: {message}\n".format(message=message) + #reply message + self.updateUI() + + + #5. deviceControlBehavior (generic behavior) + @matching.match_exact('/ui/agent/'+device_type+'/update/'+_topic_Agent_UI_tail) + def deviceControlBehavior(self,topic,headers,message,match): + #print received message from UI + print "{} agent got\nTopic: {topic}".format(self.get_variable("agent_id"),topic=topic) + print "Headers: {headers}".format(headers=headers) + print "Message: {message}\n".format(message=message) + + #prepare message content for response + topic = '/agent/ui/'+device_type+'/update_response/'+_topic_Agent_UI_tail + headers = { + 'AgentID': agent_id, + headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.PLAIN_TEXT, + headers_mod.FROM: agent_id, + headers_mod.TO: 'ui' + } + #step1: change device status according to the receive message + if self.isPostmsgValid(message[0]): + setDeviceStatusResult = Light.setDeviceStatus(json.loads(message[0])) #convert received message from string to JSON + #TODO need to do additional checking whether the device setting is actually success!!!!!!!! + #step3: send reply message back to the UI + if setDeviceStatusResult: + message = 'success' + else: + message = 'failure' + else: + print("The POST message is invalid, check brightness, status or color setting and try again\n") + message = 'failure' + self.publish(topic, headers, message) + self.deviceMonitorBehavior() + + def isPostmsgValid(self, postmsg): # check validity of postmsg + dataValidity = True + try: + _data = json.loads(postmsg) + if ("brightness" in _data.keys()) or ("status" in _data.keys()) or ("color" in _data.keys()): + dataValidity = True + else: + dataValidity = False + except: + dataValidity = False + print("dataValidity failed to validate data coming from UI") + return dataValidity + + #6. deviceIdentifyBehavior(generic behavior) + @matching.match_exact('/ui/agent/'+device_type+'/identify/'+_topic_Agent_UI_tail) + def deviceIdentifyBehavior(self,topic,headers,message,match): + print "{} agent got\nTopic: {topic}".format(self.get_variable("agent_id"),topic=topic) + print "Headers: {headers}".format(headers=headers) + print "Message: {message}\n".format(message=message) + #step1: change device status according to the receive message + identifyDeviceResult = Light.identifyDevice() + #TODO need to do additional checking whether the device setting is actually success!!!!!!!! + #step2: send reply message back to the UI + topic = '/agent/ui/identify_response/'+device_type+'/'+_topic_Agent_UI_tail + headers = { + 'AgentID': agent_id, + headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.PLAIN_TEXT, + } + if identifyDeviceResult: + message = 'success' + else: + message = 'failure' + self.publish(topic, headers, message) + + + + Agent.__name__ = 'LightingAgent' + return Agent(**kwargs) + +def main(argv=sys.argv): + '''Main method called by the eggsecutable.''' + utils.default_main(LightingAgent, + description='Lighting agent', + argv=argv) + +if __name__ == '__main__': + try: + sys.exit(main()) + except KeyboardInterrupt: + pass + diff --git a/bemoss_lib/databases/influxAPI/InfluxDB.py b/bemoss_lib/databases/influxAPI/InfluxDB.py new file mode 100755 index 0000000..4115a1e --- /dev/null +++ b/bemoss_lib/databases/influxAPI/InfluxDB.py @@ -0,0 +1,217 @@ +import pandas +import numpy +import datetime +from influxdb import InfluxDBClient +connection_established = False + +#Global variables +host = 'localhost' +port = 8086 +user = 'root' +password = 'root' +dbname = 'bemoss' +client = InfluxDBClient(host, port, user, password, dbname) + + +def makeConnection(): + try: + global host, port, user, password, dbname, connection_established + client = InfluxDBClient(host, port, user, password, dbname) + database_list = client.get_list_database() + for db in database_list: + if dbname in db['name']: + connection_established = True + return True + + except Exception as er: + print 'Cannot establish connection' + raise er + +try: + makeConnection() +except Exception as er: + print 'Connection cannot be established' + print er + + +def insert(agentID, all_vars, log_vars, cur_time=None): + """ + :param agentID: string. Data will be inserted to table named B + :param all_vars: dictionary (usually APIobject.variables). It contains all the variables and their values. + :param log_vars: dictionary (usually APIobject.log_variables). It contains variables to be logged and their datatypes + :return: 0, if successful + + timestamp is generated based on current utc time. UTC time is put in cassandra database. If the table by the agent + name doesn't exist, table is created. **If error occurs because the name of variables/data_type has changed, the old + table will be deleted, and a new one with currect variable names/datatype will be created**. + + **Need to avoid doing this in final version.Feature made to help during development + """ + global connection_established + + if not connection_established: + makeConnection() + + client = InfluxDBClient(host, port, user, password, dbname) + + if cur_time == None: + cur_time = datetime.datetime.utcnow() + + + influx_json = [] + for var in log_vars: + reading = all_vars.get(var) + if not reading == None: + var_json =[ + { + "measurement": var, + "tags": { + "host": "bemoss", + "agent": agentID + }, + "time": cur_time, + "fields": { + "value": reading + } + } + ] + if influx_json == []: + influx_json.extend(var_json) + else: + influx_json.extend(var_json) + + retry = True + while retry: + client.write_points(influx_json) + retry = False + + return 0 + + +def delete(agentID,startDate=None, endDate=None): + """ + Performs deletion of data. if statDate and endDate is omitted, all device data is deleted. + :param agentID: The on which the operation is to be performed + :param startDate: dateTime.date object (local timezone). The begining date from which to delete. Must be supplied + unless trying to delete the whole table + :param endDate: datetime.date object (local timezone). The endDate upto which to delete. The default is to upto today. + must be supplied unless trying to delete the whole table + :return: 0, if successful + + """ + global connection_established + if not connection_established: + makeConnection() + + if endDate==None and startDate==None: + try: + delete_query = 'DROP SERIES WHERE agentID = {0}'.format(agentID) + client.query(delete_query) + return 0 + except Exception: + connection_established = False #Try to establish connection again next time + raise + + elif startDate==None: + print "startTime compulsory when endTime is given" + return -1 + elif endDate==None: + endDate=datetime.datetime.utcnow() + + try: + result = client.query("SHOW MEASUREMENTS ON {0} WHERE agent = '{1}';".format(dbname, agentID)) + except: + connection_established = False #Try to establish connection again next time + raise + + daterange = pandas.date_range(startDate,endDate) + date_local="" + try: + x = 1 + #client.query("select value, {0} from {1} WHERE time >= '{2}' AND time <= '{3}';".format(agentID, varStr, startTime,endTime)) + + + except Exception as e: + print "sorry, not deleted all. deleted upto:%s" % date_local + connection_established = False + return -1 + + return 0 + + + + + +def retrieve(agentID, vars=None, startTime=None, endTime=None,export=False): + """Function to retrieve Data from Influx. \n + :param agentID: must supply, since each agentID is associated with a table in database. + :param vars: supplied as a list of strings. It represents the variables to be retrieved from the table. + eg. ['temperature','humidity']. If any of the variables don't match the column name, the result + will contain -1. If not supplied, the default is to return the complete row \n\n + :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format. It marks the + beginning for the range. If not supplied, will be taken 24-hours before endTime + :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format.It marks the end of the + range. If not supplied, will be taken as the currentTime. + :return: A numpy 2-dimensional array. The first column contains the time stamps for the entries, and the rest of the + columns correspond to variables queried. Each row corresponds to + various table entries. Also returns vars, which serves as a column index. It is identical to passed vars + except 'time' is an extra entry at the front. If the query fails, -1 is returned (and no exception raised) + + """ + + global connection_established + if not connection_established: + makeConnection() + + client = InfluxDBClient(host, port, user, password, dbname) + + if startTime==None: + startTime=str(datetime.datetime.utcnow()-datetime.timedelta(hours=24)) + + if endTime==None: + endTime = str(datetime.datetime.utcnow()) + + if vars==None: + varStr='' + vars=[] + try: + result = client.query("SHOW MEASUREMENTS ON {0} WHERE agent = '{1}';".format(dbname, agentID)) + except: + connection_established = False #Try to establish connection again next time + raise + result = result.get_points('measurements') + + for var in result: + measurement = str(var['name']) + print measurement + varStr += measurement + ', ' + vars.append(measurement) + varStr = varStr[:-2] #to get rid of the last ', ' + else: + varStr = '' + for var in vars: + varStr += var + ', ' + + varStr = varStr[:-2] #to get rid of the last ', ' + + total_result = [] + for var in vars: + result = client.query("select value, {0} from {1} WHERE time >= '{2}' AND time <= '{3}';".format(agentID, var, startTime,endTime)) + values = [] + entries = result.get_points(var) + if total_result == []: + time = [] + for entry in entries: + time.append(entry['time']) + values.append(entry['value']) + total_result.append(time) + else: + for entry in entries: + values.append(entry['value']) + + total_result += [values] + + vars.insert(0, 'time') + total_result = numpy.array(total_result) + total_result = numpy.transpose(total_result) + return vars, total_result \ No newline at end of file diff --git a/bemoss_lib/databases/influxAPI/__init__.py b/bemoss_lib/databases/influxAPI/__init__.py new file mode 100644 index 0000000..b9675ce --- /dev/null +++ b/bemoss_lib/databases/influxAPI/__init__.py @@ -0,0 +1 @@ +__author__ = 'mike' -- GitLab From f5a0617ca3d1f9dde977a2f85a1399f7e15fac73 Mon Sep 17 00:00:00 2001 From: mchlburton Date: Thu, 23 Mar 2017 11:08:45 -0400 Subject: [PATCH 3/4] Fixed problem with query statement --- bemoss_lib/databases/influxAPI/InfluxDB.py | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/bemoss_lib/databases/influxAPI/InfluxDB.py b/bemoss_lib/databases/influxAPI/InfluxDB.py index 4115a1e..b6dc753 100755 --- a/bemoss_lib/databases/influxAPI/InfluxDB.py +++ b/bemoss_lib/databases/influxAPI/InfluxDB.py @@ -88,7 +88,7 @@ def insert(agentID, all_vars, log_vars, cur_time=None): return 0 -def delete(agentID,startDate=None, endDate=None): +def delete(agentID,startTime=None, endTime=None): """ Performs deletion of data. if statDate and endDate is omitted, all device data is deleted. :param agentID: The on which the operation is to be performed @@ -103,20 +103,20 @@ def delete(agentID,startDate=None, endDate=None): if not connection_established: makeConnection() - if endDate==None and startDate==None: + if endTime==None and startTime==None: try: - delete_query = 'DROP SERIES WHERE agentID = {0}'.format(agentID) + delete_query = "DROP SERIES WHERE agent = '{0}'".format(agentID) client.query(delete_query) return 0 except Exception: connection_established = False #Try to establish connection again next time raise - elif startDate==None: + elif startTime==None: print "startTime compulsory when endTime is given" return -1 - elif endDate==None: - endDate=datetime.datetime.utcnow() + elif endTime==None: + endTime = str(datetime.datetime.utcnow()) try: result = client.query("SHOW MEASUREMENTS ON {0} WHERE agent = '{1}';".format(dbname, agentID)) @@ -124,17 +124,17 @@ def delete(agentID,startDate=None, endDate=None): connection_established = False #Try to establish connection again next time raise - daterange = pandas.date_range(startDate,endDate) - date_local="" - try: - x = 1 - #client.query("select value, {0} from {1} WHERE time >= '{2}' AND time <= '{3}';".format(agentID, varStr, startTime,endTime)) + result = result.get_points('measurements') + try: + for var in result: + measurement = str(var['name']) + client.query("DELETE FROM {0} WHERE agent = '{1}' AND time >= '{2}' AND time <= '{3}';".format(measurement, agentID, startTime, endTime)) - except Exception as e: - print "sorry, not deleted all. deleted upto:%s" % date_local + except: connection_established = False - return -1 + raise + return 0 @@ -196,7 +196,7 @@ def retrieve(agentID, vars=None, startTime=None, endTime=None,export=False): total_result = [] for var in vars: - result = client.query("select value, {0} from {1} WHERE time >= '{2}' AND time <= '{3}';".format(agentID, var, startTime,endTime)) + result = client.query("select value from {0} WHERE agent = '{1}' AND time >= '{2}' AND time <= '{3}';".format(var, agentID, startTime,endTime)) values = [] entries = result.get_points(var) if total_result == []: -- GitLab From db192e7f8c67719130611270b51d6806ea417331 Mon Sep 17 00:00:00 2001 From: mchlburton Date: Sun, 26 Mar 2017 20:33:10 -0400 Subject: [PATCH 4/4] influx functions and dummy agent for PhilipsHue light; cassandraDB calls can be replaced with influxDB calls maintaining same parameters --- Agents/LightingAgent/lighting/influxagent.py | 9 +- .../classAPI/classAPI_DummyPhilipsHue.py | 124 ++---------------- bemoss_lib/databases/influxAPI/InfluxDB.py | 41 ++++-- 3 files changed, 38 insertions(+), 136 deletions(-) diff --git a/Agents/LightingAgent/lighting/influxagent.py b/Agents/LightingAgent/lighting/influxagent.py index b1e2dc1..86930b0 100755 --- a/Agents/LightingAgent/lighting/influxagent.py +++ b/Agents/LightingAgent/lighting/influxagent.py @@ -129,14 +129,7 @@ def LightingAgent(config_path, **kwargs): except: print("ERROR: {} fails to connect to the database name {}".format(agent_id, db_database)) - host = 'localhost' - port = 8086 - user = 'root' - password = 'root' - dbname = 'example' - dbuser = 'mike' - dbuser_password = 'bemoss' - client = InfluxDBClient(host, port, user, password, dbname) + #3. send notification to notify building admin self.send_notification = send_notification diff --git a/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py b/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py index 6276e10..20b8062 100755 --- a/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py +++ b/DeviceAPI/classAPI/classAPI_DummyPhilipsHue.py @@ -1,54 +1,14 @@ -# -*- coding: utf-8 -*- ''' -Copyright (c) 2016, Virginia Tech -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the - following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and should not be -interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. - -This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the -United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, -nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, -express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or -any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe -privately owned rights. - -Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or -otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States -Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors -expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. - -VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE -under Contract DE-EE0006352 - -#__author__ = "BEMOSS Team" -#__credits__ = "" +#__author__ = "Mike" +#__credits__ = "BEMOSS Team" #__version__ = "2.0" -#__maintainer__ = "BEMOSS Team" -#__email__ = "aribemoss@gmail.com" -#__website__ = "www.bemoss.org" -#__created__ = "2014-09-12 12:04:50" -#__lastUpdated__ = "2016-03-14 11:23:33" +#__maintainer__ = "Mike" +#__email__ = "michael@blocpower.io" +#__created__ = "2017-03-16" +#__lastUpdated__ = "2017-23-17" ''' import time -import urllib2 -import json from random import randint from bemoss_lib.utils import rgb_cie class API: @@ -96,20 +56,8 @@ class API: # getDeviceStatus(), getDeviceStatusJson(data), printDeviceStatus() def getDeviceStatus(self): getDeviceStatusResult = True - _hue_username = self.get_variable("username") - _url_append = '/api/'+_hue_username+'/groups/0/' - _urlData = self.get_variable("address").replace(':80', _url_append) + try: - '''_deviceUrl = urllib2.urlopen(_urlData, timeout=20) - print(" {0}Agent is querying its current status (status:{1}) please wait ...". - format(self.variables.get('agent_id', None), _deviceUrl.getcode())) - if (_deviceUrl.getcode() == 200): - self.getDeviceStatusJson(_deviceUrl.read().decode("utf-8")) #convert string data to JSON object then interpret it - if self.debug is True: - self.printDeviceStatus() - else: - print (" Received an error from server, cannot retrieve results " + str(_deviceUrl.getcode())) - getDeviceStatusResult = False''' onoff = randint(0, 1) if onoff == 1: self.set_variable('status', "ON") @@ -120,8 +68,6 @@ class API: xy = [randint(0, 255), randint(0,255)] ct = randint(0, 255) sat = randint(0, 255) - effect = "effect" - mode = "warm" name = "DummyHue" # 2. brightness convert to % self.set_variable('brightness',int(round(float(bri)*100/255,0))) @@ -138,8 +84,6 @@ class API: self.set_variable('hexcolor', '#%02x%02x%02x' % self.get_variable('color')) # 4. saturation convert to % self.set_variable('saturation',int(round(float(sat)*100/255,0))) - self.set_variable('effect',effect) - self.set_variable('colormode',mode) self.set_variable('number_lights', [2]) self.set_variable('name',name) @@ -154,37 +98,6 @@ class API: print('ERROR: classAPI_PhilipsHue failed to getDeviceStatus') self.set_variable('offline_count',self.get_variable('offline_count')+1) - ''' - def getDeviceStatusJson(self,data): - # Use the json module to load the string data into a dictionary - _theJSON = json.loads(data) - # 1. status - if _theJSON["action"]["on"] == True: - self.set_variable('status',"ON") - else: - self.set_variable('status',"OFF") - # 2. brightness convert to % - self.set_variable('brightness',int(round(float(_theJSON["action"]["bri"])*100/255,0))) - # update only white variable every round is necessary in case user add a/take away all color bulb(s). - self.only_white_bulb = False if 'hue' in _theJSON["action"].keys() else True - if self.only_white_bulb is False: - # 3. color convert to RGB 0-255 - self.set_variable('hue', _theJSON["action"]["hue"]) - self.set_variable('xy', _theJSON["action"]["xy"]) - self.set_variable('ct', _theJSON["action"]["ct"]) - x=_theJSON["action"]["xy"][0] - y=_theJSON["action"]["xy"][1] - self.set_variable('color', rgb_cie.ColorHelper.getRGBFromXYAndBrightness(x,y,_theJSON["action"]["bri"])) - self.set_variable('hexcolor', '#%02x%02x%02x' % self.get_variable('color')) - # 4. saturation convert to % - self.set_variable('saturation',int(round(float(_theJSON["action"]["sat"])*100/255,0))) - self.set_variable('effect',_theJSON["action"]["effect"]) - self.set_variable('colormode',_theJSON["action"]["colormode"]) - for k in _theJSON["lights"]: - self.set_variable("lights{}".format(k), k) - self.set_variable('number_lights', len(_theJSON["lights"])) - self.set_variable('name',_theJSON["name"]) - ''' def printDeviceStatus(self): # now we can access the contents of the JSON like any other Python object print(" the current status is as follows:") @@ -204,28 +117,7 @@ class API: # setDeviceStatus(postmsg), isPostmsgValid(postmsg), convertPostMsg(postmsg) def setDeviceStatus(self, postmsg): setDeviceStatusResult = True - ''' - #Ex. postmsg = {"on":True,"bri":100,"hue":50260,"sat":200} - _hue_username = self.get_variable("username") - _url_append = '/api/'+_hue_username+'/groups/0/' - _urlData = self.get_variable("address").replace(':80', _url_append) - if self.isPostMsgValid(postmsg) == True: #check if the data is valid - _data = json.dumps(self.convertPostMsg(postmsg)) - _data = _data.encode(encoding='utf_8') - _request = urllib2.Request(_urlData+'action') - _request.add_header('Content-Type','application/json') - _request.get_method = lambda: 'PUT' - try: - _f = urllib2.urlopen(_request, _data, timeout=20) #when include data this become a POST command - print(" {0}Agent for {1} is changing its status with {2} please wait ..." - .format(self.variables.get('agent_id', None), self.variables.get('model', None), postmsg)) - print(" after send a POST request: {}".format(_f.read().decode('utf-8'))) - except: - print("ERROR: classAPI_PhilipsHue connection failure! @ setDeviceStatus") - setDeviceStatusResult = False - else: - print("The POST message is invalid, try again\n") - ''' + return setDeviceStatusResult def isPostMsgValid(self,postmsg): #check validity of postmsg diff --git a/bemoss_lib/databases/influxAPI/InfluxDB.py b/bemoss_lib/databases/influxAPI/InfluxDB.py index b6dc753..39cbe04 100755 --- a/bemoss_lib/databases/influxAPI/InfluxDB.py +++ b/bemoss_lib/databases/influxAPI/InfluxDB.py @@ -1,16 +1,29 @@ -import pandas +''' +Copyright (c) 2017, BlocPower +All rights reserved. + + +#__author__ = "Mike" +#__credits__ = "" +#__version__ = "2.0" +#__maintainer__ = "Mike" +#__email__ = "michael@blocpower.io" +#__website__ = "www.blocpower.io" +#__created__ = "2017-03-21" +#__lastUpdated__ = "2017-03-24" +''' + import numpy import datetime from influxdb import InfluxDBClient connection_established = False #Global variables -host = 'localhost' +host = '52.206.6.10' port = 8086 -user = 'root' -password = 'root' +user = 'user' +password = 'password' dbname = 'bemoss' -client = InfluxDBClient(host, port, user, password, dbname) def makeConnection(): @@ -23,6 +36,7 @@ def makeConnection(): connection_established = True return True + except Exception as er: print 'Cannot establish connection' raise er @@ -41,11 +55,7 @@ def insert(agentID, all_vars, log_vars, cur_time=None): :param log_vars: dictionary (usually APIobject.log_variables). It contains variables to be logged and their datatypes :return: 0, if successful - timestamp is generated based on current utc time. UTC time is put in cassandra database. If the table by the agent - name doesn't exist, table is created. **If error occurs because the name of variables/data_type has changed, the old - table will be deleted, and a new one with currect variable names/datatype will be created**. - - **Need to avoid doing this in final version.Feature made to help during development + timestamp is generated based on current utc time. UTC time is put in influxdb. """ global connection_established @@ -82,8 +92,13 @@ def insert(agentID, all_vars, log_vars, cur_time=None): retry = True while retry: - client.write_points(influx_json) - retry = False + try: + client.write_points(influx_json) + retry = False + except Exception as er: + retry = False + print er + raise return 0 @@ -103,6 +118,8 @@ def delete(agentID,startTime=None, endTime=None): if not connection_established: makeConnection() + client = InfluxDBClient(host, port, user, password, dbname) + if endTime==None and startTime==None: try: delete_query = "DROP SERIES WHERE agent = '{0}'".format(agentID) -- GitLab