Postgres SQL CLI
We will start this off by showing how to connect to the Postgres
server from the command line. This is accomplished using the
psql
command. The IP address is internal to the GCP so it
will not be accessible from the outside without first remoting into a
server on the network.
psql -h 10.26.48.5 -U roger -d analytics_poc
Type in your password when requested and you should be connected. My
primary use of the CLI was to get a list of the tables existing in the
database as I was testing my Python code to add and drop tables. You can
get a list of tables using the \dt
command. The full
documentation is here.
Connecting to Postgres from Python
This is very straightforward using SQL Alchemy. I don’t use ORM, or really any SQL Alchemy features, but for some reason pandas complains if you try to connect by any other means than SQL Alchemy.
# Postgres Connection
import os
= os.getenv('PGCON')
pw from sqlalchemy import create_engine, text
= create_engine('postgresql+psycopg2://roger:' + pw + '@10.26.48.5/analytics_poc', pool_recycle=3600);
alchemyEngine = alchemyEngine.connect(); pgcon
Reading from Postgres
Notice above that we imported the text
command from SQL
Alchemy. Here is the first quirk of Postgres. Normally you could just
use pd.read_sql()
to read from a database, but for some
reason you have to wrap each SQL statement with text()
. No
way I am typing that with each query. Also, need to access both BigQuery
and Postgres in the same codebase, so I created a series of helper
function prefixed by pg_
that already store the database
connection so I don’t have to pass that in. Here is the helper that
wraps text()
around a SQL statement and returns the
DataFrame of the results.
Now is also a good time to point out a second quirk of Postgres SQL. If a prior database command resulted in an error, no further database actions (including reads) will work until the last transaction is rolled back. There is not harm in rolling back, so I just issue that statement with each query.
def pg_sql(s):
pgcon.rollback()return pd.read_sql(text(s), pgcon)
Getting a list of tables
This helper function returns a list of all the tables in the database.
def pg_sql_table_names():
return pg_sql("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
Table Schema
When appending to tables it can be tricky to get all the data types correct, especially since I started off writing to the tables in R and then switched to Python and didn’t want to re-create the tables because Metabase dashboards were linked to them. Here is a helper function to show the table schema.
def pg_sql_table_schema(tblName):
pgcon.rollback()return pg_sql(f"SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_name = '{tblName}';")
The execute statement
You can’t use pandas command for tasks like delete
or
alter table
, so I created a helper function for that as
well. Also, Postgres does not automatically commit. There is a setting
to turn that on, but since I am writing all these helper functions, I
just add in the commit()
command. As before, I start off
with rollback()
and wrap the actual SQL command with
text()
.
def pg_execute(txt):
pgcon.rollback()
pgcon.execute(text(txt))
pgcon.commit()
Writing to a table
This helper function uses pd.to_sql()
to save a
DataFrame to a table and then commit it. It prints saved
if
successful and error
if unsuccessful. If the table already
exists, it will append. Even if the table doesn’t exist, it will still
get created, so there is no harm in using append unless you actually did
want to replace the table. If a table is being used by Metabase you
don’t want to replace it, better to delete the contents and then write
the new data.
def pg_save(dat, tblName):
try:
='append')
dat.to_sql(tblName, pgcon, if_exists
pgcon.commit()print("saved")
except:
pgcon.rollback()print("error")
Most embarassing function
I was struggling with append to certain tables used by Metabase. For some reason I could not get the types to match, especially the date type. I was also have touble with the field names because they contained capital letters which is another quirk of Postgres, it doesn’t like capital letters in column names. In the end I created a very convoluted helper function to writes the data to a new table, copies the data into the table I actually want to append to, and then drops the first table. It also changes all the field names to lowercase and casts the date so the append works. It is not very pretty, but it gets the job done.
def pg_insert(df, tblName):
= df.columns.str.lower()
df.columnstry:
f"drop table {tblName}2")
pg_execute(except:
pass
f"{tblName}2")
pg_save(df,
= pg_sql(f"select * from {tblName}2 limit 1")
test = test.columns.values.tolist()[1:]
names = ", ".join(names)
txt if "month" in txt:
= txt.replace("month", "TO_DATE(month, 'YYYY-MM-DD') as month")
txt if "date" in txt:
= txt.replace("date", "TO_DATE(date, 'YYYY-MM-DD') as date")
txt = txt.replace("extract", "TO_DATE(extract, 'YYYY-MM-DD') as extract")
txt f"insert into {tblName} select {txt} from {tblName}2")
pg_execute(f"drop table {tblName}2") pg_execute(
Addendum on BigQuery
I also had a problem appending data to BigQuery, so I thought I would
mention it here in case it helps anyone. I had to add tell pandas_gbq
more details about the date column (which I store in
datetag
) like so:
table_schema = [{'name':datetag,'type': 'DATE'}]
.
="append", table_schema = [{'name':datetag,'type': 'DATE'}]) pandas_gbq.to_gbq(comb, outName, if_exists