diff --git a/Agents/AirQualityAgent/airquality/agent.py b/Agents/AirQualityAgent/airquality/agent.py index 5817d8028c1e23212ad04d572b44e346b7227d04..6350699b9697bd883cc5c0da8f7838399e18f7fe 100755 --- a/Agents/AirQualityAgent/airquality/agent.py +++ b/Agents/AirQualityAgent/airquality/agent.py @@ -23,8 +23,8 @@ import psycopg2 import psycopg2.extras import socket import settings +from bemoss_lib.databases.influxAPI import InfluxDB -from bemoss_lib.databases.cassandraAPI import cassandraDB utils.setup_logging() _log = logging.getLogger(__name__) @@ -149,10 +149,10 @@ def AirQualityAgent(config_path, **kwargs): def backupSaveData(self): try: AirQuality.getDeviceStatus() - cassandraDB.insert(agent_id,AirQuality.variables,log_variables) - print('Every Data Pushed to cassandra') + InfluxDB.insert(device_type,agent_id,AirQuality.variables,log_variables) + print('Every Data Pushed to influx') except Exception as er: - print("ERROR: {} fails to update cassandra database".format(agent_id)) + print("ERROR: {} fails to update influx database".format(agent_id)) print er #3. deviceMonitorBehavior (TickerBehavior) @@ -241,13 +241,13 @@ def AirQualityAgent(config_path, **kwargs): except: print("ERROR: {} fails to update the database name {}".format(agent_id,db_database)) - #step5: update Cassandra (time-series) database + #step5: update Influx (time-series) database try: - cassandraDB.insert(agent_id,self.variables,log_variables) - print('Data Pushed to cassandra') + InfluxDB.insert(device_type,agent_id,self.variables,log_variables) + print('Data Pushed to influx') print "{} success update".format(agent_id) except Exception as er: - print("ERROR: {} fails to update cassandra database".format(agent_id)) + print("ERROR: {} fails to update influx database".format(agent_id)) print er #step6: debug agent knowledge if debug_agent: diff --git a/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py b/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py index 5e9553127d6a2a553b17758e4e2c53a8315a3a85..786c1c78f8b4ce19206ee3809da2e6413afb99c6 100755 --- a/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py +++ b/Agents/DeviceDiscoveryAgent/devicediscovery/agent.py @@ -136,6 +136,7 @@ def DeviceDiscoveryAgent(config_path, **kwargs): self.discovery_list.append('CT30 V1.94') self.discovery_list.append('CT50 V1.94') self.discovery_list.append('Awair+') + self.discovery_list.append('M9-D15') if self.findWiFiWeMo: self.discovery_list.append('Socket') self.discovery_list.append('LightSwitch') @@ -255,7 +256,7 @@ def DeviceDiscoveryAgent(config_path, **kwargs): print "{} >> is finding available {} {} devices ...".format(agent_id,com_type,discovery_type) discovery_module = importlib.import_module("DeviceAPI.discoverAPI."+com_type) - if (com_type == 'BACnet') or (com_type == 'Serial') or (discovery_type=='Awair'): + if (com_type == 'BACnet') or (com_type == 'Serial') or (discovery_type=='Awair') or (discovery_type=='Magnum'): discovery_returns_ip = False else: discovery_returns_ip = True @@ -265,7 +266,6 @@ def DeviceDiscoveryAgent(config_path, **kwargs): if discovered_address == None: discovered_address = list() - print discovered_address for address in discovered_address: diff --git a/Agents/InfluxAgent/influx/__init__.py b/Agents/InfluxAgent/influx/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Agents/InfluxAgent/influx/agent.py b/Agents/InfluxAgent/influx/agent.py new file mode 100755 index 0000000000000000000000000000000000000000..abe3131bc7a44dd4fecc86823707d602f7a9faf3 --- /dev/null +++ b/Agents/InfluxAgent/influx/agent.py @@ -0,0 +1,167 @@ +import psycopg2 +import sys +import json +import datetime +import time +import logging +import os +import re +from volttron.platform.agent import BaseAgent, PublishMixin, periodic +from volttron.platform.agent import utils, matching +from volttron.platform.messaging import headers as headers_mod +from bemoss_lib.databases.influxAPI import InfluxDB +import settings + + +utils.setup_logging() # setup logger for debugging +_log = logging.getLogger(__name__) + +# Step1: Agent Initialization +def InfluxAgent(config_path, **kwargs): + config = utils.load_config(config_path) # load the config_path from influxagent.launch.json + def get_config(name): + try: + kwargs.pop(name) # from the **kwargs when call this function + except KeyError as er: + return config.get(name, '') + + # 1. @params agent + agent_id = get_config('agent_id') + db_scan_time = get_config('db_scan_time') + db_backup_time = get_config('db_backup_time') + site = settings.PLATFORM['node']['building_name'] + topic_delim = '/' # topic delimiter + + # @paths + PROJECT_DIR = settings.PROJECT_DIR + Applications_Launch_DIR = settings.Applications_Launch_DIR + Agents_Launch_DIR = settings.Agents_Launch_DIR + + # postgres + db_database = settings.DATABASES['default']['NAME'] + db_host = settings.DATABASES['default']['HOST'] + db_port = settings.DATABASES['default']['PORT'] + db_user = settings.DATABASES['default']['USER'] + db_password = settings.DATABASES['default']['PASSWORD'] + db_table_influx = settings.DATABASES['default']['TABLE_influx_info'] + + class Agent(PublishMixin, BaseAgent): + + def __init__(self, **kwargs): + super(Agent, self).__init__(**kwargs) + #1. initialize all agent variables + self.variables = kwargs + + # Setup connection with db -> Connect to bemossdb database + try: + self.con = psycopg2.connect(host=db_host, port=db_port, database=db_database, user=db_user, + password=db_password) + self.cur = self.con.cursor() # open a cursor to perform database operations + print("{} connects to the database name {} successfully".format(agent_id, db_database)) + except: + print("ERROR: {} fails to connect to the database name {}".format(agent_id, db_database)) + exit(1) + + self.site = site + self.lastSensewareFetch = None + self.lastRemoteUpdate = None + self.dbUpdateCounter = 0 + + def setup(self): + super(Agent, self).setup() + InfluxDB.makeConnection() + try: + self.cur.execute("SELECT * from "+db_table_influx+" where building_name=%s",(self.site,)) + if self.cur.rowcount != 0: + influx_info = self.cur.fetchone() + self.lastSensewareFetch = influx_info[1] + self.lastRemoteUpdate = influx_info[2] + else: + print("Building Name: {} not found in database table {}, Adding.".format(site, db_table_influx)) + self.cur.execute("INSERT INTO " + db_table_influx + " VALUES(%s,%s,%s)", + (self.site, None, None)) + self.con.commit() + except Exception as Er: + print Er + print("ERROR: {} failed to get data from the database name {}".format(agent_id, db_database)) + + if ((self.lastSensewareFetch is None) and (self.lastRemoteUpdate is None)): + fetch_startTime=str(datetime.datetime.utcnow()-datetime.timedelta(days = 30)) + try: + InfluxDB.getSenseware(self.site, measurement = 'Temperature', startTime=fetch_startTime) + InfluxDB.remote_to_local(self.site, startTime=fetch_startTime) + self.lastSensewareFetch = str(datetime.datetime.utcnow()) + self.lastRemoteUpdate = str(datetime.datetime.utcnow()) + print "Senseware data retrieved on startup" + except Exception as er: + print er + print "Influx agent failed to retrieve senseware data" + else: + try: + InfluxDB.getSenseware(self.site, measurement = 'Temperature', startTime=self.lastSensewareFetch) + self.lastSensewareFetch = str(datetime.datetime.utcnow()) + print "Senseware data retrieved on startup" + except Exception as er: + print er + print "Influx agent failed to retrieve senseware data" + try: + InfluxDB.local_to_remote(startTime=self.lastRemoteUpdate) + self.lastRemoteUpdate = str(datetime.datetime.utcnow()) + print "Influx agent successfully updated remote" + except Exception as er: + print er + print "Influx agent failed to update remote, last update was at {0}".format(self.lastRemoteUpdate) + try: + self.cur.execute("UPDATE " + db_table_influx + " SET(last_retrieved_time_from_remote," + "last_update_remote_time)=(%s,%s) WHERE building_name=%s", + (self.lastSensewareFetch, self.lastRemoteUpdate, self.site)) + self.con.commit() + except: + print("ERROR: {} failed to push data into the database name {}".format(agent_id, db_database)) + + @periodic(db_scan_time) + def updateDatabases(self): + self.dbUpdateCounter += 1 + + try: + InfluxDB.getSenseware(self.site, measurement = 'Temperature', startTime=self.lastSensewareFetch) + self.lastSensewareFetch = str(datetime.datetime.utcnow()) + try: + self.cur.execute("UPDATE " + db_table_influx + " SET last_retrieved_time_from_remote=%s WHERE " + "building_name=%s", (self.lastSensewareFetch, self.site)) + self.con.commit() + except: + print("ERROR: {} failed to update data into the database name {}".format(agent_id, db_database)) + except Exception as er: + print er + print "Influx agent failed to retrieve senseware data, last update was {0}".format(self.lastSensewareFetch) + + if self.dbUpdateCounter >= (db_backup_time/db_scan_time): + try: + InfluxDB.local_to_remote(startTime = self.lastRemoteUpdate) + self.lastRemoteUpdate = str(datetime.datetime.utcnow()) + self.dbUpdateCounter = 0 + try: + self.cur.execute("UPDATE " + db_table_influx + " SET last_update_remote_time=%s WHERE " + "building_name=%s", (self.lastRemoteUpdate, self.site)) + self.con.commit() + except: + print("ERROR: {} failed to update data into the database name {}".format(agent_id, db_database)) + except Exception as er: + print er + print "Influx agent failed to update remote, last update was {0}".format(self.lastRemoteUpdate) + + Agent.__name__ = 'InfluxAgent' + return Agent(**kwargs) + +def main(argv=sys.argv): + '''Main method called by the eggsecutable.''' + utils.default_main(InfluxAgent, description='Influx agent', argv=argv) + +if __name__ == '__main__': + # Entry point for script + try: + sys.exit(main()) + except KeyboardInterrupt as er: + print "KeyboardInterrupt", er + pass \ No newline at end of file diff --git a/Agents/InfluxAgent/influxagent.launch.json b/Agents/InfluxAgent/influxagent.launch.json new file mode 100755 index 0000000000000000000000000000000000000000..946b2a9f96134d61745239afa5bb63e161b79999 --- /dev/null +++ b/Agents/InfluxAgent/influxagent.launch.json @@ -0,0 +1,5 @@ +{ + "agent_id": "InfluxAgent", + "db_scan_time": 1800, + "db_backup_time": 86400 +} diff --git a/Agents/InfluxAgent/setup.py b/Agents/InfluxAgent/setup.py new file mode 100755 index 0000000000000000000000000000000000000000..90215b13b024ce12446829234c46c7ed0f3d2dc5 --- /dev/null +++ b/Agents/InfluxAgent/setup.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: + +# Copyright (c) 2013, Battelle Memorial Institute +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and documentation +# are those of the authors and should not be interpreted as representing +# official policies, either expressed or implied, of the FreeBSD +# Project. +# +# This material was prepared as an account of work sponsored by an +# agency of the United States Government. Neither the United States +# Government nor the United States Department of Energy, nor Battelle, +# nor any of their employees, nor any jurisdiction or organization that +# has cooperated in the development of these materials, makes any +# warranty, express or implied, or assumes any legal liability or +# responsibility for the accuracy, completeness, or usefulness or any +# information, apparatus, product, software, or process disclosed, or +# represents that its use would not infringe privately owned rights. +# +# Reference herein to any specific commercial product, process, or +# service by trade name, trademark, manufacturer, or otherwise does not +# necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors +# expressed herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY +# operated by BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 + +#}}} + +from setuptools import setup, find_packages + +#get environ for agent name/identifier +packages = find_packages('.') +package = packages[0] + +setup( + name = package + 'agent', + version = "0.1", + install_requires = ['volttron'], + packages = packages, + entry_points = { + 'setuptools.installation': [ + 'eggsecutable = ' + package + '.agent:main', + ] + } +) + diff --git a/Agents/LightingAgent/lighting/agent.py b/Agents/LightingAgent/lighting/agent.py index 8970aa3ec60cc4afd623678b9dd8db0cb8ce8803..d3030f6f63abaf82f161d0c75e970306fe4490b6 100755 --- a/Agents/LightingAgent/lighting/agent.py +++ b/Agents/LightingAgent/lighting/agent.py @@ -211,6 +211,7 @@ def LightingAgent(config_path, **kwargs): print("device connection is not successful") self.changed_variables = dict() + for v in log_variables: if v in Light.variables: if not v in self.variables or self.variables[v] != Light.variables[v]: @@ -253,6 +254,7 @@ def LightingAgent(config_path, **kwargs): (self.get_variable('hexcolor'), agent_id)) self.con.commit() try: + if self.get_variable('status') == "ON": multiple_on_off_status = "" for dummyvar in range(self.get_variable('number_lights')): diff --git a/Agents/LightingAgent/lightingagent.launch.json b/Agents/LightingAgent/lightingagent.launch.json index 34e661d724f9a0a9bfea8e73aae668e1708d7e8a..80a4f51aba77f88add54d517ab90b0985b1ada79 100755 --- a/Agents/LightingAgent/lightingagent.launch.json +++ b/Agents/LightingAgent/lightingagent.launch.json @@ -10,7 +10,7 @@ "model": "Philips Hue", "type": "lighting", "api": "classAPI_DummyPhilipsHue", - "address": "http://38.68.232.161/api/newdeveloper/groups/0/", + "address": "http://10.0.2.56", "topic": "datalogger/log/building1/zone1/lightingload/PhilipsHue", "db_host": "localhost", "db_port": "5432", diff --git a/Agents/TRVAgent/setup.py b/Agents/TRVAgent/setup.py new file mode 100755 index 0000000000000000000000000000000000000000..90215b13b024ce12446829234c46c7ed0f3d2dc5 --- /dev/null +++ b/Agents/TRVAgent/setup.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: + +# Copyright (c) 2013, Battelle Memorial Institute +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and documentation +# are those of the authors and should not be interpreted as representing +# official policies, either expressed or implied, of the FreeBSD +# Project. +# +# This material was prepared as an account of work sponsored by an +# agency of the United States Government. Neither the United States +# Government nor the United States Department of Energy, nor Battelle, +# nor any of their employees, nor any jurisdiction or organization that +# has cooperated in the development of these materials, makes any +# warranty, express or implied, or assumes any legal liability or +# responsibility for the accuracy, completeness, or usefulness or any +# information, apparatus, product, software, or process disclosed, or +# represents that its use would not infringe privately owned rights. +# +# Reference herein to any specific commercial product, process, or +# service by trade name, trademark, manufacturer, or otherwise does not +# necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors +# expressed herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY +# operated by BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 + +#}}} + +from setuptools import setup, find_packages + +#get environ for agent name/identifier +packages = find_packages('.') +package = packages[0] + +setup( + name = package + 'agent', + version = "0.1", + install_requires = ['volttron'], + packages = packages, + entry_points = { + 'setuptools.installation': [ + 'eggsecutable = ' + package + '.agent:main', + ] + } +) + diff --git a/Agents/TRVAgent/trv/__init__.py b/Agents/TRVAgent/trv/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Agents/TRVAgent/trv/agent.py b/Agents/TRVAgent/trv/agent.py new file mode 100755 index 0000000000000000000000000000000000000000..2da37e2b4df9cc2e2d0f2025141ad424714538f7 --- /dev/null +++ b/Agents/TRVAgent/trv/agent.py @@ -0,0 +1,529 @@ +# -*- coding: utf-8 -*- +''' +#__author__ = "Mike" +#__credits__ = "" +#__version__ = "2.0" +#__maintainer__ = "Mike" +#__email__ = "michael@blocpower.io" +#__website__ = "www.blocopwer.io" +#__created__ = "2017-04-13" +#__lastUpdated__ = "" +''' + +import sys +import json +import importlib +import logging +import os +from volttron.platform.agent import BaseAgent, PublishMixin, periodic +from volttron.platform.agent import utils, matching +from volttron.platform.messaging import headers as headers_mod +import datetime +from bemoss_lib.communication.Email import EmailService +from bemoss_lib.communication.sms import SMSService +import psycopg2 +import psycopg2.extras +import settings +import socket +from bemoss_lib.databases.influxAPI import InfluxDB +from bemoss_lib.utils.catcherror import catcherror + +utils.setup_logging() +_log = logging.getLogger(__name__) + +# Step1: Agent Initialization +def TRVAgent(config_path, **kwargs): + config = utils.load_config(config_path) + + def get_config(name): + try: + kwargs.pop(name) + except KeyError: + return config.get(name, '') + + # 1. @params agent + agent_id = get_config('agent_id') + device_monitor_time = 240 + max_monitor_time = int(settings.DEVICES['max_monitor_time']) + debug_agent = False + log_variables = dict(setpoint='int', temperature='float', window_open='boolean', alert='string', + operation_mode='string', offline_count='int') + + # 2. @params device_info + building_name = get_config('building_name') + zone_id = get_config('zone_id') + model = get_config('model') + device_type = get_config('type') + #TODO Make address come from launch file + address = 'http://192.168.0.100' + macaddress = get_config('macaddress') + _address = address + _address = _address.replace('http://', '') + _address = _address.replace('https://', '') + try: # validate whether or not address is an ip address + socket.inet_pton(socket.AF_INET, _address) + ip_address = _address + except socket.error: + ip_address = None + identifiable = get_config('identifiable') + + # 3. @params agent & DB interfaces + db_host = get_config('db_host') + db_port = get_config('db_port') + db_database = get_config('db_database') + db_user = get_config('db_user') + db_password = get_config('db_password') + db_table_trv = settings.DATABASES['default']['TABLE_trv'] + db_id_column_name = "trv_id" + db_table_bemoss_notify = settings.DATABASES['default']['TABLE_bemoss_notify'] + db_table_active_alert = settings.DATABASES['default']['TABLE_active_alert'] + db_table_device_type = settings.DATABASES['default']['TABLE_device_type'] + db_table_alerts_notificationchanneladdress = settings.DATABASES['default']['TABLE_alerts_notificationchanneladdress'] + db_table_priority = settings.DATABASES['default']['TABLE_priority'] + + #construct _topic_Agent_UI based on data obtained from DB + _topic_Agent_UI_tail = building_name + '/' + str(zone_id) + '/' + agent_id + + # 4. @params device_api + api = get_config('api') + apiLib = importlib.import_module("DeviceAPI.classAPI." + api) + + # 4.1 initialize trv device object + TRV = apiLib.API(model=model, device_type=device_type, api=api, address=address,macaddress=macaddress, + agent_id=agent_id, db_host=db_host, db_port=db_port, db_user=db_user, db_password=db_password, + db_database=db_database, config_path=config_path) + connection_renew_interval = TRV.variables['connection_renew_interval'] + + print("{0}agent is initialized for {1} using API={2} at {3}".format(agent_id, TRV.get_variable('model'), + TRV.get_variable('api'), + TRV.get_variable('address'))) + + # 5. @params notification_info + send_notification = False + email_fromaddr = settings.NOTIFICATION['email']['fromaddr'] + email_recipients = settings.NOTIFICATION['email']['recipients'] + email_username = settings.NOTIFICATION['email']['username'] + email_password = settings.NOTIFICATION['email']['password'] + email_mailServer = settings.NOTIFICATION['email']['mailServer'] + notify_heartbeat = settings.NOTIFICATION['heartbeat'] + + class Agent(PublishMixin, BaseAgent): + + # 1. agent initialization + def __init__(self, **kwargs): + super(Agent, self).__init__(**kwargs) + # 1. initialize all agent variables + self.variables = kwargs + self.set_variable('setpoint', 0) + self.valid_data = False + self._keep_alive = True + self.ip_address = ip_address if ip_address != None else None + self.event_ids = list() + self.time_sent_notifications = {} + self.notify_heartbeat = notify_heartbeat + self.changed_variables = None + self.lastUpdateTime = datetime.datetime.now() + self.runningSeconds = 0 + self._override = True + self.already_offline = False + self.lastDateSet = None + + # 2. setup connection with db -> Connect to bemossdb database + try: + self.con = psycopg2.connect(host=db_host, port=db_port, database=db_database, user=db_user, + password=db_password) + self.cur = self.con.cursor() # open a cursor to perform database operations + print("{} connects to the database name {} successfully".format(agent_id, db_database)) + except: + print("ERROR: {} fails to connect to the database name {}".format(agent_id, db_database)) + # 3. send notification to notify building admin + self.send_notification = send_notification + self.subject = 'Message from ' + agent_id + + try: + setDeviceStatusResult = TRV.setDeviceStatus(100) + + except Exception as er: + print er + print "set device status failed at initialization for {}".format(agent_id) + return False + + # These set and get methods allow scalability + def set_variable(self, k, v): # k=key, v=value + self.variables[k] = v + + def get_variable(self, k): + return self.variables.get(k, None) # default of get_variable is none + + # 2. agent setup method + def setup(self): + super(Agent, self).setup() + + @periodic(connection_renew_interval) + def renewConnection(self): + TRV.renewConnection() + + # 3. deviceMonitorBehavior (TickerBehavior) + @periodic(device_monitor_time) + def deviceMonitorBehavior(self): + + if self.setDevice(): + print("Periodic control command sent succesfully") + else: + print("Periodic control command sending was unsuccesful") + + try: + TRV.getDeviceStatus() + except: + print("device connection is not successful") + + self.changed_variables = dict() + for v in log_variables: + if v in TRV.variables: + if not v in self.variables or self.variables[v] != TRV.variables[v]: + self.variables[v] = TRV.variables[v] + self.changed_variables[v] = log_variables[v] + else: + if v not in self.variables: #it won't be in self.variables either (in the first time) + self.changed_variables[v] = log_variables[v] + self.variables[v] = None + + #step4: update PostgresQL (meta-data) database + try: + #make the device offline if necessary + if self.get_variable('offline_count')>=2: + + self.cur.execute("UPDATE "+db_table_trv+" SET network_status=%s WHERE trv_id=%s", + ('OFFLINE', agent_id)) + self.con.commit() + if self.already_offline is False: + self.already_offline = True + _time_stamp_last_offline = str(datetime.datetime.now()) + self.cur.execute("UPDATE "+db_table_trv+" SET last_offline_time=%s " + "WHERE trv_id=%s", + (_time_stamp_last_offline, agent_id)) + self.con.commit() + else: + self.already_offline = False + self.cur.execute("UPDATE "+db_table_trv+" SET network_status=%s WHERE trv_id=%s", + ('ONLINE', agent_id)) + self.con.commit() + + # put the last scan time on database + _time_stamp_last_scanned = str(datetime.datetime.now()) + self.cur.execute("UPDATE "+db_table_trv+" SET last_scanned_time=%s " + "WHERE trv_id=%s", + (_time_stamp_last_scanned, agent_id)) + self.con.commit() + except Exception as er: + print er + print("ERROR: {} failed to update database name {}".format(agent_id, db_database)) + + if len(self.changed_variables) == 0: + print 'nothing changed' + return + else: + print 'These things changed:' + print self.changed_variables + self.updateUI() + + # step3: update PostgresQL (meta-data) database + try: + # TODO: Update the execute string using the loop and commit it to postgres all at once + for k, v in log_variables.items(): + # check if column exists, then updateDB to corresponding column + self.cur.execute("select column_name from information_schema.columns where table_name=%s and column_name=%s", + (db_table_trv, k,)) + if bool(self.cur.rowcount): + self.updateDB(db_table_trv, k, db_id_column_name, self.get_variable(k), agent_id) + else: + pass + if self.ip_address != None: + psycopg2.extras.register_inet() + _ip_address = psycopg2.extras.Inet(self.ip_address) + self.cur.execute("UPDATE " + db_table_trv + " SET ip_address=%s WHERE trv_id=%s", + (_ip_address, agent_id)) + self.con.commit() + print("{} updates database name {} during deviceMonitorBehavior successfully".format(agent_id, db_database)) + print( + "{} updates database name {} during deviceMonitorBehavior successfully".format(agent_id, db_database)) + except Exception as er: + print er + print("ERROR: {} failed to update database name {}".format(agent_id, db_database)) + + # step4: update influx database + try: + InfluxDB.insert(device_type, agent_id, self.variables, log_variables) + print('Data Pushed to InfluxDB') + except Exception as er: + print("ERROR: {} fails to update influx database".format(agent_id)) + print er + + # step5: debug agent knowledge + if debug_agent: + print("printing agent's knowledge") + for k, v in self.variables.items(): + print (k, v) + print('') + + def updateUI(self): + topic = '/agent/ui/'+device_type+'/device_status_response/'+ _topic_Agent_UI_tail + # now = datetime.utcnow().isoformat(' ') + 'Z' + headers = { + 'AgentID': agent_id, + headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.JSON, + # headers_mod.DATE: now, + headers_mod.FROM: agent_id, + headers_mod.TO: 'ui' + } + _data = {'setpoint': self.get_variable('setpoint'),'temperature': self.get_variable('temperature'), + 'window_open': self.get_variable('window_open'), 'alert': self.get_variable('alert'), + 'operation_mode': self.get_variable('operation_mode') + } + + message = json.dumps(_data) + message = message.encode(encoding='utf_8') + self.publish(topic, headers, message) + + # 4. updateUIBehavior (generic behavior) + + @matching.match_exact('/ui/agent/'+device_type+'/device_status/' + _topic_Agent_UI_tail) + def updateUIBehavior(self, topic, headers, message, match): + print agent_id + " got\nTopic: {topic}".format(topic=topic) + print "Headers: {headers}".format(headers=headers) + print "Message: {message}\n".format(message=message) + # reply message + self.updateUI() + + # 5. Control intelligence and set device status + def setDevice(self): + setDeviceStatusResult = False + current_time = datetime.datetime.now() + needtochangesetpoint = False + if current_time.hour >= 17: + if self.lastDateSet is not None: + if self.lastDateSet == current_time.date(): + needtochangesetpoint = False + else: + needtochangesetpoint = True + else: + needtochangesetpoint = True + if needtochangesetpoint: + self.variables['setpoint'] -= 20 + if self.variables['setpoint'] < 0: + self.variables['setpoint'] = 0 + + try: + setDeviceStatusResult = TRV.setDeviceStatus(self.variables['setpoint']) + + except Exception as er: + print er + print "set device status failed for {}".format(agent_id) + return False + + return setDeviceStatusResult + + #6. update Postgres database + # TODO: Update the execute string using the loop and commit it to postgres all at once + def updateDB(self, table, column, column_ref, column_data, column_ref_data): + self.cur.execute("UPDATE "+table+" SET "+column+"=%s " + "WHERE "+column_ref+"=%s", + (column_data, column_ref_data)) + self.con.commit() + + ''' + def device_offline_detection(self): + _db_notification_subject = 'BEMOSS Device {} went OFFLINE!!!'.format(agent_id) + _email_subject = '#Attention: BEMOSS Device {} went OFFLINE!!!'.format(agent_id) + _email_text = '#Attention: BEMOSS Device {} went OFFLINE!!!'.format(agent_id) + self.cur.execute("SELECT network_status FROM " + db_table_trv + " WHERE trv_id=%s", + (agent_id,)) + self.network_status = self.cur.fetchone()[0] + print self.network_status + if self.network_status == "OFFLINE": + print "Found Device OFFLINE" + self.cur.execute("SELECT id FROM " + db_table_active_alert + " WHERE event_trigger_id=%s", ('5',)) + self._active_alert_id = self.cur.fetchone()[0] + self.cur.execute( + "SELECT id FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(self._active_alert_id), agent_id,)) + # If this is the first detected violation + if self.cur.rowcount == 0: + print "first device offline detected" + self.cur.execute( + "INSERT INTO " + db_table_temp_time_counter + " VALUES(DEFAULT,%s,%s,%s,%s,%s)", + (self._active_alert_id, agent_id, '0', '0', '0')) + self.con.commit() + self.send_device_notification_db_device(_db_notification_subject, self._active_alert_id) + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'1')) + # Send email if exist + if self.cur.rowcount != 0: + self._offline_alert_email = self.cur.fetchall() + for single_email_1 in self._offline_alert_email: + print single_email_1[0] + self.send_device_notification_email_all(single_email_1[0], _email_text, _email_subject) + + # Send SMS if provided by user + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'2')) + if self.cur.rowcount != 0: + self._offline_alert_sms = self.cur.fetchall() + for single_number_ in self._offline_alert_email: + print single_number_[0] + self.send_device_notification_sms(single_number_[0]) + else: + self.priority_counter(self._active_alert_id, _db_notification_subject) + else: + print "The Device is ONLINE" + + # TODO refactor this one + def send_device_notification_db(self, device_msg, _active_alert_id): + print " INSIDE send_device_notification_db" + # Find the priority id + self.cur.execute( + "SELECT priority_id FROM " + db_table_active_alert + " WHERE id=%s", + (str(_active_alert_id),)) + self.priority_id = self.cur.fetchone()[0] + + # Find the priority level + self.cur.execute( + "SELECT priority_level FROM " + db_table_priority + " WHERE id=%s", + str(self.priority_id)) + self.priority_level = self.cur.fetchone()[0] + + # Insert the notification into DB + self.cur.execute("INSERT INTO " + db_table_bemoss_notify + " VALUES(DEFAULT,%s,%s,%s,%s)", + (device_msg, + str(datetime.datetime.now()), 'Alert', str(self.priority_level))) + self.con.commit() + + # TODO refactor this one + def send_device_notification_db_device(self, device_msg, _active_alert_id): + print " INSIDE send_device_notification_db_device" + # Find the priority id + self.cur.execute( + "SELECT priority_id FROM " + db_table_active_alert + " WHERE id=%s", + (str(_active_alert_id),)) + self.priority_id = self.cur.fetchone()[0] + + # Find the priority level + self.cur.execute( + "SELECT priority_level FROM " + db_table_priority + " WHERE id=%s", + str(self.priority_id)) + self.priority_level = self.cur.fetchone()[0] + + # Insert the notification into DB + self.cur.execute("INSERT INTO " + db_table_bemoss_notify + " VALUES(DEFAULT,%s,%s,%s,%s)", + (device_msg, + str(datetime.datetime.now()), 'Alert', str(self.priority_level))) + self.con.commit() + + # Find the number of total number notifications sent for the same alert and device + self.cur.execute("SELECT id FROM " + db_table_active_alert + " WHERE event_trigger_id=%s", ('5',)) + if self.cur.rowcount != 0: + self.cur.execute( + "SELECT no_notifications_sent FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(_active_alert_id), agent_id,)) + if self.cur.rowcount != 0: + self._no_notifications_sent = self.cur.fetchone()[0] + self.con.commit() + print self._no_notifications_sent + self._no_notifications_sent = int(self._no_notifications_sent) + 1 + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET no_notifications_sent=%s WHERE alert_id=%s AND device_id=%s", + (str(self._no_notifications_sent), str(_active_alert_id), agent_id,)) + self.con.commit() + + def send_device_notification_sms(self, _active_alert_phone_number): + print "INSIDE send_device_notification_sms" + print _active_alert_phone_number + _sms_subject = 'Please Check BEMOSS Notifications' + smsService = SMSService() + smsService.sendSMS(email_fromaddr, _active_alert_phone_number, email_username, email_password, _sms_subject, email_mailServer) + + def send_device_notification_email_all(self, _active_alert_email, _email_subject, _email_text): + emailService = EmailService() + + # Send Email + emailService.sendEmail(email_fromaddr, _active_alert_email, email_username, + email_password, _email_subject, _email_text, email_mailServer) + + # TODO refactor this one + + def priority_counter(self, _active_alert_id, device_msg_1): + # find the priority counter then compare it with priority_counter in priority table + # if greater than the counter then send notification and reset the value + # else just increase the counter + print "INSIDE the priority_counter" + self.cur.execute( + "SELECT priority_counter FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(_active_alert_id), agent_id,)) + self.priority_count = self.cur.fetchone()[0] + self.con.commit() + + # Find the priority id from active alert table + self.cur.execute( + "SELECT priority_id FROM " + db_table_active_alert + " WHERE id=%s", + (str(_active_alert_id),)) + self.priority_id = self.cur.fetchone()[0] + self.con.commit() + + # Find the priority limit from the priority counter + self.cur.execute( + "SELECT priority_counter FROM " + db_table_priority + " WHERE id=%s", + (str(self.priority_id),)) + self.priority_limit = self.cur.fetchone()[0] + self.con.commit() + + if int(self.priority_count) > int(self.priority_limit): + self.cur.execute( + "SELECT no_notifications_sent FROM " + db_table_temp_time_counter + " WHERE alert_id=%s AND device_id=%s", + (str(_active_alert_id), agent_id,)) + self._no_notifications_sent = self.cur.fetchone()[0] + self._no_notifications_sent = int(self._no_notifications_sent) + 1 + self.send_device_notification_db(device_msg_1, _active_alert_id) + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET priority_counter=%s WHERE alert_id=%s AND device_id=%s", + ('0', str(_active_alert_id), agent_id,)) + self.con.commit() + + print "INSIDE the priority counter exceeded the defined range" + # send email if checked + self.cur.execute("SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s",(self._active_alert_id,'1')) + # Send email if exist + if self.cur.rowcount != 0: + self._tampering_alert_email_misoperation = self.cur.fetchall() + for single_email_1 in self._tampering_alert_email_misoperation: + print single_email_1[0] + self.send_device_notification_email_all(single_email_1[0], device_msg_1, device_msg_1) + + # Send SMS if provided by user + self.cur.execute( + "SELECT notify_address FROM " + db_table_alerts_notificationchanneladdress + " WHERE active_alert_id=%s AND notification_channel_id=%s", + (self._active_alert_id,'2')) + if self.cur.rowcount != 0: + self._tampering_alert_sms_misoperation = self.cur.fetchall() + for single_number_misoperation in self._tampering_alert_sms_misoperation: + print single_number_misoperation[0] + self.send_device_notification_sms(single_number_misoperation[0]) + else: + self.priority_count = int(self.priority_count) + 1 + self.cur.execute( + "UPDATE " + db_table_temp_time_counter + " SET priority_counter=%s WHERE alert_id=%s AND device_id=%s", + (str(self.priority_count), str(_active_alert_id), agent_id,)) + ''' + + Agent.__name__ = 'TRV Agent' + return Agent(**kwargs) + +def main(argv=sys.argv): + '''Main method called by the eggsecutable.''' + utils.default_main(TRVAgent, + description='TRV agent', + argv=argv) + +if __name__ == '__main__': + # Entry point for script + try: + sys.exit(main()) + except KeyboardInterrupt: + pass diff --git a/DeviceAPI/classAPI/classAPI_EnOceanPi.py b/DeviceAPI/classAPI/classAPI_EnOceanPi.py new file mode 100755 index 0000000000000000000000000000000000000000..a944f7fae143b8460f6adcbbd549d225f6715ce9 --- /dev/null +++ b/DeviceAPI/classAPI/classAPI_EnOceanPi.py @@ -0,0 +1,103 @@ +''' +#__author__ = "Mike" +#__credits__ = "" +#__version__ = "2.0" +#__maintainer__ = "Mike" +#__email__ = "michael@blocpower.io" +#__website__ = "www.blocpower.io" +#__created__ = "2017-04-17" +#__lastUpdated__ = "" +''' +import requests +import json +import time +import datetime +from DeviceAPI.discoverAPI import WiFi +from urlparse import urlparse + +class API: + # 1. constructor : gets call every time when create a new class + # requirements for instantiation1. model, 2.type, 3.api, 4. address + def __init__(self,**kwargs): # default color is white + # Initialized common attributes + self.variables = kwargs + self.debug = False + self.set_variable('connection_renew_interval',6000) + self.set_variable('offline_count', 0) + + def set_variable(self,k,v): # k=key, v=value + self.variables[k] = v + + def get_variable(self,k): + return self.variables.get(k, None) # default of get_variable is none + + # 2. Attributes from Attributes table + ''' + Attributes: + POST: setpoint + ------------------------------------------------------------------------------------------ + setpoint GET/POST setpoint of the TRV (int from 0-100%) + window_open GET Window open detection of TRV (boolean) + alert GET concatenated string of TRV error messages (string) + temperature GET temperature (floating point in deg F) + operation_mode GET Operation mode of the TRV (string) + ------------------------------------------------------------------------------------------ + ''' + + # 3. Capabilites (methods) from Capabilities table + ''' + API1 available methods: + 1. renewConnection() + 2. setDeviceStatus(setpoint) POST + 3. getDeviceStatus() GET + ''' + #TODO Take authentication key from launch file instead of hard coding + def renewConnection(self): + pass + + def setDeviceStatus(self,setpoint): + setDeviceStatusResult = True + enocean_address = self.variables.get('address')+':8080/api/v1' + set_trv = {"trv_id":self.variables.get('macaddress'), "setpoint":setpoint} + headers = {"Authorization_key": 'ABC', "Content-type": 'application/json'} + + try: + verify = requests.post(enocean_address, headers = headers, data = json.dumps(set_trv)) + if not verify.status_code == 200: + print verify.status_code + raise Exception('Response code != 200') + + except Exception as er: + print "HTTP Request to EnOcean gateway for {} failed".format(self.variables.get('agent_id')) + print er + setDeviceStatusResult = False + + print "Enocean TRV device set successfully" + return setDeviceStatusResult + + def getDeviceStatus(self): + getDeviceStatusResult = True + device_url = self.get_variable('address')+':8080/api/v1' + header = {"Authorization_key":'ABC', "Content-type": 'application/json'} + trv_id = {"trv_id":self.get_variable('macaddress')} + try: + data_req = requests.get(device_url,headers = header, params = trv_id, timeout=10) + data = data_req.json() + + temperature_in_f = data['temperature']*1.8 + 32 + + self.set_variable('setpoint', data['setpoint']) + self.set_variable('temperature', temperature_in_f) + self.set_variable('window_open', data['window_open']) + self.set_variable('alert', data['alert']) + self.set_variable('operation_mode', data['operation_mode']) + + except Exception as er: + print er + getDeviceStatusResult = False + + if getDeviceStatusResult==True: + self.set_variable('offline_count', 0) + + else: + self.set_variable('offline_count', self.get_variable('offline_count')+1) \ No newline at end of file diff --git a/DeviceAPI/discoverAPI/WiFi.py b/DeviceAPI/discoverAPI/WiFi.py index cb9ab97b855d17f280e9a41f4bac0f6ba6f5213a..7986a78751cf174981176cbde00d051994eafc33 100755 --- a/DeviceAPI/discoverAPI/WiFi.py +++ b/DeviceAPI/discoverAPI/WiFi.py @@ -85,13 +85,14 @@ def parseJSONresponse(data,key): def discover(type, timeout=2, retries=1): + #TODO Grab token from bemoss passwords instead of hard coding if type=='Awair': token = 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoiNTUxOSJ9.3YE0APseF-aLMcjTMSbN4bHgE--ZgcuWBEXVs-5TTO4' head_auth = {'Authorization' : token} devices_url = 'https://beta-api.awair.is/v1/users/self/devices' devices_req = None try: - devices_req = requests.get(devices_url, headers = head_auth) + devices_req = requests.get(devices_url, headers = head_auth, timeout=10) except Exception as er: print er pass @@ -103,6 +104,26 @@ def discover(type, timeout=2, retries=1): for device in devices_json['data']: responses.append(str(device['device_id'])) + #TODO have discovery agent find raspberry pi first instead of hard coding ip + elif type=='Magnum': + device_url = 'http://192.168.0.100:8080/api/v1' + header = {"Authorization_key":'ABC', "Content-type": 'application/json'} + trv_discover = {"discover":'True'} + data_req=None + try: + data_req = requests.get(device_url,headers = header, params = trv_discover, timeout=15) + + except Exception as er: + print er + + responses = list() + + if data_req is not None: + devices = data_req.json() + responses.extend(devices['trv_list']) + + return responses + else: group = ("239.255.255.250", 1900) if type=='thermostat': @@ -183,6 +204,9 @@ def getMACaddress(type,ipaddress): elif type=="Awair": awairid = ipaddress return awairid + elif type=="Magnum": + enocean_address = ipaddress + return enocean_address else: print "This device: {} is not supported by the WiFi discovery module".format(type) @@ -222,11 +246,13 @@ def getmodelvendor(type,ipaddress): return {'model':deviceModel,'vendor':deviceVendor,'nickname':nickname} elif type=="Awair": return {'model':'Awair+','vendor':'Awair'} + elif type=="Magnum": + return {'model':'M9-D15', 'vendor':'Magnum'} # This main method will not be executed when this class is used as a module def main(): - print discover('Philips') + print discover('Magnum') # print discover('thermostat') # print getMACaddress('Philips','http://192.168.1.102:80/description.xml') # print type(getMACaddress('Philips','http://192.168.102.:80')) diff --git a/bemoss_install_v2.sh b/bemoss_install_v2.sh index 6d9fa0680a631a9703a2465f3eb66b4d2abc06a5..0af8fdc08f5bd25d0765425dd9cdb8a700aad33c 100755 --- a/bemoss_install_v2.sh +++ b/bemoss_install_v2.sh @@ -37,8 +37,7 @@ cd ~/workspace #Remove the existing bemoss_web_ui folder sudo rm -rf bemoss_web_ui #Clone the bemoss_web_ui repository -# TODO: switch to GitHub repo -sudo git clone -b master https://github.com/bemoss/bemoss_web_ui.git +sudo git clone -b master https://github.com/blocp/bemoss_web_ui.git sudo chmod 777 -R ~/workspace #Create the bemossdb database sudo -u postgres psql -c "CREATE USER admin WITH PASSWORD 'admin';" @@ -49,20 +48,16 @@ sudo -u postgres psql -d bemossdb -c "create extension hstore;" sudo apt-get update sudo apt-get install openjdk-7-jre --assume-yes sudo apt-get install libjna-java --assume-yes -# Install Cassandra -cd ~/workspace -wget http://downloads.datastax.com/community/dsc-cassandra-2.1.7-bin.tar.gz -tar -xvzf dsc-cassandra-2.1.7-bin.tar.gz -sudo rm dsc-cassandra-2.1.7-bin.tar.gz -sudo mv dsc-cassandra-2.1.7 cassandra +# Install Influx +curl -sL https://repos.influxdata.com/influxdb.key | sudo apt-key add - +echo "deb https://repos.influxdata.com/ubuntu trusty stable" | sudo tee /etc/apt/sources.list.d/influxdb.list +sudo apt-get update +sudo apt-get -y install influxdb +sudo service influxdb start # Install Dependencies in virtual env. cd ~/workspace/bemoss_os/env . bin/activate pip install -r ~/workspace/bemoss_os/requirements.txt -# Install Cassandra Driver -# (For better performance of Cassandra, the install-option can be removed but might cause installation failure in some boards.) -sudo CASS_DRIVER_NO_CYTHON=1 pip install cassandra-driver -CASS_DRIVER_NO_CYTHON=1 pip install cassandra-driver deactivate #Go to the bemoss_web_ui and run the syncdb command for the database tables (ref: model.py) cd ~/workspace/bemoss_os @@ -70,8 +65,8 @@ sudo python ~/workspace/bemoss_web_ui/manage.py syncdb sudo python ~/workspace/bemoss_web_ui/run/defaultDB.py #Initialize the tables sudo python ~/workspace/bemoss_os/bemoss_lib/utils/platform_initiator.py -# Prompt user for Cassandra Authorization Info -sudo python ~/workspace/bemoss_os/bemoss_lib/databases/cassandraAPI/initialize.py +#set up influxdb +sudo python ~/workspace/bemoss_os/bemoss_lib/databases/InfluxAPI/initialize.py # Fix miscellaneaus issues sudo ~/workspace/bemoss_os/bemoss_lib/utils/increase_open_file_limit.sh rm ~/workspace/bemoss_os/bemoss_lib/utils/increase_open_file_limit.sh diff --git a/bemoss_lib/databases/cassandraAPI/__init__.py b/bemoss_lib/databases/cassandraAPI/__init__.py deleted file mode 100755 index 2ac3bbac5ca9645a89eb3a59afd762022f56464d..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'rajee' diff --git a/bemoss_lib/databases/cassandraAPI/cassandraDB.py b/bemoss_lib/databases/cassandraAPI/cassandraDB.py deleted file mode 100755 index 2da1c295337b14e7497f233dad706b8fb6873155..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/cassandraDB.py +++ /dev/null @@ -1,404 +0,0 @@ -# -*- coding: utf-8 -*- -''' -Copyright (c) 2016, Virginia Tech -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the - following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and should not be -interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. - -This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the -United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, -nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, -express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or -any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe -privately owned rights. - -Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or -otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States -Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors -expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. - -VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE -under Contract DE-EE0006352 - -#__author__ = "BEMOSS Team" -#__credits__ = "" -#__version__ = "2.0" -#__maintainer__ = "BEMOSS Team" -#__email__ = "aribemoss@gmail.com" -#__website__ = "www.bemoss.org" -#__created__ = "2014-09-12 12:04:50" -#__lastUpdated__ = "2016-03-14 11:23:33" -''' -import pandas -from cassandra import * -import numpy -import datetime -import re -from bemoss_lib.databases.cassandraAPI import cassandraHelper -from bemoss_lib.utils.catcherror import catcherror -import json -connection_established = False - -#Global variables -bCluster, bSpace, keyspace_name, replication_factor = None, None, None, None - - - -@catcherror('Could not get replication') -def get_replication(keyspace): - - global connection_established - if not connection_established: - makeConnection() - x = bSpace.execute("SELECT strategy_options from system.schema_keyspaces where keyspace_name=%s",(keyspace,)) - y = int(json.loads(x[0][0])['replication_factor']) - return y - - -@catcherror('Could not set replication') -def set_replication(keyspace,replication): - - global connection_established - if not connection_established: - makeConnection() - bSpace.execute("ALTER KEYSPACE %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : %d };" % (keyspace,replication)) - - - -def makeConnection(): - try: - global bCluster, bSpace, keyspace_name, replication_factor, connection_established - bCluster, bSpace = cassandraHelper.makeConnection() - keyspace_name,replication_factor = cassandraHelper.findKeyAndRep() - bSpace.set_keyspace(keyspace_name) - if bSpace is not None: - connection_established = True #if some error occurs this would be skipped - return True - except InvalidRequest as er: - try: - bSpace.execute("create keyspace %s with replication={'class':'SimpleStrategy','replication_factor':%s}" % (keyspace_name, replication_factor)) - bSpace.set_keyspace(keyspace_name) - return True - except Exception as er: - print 'bemossspace couldnt be created/switched to' - print er - raise er - - except Exception as er: - print 'Cannot establish connection' - raise er - -try: - makeConnection() -except Exception as er: - print 'Connection cannot be established' - print er - - -def createTable(agentID, variables): - """ - Function to create table for a AgentID - - :param agentID: string. Table with b will be created - :param variables: dictionary (usually APIobject.log_variables). It contains variables to be logged and their datatypes - :return: 0, if successful - - """ - global connection_established - if not connection_established: - makeConnection() - - - varStr = "" - tableName = "B"+agentID - for vars,types in variables.items(): - varStr += vars+" "+types+", " - - - varStr = varStr[:-2] - create_query = "create table {0} (agent_id text, date_id text , time TIMESTAMP, {1}, PRIMARY KEY ((agent_id, date_id), time))".format(tableName,varStr) - try: - bSpace.execute(create_query) - except AlreadyExists as e: - print 'table %s already present' % agentID - raise - except Exception: - connection_established = False #Try to establish connection again next time - raise - - return 0 - - -def insert(agentID, all_vars, log_vars, cur_timeLocal=None): - """ - :param agentID: string. Data will be inserted to table named B - :param all_vars: dictionary (usually APIobject.variables). It contains all the variables and their values. - :param log_vars: dictionary (usually APIobject.log_variables). It contains variables to be logged and their datatypes - :return: 0, if successful - - timestamp is generated based on current utc time. UTC time is put in cassandra database. If the table by the agent - name doesn't exist, table is created. **If error occurs because the name of variables/data_type has changed, the old - table will be deleted, and a new one with currect variable names/datatype will be created**. - - **Need to avoid doing this in final version.Feature made to help during development - """ - global connection_established - - if not connection_established: - makeConnection() - - - x = datetime.datetime.utcnow()-datetime.datetime.now() - sec_offset = int(round(x.seconds+x.microseconds/1000000.0)) - timeDel = datetime.timedelta(seconds=sec_offset) - - if cur_timeLocal == None: - cur_timeLocal = datetime.datetime.now() - - cur_timeUTC = cur_timeLocal+timeDel - date_local = str(cur_timeLocal.date()) - tableName = "B"+agentID - varStr = "agent_id, date_id, time" - placeHolderStr = "%s, %s, %s" - - values = [agentID, date_local, cur_timeUTC] - noVarStr ="" - for var in log_vars: - reading = all_vars.get(var) - if not reading == None: - varStr += ", "+var - placeHolderStr += ", %s" - values.append(reading) - else: - noVarStr+=", "+var - - - insert_query = "insert into {0} ({1}) VALUES ({2})".format(tableName,varStr,placeHolderStr) - retry = True - while retry: - try: - bSpace.execute(insert_query,values) - retry = False - except InvalidRequest as e: - if e.message.find('unconfigured columnfamily')!=-1: - print 'Table not exits. Creating one:' - createTable(agentID,log_vars) - print "Table Created. Now Trying to insert Again:" - bSpace.execute(insert_query,values) - retry = False - elif e.message.lower().find('unknown identifier')!=-1: - k = str(e.message) - newColumn = re.search('[Uu]nknown identifier ([a-zA-Z_]*)',k).groups()[0] - alter_query = "ALTER TABLE {0} ADD {1} {2}".format(tableName,newColumn,log_vars[newColumn]) - bSpace.execute(alter_query) - retry = True - else: - #TO DO: Don't do this. if the table already exists and can't insert data, simply raise exception - #drop_query = "drop table {0}".format(tableName) - #bSpace.execute(drop_query) - #print "Table Dropped. Now Trying to create again" - #createTable(agentID,log_vars) - #print "Created. Now inserting" - #bSpace.execute(insert_query,values) - retry = False - print e - raise - except: - connection_established = False #Try to connect again next-time - raise - - return 0 - - -def delete(agentID,startDate=None, endDate=None): - """ - Performs deletion of data. if statDate and endDate is omitted, whole table is delted. - :param agentID: The B table in which the operation is to be performed - :param startDate: dateTime.date object (local timezone). The begining date from which to delete. Must be supplied - unless trying to delete the whole table - :param endDate: datetime.date object (local timezone). The endDate upto which to delete. The default is to upto today. - must be supplied unless trying to delte the whole table - :return: 0, if successfull - - Delete can be performed with a resolution of a day so, data cannot be partially deleted for a day. If new data needs - to be written to existing place, simply inserting it again with same primary key will override it. - - """ - global connection_established - if not connection_established: - makeConnection() - - tableName = "B"+agentID - - if endDate==None and startDate==None: - try: - delete_query='drop table {0}'.format(tableName) - bSpace.execute(delete_query) - except InvalidRequest as e: - if e.message.find('unconfigured columnfamily')!=-1: - print "Already delted. Don't worry about it" - return 0 - else: - raise - except Exception: - connection_established = False #Try to establish connection again next time - raise - - elif startDate==None: - print "startTime compulsory when endTime is given" - return -1 - elif endDate==None: - endDate=datetime.datetime.now() - - daterange = pandas.date_range(startDate,endDate) - date_local="" - try: - for day in daterange: - date_local = str(day.date()) - delete_query = 'DELETE from {0} WHERE agent_id=%s AND date_id=%s'.format(tableName) - bSpace.execute(delete_query,(agentID,date_local)) - - except Exception as e: - print "sorry, not deleted all. deleted upto:%s" % date_local - connection_established = False - return -1 - - return 0 - - - - - -def retrieve(agentID, vars=None, startTime=None, endTime=None,export=False): - """Function to retrieve Data from the active cassandra cluster. \n - :param agentID: must supply, since each agentID is associated with a table in database. - :param vars: supplied as a list of strings. It represents the variables to be retrieved from the table. - eg. ['time','temperature','heat_setpoint']. If any of the variables don't match the column name, the result - will contain -1. If not supplied, the default is to return the complete row \n\n - :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime format. It marks the - beginning for the range. If not supplied, will be taken 24-hours before endTime - :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime format.It marks the end of the - range. If not supplied, will be taken as the currentTime. - :return: A numpy 2-dimensional array. Columns corresponds to variables querried, and rows corresponds to - various table entries.The time is reported in local time (cassandra returns UTC time, conversion is done in - this function). If the query fails, -1 is returned (and no exception raised) - - """ - - global connection_established - if not connection_established: - makeConnection() - - x = datetime.datetime.utcnow()-datetime.datetime.now() - sec_offset = int(round(x.seconds+x.microseconds/1000000.0)) - timeDel = datetime.timedelta(seconds=sec_offset) - - if startTime==None: - startTime = datetime.datetime.now()-datetime.timedelta(hours=24) - startTimeUTC=datetime.datetime.utcnow()-datetime.timedelta(hours=24) - else: - startTimeUTC = startTime + timeDel #convert to UTC - - if endTime==None: - endTime = datetime.datetime.now() - endTimeUTC = datetime.datetime.utcnow() - else: - endTimeUTC = endTime+timeDel #convert to UTC - - tableName = str("B"+agentID).lower() - - if vars==None: - varStr='' - vars=[] - try: - result=bSpace.execute("select column_name from system.schema_columns WHERE keyspace_name=%s and columnfamily_name=%s",[keyspace_name,tableName]) - except: - connection_established = False #Try to establish connection again next time - raise - - for var in result: - varStr += var[0] + ', ' - vars += var - varStr = varStr[:-2] #to get rid of the last ', ' - else: - varStr = '' - for var in vars: - varStr += var + ', ' - - varStr = varStr[:-2] #to get rid of the last ', ' - - daterange = pandas.date_range(startTime,endTime) - total_result = [] - try: - for day in daterange: - - date_local = str(day.date()) - result = bSpace.execute('select {0} from {1} WHERE agent_id=%s AND date_id=%s AND time >= %s AND time <= %s'.format(varStr, tableName),[agentID,date_local,startTimeUTC,endTimeUTC]) - total_result += result - - total_result = numpy.array(total_result) - - # If there is no data, return an empty list [] - if len(total_result) == 0: - return vars, total_result - - if vars is not None: - if export: - #convert the UTC time to local time if time is present - #more robust method would be look at data types of the result and convert them if it has datetime data type - if 'time' in vars: - total_result[:, vars.index('time')]-=timeDel - time_map = total_result[:,vars.index('time')] - total_result[:, vars.index('time')] = map(lambda x: "{}".format(x.strftime('%y-%m-%d %H:%M:%S')), time_map) - if 'cooling_mode' in vars: - total_result[:, vars.index('cooling_mode')] = map(lambda x: x.encode('utf8'), total_result[:, vars.index('cooling_mode')]) - return vars, total_result - else: - if 'time' in vars: - time_map = total_result[:,vars.index('time')] - # unix timestamp (from seconds) to javascript epoch (timestamp in milliseconds) - total_result[:, vars.index('time')] = map(lambda x: int((x-datetime.datetime(1970,1,1)).total_seconds()*1000), time_map) # '"{}"'.format(str([])) - if 'status' in vars: - total_result[:, vars.index('status')] = map(lambda x: 1 if str(x).lower() == 'on' else 0 if str(x).lower() == 'off' else x, total_result[:, vars.index('status')]) - if 'motion' in vars: - total_result[:, vars.index('motion')] = map(lambda x: 1 if x == True else 0 if x == False else x, total_result[:, vars.index('motion')]) - if 'cooling_mode' in vars: - total_result[:, vars.index('cooling_mode')] = map(lambda x: x.encode('utf8') if x is not None else None, total_result[:, vars.index('cooling_mode')]) - return vars, total_result - - except InvalidRequest as e: - if e.message.find('unconfigured columnfamily')!=-1: - total_result = -1 - print ('table not exist') - else: - total_result = -1 - print e - except: - connection_established = False #Try to establish connection again next time - raise - - -def retrieve_for_export(agentID, vars=None, startTime=None, endTime=None): - a,b = retrieve(agentID,vars,startTime,endTime,export=True) - return a,b - -if __name__ == '__main__': - x = get_replication('system_auth') - set_replication('system_auth',2) - print x diff --git a/bemoss_lib/databases/cassandraAPI/cassandraHelper.py b/bemoss_lib/databases/cassandraAPI/cassandraHelper.py deleted file mode 100755 index abdc508ef5e47d802ecc109b6121244efd538da7..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/cassandraHelper.py +++ /dev/null @@ -1,172 +0,0 @@ -# -*- coding: utf-8 -*- -''' -Copyright (c) 2016, Virginia Tech -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the - following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and should not be -interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. - -This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the -United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, -nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, -express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or -any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe -privately owned rights. - -Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or -otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States -Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors -expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. - -VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE -under Contract DE-EE0006352 - -#__author__ = "BEMOSS Team" -#__credits__ = "" -#__version__ = "2.0" -#__maintainer__ = "BEMOSS Team" -#__email__ = "aribemoss@gmail.com" -#__website__ = "www.bemoss.org" -#__created__ = "2014-09-12 12:04:50" -#__lastUpdated__ = "2016-03-14 11:23:33" -''' -import os -import re -from cassandra.cluster import Cluster -from cassandra.io.asyncorereactor import AsyncoreConnection -from cassandra.auth import PlainTextAuthProvider -from bemoss_lib.utils.find_own_ip import getIPs - -def addSeed(newseed): - ''' - Add a seed IP to the cassandra settings file, and update cassandra yaml file accordingly - Will be called by udpclient in a node after a core is discovered - :param newseed: Ip address string to add to seed list - :return: - ''' - try: - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r+') - except IOError as er: - print 'cassandra settings file missing. Settings file must be present at this point' - settingsContent = casSettingsFile.read() - seeds = re.search('seeds:(.*)\n',settingsContent).group(1) - if newseed not in seeds: - oldseeds = seeds.replace('"','').strip() - seeds = '"%s, %s"'% (newseed,oldseeds) - settingsContent = re.sub('seeds:(.*)\n','seeds: %s\n'%seeds,settingsContent) - casSettingsFile.seek(0) - casSettingsFile.write(settingsContent) - casSettingsFile.truncate() - casYamlFile = open(os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml",'r+') - yamlContent = casYamlFile.read() - yamlContent = re.sub('seeds:(.*)\n','seeds: %s\n' % seeds,yamlContent) - casYamlFile.seek(0) - casYamlFile.write(yamlContent) - casYamlFile.truncate() - casYamlFile.close() - - casSettingsFile.close() - -def findKeyAndRep(): - try: - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r') - settingsContent = casSettingsFile.read() - casSettingsFile.close() - keyspace = re.search('keyspace_name: *(.*)\n',settingsContent).group(1) - replication_factor = re.search('replication_factor: *(.*)\n',settingsContent).group(1) - except IOError as er: - print "No cassandra_settings file or bad settings file. Using default bemossspace and factor 1" - keyspace = 'bemossspace' - replication_factor = '1' - - return keyspace, replication_factor - - - -def findIP(): - """ - Reads the listen address from cassandra settings file - :return: - """ - try: - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r') - settingsContent = casSettingsFile.read() - casSettingsFile.close() - ip_address = re.search('rpc_address: *([0-9\.]*)\n',settingsContent).group(1) - except IOError as er: - print "No cassandra_settings file. Using current IP" - ip_address = getIPs()[-1] - - return ip_address - -def findUserPass(): - try: - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r') - settingsContent = casSettingsFile.read() - casSettingsFile.close() - username = re.search('db_username: *(.*)\n',settingsContent).group(1) - password = re.search('db_password: *(.*)\n',settingsContent).group(1) - except IOError as er: - print "No cassandra_settings file. Using the default cassandra cassandra userpass" - username='cassandra' - password='cassandra' - - userpass = [username,password] - return userpass - - -def makeConnection(): - ip_address = findIP() - notResolved = True - while notResolved: - notResolved=False - try: - userpass = findUserPass() - ap = PlainTextAuthProvider(username=userpass[0], password=userpass[1]) - bCluster=Cluster([ip_address],connection_class=AsyncoreConnection,auth_provider=ap) - bSpace = bCluster.connect() - except Exception as er: - redFlag = ['AuthenticationFailed','username','password','incorrect'] - test = filter(lambda x: x.lower() in str(er).lower(), redFlag) - if len(test)==len(redFlag): #all redFlags words exists on message - print 'provided username doesnt work. trying default:' - ap = PlainTextAuthProvider(username='cassandra', password='cassandra') - try: - bCluster=Cluster([ip_address],connection_class=AsyncoreConnection,auth_provider=ap) - bSpace=bCluster.connect() - bSpace.execute("ALTER USER cassandra with password 'merogharanuwakotmaparchhatimrokahaparchha'") - except Exception as er: - print er - ap = PlainTextAuthProvider(username='cassandra', password='merogharanuwakotmaparchhatimrokahaparchha') - bCluster=Cluster([ip_address],connection_class=AsyncoreConnection,auth_provider=ap) - bSpace=bCluster.connect() - - bSpace.execute("CREATE USER %s with password '%s' SUPERUSER" % (userpass[0],userpass[1])) - print ('The username and password created. Now trying login again') - bCluster.shutdown() - notResolved=True - else: - raise - - return bCluster, bSpace - - - - -if __name__ == '__main__': - findIP() diff --git a/bemoss_lib/databases/cassandraAPI/casstart.sh b/bemoss_lib/databases/cassandraAPI/casstart.sh deleted file mode 100755 index aaeffd40cad7ed4036ebbb897ad70d2c23fa0ec8..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/casstart.sh +++ /dev/null @@ -1,2 +0,0 @@ -ls -~/workspace/cassandra/bin/cassandra -f 2>&1 | tee ~/workspace/bemoss_os/log/cassandra.log diff --git a/bemoss_lib/databases/cassandraAPI/initialize.py b/bemoss_lib/databases/cassandraAPI/initialize.py deleted file mode 100755 index 856edf9413c56d3a7a4a1251912714c7d5bc5b3a..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/initialize.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- -''' -Copyright (c) 2016, Virginia Tech -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the - following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and should not be -interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. - -This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the -United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, -nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, -express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or -any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe -privately owned rights. - -Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or -otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States -Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors -expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. - -VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE -under Contract DE-EE0006352 - -#__author__ = "BEMOSS Team" -#__credits__ = "" -#__version__ = "2.0" -#__maintainer__ = "BEMOSS Team" -#__email__ = "aribemoss@gmail.com" -#__website__ = "www.bemoss.org" -#__created__ = "2014-09-12 12:04:50" -#__lastUpdated__ = "2016-03-14 11:23:33" -''' -import os -import re -import sys -sys.path.insert(0,os.path.expanduser("~")+"/workspace/bemoss_os/") -from bemoss_lib.utils.find_own_ip import getIPs - - -def init(): - try: - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r') - except IOError as er: - print "creating a new cassandra_settings file" - cluster_name = raw_input("Enter a unique name for cassandra cluster-name: ") - if cluster_name=='': - cluster_name = 'bemosscluster' - print 'Using default clustername: '+cluster_name - db_username = raw_input("Enter database username: ") - if db_username=='': - db_username='bemoss' - print 'Using default username: '+db_username - db_password = raw_input("Enter database password: ") - if db_password=='': - db_password='bemoss' - print 'Using default password: '+db_password - else: - password2 = raw_input("Enter password again: ") - while password2 != db_password: - password2 = raw_input("Password doesn't match. Input again: ") - #TODO have some exit condition - ip_address = getIPs()[-1] - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'w') - contents="""cluster_name: '%s' -keyspace_name: bemossspace -replication_factor: 2 -listen_address: %s -rpc_address: %s -seeds: "%s" -authenticator: PasswordAuthenticator -db_username: %s -db_password: %s -MAX_HEAP_SIZE="400M" -HEAP_NEWSIZE="100M" - """ % (cluster_name,ip_address,ip_address,ip_address,db_username,db_password) - casSettingsFile.write(contents) - casSettingsFile.close() - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'r') - - settingsContent = casSettingsFile.read() - casSettingsFile.close() - try: - casYamlFile = open(os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml",'r') - yamlContent = casYamlFile.read() - casYamlFile.close() - except IOError as er: - print "Not found:" + os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml" - raise - - try: - casEnvFile = open(os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra-env.sh",'r') - envContent = casEnvFile.read() - casEnvFile.close() - except IOError as er: - print "Not found:" + os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml" - raise - - cluster_name = re.search('cluster_name:(.*)\n',settingsContent).group(1) - listen_address = re.search('listen_address:(.*)\n',settingsContent).group(1) - rpc_address = re.search('rpc_address: *([0-9\.]*)\n',settingsContent).group(1) - db_username = re.search('db_username: *(.*)\n',settingsContent).group(1) - db_password = re.search('db_password: *(.*)\n',settingsContent).group(1) - keyspace = re.search('keyspace_name: *(.*)\n',settingsContent).group(1) - replication_factor = re.search('replication_factor: *(.*)\n',settingsContent).group(1) - myips = getIPs() - - seeds = re.search('seeds:(.*)\n',settingsContent).group(1) - authenticator = re.search('authenticator:(.*)\n',settingsContent).group(1) - MAX_HEAP_SIZE = re.search('MAX_HEAP_SIZE=("[a-zA-Z0-9]*")\n',settingsContent).group(1) - HEAP_NEWSIZE = re.search('HEAP_NEWSIZE=("[a-zA-Z0-9]*")\n',settingsContent).group(1) - - #check if any of the seeds IP is current machine IP. At least one seed needs to be self IP - bad_seed = True - for ip in myips: - if ip in seeds: - bad_seed = False - - if bad_seed: - oldseeds = seeds.replace('"','').strip() - seeds = '"%s, %s"'% (myips[-1],oldseeds) - - if listen_address.strip() not in myips or rpc_address.strip() not in myips: - - listen_address = myips[-1] - rpc_address = myips[-1] - - - - casSettingsFile = open(os.path.expanduser("~")+"/workspace/bemoss_os/cassandra_settings.txt",'w') - contents="""cluster_name: %s -keyspace_name: %s -replication_factor: %s -listen_address: %s -rpc_address: %s -seeds: %s -authenticator: %s -db_username: %s -db_password: %s -MAX_HEAP_SIZE=%s -HEAP_NEWSIZE=%s -""" % (cluster_name.strip(),keyspace.strip(),replication_factor.strip(),myips[-1],myips[-1],seeds.strip(),authenticator.strip(),db_username.strip(),db_password.strip(),MAX_HEAP_SIZE.strip(),HEAP_NEWSIZE.strip()) - casSettingsFile.write(contents) - casSettingsFile.close() - - yamlContent = re.sub('cluster_name:(.*)\n','cluster_name: %s\n' % cluster_name,yamlContent) - yamlContent = re.sub('listen_address:(.*)\n','listen_address: %s\n' % listen_address,yamlContent) - yamlContent = re.sub('rpc_address:(.*)\n','rpc_address: %s\n' % rpc_address,yamlContent) - yamlContent = re.sub('authenticator:(.*)\n','authenticator: %s\n' % authenticator,yamlContent) - yamlContent = re.sub('seeds:(.*)\n','seeds: %s\n' % seeds,yamlContent) - envContent = re.sub('#?MAX_HEAP_SIZE=("[a-zA-Z0-9]*")\n','MAX_HEAP_SIZE=%s\n' % MAX_HEAP_SIZE,envContent) - envContent = re.sub('#?HEAP_NEWSIZE=("[a-zA-Z0-9]*")\n','HEAP_NEWSIZE=%s\n'%HEAP_NEWSIZE,envContent) - - try: - casYamlFile = open(os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml",'w') - casYamlFile.write(yamlContent) - casYamlFile.close() - except IOError as er: - print "Error writing:" + os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml" - raise - - try: - casEnvFile = open(os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra-env.sh",'w') - casEnvFile.write(envContent) - casEnvFile.close() - except IOError as er: - print "Error writing:" + os.path.expanduser("~")+"/workspace/cassandra/conf/cassandra.yaml" - raise - -if __name__ == '__main__': - init() diff --git a/bemoss_lib/databases/cassandraAPI/startCassandra.py b/bemoss_lib/databases/cassandraAPI/startCassandra.py deleted file mode 100755 index 69b89d9cc306f7d79731c20fd9b92688b3c9cc6d..0000000000000000000000000000000000000000 --- a/bemoss_lib/databases/cassandraAPI/startCassandra.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -''' -Copyright (c) 2016, Virginia Tech -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the - following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following -disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and should not be -interpreted as representing official policies, either expressed or implied, of the FreeBSD Project. - -This material was prepared as an account of work sponsored by an agency of the United States Government. Neither the -United States Government nor the United States Department of Energy, nor Virginia Tech, nor any of their employees, -nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, -express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or -any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe -privately owned rights. - -Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or -otherwise does not necessarily constitute or imply its endorsement, recommendation, favoring by the United States -Government or any agency thereof, or Virginia Tech - Advanced Research Institute. The views and opinions of authors -expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof. - -VIRGINIA TECH – ADVANCED RESEARCH INSTITUTE -under Contract DE-EE0006352 - -#__author__ = "BEMOSS Team" -#__credits__ = "" -#__version__ = "2.0" -#__maintainer__ = "BEMOSS Team" -#__email__ = "aribemoss@gmail.com" -#__website__ = "www.bemoss.org" -#__created__ = "2014-09-12 12:04:50" -#__lastUpdated__ = "2016-03-14 11:23:33" -''' -import os -import time -import cassandraHelper -import initialize - -def start(): - started = False - initialize.init() - x=1 - while x>0: - try: - bCluster, bSpace = cassandraHelper.makeConnection() - x=0 - except Exception as er: - print er - if started == False: - print "Starting cassandra",x - os.system("nohup x-terminal-emulator -e ~/workspace/bemoss_os/bemoss_lib/databases/cassandraAPI/casstart.sh &") - time.sleep(15) - started = True - print 'Waiting for cassandra ...' - - x=1 - time.sleep(5) - print "Cassandra connected" - -if __name__ == '__main__': - start() diff --git a/bemoss_lib/databases/create_influx_table_in_bemossdb.sql b/bemoss_lib/databases/create_influx_table_in_bemossdb.sql new file mode 100644 index 0000000000000000000000000000000000000000..7b9300b7f29dc667f2edb900140297f5a2730a9f --- /dev/null +++ b/bemoss_lib/databases/create_influx_table_in_bemossdb.sql @@ -0,0 +1,12 @@ +CREATE TABLE public.influx_info +( + building_name character varying(100) NOT NULL, + last_retrieved_time_from_remote timestamp with time zone, + last_update_remote_time timestamp with time zone, + CONSTRAINT influx_info_pkey PRIMARY KEY (building_name) +) +WITH ( + OIDS=FALSE +); +ALTER TABLE public.influx_info + OWNER TO admin; \ No newline at end of file diff --git a/bemoss_lib/databases/influxAPI/InfluxDB.py b/bemoss_lib/databases/influxAPI/InfluxDB.py new file mode 100755 index 0000000000000000000000000000000000000000..9a6b99c7f1071e3f6125f60d494bd2834e9ba606 --- /dev/null +++ b/bemoss_lib/databases/influxAPI/InfluxDB.py @@ -0,0 +1,416 @@ +''' +Copyright (c) 2017, BlocPower +All rights reserved. + + +#__author__ = "Mike" +#__credits__ = "" +#__version__ = "2.0" +#__maintainer__ = "Mike" +#__email__ = "michael@blocpower.io" +#__website__ = "www.blocpower.io" +#__created__ = "2017-03-21" +#__lastUpdated__ = "2017-03-24" +''' + +import numpy +import datetime +import settings +from influxdb import InfluxDBClient +connection_established = False + +#Global variables +host = 'localhost' +building_name = settings.PLATFORM['node']['building_name'] +remote = settings.INFLUX['remote_address'] +port = settings.INFLUX['port'] +user = settings.INFLUX['db_username'] +password = settings.INFLUX['db_password'] +influx_types={ + "float":float, + "integer":int, + "string":str, + "boolean":bool +} + +#TODO make this function test the connection in a more sensible way +def makeConnection(database = building_name): + try: + global host, port, user, password, building_name, connection_established + client = InfluxDBClient(host, port, user, password, building_name) + database_list = client.get_list_database() + for db in database_list: + if building_name in db['name']: + connection_established = True + return True + + + except Exception as er: + print 'Cannot establish connection, trying to create database' + client.create_database(building_name) + raise er + +try: + makeConnection() +except Exception as er: + print 'Connection cannot be established' + print er + + +def insert(device_model, agentID, all_vars, log_vars, cur_time=None): + """ + :param agentID: string. Data will be inserted to table named B + :param all_vars: dictionary (usually APIobject.variables). It contains all the variables and their values. + :param log_vars: dictionary (usually APIobject.log_variables). It contains variables to be logged and their datatypes + :return: 0, if successful + + timestamp is generated based on current utc time. UTC time is put in influxdb. + """ + global connection_established + + if not connection_established: + makeConnection() + + client = InfluxDBClient(host, port, user, password, building_name) + if cur_time == None: + cur_time = datetime.datetime.utcnow() + + fields = {} + for var in log_vars: + reading = all_vars.get(var) + fields.update({var:reading}) + + influx_json = [ + { + "measurement": device_model, + "tags": { + "agent": agentID + }, + "time": cur_time, + "fields": fields + } + ] + + retry = True + while retry: + try: + client.write_points(influx_json) + retry = False + except Exception as er: + retry = False + print er + raise + + return 0 + + +def delete(device_model, agentID, startTime=None, endTime=None): + """ + Performs deletion of data. if statDate and endDate is omitted, all device data is deleted. + :param agentID: The on which the operation is to be performed + :param startDate: dateTime.date object (local timezone). The begining date from which to delete. Must be supplied + unless trying to delete the whole table + :param endDate: datetime.date object (local timezone). The endDate upto which to delete. The default is to upto today. + must be supplied unless trying to delete the whole table + :return: 0, if successful + """ + global connection_established + if not connection_established: + makeConnection() + + client = InfluxDBClient(host, port, user, password, building_name) + + if endTime==None and startTime==None: + try: + delete_query = "DROP SERIES WHERE agent = '{0}'".format(agentID) + client.query(delete_query) + return 0 + except Exception: + connection_established = False #Try to establish connection again next time + raise + + elif startTime==None: + print "startTime compulsory when endTime is given" + return -1 + elif endTime==None: + endTime = str(datetime.datetime.utcnow()) + + try: + result = client.query("SHOW MEASUREMENTS ON {0} WHERE agent = '{1}';".format(building_name, agentID)) + except: + connection_established = False #Try to establish connection again next time + raise + + result = result.get_points('measurements') + + try: + for var in result: + measurement = str(var['name']) + client.query("DELETE FROM {0} WHERE agent = '{1}' AND time >= '{2}' AND time <= '{3}';".format(measurement, agentID, startTime, endTime)) + + except: + connection_established = False + raise + + + return 0 + + + + + +def retrieve(device_model, agentID, vars=None, startTime=None, endTime=None,export=False): + """Function to retrieve Data from Influx. \n + :param agentID: must supply, since each agentID is associated with a table in database. + :param vars: supplied as a list of strings. It represents the variables to be retrieved from the table. + eg. ['temperature','humidity']. If any of the variables don't match the column name, the result + will contain -1. If not supplied, the default is to return the complete row \n\n + :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format. It marks the + beginning for the range. If not supplied, will be taken 24-hours before endTime + :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format.It marks the end of the + range. If not supplied, will be taken as the currentTime. + :return: A numpy 2-dimensional array. The first column contains the time stamps for the entries, and the rest of the + columns correspond to variables queried. Each row corresponds to + various table entries. Also returns vars, which serves as a column index. It is identical to passed vars + except 'time' is an extra entry at the front. If the query fails, -1 is returned (and no exception raised) + """ + + global connection_established + if not connection_established: + makeConnection() + + client = InfluxDBClient(host, port, user, password, building_name) + + if startTime==None: + startTime=str(datetime.datetime.utcnow()-datetime.timedelta(hours=1)) + + if endTime==None: + endTime = str(datetime.datetime.utcnow()) + + if vars==None: + vars=[] + try: + vars_query = client.query("show field keys on {0} from {1};".format(building_name, device_model)) + + for data in vars_query: + vars = [varlist['fieldKey'] for varlist in data] + except: + connection_established = False #Try to establish connection again next time + raise + + varStr = '' + for var in vars: + varStr += var + ', ' + + varStr = varStr[:-2] #to get rid of the last ', ' + + result = client.query("select {0} from {1} WHERE agent = '{2}' AND time >= '{3}' AND time <= '{4}';".format(varStr, device_model, agentID, startTime,endTime), chunked=True, chunk_size=100) + values = [] + data = result.get_points() + + for entry in data: + values.append(entry.values()) + vars = entry.keys() + total_result = numpy.array(values) + + return vars, total_result + +def local_to_remote(buiding_name=building_name, local='localhost', remote=remote, startTime=None, endTime=None): + """Function to retrieve Data from local Influx and push it to remote server. + :param local: Database you want to copy data from + :param remote: Database you want data copied to + :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format. It marks the + beginning for the range. If not supplied, will be taken 24-hours before endTime + :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format.It marks the end of the + range. If not supplied, will be taken as the currentTime. + :return: 0, if successful + """ + global connection_established + if not connection_established: + makeConnection() + + localclient = InfluxDBClient(local, port, user, password, building_name) + remoteclient = InfluxDBClient(remote, port, user, password, building_name, ssl=True) + try: + copy_database(localclient, remoteclient, startTime, endTime) + except Exception as er: + print er + raise + + return 0 + +def remote_to_local(building_name=building_name, remote=remote, local='localhost', startTime=None, endTime=None): + """Function to retrieve Data from Influx server and push it to local bemoss. + :param remote: Address of the remote server + :param local: Address of local bemoss, usually just 'localhost' + :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format. It marks the + beginning for the range. If not supplied, will be taken 25-hours before endTime + :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format.It marks the end of the + range. If not supplied, will be taken as the currentTime. + :return: 0, if successful + """ + global connection_established + if not connection_established: + makeConnection() + + localclient = InfluxDBClient(local, port, user, password, building_name) + remoteclient = InfluxDBClient(remote, port, user, password, building_name, ssl=True) + try: + copy_database(remoteclient, localclient, startTime, endTime) + except Exception as er: + print er + raise + + return 0 + +def getSenseware(site = building_name, measurement = 'Temperature', local='localhost', remote=remote, sensewaredb = 'Senseware', startTime=None, endTime=None): + """Function to retrieve Senseware data from server. + :param site: Building for BEMOSS instance corresponding to senseawre site tag + :param measurement: The variable you want the data for (temperature, humidity, etc.) + :param local: Database you want to copy data from + :param remote: Database you want data copied to + :param startTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format. It marks the + beginning for the range. If not supplied, will be taken 24-hours before endTime + :param endTime: the time in localtime zone (the timezone of the node), in datetime.datetime.utcnow format.It marks the end of the + range. If not supplied, will be taken as the currentTime. + :return: 0, if successful + """ + global connection_established + if not connection_established: + makeConnection() + localclient = InfluxDBClient(host, port, user, password, building_name) + remoteclient = InfluxDBClient(remote, port, user, password, sensewaredb, ssl = True) + + if startTime==None: + startTime=str(datetime.datetime.utcnow()-datetime.timedelta(days = 30)) + + if endTime==None: + endTime = str(datetime.datetime.utcnow()) + + measure_query = remoteclient.query("SELECT * FROM {0} WHERE site='{1}' AND time>='{2}' AND time<='{3}';".format(measurement, site, startTime, endTime), chunked=True, chunk_size=100) + data = measure_query.get_points('{0}'.format(measurement)) + + field_query = remoteclient.query("show field keys from {0}".format(measurement)) + + for field in field_query: + fieldlist = [varlist['fieldKey'] for varlist in field] + typelist = [varlist['fieldType'] for varlist in field] + fields = {fieldKey:fieldType for fieldKey, fieldType in zip(fieldlist, typelist)} + tag_query = remoteclient.query("show tag keys from {0}".format(measurement)) + for tag in tag_query: + tags = [varlist['tagKey'] for varlist in tag] + + influx_json = [] + + for entry in data: + json_field = {} + for field in fields.keys(): + data_type = influx_types[fields[field]] + if isinstance(entry[field], data_type): + reading = entry[field] + json_field.update({field:reading}) + elif data_type==float and isinstance(entry[field], int): + float(entry[field]) + entry[field]+=0.0000000001 + reading = entry[field] + json_field.update({field:reading}) + else: + continue + + + json_tag = {} + for tag in tags: + if entry[tag] is not None: + tag_info = entry[tag] + json_tag.update({tag:tag_info}) + + json_time = entry['time'] + + json_body =[ + { + "measurement": measurement, + "tags": json_tag, + "time": json_time, + "fields": json_field + } + ] + + influx_json.extend(json_body) + + try: + response = localclient.write_points(influx_json, batch_size=1000) + print "Senseware Data for {0} retrieved from server".format(measurement) + return 0 + except Exception as er: + print er + print "Senseware Data retrieve for {0} failed".format(measurement) + raise + +def copy_database(source, destination, startTime=None, endTime=None): + if startTime==None: + startTime=str(datetime.datetime.utcnow()-datetime.timedelta(days=30)) + + if endTime==None: + endTime = str(datetime.datetime.utcnow()) + + measurements = source.query("show measurements;") + + for result in measurements: + for measurement in result: + print measurement + measure_query = source.query("SELECT * FROM {0} WHERE time >= '{1}' AND time <= '{2}';".format(measurement['name'], startTime, endTime), chunked=True, chunk_size=100) + data = measure_query.get_points('{0}'.format(measurement['name'])) + + field_query = source.query("show field keys from {0}".format(measurement['name'])) + for field in field_query: + fieldlist = [varlist['fieldKey'] for varlist in field] + typelist = [varlist['fieldType'] for varlist in field] + fields = {fieldKey:fieldType for fieldKey, fieldType in zip(fieldlist, typelist)} + tag_query = source.query("show tag keys from {0}".format(measurement['name'])) + for tag in tag_query: + tags = [varlist['tagKey'] for varlist in tag] + + influx_json = [] + + for entry in data: + json_field = {} + for field in fields.keys(): + data_type = influx_types[fields[field]] + if isinstance(entry[field], data_type): + reading = entry[field] + json_field.update({field:reading}) + elif data_type==float and isinstance(entry[field], int): + float(entry[field]) + cast_float = 0.0000000001 + reading = entry[field]+cast_float + json_field.update({field:reading}) + else: + continue + + json_tag = {} + for tag in tags: + if entry[tag] is not None: + tag_info = entry[tag] + json_tag.update({tag:tag_info}) + + json_time = entry['time'] + + json_body =[ + { + "measurement": measurement['name'], + "tags": json_tag, + "time": json_time, + "fields": json_field + } + ] + + influx_json.extend(json_body) + + try: + response = destination.write_points(influx_json, batch_size=1000) + print "Data for {0} pushed to server".format(measurement['name']) + except Exception as er: + print er + print "Data push for {0} failed".format(measurement['name']) + raise + return 0 diff --git a/bemoss_lib/databases/influxAPI/__init__.py b/bemoss_lib/databases/influxAPI/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..b9675ceac31652dab4c45e69345ad1b883ce5dd9 --- /dev/null +++ b/bemoss_lib/databases/influxAPI/__init__.py @@ -0,0 +1 @@ +__author__ = 'mike' diff --git a/bemoss_lib/databases/influxAPI/initialize.py b/bemoss_lib/databases/influxAPI/initialize.py new file mode 100755 index 0000000000000000000000000000000000000000..1f560d71cc218671db529907c1f606770f2ac895 --- /dev/null +++ b/bemoss_lib/databases/influxAPI/initialize.py @@ -0,0 +1,84 @@ +import os +import re +import sys +sys.path.insert(0,os.path.expanduser("~")+"/workspace/bemoss_os/") +from bemoss_lib.utils.find_own_ip import getIPs +from influxdb import InfluxDBClient +from bemoss_lib.databases.influxAPI import InfluxDB +import datetime +import settings +import psycopg2 + + +def init(): + host = 'localhost' + building_name = settings.PLATFORM['node']['building_name'] + #influx + remote_address = settings.INFLUX['remote_address'] + port = settings.INFLUX['port'] + influx_username = settings.INFLUX['db_username'] + influx_password = settings.INFLUX['db_password'] + #postgres + db_database = settings.DATABASES['default']['NAME'] + db_host = settings.DATABASES['default']['HOST'] + db_port = settings.DATABASES['default']['PORT'] + db_user = settings.DATABASES['default']['USER'] + db_password = settings.DATABASES['default']['PASSWORD'] + db_table_influx = settings.DATABASES['default']['TABLE_influx_info'] + + local = InfluxDBClient(host, port, influx_username, influx_password, building_name) + remote = InfluxDBClient(remote_address, port ,influx_username, influx_password, building_name, ssl=True) + building_exists = False + database_list = remote.get_list_database() + for db in database_list: + if building_name in db['name']: + building_exists = True + if not building_exists: + print("Building {0} not registered in remote, creating building database".format(building_name)) + try: + remote.create_database(building_name) + except Exception as er: + print er + print("Creation of database for {0} failed".format(building_name)) + raise + existing_data = False + database_list = local.get_list_database() + for db in database_list: + if building_name in db['name']: + existing_data = True + + if existing_data: + lastUpdateTime = None + try: + con = psycopg2.connect(host=db_host, port=db_port, database=db_database, user=db_user, + password=db_password) + cur = con.cursor() # open a cursor to perform database operations + + cur.execute("SELECT * FROM "+db_table_influx+" WHERE building_name=%s",(building_name,)) + influx_info = cur.fetchone() + lastRemoteUpdate = influx_info[2] + except Exception as er: + print er + print("could not connect to database {}".format(db_database)) + + try: + print("Pushing existing data to remote from {0}".format(lastUpdateTime)) + InfluxDB.local_to_remote(building_name, 'localhost', remote_address, startTime=lastUpdateTime) + except Exception as er: + print er + print("Existing data on InfluxDB could not be copied to remote") + raise + + print("Dropping local database") + local.drop_database(building_name) + print("Creating local database") + local.create_database(building_name) + + print("Creating retention policy") + local.create_retention_policy(building_name, "30d", 3, building_name, True) + + print("Local database has been setup") + + +if __name__ == '__main__': + init() \ No newline at end of file diff --git a/bemoss_lib/multi_node/runBEMOSS_node.sh b/bemoss_lib/multi_node/runBEMOSS_node.sh index 18728a6ee265d18b8429a85eb2c33f981c646e6f..1f2db1d52cd1734f7c40228427a74176d03b97f8 100755 --- a/bemoss_lib/multi_node/runBEMOSS_node.sh +++ b/bemoss_lib/multi_node/runBEMOSS_node.sh @@ -48,12 +48,11 @@ sudo python ~/workspace/bemoss_os/bemoss_lib/utils/find_own_ip.py #Step1: Run Platform Initiator sudo python ~/workspace/bemoss_os/bemoss_lib/multi_node/platform_initiator_node.py -#Step2: Do initial configuration for cassandra -sudo python ~/workspace/bemoss_os/bemoss_lib/databases/cassandraAPI/initialize.py +#Step2: Do initial configuration for influx +sudo python ~/workspace/bemoss_os/bemoss_lib/databases/InfluxAPI/initialize.py #Step3: Look for bemoss core sudo python ~/workspace/bemoss_os/bemoss_lib/multi_node/udpclient.py -#step4: Start/connect to cassandra -sudo PYTHONPATH='.' python ~/workspace/bemoss_os/bemoss_lib/databases/cassandraAPI/startCassandra.py +# TODO: Verify influx connection befofre continuing with BEMOSS #Step4: Build agents cd ~/workspace/bemoss_os/bemoss_lib/multi_node/ source buildAgents_node.sh diff --git a/bemoss_lib/multi_node/udpclient.py b/bemoss_lib/multi_node/udpclient.py index 10eb1a512c147879a97ed9a20ac7e448adc96704..4d94f020b455afad4ed1aaba4e18ac9cffb6a68a 100755 --- a/bemoss_lib/multi_node/udpclient.py +++ b/bemoss_lib/multi_node/udpclient.py @@ -58,7 +58,6 @@ import datetime import netifaces as ni import time import select -from bemoss_lib.databases.cassandraAPI import cassandraHelper #find broadcast address automatically from script interfaces=ni.interfaces() @@ -145,9 +144,6 @@ while True: print "found_bemoss_core {}".format(found_bemoss_core) print str(addr)+":"+str(recv_data) -#add the ip address of the core as a seed node for the cassandra cluster -cassandraHelper.addSeed(addr[0]) - #add new node to node_info table node_info = json.loads(recv_data) core_name = node_info['node_name'] diff --git a/bemoss_lib/multi_node/udpserver.py b/bemoss_lib/multi_node/udpserver.py index dfa5babe93ac16eb46b609475dc0a6f3126cea8f..459965de8200f1507ad90d51554d970c70f6286f 100755 --- a/bemoss_lib/multi_node/udpserver.py +++ b/bemoss_lib/multi_node/udpserver.py @@ -55,7 +55,6 @@ import sys os.chdir(os.path.expanduser("~/workspace/bemoss_os/")) # = ~/workspace/bemoss_os current_working_directory = os.getcwd() sys.path.append(current_working_directory) -from bemoss_lib.databases.cassandraAPI import cassandraDB import settings import psycopg2 import datetime @@ -199,7 +198,8 @@ with open(_launch_file, 'w') as outfile: json.dump(data, outfile, indent=4, sort_keys=True) node_count = 1 -current_system_auth_replication = cassandraDB.get_replication('system_auth') +#TODO check how this is affected by cassandra removal +current_system_auth_replication = 1 while True: print("BEMOSS core >> Listening to connection from BEMOSS node: ") recv_data, addr = server_socket.recvfrom(2048) @@ -237,8 +237,6 @@ while True: isNew = insertNodeInfo(remote_node_name,remote_node_type,remote_node_model,remote_node_status,remote_building_name, remote_ip_address,remote_mac_address, remote_associated_zone, remote_date_added,remote_communication,remote_last_scanned_time) node_count += isNew #Add node count only if it is a new node. - if node_count > current_system_auth_replication: - cassandraDB.set_replication('system_auth',node_count) else: #something wrong with database, ignore this node continue diff --git a/bemoss_lib/utils/buildAgents.sh b/bemoss_lib/utils/buildAgents.sh index 96b345dc5bedb220c467dff5040b4801e8295b58..2d683299bec233e7c59278c4991be2672f0d8d74 100755 --- a/bemoss_lib/utils/buildAgents.sh +++ b/bemoss_lib/utils/buildAgents.sh @@ -54,6 +54,7 @@ cd ~/workspace/bemoss_os/ volttron-pkg package ~/workspace/bemoss_os/Agents/ThermostatAgent volttron-pkg package ~/workspace/bemoss_os/Agents/AirQualityAgent +volttron-pkg package ~/workspace/bemoss_os/Agents/TRVAgent volttron-pkg package ~/workspace/bemoss_os/Agents/PlugloadAgent volttron-pkg package ~/workspace/bemoss_os/Agents/LightingAgent volttron-pkg package ~/workspace/bemoss_os/Agents/NetworkAgent @@ -65,6 +66,9 @@ volttron-ctl install devicediscoveryagent=/tmp/volttron_wheels/devicediscoveryag volttron-pkg package ~/workspace/bemoss_os/Agents/AppLauncherAgent volttron-pkg configure /tmp/volttron_wheels/applauncheragent-0.1-py2-none-any.whl ~/workspace/bemoss_os/Agents/AppLauncherAgent/applauncheragent.launch.json volttron-ctl install applauncheragent=/tmp/volttron_wheels/applauncheragent-0.1-py2-none-any.whl +volttron-pkg package ~/workspace/bemoss_os/Agents/InfluxAgent +volttron-pkg configure /tmp/volttron_wheels/influxagent-0.1-py2-none-any.whl ~/workspace/bemoss_os/Agents/InfluxAgent/influxagent.launch.json +volttron-ctl install influxagent=/tmp/volttron_wheels/influxagent-0.1-py2-none-any.whl volttron-pkg package ~/workspace/bemoss_os/Applications/code/Lighting_Scheduler volttron-pkg package ~/workspace/bemoss_os/Applications/code/Plugload_Scheduler diff --git a/bemoss_lib/utils/platform_initiator.py b/bemoss_lib/utils/platform_initiator.py index 114cceed44e4b4053cee58d3b955ff03ff9800e4..1ae1a6201999fb5a3c95a39d01532e1f5d425dfa 100755 --- a/bemoss_lib/utils/platform_initiator.py +++ b/bemoss_lib/utils/platform_initiator.py @@ -75,6 +75,7 @@ db_table_application_running = settings.DATABASES['default']['TABLE_application_ db_table_application_registered = settings.DATABASES['default']['TABLE_application_registered'] db_table_plugload = settings.DATABASES['default']['TABLE_plugload'] db_table_thermostat = settings.DATABASES['default']['TABLE_thermostat'] +db_table_trv = settings.DATABASES['default']['TABLE_trv'] db_table_lighting = settings.DATABASES['default']['TABLE_lighting'] db_table_airquality = settings.DATABASES['default']['TABLE_airquality'] db_table_device_metadata = settings.DATABASES['default']['TABLE_device_metadata'] @@ -106,6 +107,7 @@ print "{} >> Done 1: connect to database name {}".format(agent_id, db_database) cur.execute("DELETE FROM "+db_table_thermostat) cur.execute("DELETE FROM "+db_table_lighting) cur.execute("DELETE FROM "+db_table_airquality) +cur.execute("DELETE FROM "+db_table_trv) cur.execute("DELETE FROM "+db_table_plugload) cur.execute("DELETE FROM "+db_table_vav) cur.execute("DELETE FROM "+db_table_rtu) @@ -360,6 +362,8 @@ cur.execute("INSERT INTO supported_devices VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, ("Tekmar Bypass Controller","BlocPower","Serial","plugload","USB-RLY02","3WSP","classAPI_Tekmar_Bypass",True,False,4,4,True)) cur.execute("INSERT INTO supported_devices VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", ("Awair+","Awair","WiFi","airquality","Awair","4AIR","classAPI_Awair",False,True,4,4,True)) +cur.execute("INSERT INTO supported_devices VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ("M9-D15","Magnum","WiFi","trv","Magnum","5MAG","classAPI_EnOceanPi",False,False,4,4,True)) conn.commit() print "Table supported_devices populated successfully!" @@ -377,7 +381,6 @@ os.system("sudo chmod 777 -R ~/workspace/bemoss_os") #TODO make a backup of log files os.system("sudo rm ~/workspace/bemoss_os/log/volttron.log") -os.system("sudo rm ~/workspace/bemoss_os/log/cassandra.log") os.system("sudo killall volttron") os.system("sudo kill $(cat ~/workspace/bemoss_os/.temp/BEMOSS.pid)") diff --git a/bemoss_lib/utils/runPlatform.sh b/bemoss_lib/utils/runPlatform.sh index 7714eb65b45c4e7f5cb645007114f9b4ca5506b9..94df30910f1a5d52403ff033a1978c7917cefc1a 100755 --- a/bemoss_lib/utils/runPlatform.sh +++ b/bemoss_lib/utils/runPlatform.sh @@ -58,6 +58,8 @@ volttron-ctl start --tag approvalhelperagent #Run network agent sleep 2 volttron-ctl start --tag networkagent +sleep 2 +volttron-ctl start --tag influxagent #sleep 2 #volttron-ctl start --tag launcheragent volttron-ctl status diff --git a/runBEMOSS.sh b/runBEMOSS.sh index 601c598a066712996bc06220cac3a61489e48d41..e2c3d61f5f5f52f5878e839ee1fbc5ae743a17d1 100755 --- a/runBEMOSS.sh +++ b/runBEMOSS.sh @@ -55,10 +55,11 @@ then echo "Retaining previous state of BEMOSS..." else echo "Performing fresh restart of BEMOSS..." + sudo python ~/workspace/bemoss_os/bemoss_lib/databases/influxAPI/initialize.py sudo python ~/workspace/bemoss_os/bemoss_lib/utils/platform_initiator.py sleep 2 fi -sudo PYTHONPATH='.' python ~/workspace/bemoss_os/bemoss_lib/databases/cassandraAPI/startCassandra.py +# TODO: Verify influx connection befofre continuing with BEMOSS sleep 2 #Step3: Build agents source ~/workspace/bemoss_os/bemoss_lib/utils/buildAgents.sh diff --git a/settings.py b/settings.py index 5319e1f8e7ec6f1c010586487de24cbbe7196e06..5ca337bfa98fa000fe5f49842eea6b198581eaaf 100755 --- a/settings.py +++ b/settings.py @@ -69,7 +69,7 @@ PLATFORM = { 'name': 'BEMOSS core', 'type': 'core', 'model': 'Odroid3', - 'building_name': 'bemoss', + 'building_name': '106_E_30th_St__Typhin_', 'node_monitor_time': 60, 'node_offline_timeout': 0, 'main_core': 'BEMOSS core' @@ -101,6 +101,7 @@ DATABASES = { 'TABLE_plugload': 'plugload', 'TABLE_thermostat': 'thermostat', 'TABLE_airquality': 'airquality', + 'TABLE_trv': 'trv', 'TABLE_lighting': 'lighting', 'TABLE_device_metadata': 'device_metadata', 'TABLE_vav': 'vav', @@ -116,10 +117,18 @@ DATABASES = { 'TABLE_temp_time_counter': 'temp_time_counter', 'TABLE_temp_failure_time': 'temp_failure_time', 'TABLE_priority': 'priority', - 'TABLE_seen_notifications_counter': 'seen_notifications_counter' + 'TABLE_seen_notifications_counter': 'seen_notifications_counter', + 'TABLE_influx_info': 'influx_info' } } +INFLUX = { + 'remote_address': '52.206.6.10', + 'port': 8086, + 'db_username': 'engineering', + 'db_password': 'nPEc9Pz0iV' +} + NOTIFICATION = { 'heartbeat': 24*60, # heartbeat period to resend a message 'heartbeat_device_tampering': 130, # heartbeat period to resend a message