Daten mit Spark und Iceberg einfügen bzw. aktualisieren

25.05.2023

Jonathan Merlevede

Verwenden Sie die MERGE INTO-Syntax von Spark und Iceberg, um täglich inkrementelle Schnappschüsse einer veränderlichen Quelltabelle effizient zu speichern.

Iceberg ermöglicht die Verfolgung der Historie einer Tabelle, indem inkrementelle Diffs gespeichert werden. Leider gibt es einige Fallstricke, und es erfordert nicht offensichtliche Abfragen, um dies so zum Laufen zu bringen, wie Sie es wahrscheinlich möchten. In diesem Beitrag betrachten wir das Warum und das Wie.

Wir verwenden Spark als unsere analytische Engine, aber dieser Beitrag sollte auch für andere Engines gelten, die mit Iceberg arbeiten.

Problemstellung

Ein typisches Muster in der analytischen Datenverarbeitung ist es, Daten täglich aus einem operativen System zu importieren. Oft speichern wir zusätzlich zum aktuellen Stand auch zuvor importierte Tabellenversionen, um beispielsweise die Reproduktion von Ergebnissen aus maschinellem Lernen zu unterstützen. Bei der Arbeit mit „standardmäßigen“ Spark und Parquet tun wir dies, indem wir tägliche Snapshots speichern und nach dem Einfügedatum partitionieren. Wir beabsichtigen, Apache Iceberg zu verwenden, um dasselbe Resultat zu erreichen – eine abfragbare Historie von Tabellensnapshots – effizienter. Iceberg ist ein Projekt, das ein Metadatenformat und eine Reihe von Ausführungsengine-Plugins anbietet. Dies erweitert beliebte analytische Verarbeitungsengines wie Spark, Flink und Trino um Funktionen wie inkrementelle Updates, Zeitreisen und ACID-Transaktionen.

Upserting

Die Verwendung von Spark mit Iceberg schaltet das SQL MERGE INTO-Statement frei, das ein „Upsert“ einer Tabelle implementiert, ein Portmanteau, das sich aus einer „Tabelleninsert“ und „Tabellenupdate“ zusammensetzt: Der obige Code verwendet das Ergebnis des SELECT-Statements, um Zeilen aus der Tabelle prod.db.target zu löschen, zu aktualisieren und einzufügen, je nachdem, ob eine ID in der Quelltabelle existiert oder nicht und ob <predicate_x> oder <predicate_y> wahr sind. Für einen Überblick über das MERGE INTO-Statement, lesen Sie die Iceberg-Dokumentation hier. Auf der Speicherseite löst das Ausführen eines Upsert-Statements wie das oben beschriebene aus, dass Iceberg neue Datendateien erstellt, die entsprechenden modifizierten Partitionen zugeordnet sind (~ copy-on-write) oder kleine Dateien erstellt, die z.B. Löschvorgänge ausdrücken (~ merge-on-read, verfügbar seit Iceberg v2). Iceberg erstellt auch neue Metadatendateien, die auf diese neuen Datendateien verweisen. Unveränderte Datendateien (Partitionen) werden wiederverwendet. Dies ermöglicht eine effiziente Speicherung von Snapshot-Reihen, indem nur die Snapshot „Deltas“ aufbewahrt werden. Frühere Versionen der Tabelle prod.db.target können durch Zeitreisen „wieder aufgerufen“ werden, indem die Klauseln TIMESTAMP AS OF oder VERSION AS OF verwendet werden.


Iceberg Catalog

Iceberg-Tabellenformat-Spezifikation (Quelle)

TL;DR Upserting ermöglicht es uns, unsere Zielkopie auf dem neuesten Stand zu halten, während wir eine vollständige Historie der vorherigen Zustände einer Quelltabelle führen, ohne vollständige Snapshots der Daten zu speichern.

MERGE INTO-Einschränkungen

Wie können wir Upserts verwenden, um Unterschiede zwischen einer bestehenden Iceberg-Tabelle und einem neu extrahierten Snapshot auszugleichen?

Angenommen, wir extrahieren oder erstellen täglich eine veränderbare Quelltabelle snapshot und möchten diese verwenden, um in eine Iceberg-Tabelle mit dem Namen iceberg zu upserten. Idealerweise könnten wir Folgendes schreiben:

Leider funktioniert diese einfache Abfrage aus zwei Gründen nicht.

