Upserting Data using Spark and Iceberg

May 25, 2023

Jonathan Merlevede

Use Spark and Iceberg’s MERGE INTO syntax to efficiently store daily, incremental snapshots of a mutable source table.

Iceberg allows tracking a table’s history by storing incremental diffs. Unfortunately, there are some caveats, and getting this to work as you likely want it to requires non-obvious querying. In this post, we look at the why and the how.

We use Spark as our analytical engine, but this post should also apply to other engines working with Iceberg.

Problem setting

A typical pattern in analytical data processing is to ingest data from an operational system on a daily basis. Often, we store previously ingested table versions in addition to the current state of things, for example, to support reproducing machine learning results. When working with “standard” Spark and Parquet, we do this by storing daily snapshots and by partitioning on the ingestion date.We aim to use Apache Iceberg to achieve the same result — storing a queryable history of table snapshots — more efficiently. Iceberg is a project offering a metadata format and a set of execution engine plugins. This extends popular analytical processing engines like Spark, Flink, and Trino with features such as incremental updates, time travel, and ACID transactions.

Upserting

Using Spark with Iceberg unlocks the SQL MERGE INTO statement, which implements a table “upsert”, a portmanteau formed by combining a “table insert” and “table update”:The code above uses the result of the SELECTstatement to delete, update and insert rows from and into the table prod.db.target, depending on whether an id exists in the source table or not and whether or not <predicate_x> or <predicate_y> are true. For an overview of the MERGE INTO statement, check out the Iceberg documentation here.On the storage side, executing an upsert statement like the one above triggers Iceberg to create new data files corresponding to any modified partitions (~ copy-on-write) or to create small files expressing e.g. deletes (~ merge-on-read, available since Iceberg v2). Iceberg also creates new metadata files pointing to these new data files. Unchanged data files (partitions) are re-used. This allows efficient storing of snapshot series by keeping only the snapshot “deltas”. Earlier versions of the prod.db.target table can be “recalled” by time traveling, using the TIMESTAMP AS OF or VERSION AS OF clauses.


Iceberg Catalog

Iceberg table format spec. (source)

TL;DR Upserting allows us to keep our target copy up-to-date while maintaining a complete history of previous states of a source table, without storing full snapshots of the data.

MERGE INTO limitations

How can we use upserts to reconcile differences between an existing Iceberg table and a newly extracted snapshot?

Assume that we extract or create a mutable source table snapshot on a daily basis, and want to use it to upsert an Iceberg table called iceberg. Ideally, we would be able to write the following:

Unfortunately, this straightforward query does not work for two reasons.

Deletes. Unlike Delta, Iceberg does not support the syntax MATCHED BY SOURCE. Iceberg’s NOT MATCHED statement corresponds to NOT MATCHED BY TARGET, that is, it fires when a row exists in snapshot but not in iceberg. This is problematic if rows can be deleted from the source system.

Superfluous copies. For rows that are the same in iceberg and snapshot, the WHEN MATCHED THEN UPDATE * results in an identical duplicate of the data being stored on your filesystem. This means that Iceberg will not bring storage benefits over storing multiple snapshots of the source table.

Under the hood, Iceberg decides which partitions it will re-write based on the ON conditional (see this issue for a discussion on the impact on performance). Rows matched by the ON statement but not by any guard on the match conditions will, therefore, still be copied every time you run the upsert statement. Practically, this means that re-writing the MERGE INTO statement above to read +- as follows still results in duplicate partitions being stored:

MERGE INTO icebergUSING snapshotON iceberg.id = snapshot.id-- condition expressing change is true if one or more columns is different-- in iceberg and target, i.e.-- (iceberg.col1 != snapshot.col1) OR (iceberg.col2 != snapshot.col2) OR ...WHEN MATCHED AND <condition_expressing_change>

(More detailed analysis with references to source code below)

In fact, your MATCHED conditions are always re-written to include a catchall condition that emits the target row.

