#!/usr/bin/python import psycopg2 import csv import os def get_conf(): '''return db connection settings''' settings = { "host": os.environ.get('POSTGRES_HOST'), "port": os.environ.get('POSTGRES_PORT'), "database": os.environ.get('POSTGRES_DB'), "user": os.environ.get('POSTGRES_USER'), "password": os.environ.get('POSTGRES_PASSWORD') } return settings def connect_db(): '''open and return a connection to the database''' conn = None db_settings = get_conf() print("Connecting to the PostgreSQL database...") try: conn = psycopg2.connect(**db_settings) print("Connection success") except (Exception, psycopg2.DatabaseError) as error: print(error) return conn def add_item(itemid, skuid, choice, attributes, image): '''insert a new item in the database''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" INSERT INTO item (uuid, itemid, skuid, choice, attributes, image) VALUES (nextval('uuid_sequence'), %s, %s, %s, %s, %s) """, (itemid, skuid, choice, attributes, image)) connection.commit() connection.close() def add_history_entry(itemid, skuid, choice, attributes, image, price, currency, quantity, discount_percentage): '''Add a new history entry for an item in the database. If item isn't in database yet, add it.''' connection = connect_db() cursor = connection.cursor() if not check_exist(itemid, skuid): add_item(itemid, skuid, choice, attributes, image) cursor.execute(""" SELECT uuid FROM item WHERE itemid = %s AND skuid = %s """, (itemid, skuid)) uuid = cursor.fetchall()[0] cursor.execute(""" INSERT INTO history (uuid, price, currency, quantity, discount_percentage, h_timestamp) VALUES (%s, %s, %s, %s, %s, (SELECT LOCALTIMESTAMP)) """, (uuid, price, currency, quantity, discount_percentage)) connection.commit() connection.close() def get_history(): '''return history data from database''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" SELECT uuid, quantity, discount_percentage, price, currency, h_timestamp FROM history """) results = cursor.fetchall() cursor.close() connection.close() return results def get_item(): '''return items data from database''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" SELECT uuid, itemid, skuid, choice, attributes, image FROM item """) results = cursor.fetchall() cursor.close() connection.close() return results def get_item_keys(): '''return items id and attributes from database''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" SELECT itemid, attributes FROM item """) results = cursor.fetchall() cursor.close() connection.close() return results def check_exist(itemid, skuid): '''check if an item is already in the database''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" SELECT uuid FROM item WHERE itemid = %s AND skuid = %s """, (itemid, skuid)) result = cursor.rowcount cursor.close() connection.close() if result == 0: return False else: return True def delete_item(uuid): ''' delete item and its history from database ''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" DELETE FROM history WHERE uuid = %s """, (uuid,)) cursor.execute(""" DELETE FROM item WHERE uuid = %s """, (uuid,)) cursor.close() connection.commit() connection.close() def export_csv(): '''join item and history data from database and export it in ./output.csv''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" SELECT i.uuid, i.itemid, i.skuid, i.choice, i.attributes, i.image, h.quantity, h.discount_percentage, h.price, h.currency, h.h_timestamp FROM item i, history h WHERE i.uuid = h.uuid """) results = cursor.fetchall() with open(os.path.dirname(os.path.realpath(__file__))+"/output.csv", 'w') as csv_file: # Create a CSV writer writer = csv.writer(csv_file) # write the column names writer.writerow([col[0] for col in cursor.description]) # write the query results writer.writerows(results) cursor.close() connection.close() def initialize(): '''Create tables and sequence in database. Drop them first if they already exist.''' connection = connect_db() cursor = connection.cursor() cursor.execute(""" DROP TABLE IF EXISTS history """) cursor.execute(""" DROP TABLE IF EXISTS item """) cursor.execute(""" DROP SEQUENCE IF EXISTS uuid_sequence """) cursor.execute(""" CREATE SEQUENCE uuid_sequence INCREMENT BY 1 START WITH 1 """) cursor.execute(""" CREATE TABLE item ( uuid int, itemid bigint, skuid bigint, choice boolean, attributes text[], image text, primary key (uuid) ) """) cursor.execute(""" CREATE TABLE history ( uuid int, quantity integer, discount_percentage numeric(2), price money, currency varchar(4), h_timestamp timestamp, foreign key (uuid) references item(uuid), primary key (uuid, h_timestamp) ) """) connection.commit() connection.close() print("Database initialized") def check_schema(): connection = connect_db() cursor = connection.cursor() cursor.execute(""" select exists( select * from pg_tables where tablename = 'item' ) and exists( select * from pg_tables where tablename = 'history' ); """) result = cursor.fetchall() cursor.close() connection.close() return result[0][0]