Postgres SQL CLI

We will start this off by showing how to connect to the Postgres server from the command line. This is accomplished using the psql command. The IP address is internal to the GCP so it will not be accessible from the outside without first remoting into a server on the network.

psql  -h 10.26.48.5 -U roger -d analytics_poc

Type in your password when requested and you should be connected. My primary use of the CLI was to get a list of the tables existing in the database as I was testing my Python code to add and drop tables. You can get a list of tables using the \dt command. The full documentation is here.

Connecting to Postgres from Python

This is very straightforward using SQL Alchemy. I don’t use ORM, or really any SQL Alchemy features, but for some reason pandas complains if you try to connect by any other means than SQL Alchemy.

# Postgres Connection
import os
pw = os.getenv('PGCON')
from sqlalchemy import create_engine, text
alchemyEngine = create_engine('postgresql+psycopg2://roger:' + pw + '@10.26.48.5/analytics_poc', pool_recycle=3600);
pgcon = alchemyEngine.connect();

Reading from Postgres

Notice above that we imported the text command from SQL Alchemy. Here is the first quirk of Postgres. Normally you could just use pd.read_sql() to read from a database, but for some reason you have to wrap each SQL statement with text(). No way I am typing that with each query. Also, need to access both BigQuery and Postgres in the same codebase, so I created a series of helper function prefixed by pg_ that already store the database connection so I don’t have to pass that in. Here is the helper that wraps text() around a SQL statement and returns the DataFrame of the results.

Now is also a good time to point out a second quirk of Postgres SQL. If a prior database command resulted in an error, no further database actions (including reads) will work until the last transaction is rolled back. There is not harm in rolling back, so I just issue that statement with each query.

def pg_sql(s):
  pgcon.rollback()
  return pd.read_sql(text(s), pgcon)

Getting a list of tables

This helper function returns a list of all the tables in the database.

def pg_sql_table_names():
  return pg_sql("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")

Table Schema

When appending to tables it can be tricky to get all the data types correct, especially since I started off writing to the tables in R and then switched to Python and didn’t want to re-create the tables because Metabase dashboards were linked to them. Here is a helper function to show the table schema.

def pg_sql_table_schema(tblName):
  pgcon.rollback()
  return pg_sql(f"SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_name = '{tblName}';")

The execute statement

You can’t use pandas command for tasks like delete or alter table, so I created a helper function for that as well. Also, Postgres does not automatically commit. There is a setting to turn that on, but since I am writing all these helper functions, I just add in the commit() command. As before, I start off with rollback() and wrap the actual SQL command with text().

def pg_execute(txt):
  pgcon.rollback()
  pgcon.execute(text(txt))
  pgcon.commit()
  

Writing to a table

This helper function uses pd.to_sql() to save a DataFrame to a table and then commit it. It prints saved if successful and error if unsuccessful. If the table already exists, it will append. Even if the table doesn’t exist, it will still get created, so there is no harm in using append unless you actually did want to replace the table. If a table is being used by Metabase you don’t want to replace it, better to delete the contents and then write the new data.

def pg_save(dat, tblName):
  try:  
    dat.to_sql(tblName, pgcon, if_exists='append')
    pgcon.commit()
    print("saved")
  except:
    pgcon.rollback()
    print("error")  

Most embarassing function

I was struggling with append to certain tables used by Metabase. For some reason I could not get the types to match, especially the date type. I was also have touble with the field names because they contained capital letters which is another quirk of Postgres, it doesn’t like capital letters in column names. In the end I created a very convoluted helper function to writes the data to a new table, copies the data into the table I actually want to append to, and then drops the first table. It also changes all the field names to lowercase and casts the date so the append works. It is not very pretty, but it gets the job done.

def pg_insert(df, tblName):
  df.columns= df.columns.str.lower()
  try:
    pg_execute(f"drop table {tblName}2")  
  except:
    pass
  pg_save(df, f"{tblName}2")
  
  test = pg_sql(f"select * from {tblName}2 limit 1")
  names = test.columns.values.tolist()[1:]
  txt = ", ".join(names) 
  if "month" in txt:
    txt = txt.replace("month", "TO_DATE(month, 'YYYY-MM-DD') as month")
  if "date" in txt:
    txt = txt.replace("date", "TO_DATE(date, 'YYYY-MM-DD') as date")
  txt = txt.replace("extract", "TO_DATE(extract, 'YYYY-MM-DD') as extract")
  pg_execute(f"insert into {tblName} select {txt} from {tblName}2")
  pg_execute(f"drop table {tblName}2")

Addendum on BigQuery

I also had a problem appending data to BigQuery, so I thought I would mention it here in case it helps anyone. I had to add tell pandas_gbq more details about the date column (which I store in datetag) like so: table_schema = [{'name':datetag,'type': 'DATE'}].

pandas_gbq.to_gbq(comb, outName, if_exists="append", table_schema = [{'name':datetag,'type': 'DATE'}])