PostgreSQL to Iceberg Snapshot Sync (Using DuckDB)

  ·   2 min read

Why sync from Postgres to Iceberg: you want an analytics optimized snapshot of your data to carry out OLAP-style queries on. This can be accomplished using DuckDB plus a bit of glue code. Specifically, this post relies on the following key features of DuckDB:

I’ll also use Python and PyIceberg. For the iceberg catalog, I’ll use sqlite. Let’s get right to it.

First, let’s set up the catalog:

import os

from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = os.path.abspath("./warehouse")
os.makedirs(warehouse_path, exist_ok=True)

os.makedirs(warehouse_path, exist_ok=True)
catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

Next, let’s set up duckdb and connect it to postgres:

import duckdb

conn = duckdb.connect(":memory:")
db_name = os.getenv("PG_DATABASE")
pg = {
    "database": db_name,
    "user": os.getenv("PG_USER")
    "password": os.getenv("PG_PASSWORD"),
    "host": os.getenv("PG_HOST")
    "port": os.getenv("PG_PORT")
}

conn.execute("install postgres")
conn.execute("load postgres")
conn.execute(
    """attach 'host={} port={} dbname={} user={} password={}'
    as {} (type postgres, read_only)
    """.format(
        pg["host"],
        pg["port"],
        pg["database"],
        pg["user"],
        pg["password"],
        db_name,
    )
)

Let’s retrieve the schemas in Postgres plus the tables within those schemas:

  • we’re using postgres_query which is a duckdb function that runs the query it’s supplied with directly in postgres. Single quote strings have to be escaped by repeating the single quote twice
  • in postgres, we can get the list of schemas via querying the information_schema.schemata view, e.g. select * from information_schema.schemata
  • pg_catalog, pg_toast, information_schema are schemas internal to postgres that we need to filter out
  • once we have a schema name, we can get the tables within that schema via querying the pg_tables system view
# get schemas
pg_query = """
select schema_name
from information_schema.schemata
where schema_name not in (''pg_catalog'', ''pg_toast'', ''information_schema'')
 """
res = conn.sql(f"select * from postgres_query('{db_name}', '{pg_query}')")
schema_to_tables = {r[0]: [] for r in res.fetchall()}
for s in schema_to_tables:
    res = conn.sql(
        f"""
    select * from postgres_query('{db_name}',
        'select tablename from pg_tables where schemaname=''{s}''')"""
    )
    schema_to_tables[s] = [r[0] for r in res.fetchall()]

Finally, let’s sync from Postgres to Iceberg. Note that the sync’s being carried out within a transaction:

with conn.begin() as tx:
    for schema, tables in schema_to_tables.items():
        catalog.create_namespace(schema)
        for tbl_name in tables:
            full_tbl_name = f"{schema}.{tbl_name}"
            df = tx.sql(f"table {db_name}.{full_tbl_name}").arrow()
            print(df)
            table = catalog.create_table(full_tbl_name, schema=df.schema)
            table.append(df)
    tx.commit()