Iceberg will construct modified partitions differently depending on whether your MERGE INTO statement contains only MATCHED conditions, NOT MATCHED conditions or both. If only MATCHED conditions exist, a right outer join between target and source suffices (with source being on the right). If only NOT MATCHED conditions exist, Iceberg uses a left anti join and performs a simple append operation instead of re-writing partitions. If both MATCHED and NOT MATCHED conditions exist, a full outer join between target and source is required.

To determine which rows/partitions of the target to re-write, Iceberg performs a quick inner join between source and target tables. To support NOT MATCHED BY SOURCE, a right outer join would be required, as is implemented by Delta here.

Overcoming MERGE INTO limitations

Luckily, we can easily overcome these properties of Iceberg’s upserting functionality. We do this by first preparing a table containing only the changes to your iceberg table. When dealing with sources where rows can be updated and deleted, this requires a full outer join or a sequence of anti-joins. Then, we can use this changes table as the source of updates for our MERGE INTO statement.

One way to do this is by using a CTE as follows:

This results in only changes being stored in the target table iceberg, and supports insertions, updates, and deletes in the source table snapshot.

The table changes includes a row for every new insert, update or delete in the source table. To construct changes, we perform a full outer join between the recent snapshot of the source table (snapshot) and the existing Iceberg table (iceberg):

  • If a row existed in iceberg but no longer exists in snapshot (b.id is null), this corresponds to a delete operation in the source table.

  • If an id exists in snapshot but does not yet exist in iceberg (a.id is null), this corresponds to an insert operation in the source table.

  • If a row with a specific id exists in both iceberg and snapshot (both a.id and b.id are non-null), the row with this id was unchanged or updated. We filter out unchanged rows by specifying WHERE NOT (a.col1 = b.col1 AND a.col2 = b.col2 AND ...).

The changes tables only has entries for rows actually requiring changes in the iceberg table, working around the problem of superfluous updates. Merging changes into iceberg using MERGE INTO is straightforward and works the way you would expect it to.


This post looked at how we can leverage Iceberg to maintain a history of full table snapshots efficiently.Iceberg requires some tinkering for it to work the way we want it to, but enables patterns that were previously impossible or inefficient to use with Spark. Iceberg extends Spark's capabilities with functionality that was previously only available within data warehouses like Snowflake. We hope that with time, Iceberg will become even easier to use.

It's definitely a technology worth exploring!

Edit 21/11: Added some sentences to the introduction and re-wrote some sentences for clarity.


Latest

Cloud Independence: Testing a European Cloud Provider Against the Giants
Cloud Independence: Testing a European Cloud Provider Against the Giants
Cloud Independence: Testing a European Cloud Provider Against the Giants

Cloud Independence: Testing a European Cloud Provider Against the Giants

Can a European cloud provider like Ionos replace AWS or Azure? We test it—and find surprising advantages in cost, control, and independence.

Stop loading bad quality data
Stop loading bad quality data
Stop loading bad quality data

Stop loading bad quality data

Ingesting all data without quality checks leads to recurring issues. Prioritize data quality upfront to prevent downstream problems.

A 5-step approach to improve data platform experience
A 5-step approach to improve data platform experience
A 5-step approach to improve data platform experience

A 5-step approach to improve data platform experience

Boost data platform UX with a 5-step process:gather feedback, map user journeys, reduce friction, and continuously improve through iteration

Leave your email address to subscribe to the Dataminded newsletter

Leave your email address to subscribe to the Dataminded newsletter

Leave your email address to subscribe to the Dataminded newsletter

Belgium

Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen


Vat. BE.0667.976.246

Germany

Spaces Tower One,
Brüsseler Strasse 1-3, Frankfurt 60327, Germany

© 2025 Dataminded. All rights reserved.


Belgium

Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen

Vat. BE.0667.976.246

Germany

Spaces Tower One, Brüsseler Strasse 1-3, Frankfurt 60327, Germany

© 2025 Dataminded. All rights reserved.


Belgium

Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen

Vat. BE.0667.976.246

Germany

Spaces Tower One, Brüsseler Strasse 1-3, Frankfurt 60327, Germany

© 2025 Dataminded. All rights reserved.