218 lines
5.6 KiB
Python
Raw Normal View History

2023-12-28 13:38:21 +01:00
#!/usr/bin/python
import psycopg2
2024-01-13 23:13:11 +01:00
import csv
import os
2023-12-28 13:38:21 +01:00
2024-01-31 12:01:20 +01:00
def get_conf():
'''return db connection settings'''
settings = {
"host": os.environ.get('POSTGRES_HOST'),
"port": os.environ.get('POSTGRES_PORT'),
"database": os.environ.get('POSTGRES_DB'),
"user": os.environ.get('POSTGRES_USER'),
"password": os.environ.get('POSTGRES_PASSWORD')
}
return settings
def connect_db():
2024-01-27 00:57:32 +01:00
'''open and return a connection to the database'''
2023-12-28 13:38:21 +01:00
conn = None
2024-01-31 12:01:20 +01:00
db_settings = get_conf()
2023-12-28 13:38:21 +01:00
print("Connecting to the PostgreSQL database...")
try:
conn = psycopg2.connect(**db_settings)
print("Connection success")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return conn
2024-01-31 12:01:20 +01:00
def add_item(itemid, skuid, choice, attributes, image):
2024-01-27 00:57:32 +01:00
'''insert a new item in the database'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
INSERT INTO item (uuid, itemid, skuid, choice, attributes, image)
VALUES (nextval('uuid_sequence'), %s, %s, %s, %s, %s)
2024-01-24 14:44:08 +01:00
""", (itemid, skuid, choice, attributes, image))
connection.commit()
connection.close()
2024-01-31 12:01:20 +01:00
def add_history_entry(itemid, skuid, choice, attributes, image, price, currency, quantity, discount_percentage):
2024-01-27 00:57:32 +01:00
'''Add a new history entry for an item in the database. If item isn't in database yet, add it.'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
cursor = connection.cursor()
2024-01-31 12:01:20 +01:00
if not check_exist(itemid, skuid):
add_item(itemid, skuid, choice, attributes, image)
cursor.execute("""
SELECT uuid
FROM item
WHERE itemid = %s
AND skuid = %s
""", (itemid, skuid))
uuid = cursor.fetchall()[0]
cursor.execute("""
INSERT INTO history (uuid, price, currency, quantity, discount_percentage, h_timestamp)
VALUES (%s, %s, %s, %s, %s, (SELECT LOCALTIMESTAMP))
""", (uuid, price, currency, quantity, discount_percentage))
connection.commit()
connection.close()
2024-01-31 12:01:20 +01:00
def get_history():
2024-01-27 00:57:32 +01:00
'''return history data from database'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
SELECT uuid, quantity, discount_percentage, price, currency, h_timestamp
FROM history
""")
results = cursor.fetchall()
cursor.close()
connection.close()
return results
2024-01-31 12:01:20 +01:00
def get_item():
2024-01-27 00:57:32 +01:00
'''return items data from database'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
SELECT uuid, itemid, skuid, choice, attributes, image
FROM item
""")
results = cursor.fetchall()
cursor.close()
connection.close()
return results
2024-01-31 12:01:20 +01:00
def get_item_keys():
'''return items id and attributes from database'''
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
SELECT itemid, attributes
FROM item
""")
results = cursor.fetchall()
cursor.close()
connection.close()
return results
def check_exist(itemid, skuid):
2024-01-31 01:06:26 +01:00
'''check if an item is already in the database'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
2024-01-31 01:06:26 +01:00
cursor = connection.cursor()
cursor.execute("""
SELECT uuid
FROM item
WHERE itemid = %s
AND skuid = %s
""", (itemid, skuid))
result = cursor.rowcount
cursor.close()
connection.close()
if result == 0:
return False
else:
return True
2024-01-31 12:01:20 +01:00
def export_csv():
2024-01-27 00:57:32 +01:00
'''join item and history data from database and export it in ./output.csv'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
2024-01-13 23:13:11 +01:00
cursor = connection.cursor()
cursor.execute("""
SELECT i.uuid, i.itemid, i.skuid, i.choice, i.attributes, i.image, h.quantity, h.discount_percentage, h.price, h.currency, h.h_timestamp
2024-01-13 23:13:11 +01:00
FROM item i, history h
WHERE i.uuid = h.uuid
2024-01-13 23:13:11 +01:00
""")
results = cursor.fetchall()
with open(os.path.dirname(os.path.realpath(__file__))+"/output.csv", 'w') as csv_file:
# Create a CSV writer
writer = csv.writer(csv_file)
# write the column names
writer.writerow([col[0] for col in cursor.description])
# write the query results
writer.writerows(results)
cursor.close()
connection.close()
2024-01-31 12:01:20 +01:00
def initialize():
2024-01-27 00:57:32 +01:00
'''Create tables and sequence in database. Drop them first if they already exist.'''
2024-01-31 12:01:20 +01:00
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
DROP TABLE IF EXISTS history
""")
cursor.execute("""
DROP TABLE IF EXISTS item
""")
cursor.execute("""
DROP SEQUENCE IF EXISTS uuid_sequence
""")
cursor.execute("""
CREATE SEQUENCE uuid_sequence
INCREMENT BY 1
START WITH 1
""")
cursor.execute("""
CREATE TABLE item
(
uuid int,
itemid bigint,
skuid bigint,
choice boolean,
attributes text[],
2024-01-24 14:44:08 +01:00
image text,
primary key (uuid)
)
""")
cursor.execute("""
CREATE TABLE history
(
uuid int,
quantity integer,
discount_percentage numeric(2),
price money,
currency varchar(4),
h_timestamp timestamp,
foreign key (uuid) references item(uuid),
primary key (uuid, h_timestamp)
)
""")
connection.commit()
connection.close()
print("Database initialized")
def check_schema():
connection = connect_db()
cursor = connection.cursor()
cursor.execute("""
select
exists(
select *
from pg_tables
where tablename = 'item'
)
and exists(
select *
from pg_tables
where tablename = 'history'
);
""")
result = cursor.fetchall()
cursor.close()
connection.close()
return result[0][0]