Löschen. Im Gegensatz zu Delta unterstützt Iceberg nicht die Syntax MATCHED BY SOURCE. Die Anweisung NOT MATCHED von Iceberg entspricht NOT MATCHED BY TARGET, das heißt, sie wird aktiviert, wenn eine Zeile in snapshot existiert, aber nicht in iceberg. Dies ist problematisch, wenn Zeilen aus dem Quellsystem gelöscht werden können.

Überflüssige Kopien. Für Zeilen, die sowohl in iceberg als auch in snapshot gleich sind, führt die Anweisung WHEN MATCHED THEN UPDATE * dazu, dass eine identische Duplikatkopie der Daten auf Ihrem Dateisystem gespeichert wird. Das bedeutet, dass Iceberg keine Speicherersparnisse im Vergleich zur Speicherung mehrerer Snapshots der Quelltabelle bietet.

Im Hintergrund entscheidet Iceberg, welche Partitionen es basierend auf der ON-Bedingung neu schreibt (siehe dieses Problem für eine Diskussion über die Auswirkungen auf die Leistung). Zeilen, die durch die ON-Anweisung übereinstimmen, aber nicht durch irgendeine Bedingung zum Abgleich, werden daher jedes Mal kopiert, wenn Sie das Upsert-Statement ausführen. Praktisch bedeutet dies, dass das Neuschreiben des MERGE INTO-Statements oben, um +- wie folgt zu lesen, immer noch dazu führt, dass doppelte Partitionen gespeichert werden:

MERGE INTO icebergUSING snapshotON iceberg.id = snapshot.id-- condition expressing change is true if one or more columns is different-- in iceberg and target, i.e.-- (iceberg.col1 != snapshot.col1) OR (iceberg.col2 != snapshot.col2) OR ...WHEN MATCHED AND <condition_expressing_change>

(Ausführlichere Analyse mit Verweisen auf den Quellcode weiter unten)

Tatsächlich werden Ihre MATCHED-Bedingungen immer umgeschrieben um eine Catchall-Bedingung zu inkludieren, die die Zielzeile ausgibt.

Iceberg erstellt modifizierte Partitionen unterschiedlich, je nachdem, ob Ihr MERGE INTO-Statement nur MATCHED-Bedingungen, NOT MATCHED-Bedingungen oder beides enthält. Wenn nur MATCHED-Bedingungen vorhanden sind, reicht ein rechter Außenausschnitt zwischen Ziel und Quelle (wobei die Quelle rechts ist). Wenn nur NOT MATCHED-Bedingungen vorhanden sind, verwendet Iceberg einen linken Anti-Join und führt eine einfache Anhängeoperation aus, anstatt Partitionen neu zu schreiben. Wenn sowohl MATCHED- als auch NOT MATCHED-Bedingungen vorhanden sind ist ein vollständiger äußerer Join zwischen Ziel und Quelle erforderlich.

Um zu bestimmen, welche Zeilen/Partitionen des Ziels neu geschrieben werden sollen, führt Iceberg einen schnellen inneren Join zwischen den Quell- und Zieltabellen durch. Um NOT MATCHED BY SOURCE zu unterstützen, wäre ein rechter äußerer Join erforderlich, wie ihn Delta hier implementiert.

Überwindung der MERGE INTO-Einschränkungen

Glücklicherweise können wir diese Eigenschaften der Upserting-Funktionalität von Iceberg leicht überwinden. Wir tun dies, indem wir zuerst eine Tabelle vorbereiten, die nur die Änderungen an Ihrer iceberg-Tabelle enthält. Wenn wir es mit Quellen zu tun haben, bei denen Zeilen aktualisiert und gelöscht werden können, erfordert dies einen vollständigen äußeren Join oder eine Sequenz von Anti-Joins. Dann können wir diese Änderungstabelle als Quelle für Updates für unser MERGE INTO-Statement nutzen.

Eine Möglichkeit, dies zu tun, ist die Verwendung eines CTE wie folgt:

Dies führt dazu, dass nur Änderungen in der Zieltabelle iceberg gespeichert werden und Einfügungen, Aktualisierungen und Löschungen in der Quelltabelle snapshot unterstützt werden.

