diff --git a/Dockerfile b/Dockerfile index 6c3c25b9d796e3b98d81bc3922b5bb1f6e4dfb48..4e8f257bb2d3ac359a405fbd4d74119656d63994 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,7 +38,7 @@ RUN apt-get install -y zip # Install node RUN apt-get install -y curl build-essential libssl-dev -ENV NODE_VERSION 5.10.0 +ENV NODE_VERSION 8.9.1 ENV NVM_DIR /usr/local/nvm RUN \ curl https://raw.githubusercontent.com/creationix/nvm/v0.16.1/install.sh | bash && \ @@ -50,6 +50,11 @@ RUN \ ENV NODE_PATH $NVM_DIR/v$NODE_VERSION/lib/node_modules ENV PATH $NVM_DIR/v$NODE_VERSION/bin:$PATH +# Make a directory to place senseware things. +RUN mkdir /var/senseware +RUN chmod 755 /var/senseware +COPY ./scripts/ftp_refresh.py /var/senseware/ftp_refresh.py + # Clean up apt. RUN rm -rf /var/lib/apt/lists/* diff --git a/scripts/ftp_refresh.py b/scripts/ftp_refresh.py new file mode 100644 index 0000000000000000000000000000000000000000..e57c29024e6e9f5e06810c793c132468a964473c --- /dev/null +++ b/scripts/ftp_refresh.py @@ -0,0 +1,163 @@ +from urllib.parse import urlparse +import threading +from queue import Queue +import time +import json +import os +import psycopg2 +import psycopg2.extras +import glob + + +class RedshiftWrapper(object): + """A wrapper for Redshift.""" + db = None + uri = '' + + def __init__(self): + """Stores information about the redis database from the URI.""" + self.uri = os.environ['REDSHIFT_DATABASE_URI'] + + self.db = self.make_connection() + + def make_connection(self): + uri_data = urlparse(self.uri) + return psycopg2.connect( + user=uri_data.username, + password=uri_data.password, + host=uri_data.hostname, + port=5439, + dbname=uri_data.path[1:] + ) + + def get_cursor(self): + try: + cursor = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) + except: + self.db.close() + self.db = self.make_connection() + cursor = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) + return cursor + + def execute(self, cursor, sql, where_tuple=()): + """ Execute a database call and handle the InterfaceError + Returns the cursor that should be used to make future calls + """ + retry_counter = 0 + while retry_counter < 5: + retry_counter += 1 + try: + cursor.execute(sql, where_tuple) + break + except psycopg2.ProgrammingError as e: + print( + 'ProgrammingError while executing redshift db call: {}'.format(e) + ) + self.db.rollback() + break + except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: + print( + 'Interface or Operational Error while executing redshift db call: {}'.format(e) + ) + self.db = self.make_connection() + cursor = self.get_cursor() + return cursor + + +# An object to keep track of the most recent date +class RecentDate(object): + newest_date = None + +# lock to serialize console output +lock = threading.Lock() + +def do_redshift_work(item, redshift, cursor): + insert_sql = """ + INSERT INTO data_temporary (ts, value, sn, unit, channel_id, mod, unit_id) + VALUES + """ + insert_tuple = () + newest_date = 0 + for point in item: + insert_sql += "(TIMESTAMP 'epoch' + %s * INTERVAL '1 Second ', %s, %s, %s, %s, %s, %s)," + unit_id = None + unit_mapping = { + u'\xb0F': 1, + '%': 2, + 'V': 3, + } + if point['channel_unit'] in unit_mapping: + unit_id = unit_mapping[point['channel_unit']] + insert_tuple = insert_tuple + ( + point['channel_received_timestamp'], # ts + point['channel_converted_value'], # value + point['sn'], # sn + point['channel_unit'], # unit + point['channel_id'], # channel_id + point['mod'], # mod + unit_id, # unit_id + ) + if point['channel_received_timestamp'] > newest_date: + newest_date = point['channel_received_timestamp'] + insert_sql = insert_sql[:-1] + cursor = redshift.execute(cursor, insert_sql, insert_tuple) + + print('REDSHIFT', threading.current_thread().name, 'Ran sql', len(item)) + return cursor, newest_date + +# The worker thread pulls an item from the queue and processes it +def redshift_worker(redshift, timestamp): + cursor = redshift.get_cursor() + with lock: + print('CONFIGURED REDSHIFT CONNECTION', threading.current_thread()) + while True: + print('REDSHIFT', threading.current_thread().name, 'Number of items', redshift_q.qsize()) + item = redshift_q.get() + cursor, newest_date = do_redshift_work(item, redshift, cursor) + # Errors are usually issues with connections, end this thread + with lock: + if not timestamp.newest_date or newest_date > timestamp.newest_date: + timestamp.newest_date = newest_date + redshift_q.task_done() + + +start = time.perf_counter() +# Create the queue and thread pool. +redshift = RedshiftWrapper() +timestamp = RecentDate() +cursor = redshift.get_cursor() +cursor = redshift.execute(cursor, 'CREATE TABLE data_temporary AS SELECT * FROM data WHERE 1=2;') +redshift.db.commit() +cursor.close() +redshift_q = Queue() +for i in range(20): + t = threading.Thread(target=redshift_worker, args=(redshift, timestamp)) + t.daemon = True # thread dies when main thread (only non-daemon thread) exits. + t.start() + +DIRECTORY = os.environ['DIRECTORY'] +files = glob.glob(DIRECTORY + '/**/*.json', recursive=True) + +for file_name in files: + data = json.load(open(file_name)) + redshift_q.put(data) + + +# stuff work items on the queue (in this case, just a number). +redshift_q.join() # block until all tasks are done +redshift.db.commit() + +print('Commited') +print('Newest date', timestamp.newest_date) +print('time:', time.perf_counter() - start) +cursor = redshift.get_cursor() +print('Now inserting all of the new data') +cursor = redshift.execute(cursor, """ + INSERT INTO data_temporary + (SELECT * FROM data WHERE ts > TIMESTAMP 'epoch' + %s * INTERVAL '1 Second '); +""", (timestamp.newest_date,)) +print('Finished inserting all of the new data, commiting') +redshift.db.commit() +print('Succesfully comitted. Exiting') +exit() +