From 993604237888f104e0cddea5aff4b98b3a408cd0 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 10 Apr 2018 13:35:16 -0400 Subject: [PATCH 1/7] Update config files to add new redshift db URI --- app/config/development.default.py | 1 + app/config/local.default.py | 1 + app/config/production.default.py | 1 + app/config/staging.default.py | 1 + 4 files changed, 4 insertions(+) diff --git a/app/config/development.default.py b/app/config/development.default.py index 963406e..cd90659 100644 --- a/app/config/development.default.py +++ b/app/config/development.default.py @@ -4,6 +4,7 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] +NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] diff --git a/app/config/local.default.py b/app/config/local.default.py index 0273fd7..d3de22e 100644 --- a/app/config/local.default.py +++ b/app/config/local.default.py @@ -1,6 +1,7 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = 'sqlite://' REDSHIFT_DATABASE_URI = 'redshift://' +NEW_REDSHIFT_DATABASE_URI = 'redshift://' REDIS_URI = 'redis://127.0.0.1:6379/' diff --git a/app/config/production.default.py b/app/config/production.default.py index 4e30ea1..9b05d92 100644 --- a/app/config/production.default.py +++ b/app/config/production.default.py @@ -4,6 +4,7 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] +NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] diff --git a/app/config/staging.default.py b/app/config/staging.default.py index 3fa2a76..fe8a9f4 100644 --- a/app/config/staging.default.py +++ b/app/config/staging.default.py @@ -4,6 +4,7 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] +NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] -- GitLab From a8d3f01801f5d05f1084ab3a2b20d311dc88ec66 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 10 Apr 2018 15:08:26 -0400 Subject: [PATCH 2/7] Handle the awair data request --- app/controllers/constants.py | 5 ++ app/controllers/data.py | 108 +++++++++++++++++++++---------- app/controllers/gateway.py | 26 ++++++-- app/lib/__init__.py | 1 + app/lib/new_redshift_database.py | 28 ++++++++ 5 files changed, 129 insertions(+), 39 deletions(-) create mode 100644 app/controllers/constants.py create mode 100644 app/lib/new_redshift_database.py diff --git a/app/controllers/constants.py b/app/controllers/constants.py new file mode 100644 index 0000000..dc2a0a1 --- /dev/null +++ b/app/controllers/constants.py @@ -0,0 +1,5 @@ +GATEWAY_TYPES = { + 'senseware': 1, + 'awair': 2, + 'other': 3, +} diff --git a/app/controllers/data.py b/app/controllers/data.py index 92eb723..585f79e 100644 --- a/app/controllers/data.py +++ b/app/controllers/data.py @@ -5,43 +5,83 @@ import psycopg2 from werkzeug.exceptions import BadRequest from app.lib import get_redshift_db +from app.lib import get_new_redshift_db from .base import RestController +from .constants import ( + GATEWAY_TYPES, +) + class DataController(RestController): """A data controller.""" def index(self, filter_data): - if 'from' not in filter_data: - raise BadRequest("'from' is a required field in the query params") - date = filter_data.get('from') - gateway_serials = filter_data.getlist('gateway_serial[]') - node_module_numbers = filter_data.getlist('node_id[]') - - # Set up the WHERE statement with date info - where_statement = 'WHERE ts>%s AND (' - where_tuple = (date,) - - # Add the gateway serials to hte WHERE statement - for gateway_serial in gateway_serials: - where_statement += "sn=%s OR " - where_tuple = where_tuple + (gateway_serial,) - - # Add the node_module_ids to hte WHERE statement - for node_module_number in node_module_numbers: - where_statement += "mod=%s OR " - where_tuple = where_tuple + (node_module_number,) - - where_statement = where_statement[:-4] + ')' - - if 'unit_id' in filter_data: - where_statement += 'AND unit_id=%s' - where_tuple = where_tuple + (filter_data['unit_id'],) - - sql = "SELECT ts, value, unit, unit_id, name, sn, mod, channel_id FROM data {} ORDER BY ts".format(where_statement) - db = get_redshift_db(current_app) - cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute(sql, where_tuple) - results = [dict(record) for record in cur] - cur.close() - - return results + if 'new' in filter_data: + if 'from' not in filter_data: + raise BadRequest("'from' is a required field in the query params") + if 'type' not in filter_data: + raise BadRequest("'type' is a required field in the query params") + date = filter_data.get('from') + gateway_type = filter_data.get('type') + if gateway_type == GATEWAY_TYPES['awair']: + if 'device_id' not in filter_data: + raise BadRequest("'device_id' is a required field in the query params if the device is awair") + device_id = filter_data.get('device_id') + sql = ''' + SELECT + da.ts, da.value, + awair.device_id, + unit.description as unit + FROM + iot.data as da + INNER JOIN + iot.metadata as meta on meta.id = da.metadata_id + INNER JOIN + iot.metadata_awair as awair on awair.metadata_id = meta.id + INNER JOIN + iot.unit on unit.id = meta.unit_id + WHERE awair.device_id=%s + ''' + db = get_new_redshift_db(current_app) + cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute(sql, (device_id,)) + results = [dict(record) for record in cur] + cur.close() + return results + return [] + + else: + if 'from' not in filter_data: + raise BadRequest("'from' is a required field in the query params") + date = filter_data.get('from') + gateway_serials = filter_data.getlist('gateway_serial[]') + node_module_numbers = filter_data.getlist('node_id[]') + + # Set up the WHERE statement with date info + where_statement = 'WHERE ts>%s AND (' + where_tuple = (date,) + + # Add the gateway serials to hte WHERE statement + for gateway_serial in gateway_serials: + where_statement += "sn=%s OR " + where_tuple = where_tuple + (gateway_serial,) + + # Add the node_module_ids to hte WHERE statement + for node_module_number in node_module_numbers: + where_statement += "mod=%s OR " + where_tuple = where_tuple + (node_module_number,) + + where_statement = where_statement[:-4] + ')' + + if 'unit_id' in filter_data: + where_statement += 'AND unit_id=%s' + where_tuple = where_tuple + (filter_data['unit_id'],) + + sql = "SELECT ts, value, unit, unit_id, name, sn, mod, channel_id FROM data {} ORDER BY ts".format(where_statement) + db = get_redshift_db(current_app) + cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute(sql, where_tuple) + results = [dict(record) for record in cur] + cur.close() + + return results diff --git a/app/controllers/gateway.py b/app/controllers/gateway.py index 105cda2..594d42f 100644 --- a/app/controllers/gateway.py +++ b/app/controllers/gateway.py @@ -1,12 +1,14 @@ +from flask import current_app +from werkzeug.datastructures import MultiDict + from .base import RestController from ..forms.gateway import GatewayForm from ..models.gateway import Gateway from ..controllers.data import DataController from ..controllers.senseware_node import SensewareNodeController -import datetime -import json -import psycopg2 -from werkzeug.datastructures import MultiDict +from .constants import ( + GATEWAY_TYPES, +) class GatewayController(RestController): """A sensor controller.""" @@ -25,7 +27,21 @@ class GatewayController(RestController): Retrieve a list of gateways """ result = super().index(filter_data) - # If the user is also requesting data, get data + """ New way of accessing data """ + if 'data' in filter_data and 'from' in filter_data and result: + # Access data for gateways that don't have seperate nodes + for gateway in result: + if gateway['sensor_type'] == GATEWAY_TYPES['awair']: + args = MultiDict([ + ('new', ''), + ('from', filter_data['from']), + ('type', GATEWAY_TYPES['awair']), + ('device_id', gateway['gateway_id']), + ]) + data_result = DataController().index(args) + gateway['data'] = data_result + + # Gateways that have seperate nodes (senseware) are handled differently if 'nodes' in filter_data and result: node_ids = [] for gateway in result: diff --git a/app/lib/__init__.py b/app/lib/__init__.py index 9e859d0..0537405 100644 --- a/app/lib/__init__.py +++ b/app/lib/__init__.py @@ -2,3 +2,4 @@ from .database import db from .red import redis from .auth0 import auth0_ from .redshift_database import get_redshift_db +from .new_redshift_database import get_new_redshift_db diff --git a/app/lib/new_redshift_database.py b/app/lib/new_redshift_database.py new file mode 100644 index 0000000..7340c0e --- /dev/null +++ b/app/lib/new_redshift_database.py @@ -0,0 +1,28 @@ +from flask import Flask, g +from urllib.parse import urlparse +import psycopg2 + +app = Flask(__name__) + +def get_new_redshift_db(app): + db = getattr(g, 'new_redshift_database', None) + if db is None: + db = g._database = connect_to_database(app) + return db + +@app.teardown_appcontext +def teardown_db(exception): + db = getattr(g, 'new_redshift_database', None) + if db is not None: + db.close() + +def connect_to_database(app): + uri = app.config.get('NEW_REDSHIFT_DATABASE_URI') + uri_data = urlparse(uri) + return psycopg2.connect( + user=uri_data.username, + password=uri_data.password, + host=uri_data.hostname, + port=5439, + dbname=uri_data.path[1:] + ) -- GitLab From 18143d4d02b3cf193c0a1b0e2eff16f91b68dec2 Mon Sep 17 00:00:00 2001 From: Conrad Date: Wed, 11 Apr 2018 10:04:44 -0400 Subject: [PATCH 3/7] Update gateway model to include space --- app/forms/gateway.py | 3 +++ app/models/gateway.py | 1 + 2 files changed, 4 insertions(+) diff --git a/app/forms/gateway.py b/app/forms/gateway.py index b1279c2..83f0a99 100644 --- a/app/forms/gateway.py +++ b/app/forms/gateway.py @@ -31,6 +31,9 @@ class GatewayForm(Form, UserForm): placement_type = wtf.IntegerField( validators=[wtf.validators.Optional()] ) + space_id = wtf.IntegerField( + validators=[wtf.validators.Optional()] + ) notes = wtf.StringField( validators=[wtf.validators.Optional()] ) diff --git a/app/models/gateway.py b/app/models/gateway.py index 1d3de69..8bc2956 100644 --- a/app/models/gateway.py +++ b/app/models/gateway.py @@ -19,4 +19,5 @@ class Gateway(Model, User, Tracked, db.Model): gateway_id = db.Column(db.Integer) installer_name = db.Column(db.Unicode(50)) placement_type = db.Column(db.Integer) + space_id = db.Column(db.Integer) notes = db.Column(db.Unicode(500)) -- GitLab From 819134c8e7680b9e7f9246ab36449b1ffc27d6dd Mon Sep 17 00:00:00 2001 From: Conrad Date: Wed, 11 Apr 2018 10:21:37 -0400 Subject: [PATCH 4/7] Make space id nullable --- app/forms/gateway.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/app/forms/gateway.py b/app/forms/gateway.py index 83f0a99..794bd35 100644 --- a/app/forms/gateway.py +++ b/app/forms/gateway.py @@ -1,6 +1,23 @@ import wtforms as wtf from .base import Form, UserForm +class NullableIntegerField(wtf.IntegerField): + """ + An IntegerField where the field can be null if the input data is an empty + string. + """ + + def process_formdata(self, valuelist): + if valuelist: + if valuelist[0] == '' or valuelist[0] is None: + self.data = None + else: + try: + self.data = int(valuelist[0]) + except ValueError: + self.data = None + raise ValueError(self.gettext('Not a valid integer value')) + class GatewayForm(Form, UserForm): """ A form for validating building sensor requests.""" @@ -31,7 +48,7 @@ class GatewayForm(Form, UserForm): placement_type = wtf.IntegerField( validators=[wtf.validators.Optional()] ) - space_id = wtf.IntegerField( + space_id = NullableIntegerField( validators=[wtf.validators.Optional()] ) notes = wtf.StringField( -- GitLab From e79d8bfc9bef0aec915ed2e1094c6b77e0004bd4 Mon Sep 17 00:00:00 2001 From: Conrad Date: Wed, 11 Apr 2018 17:25:43 -0400 Subject: [PATCH 5/7] Add order by clause --- app/controllers/data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/controllers/data.py b/app/controllers/data.py index 585f79e..a03c62e 100644 --- a/app/controllers/data.py +++ b/app/controllers/data.py @@ -41,6 +41,7 @@ class DataController(RestController): INNER JOIN iot.unit on unit.id = meta.unit_id WHERE awair.device_id=%s + ORDER BY ts ASC ''' db = get_new_redshift_db(current_app) cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) -- GitLab From 488f6a629390d3ec90073eaea67dbccef2d8aba7 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 16 Apr 2018 16:19:04 -0400 Subject: [PATCH 6/7] Handle metadata coming from postgres instead of redshift --- app/controllers/data.py | 54 ++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/app/controllers/data.py b/app/controllers/data.py index a03c62e..2f3adbb 100644 --- a/app/controllers/data.py +++ b/app/controllers/data.py @@ -6,6 +6,8 @@ from werkzeug.exceptions import BadRequest from app.lib import get_redshift_db from app.lib import get_new_redshift_db +from app.lib.database import db + from .base import RestController from .constants import ( GATEWAY_TYPES, @@ -27,26 +29,44 @@ class DataController(RestController): if 'device_id' not in filter_data: raise BadRequest("'device_id' is a required field in the query params if the device is awair") device_id = filter_data.get('device_id') + res = db.session.execute(''' + SELECT meta.id as metadata_id, unit.description as unit FROM + iot.metadata as meta + INNER JOIN iot.unit ON unit.id = meta.unit_id + INNER JOIN iot.metadata_awair as awair on awair.metadata_id = meta.id + WHERE awair.device_id=:param + ''', {'param': device_id}) + + metadata_rows = res.fetchall() + # No metadata found for the specified sensor + if len(metadata_rows) == 0: + return [] sql = ''' SELECT da.ts, da.value, - awair.device_id, - unit.description as unit + da.metadata_id FROM iot.data as da - INNER JOIN - iot.metadata as meta on meta.id = da.metadata_id - INNER JOIN - iot.metadata_awair as awair on awair.metadata_id = meta.id - INNER JOIN - iot.unit on unit.id = meta.unit_id - WHERE awair.device_id=%s - ORDER BY ts ASC - ''' - db = get_new_redshift_db(current_app) - cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute(sql, (device_id,)) - results = [dict(record) for record in cur] + WHERE + da.metadata_id in ({}) + '''.format( + ('%s,' * len(metadata_rows))[:-1] + ) + insert_tuple = tuple() + id_to_unit = {} + for row in metadata_rows: + insert_tuple += (row[0],) + id_to_unit[row[0]] = row[1] + + redshift_db = get_new_redshift_db(current_app) + cur = redshift_db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute(sql, insert_tuple) + results = [] + for record in cur: + row = dict(record) + row['unit'] = id_to_unit[row['metadata_id']] + results.append(row) + cur.close() return results return [] @@ -79,8 +99,8 @@ class DataController(RestController): where_tuple = where_tuple + (filter_data['unit_id'],) sql = "SELECT ts, value, unit, unit_id, name, sn, mod, channel_id FROM data {} ORDER BY ts".format(where_statement) - db = get_redshift_db(current_app) - cur = db.cursor(cursor_factory=psycopg2.extras.DictCursor) + redshift_db = get_redshift_db(current_app) + cur = redshift_db.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(sql, where_tuple) results = [dict(record) for record in cur] cur.close() -- GitLab From 1c62631452fc09ab62e30be464233c00a6da384f Mon Sep 17 00:00:00 2001 From: Conrad Date: Thu, 26 Apr 2018 12:14:44 -0400 Subject: [PATCH 7/7] Update endpoint to use new database --- app/config/development.default.py | 1 - app/config/local.default.py | 1 - app/config/production.default.py | 1 - app/config/staging.default.py | 1 - app/controllers/data.py | 197 +++++++++++++++++------------- app/controllers/gateway.py | 88 ++++++------- app/lib/__init__.py | 1 - app/lib/new_redshift_database.py | 28 ----- 8 files changed, 154 insertions(+), 164 deletions(-) delete mode 100644 app/lib/new_redshift_database.py diff --git a/app/config/development.default.py b/app/config/development.default.py index cd90659..963406e 100644 --- a/app/config/development.default.py +++ b/app/config/development.default.py @@ -4,7 +4,6 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] -NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] diff --git a/app/config/local.default.py b/app/config/local.default.py index d3de22e..0273fd7 100644 --- a/app/config/local.default.py +++ b/app/config/local.default.py @@ -1,7 +1,6 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = 'sqlite://' REDSHIFT_DATABASE_URI = 'redshift://' -NEW_REDSHIFT_DATABASE_URI = 'redshift://' REDIS_URI = 'redis://127.0.0.1:6379/' diff --git a/app/config/production.default.py b/app/config/production.default.py index 9b05d92..4e30ea1 100644 --- a/app/config/production.default.py +++ b/app/config/production.default.py @@ -4,7 +4,6 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] -NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] diff --git a/app/config/staging.default.py b/app/config/staging.default.py index fe8a9f4..3fa2a76 100644 --- a/app/config/staging.default.py +++ b/app/config/staging.default.py @@ -4,7 +4,6 @@ import os SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_DATABASE_URI = os.environ['DBURI'] REDSHIFT_DATABASE_URI = os.environ['REDSHIFT_DATABASE_URI'] -NEW_REDSHIFT_DATABASE_URI = os.environ['NEW_REDSHIFT_DATABASE_URI'] REDIS_URI = os.environ['REDISURI'] diff --git a/app/controllers/data.py b/app/controllers/data.py index 2f3adbb..ded3ccb 100644 --- a/app/controllers/data.py +++ b/app/controllers/data.py @@ -5,7 +5,6 @@ import psycopg2 from werkzeug.exceptions import BadRequest from app.lib import get_redshift_db -from app.lib import get_new_redshift_db from app.lib.database import db from .base import RestController @@ -18,91 +17,123 @@ class DataController(RestController): """A data controller.""" def index(self, filter_data): - if 'new' in filter_data: - if 'from' not in filter_data: - raise BadRequest("'from' is a required field in the query params") - if 'type' not in filter_data: - raise BadRequest("'type' is a required field in the query params") - date = filter_data.get('from') - gateway_type = filter_data.get('type') - if gateway_type == GATEWAY_TYPES['awair']: - if 'device_id' not in filter_data: - raise BadRequest("'device_id' is a required field in the query params if the device is awair") - device_id = filter_data.get('device_id') - res = db.session.execute(''' + ''' + Takes in device data like + device[]=$DEVICE_TYPE::$DEVICE_SPECIFIC_ID_1::$DEVICE_SPECIFIC_ID_2_OPTIONAL + ''' + import time + start = time.clock() + if 'from' not in filter_data: + raise BadRequest("'from' is a required field in the query params") + if 'device[]' not in filter_data: + raise BadRequest("'device[]' is a required field in the query params") + unit_id = filter_data.get('unit_id') + date = filter_data.get('from') + devices = filter_data.getlist('device[]') + metadata_id_dict = {} + for device in devices: + device_info = device.split('::') + device_type = device_info[0] + + if device_type == 'awair': + if len(device_info) < 2: + raise BadRequest("device_id is a required field in the device[] query param if the device is awair") + device_id = device_info[1] + query = ''' SELECT meta.id as metadata_id, unit.description as unit FROM iot.metadata as meta INNER JOIN iot.unit ON unit.id = meta.unit_id INNER JOIN iot.metadata_awair as awair on awair.metadata_id = meta.id WHERE awair.device_id=:param - ''', {'param': device_id}) - - metadata_rows = res.fetchall() - # No metadata found for the specified sensor - if len(metadata_rows) == 0: - return [] - sql = ''' - SELECT - da.ts, da.value, - da.metadata_id - FROM - iot.data as da - WHERE - da.metadata_id in ({}) - '''.format( - ('%s,' * len(metadata_rows))[:-1] - ) - insert_tuple = tuple() - id_to_unit = {} - for row in metadata_rows: - insert_tuple += (row[0],) - id_to_unit[row[0]] = row[1] - - redshift_db = get_new_redshift_db(current_app) - cur = redshift_db.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute(sql, insert_tuple) - results = [] - for record in cur: - row = dict(record) - row['unit'] = id_to_unit[row['metadata_id']] - results.append(row) - - cur.close() - return results + ''' + params = {'param': device_id} + if unit_id: + query += ''' + AND unit.id=:unit_param + ''' + params['unit_param'] = unit_id + + res = db.session.execute(query, params) + + metadata_id_rows = res.fetchall() + for metadata_id_row in metadata_id_rows: + metadata_id = metadata_id_row[0] + metadata_id_dict[metadata_id] = { + 'type': device_type, + 'device_id': device_id, + 'unit': metadata_id_row[1], + 'metadata_id': metadata_id, + 'data': [], + } + + elif device_type == 'senseware': + if len(device_info) < 3: + raise BadRequest("'mod' and 'sn' are required fields in the device[] query param if the device is senseware") + mod = device_info[1] + sn = device_info[2] + query = ''' + SELECT meta.id as metadata_id, unit.description as unit FROM + iot.metadata as meta + INNER JOIN iot.unit ON unit.id = meta.unit_id + INNER JOIN iot.metadata_senseware as sense on sense.metadata_id = meta.id + WHERE sense.mod=:mod_param + AND sense.sn=:sn_param + ''' + params = { + 'mod_param': mod, + 'sn_param': sn, + } + if unit_id: + query += ''' + AND unit.id=:unit_param + ''' + params['unit_param'] = unit_id + + res = db.session.execute(query, params) + + metadata_id_rows = res.fetchall() + for metadata_id_row in metadata_id_rows: + metadata_id = metadata_id_row[0] + metadata_id_dict[metadata_id] = { + 'type': device_type, + 'sn': sn, + 'mod': mod, + 'unit': metadata_id_row[1], + 'metadata_id': metadata_id, + 'data': [], + } + + # No metadata found for the specified sensor + if len(metadata_id_dict.keys()) == 0: return [] - - else: - if 'from' not in filter_data: - raise BadRequest("'from' is a required field in the query params") - date = filter_data.get('from') - gateway_serials = filter_data.getlist('gateway_serial[]') - node_module_numbers = filter_data.getlist('node_id[]') - - # Set up the WHERE statement with date info - where_statement = 'WHERE ts>%s AND (' - where_tuple = (date,) - - # Add the gateway serials to hte WHERE statement - for gateway_serial in gateway_serials: - where_statement += "sn=%s OR " - where_tuple = where_tuple + (gateway_serial,) - - # Add the node_module_ids to hte WHERE statement - for node_module_number in node_module_numbers: - where_statement += "mod=%s OR " - where_tuple = where_tuple + (node_module_number,) - - where_statement = where_statement[:-4] + ')' - - if 'unit_id' in filter_data: - where_statement += 'AND unit_id=%s' - where_tuple = where_tuple + (filter_data['unit_id'],) - - sql = "SELECT ts, value, unit, unit_id, name, sn, mod, channel_id FROM data {} ORDER BY ts".format(where_statement) - redshift_db = get_redshift_db(current_app) - cur = redshift_db.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute(sql, where_tuple) - results = [dict(record) for record in cur] - cur.close() - - return results + sql = ''' + SELECT + da.ts, da.value, + da.metadata_id + FROM + iot.data as da + WHERE + da.metadata_id in ({}) + AND + ts > %s + ORDER BY ts ASC + '''.format( + ('%s,' * len(metadata_id_dict.keys()))[:-1] + ) + insert_tuple = tuple() + for metadata_id in metadata_id_dict.keys(): + insert_tuple += (metadata_id,) + insert_tuple += (date,) + + redshift_db = get_redshift_db(current_app) + cur = redshift_db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute(sql, insert_tuple) + for record in cur: + row = dict(record) + return_device = metadata_id_dict[row['metadata_id']] + row['unit'] = return_device['unit'] + row.pop('metadata_id') + return_device['data'].append(row) + + cur.close() + return metadata_id_dict.values() diff --git a/app/controllers/gateway.py b/app/controllers/gateway.py index 594d42f..17faf62 100644 --- a/app/controllers/gateway.py +++ b/app/controllers/gateway.py @@ -26,56 +26,48 @@ class GatewayController(RestController): """ Retrieve a list of gateways """ - result = super().index(filter_data) + gateway_result = super().index(filter_data) """ New way of accessing data """ - if 'data' in filter_data and 'from' in filter_data and result: + if 'data' in filter_data and 'from' in filter_data and gateway_result: # Access data for gateways that don't have seperate nodes - for gateway in result: + args = MultiDict([('from', filter_data['from'])]) + if 'unit_id' in filter_data: + args.add('unit_id', filter_data.get('unit_id')) + for gateway in gateway_result: if gateway['sensor_type'] == GATEWAY_TYPES['awair']: - args = MultiDict([ - ('new', ''), - ('from', filter_data['from']), - ('type', GATEWAY_TYPES['awair']), - ('device_id', gateway['gateway_id']), - ]) - data_result = DataController().index(args) - gateway['data'] = data_result + args.add('device[]', 'awair::{}'.format(gateway['gateway_id'])) + gateway['data'] = [] + if gateway['sensor_type'] == GATEWAY_TYPES['senseware']: + node_args = MultiDict([('gateway_id', gateway['id'])]) + node_result = SensewareNodeController().index(node_args) + for node in node_result: + args.add('device[]', 'senseware::{}::{}'.format( + node['node_id'], + gateway['gateway_serial'], + )) + node['data'] = [] + gateway['nodes'] = node_result + data_result_list = DataController().index(args) + # Parse through the data response and add it to the correct gateways and nodes + for data_result in data_result_list: - # Gateways that have seperate nodes (senseware) are handled differently - if 'nodes' in filter_data and result: - node_ids = [] - for gateway in result: - multi_dict_args = MultiDict([('gateway_id', gateway['id'])]) - node_result = SensewareNodeController().index(multi_dict_args) - for node in node_result: - if (node['node_id']): - node_ids.append(node['node_id']) - gateway['nodes'] = node_result + for gateway in gateway_result: + if GATEWAY_TYPES[data_result['type']] == gateway['sensor_type']: + if data_result['type'] == 'awair': + if data_result['device_id'] == gateway['gateway_id']: + gateway['data'].extend(data_result['data']) + elif data_result['type'] == 'senseware': + for node in gateway['nodes']: + if ( + data_result['mod'] == node['node_id'] and + data_result['sn'] == gateway['gateway_serial'] + ): + node['data'].extend(data_result['data']) - if 'data' in filter_data and 'from' in filter_data and result: - args = [('node_id[]', node_id) for node_id in node_ids] - data_mapping = {} - # Continue if any of the gateways have serials - if args: - args.append(('from', filter_data.get('from'))) - - if 'unit_id' in filter_data: - args.append(('unit_id', filter_data.get('unit_id'))) - - multi_dict_args = MultiDict(args) - data_result = DataController().index(multi_dict_args) - # Map data to gateway serial numbers - for record in data_result: - key = str(record['mod']) - if key not in data_mapping: - data_mapping[key] = [] - data_mapping[key].append(record) - # Add the data to the nodes - for gateway in result: - for node in gateway['nodes']: - if node['node_id'] in data_mapping: - node['data'] = data_mapping[node['node_id']] - else: - node['data'] = [] - - return result + elif 'nodes' in filter_data and gateway_result: + for gateway in gateway_result: + if gateway['sensor_type'] == GATEWAY_TYPES['senseware']: + node_args = MultiDict([('gateway_id', gateway['id'])]) + node_result = SensewareNodeController().index(node_args) + gateway['nodes'] = node_result + return gateway_result diff --git a/app/lib/__init__.py b/app/lib/__init__.py index 0537405..9e859d0 100644 --- a/app/lib/__init__.py +++ b/app/lib/__init__.py @@ -2,4 +2,3 @@ from .database import db from .red import redis from .auth0 import auth0_ from .redshift_database import get_redshift_db -from .new_redshift_database import get_new_redshift_db diff --git a/app/lib/new_redshift_database.py b/app/lib/new_redshift_database.py deleted file mode 100644 index 7340c0e..0000000 --- a/app/lib/new_redshift_database.py +++ /dev/null @@ -1,28 +0,0 @@ -from flask import Flask, g -from urllib.parse import urlparse -import psycopg2 - -app = Flask(__name__) - -def get_new_redshift_db(app): - db = getattr(g, 'new_redshift_database', None) - if db is None: - db = g._database = connect_to_database(app) - return db - -@app.teardown_appcontext -def teardown_db(exception): - db = getattr(g, 'new_redshift_database', None) - if db is not None: - db.close() - -def connect_to_database(app): - uri = app.config.get('NEW_REDSHIFT_DATABASE_URI') - uri_data = urlparse(uri) - return psycopg2.connect( - user=uri_data.username, - password=uri_data.password, - host=uri_data.hostname, - port=5439, - dbname=uri_data.path[1:] - ) -- GitLab