Die Tabelle changes enthält eine Zeile für jede neue Einfügung, Aktualisierung oder Löschung in der Quelltabelle. Um changes zu erstellen, führen wir einen vollständigen äußeren Join zwischen dem aktuellen Snapshot der Quelltabelle (snapshot) und der bestehenden Iceberg-Tabelle (iceberg) durch:

  • Wenn eine Zeile in iceberg existierte, aber nicht mehr in snapshot existiert (b.id ist null), entspricht dies einer Löschoperation in der Quelltabelle.

  • Wenn eine ID in snapshot existiert, aber noch nicht in iceberg existiert (a.id ist null), entspricht dies einer Einfügeoperation in der Quelltabelle.

  • Wenn eine Zeile mit einer bestimmten ID sowohl in iceberg als auch in snapshot existiert (sowohl a.id als auch b.id sind nicht null), bedeutet dies, dass die Zeile mit dieser ID unverändert oder aktualisiert wurde. Wir filtern unveränderte Zeilen, indem wir WHERE NOT (a.col1 = b.col1 AND a.col2 = b.col2 AND ...) angeben.

Die changes-Tabelle hat nur Einträge für Zeilen, die tatsächlich Änderungen in der iceberg-Tabelle erfordern und umgeht das Problem der überflüssigen Updates. Das Merging von Änderungen in iceberg mithilfe von MERGE INTO ist unkompliziert und funktioniert so, wie Sie es erwarten würden.


In diesem Beitrag haben wir betrachtet, wie wir Iceberg nutzen können, um eine Historie vollständiger Tabellensnapshots effizient zu führen. Iceberg erfordert einige Anpassungen, damit es so funktioniert, wie wir es möchten, ermöglicht jedoch Muster, die zuvor mit Spark unmöglich oder ineffizient waren. Iceberg erweitert die Möglichkeiten von Spark mit Funktionen, die zuvor nur in Data Warehouses wie Snowflake verfügbar waren. Wir hoffen, dass Iceberg mit der Zeit noch einfacher zu bedienen wird.

Es ist definitiv eine Technologie, die es wert ist, erkundet zu werden!

Bearbeitung 21.11.: Einige Sätze in die Einleitung hinzugefügt und einige Sätze zur Klarheit umgeschrieben.


Latest

Portable by design: Rethinking data platforms in the age of digital sovereignty
Portable by design: Rethinking data platforms in the age of digital sovereignty
Portable by design: Rethinking data platforms in the age of digital sovereignty

Portable by design: Rethinking data platforms in the age of digital sovereignty

Build a portable, EU-compliant data platform and avoid vendor lock-in—discover our cloud-neutral stack in this deep-dive blog.

Cloud-Unabhängigkeit: Test eines europäischen Cloud-Anbieters gegen die Giganten
Cloud-Unabhängigkeit: Test eines europäischen Cloud-Anbieters gegen die Giganten
Cloud-Unabhängigkeit: Test eines europäischen Cloud-Anbieters gegen die Giganten

Cloud-Unabhängigkeit: Test eines europäischen Cloud-Anbieters gegen die Giganten

Kann ein europäischer Cloud-Anbieter wie Ionos AWS oder Azure ersetzen? Wir testen es – und finden überraschende Vorteile in Bezug auf Kosten, Kontrolle und Unabhängigkeit.

Hören Sie auf, schlechte Qualitätsdaten zu laden
Hören Sie auf, schlechte Qualitätsdaten zu laden
Hören Sie auf, schlechte Qualitätsdaten zu laden

Vermeide schlechte Daten von Anfang an

Das Erfassen aller Daten ohne Qualitätsprüfungen führt zu wiederkehrenden Problemen. Priorisieren Sie die Datenqualität von Anfang an, um nachgelagerte Probleme zu vermeiden.

Hinterlasse deine E-Mail-Adresse, um den Dataminded-Newsletter zu abonnieren.

Hinterlasse deine E-Mail-Adresse, um den Dataminded-Newsletter zu abonnieren.

Hinterlasse deine E-Mail-Adresse, um den Dataminded-Newsletter zu abonnieren.

Belgien

Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen


USt-IdNr. DE.0667.976.246

Deutschland

Spaces Kennedydamm,
Kaiserswerther Strasse 135, 40474 Düsseldorf, Deutschland


© 2025 Dataminded. Alle Rechte vorbehalten.


Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen

USt-IdNr. DE.0667.976.246

Deutschland

Spaces Kennedydamm, Kaiserswerther Strasse 135, 40474 Düsseldorf, Deutschland

© 2025 Dataminded. Alle Rechte vorbehalten.


Vismarkt 17, 3000 Leuven - HQ
Borsbeeksebrug 34, 2600 Antwerpen

USt-IdNr. DE.0667.976.246

Deutschland

Spaces Kennedydamm, Kaiserswerther Strasse 135, 40474 Düsseldorf, Deutschland

© 2025 Dataminded. Alle Rechte vorbehalten.