Kreuz-DAG-Abhängigkeiten in Apache Airflow: Ein umfassender Leitfaden
04.07.2023
•
Frederic Vanderveken
Vier Methoden zur effektiven Verwaltung und Skalierung Ihrer Datenworkflow-Abhängigkeiten mit Apache Airflow erkunden.
Wenn Organisationen ihre Datenoperationen skalieren, wird die Notwendigkeit für Struktur und optimierte Prozesse von größter Bedeutung. Dies wird typischerweise von Tools wie Apache Airflow übernommen, das einen codegesteuerten Ansatz bietet, um Datenlasten über gerichtete azyklische Graphen (DAGs) zu orchestrieren und zu überwachen. Innerhalb dieses Rahmens verkörpert jeder DAG eine einzigartige Datenpipeline oder einen Workflow. Daher wird es beim Skalieren der Anzahl von Datenprozessen und gleichzeitigem Gewährleisten der Datenfrische entscheidend, diese DAGs und ihre Abhängigkeiten ordnungsgemäß zu verwalten.
In diesem Blogbeitrag untersuchen wir vier prominente Methoden zur Verwaltung und Implementierung von inter-DAG-Abhängigkeiten in Airflow — Trigger, Sensoren, Datensätze und die Airflow-API. Wir erklären ihr Verhalten, bieten Beispielcode-Schnipsel an und vergleichen die Vor- und Nachteile jeder Methode, sodass Sie informierte Entscheidungen treffen können, die auf Ihren spezifischen Anwendungsfall zugeschnitten sind.
Warum sind inter-DAG-Abhängigkeiten nützlich?
Jeder DAG besteht aus mehreren miteinander verbundenen Aufgaben, die es einer Organisation ermöglichen, alle ihre Daten-Workflows einfach in einer Menge von DAGs zusammenzufassen. Ein Beispiel-DAG mit vier Aufgaben ist in Abb. 1 dargestellt. Um den besten Praktiken von Airflow zu folgen, muss jeder DAG einen gut definierten Umfang haben [1,2]. Das bedeutet, dass er ein einzelnes Projekt oder einen Prozess umfassen, von einem einzelnen Team verwaltet werden und einen spezifischen Ausführungszeitplan haben sollte. Wenn Organisationen jedoch wachsen, müssen diese unabhängigen Workflows oft miteinander interagieren oder synchronisiert werden. Hier kommt das Problem der inter-DAG-Abhängigkeiten ins Spiel. Ein anschauliches Beispiel für diese Komplexität ist in Abb. 2 gezeigt, wo alle Daten-Workflows in DAGs erfasst sind und mehrere Abhängigkeiten zwischen ihnen bestehen.


Inter-DAG-Abhängigkeiten beziehen sich auf Szenarien, in denen ein DAG auf die erfolgreiche Ausführung einer oder mehrerer Aufgaben in einem anderen DAG angewiesen ist. Unverwaltete inter-DAG-Abhängigkeiten können mehrere Probleme verursachen:
Dateninkonsistenzen: Wenn ein DAG mit der Verarbeitung beginnt, bevor seine Daten bereit sind, kann dies zu falschen oder ungenauen Ergebnissen führen.
Unwirtschaftliche Ressourcennutzung: Wenn ein DAG mit der Verarbeitung beginnt, bevor seine Daten bereit sind, muss er später erneut ausgeführt werden.
Fehler: Wenn ungenaue Ergebnisse aufgrund eines Missverhältnisses zwischen verschiedenen DAGs auftreten, kann es sehr zeitaufwändig sein, die Ursache zu identifizieren. Die Lösung hierfür umfasst oft einfach eine koordinierte erneute Ausführung der beiden DAGs.
Unverwaltete inter-DAG-Abhängigkeiten können mehrere Probleme verursachen, wie Dateninkonsistenzen oder unwirtschaftliche Ressourcennutzung.
Eine Möglichkeit, dieses Problem zu mildern, könnte darin bestehen, die abhängigen DAGs nacheinander zu planen und auf das Beste zu hoffen. Beachten Sie, dass dies kein skalierbarer oder robuster Ansatz ist, da er nur für eine begrenzte Anzahl von gekoppelten DAGs funktioniert und voraussetzt, dass die DAGs nicht fehlschlagen. Eine weitere naive Lösung zur Bewältigung der erhöhten Komplexität der inter-DAG-Abhängigkeiten wäre, die DAGs, die voneinander abhängen, in einen einzigen DAG zu fusionieren. Dieser Ansatz verstößt jedoch gegen die besten Praktiken, da jeder DAG einen einzelnen Prozess widerspiegeln und von einem einzigen Team gewartet werden sollte. Außerdem führt die Integration von DAGs ineinander zu großen monolithischen DAGs, die schwer zu warten sind.
Insofern ist es entscheidend, inter-DAG-Abhängigkeiten zu verstehen und effektiv zu verwalten, wenn Sie mit Apache Airflow arbeiten. Ein gut verwaltetes System ermöglicht es Ihnen, komplexe Workflows effizient zu orchestrieren, die Datenkonsistenz aufrechtzuerhalten und Ressourcen optimal zu nutzen, während die Einfachheit und Lesbarkeit der einzelnen DAGs gewahrt bleibt. Glücklicherweise bietet Airflow mehrere Ansätze zur Verwaltung inter-DAG-Abhängigkeiten, ohne die besten Praktiken zu verletzen.
1. Trigger
Die Trigger-Methode ist ein einfacher Weg, um nach erfolgreichem Abschluss einer Aufgabe in einem vorhergehenden DAG nachgelagerte DAGs zu aktivieren. Dieser push-basierte Mechanismus ist einfach, aber effektiv, und macht ihn zu einer idealen Lösung für die Aktivierung von einem oder mehreren nachgelagerten DAGs. Abbildung 3 zeigt eine schematische Darstellung, wie das Koppeln von DAGs mit der Trigger-Methode funktionieren kann.

Um diese Methode in die Praxis umzusetzen, muss der TriggerDagRunOperator
als Hilfsaufgabe innerhalb des vorhergehenden DAGs integriert werden, wie im folgenden Code-Schnipsel zu sehen ist. Der Operator nimmt das Argument trigger_dag_id
entgegen, das den nachgelagerten DAG angibt, der ausgelöst werden soll. Optional kann das Argument conf
verwendet werden, das es ermöglicht, Variablen vom vorhergehenden DAG an den nachgelagerten DAG zu übergeben, was zusätzliche Funktionalität bei der inter-DAG-Kopplung bietet.
2. Sensoren
Die Sensoren-Methode ist eine weitere Möglichkeit, inter-DAG-Abhängigkeiten in Apache Airflow herzustellen. Sie ist besonders nützlich, wenn die Ausführung eines nachgelagerten DAGs von dem Abschluss von Aufgaben in einem oder mehreren vorhergehenden DAGs abhängt. In diesem Szenario fungieren Sensoren als einzigartige Aufgabentypen in Airflow, die flexibel auf den Status von vorhergehenden Aufgaben, vorhergehenden Aufgabengruppen oder gesamten vorhergehenden DAGs reagieren. Eine schematische Darstellung der DAG-Kopplung mit Sensoren ist in Abb. 4 zu sehen.

Um inter-DAG-Abhängigkeiten mit einem Sensor herzustellen, muss der nachgelagerte DAG den ExternalTaskSensor
enthalten, wie im folgenden Code-Schnipsel zu erkennen ist. Darüber hinaus müssen die abhängigen vorhergehenden DAGs und Aufgaben importiert und dem ExternalTaskSensor
mittels der Argumente external_dag_id
und external_task_id
zugewiesen werden. Sobald diese festgelegt sind, überwacht der ExternalTaskSensor
kontinuierlich die vorhergehenden Aufgaben und DAGs auf deren Abschluss.
Im nachgelagerten DAG wird die Sensoraufgabe nur ausgeführt, wenn alle vorhergehenden Aufgaben das gleiche Ausführungsdatum teilen und als erfolgreich markiert sind. Daher wird die nachgelagerte Sensoraufgabe nicht fortgesetzt, wenn eine der vorhergehenden Aufgaben fehlschlägt. Dieses Verhalten wird in Abb. 5 für ein etwas komplizierteres Szenario einer einzelnen nachgelagerten Sensoraufgabe, die von zwei vorhergehenden Aufgaben mit unterschiedlichen Zeitplänen abhängt, veranschaulicht.

Ein weiteres Szenario, das zu Problemen führen kann, ist, wenn die vorhergehenden Aufgaben zeitlich verschoben sind, was zu Ausführungszeiten führt, die niemals zwischen den vorhergehenden und den nachgelagerten Aufgaben übereinstimmen, sodass die nachgelagerte Aufgabe niemals gestartet wird. Dieses Problem kann jedoch durch die Verwendung des Arguments timedelta
im ExternalTaskSensor
gelöst werden, das die Ausführungszeit der nachgelagerten ExternalTaskSensor
Aufgabe virtuell verschiebt, um sie mit der der vorhergehenden Aufgabe abzugleichen.
Im Vergleich zu diesem Sensoransatz mit dem Triggeransatz wird ein operationeller Unterschied festgestellt. Der Sensorbetrieb spiegelt ein pull-basiertes Modell wider, bei dem der nachgelagerte DAG kontinuierlich überprüft, ob die vorhergehenden Aufgaben erfolgreich waren, während die Trigger-Methode einen push-basierten Ansatz verfolgt, da die vorhergehenden Aufgaben direkt die nachgelagerten Aufgaben nach einer erfolgreichen Ausführung aktivieren.
Es ist auch möglich, einen ExternalTaskMarker
in den vorhergehenden DAGs einzuschließen, der mit dem ExternalTaskSensor
im nachgelagerten DAG verbunden ist. Obwohl diese Marker-Aufgaben optional sind, werden sie dringend empfohlen, da sie eine klare Abhängigkeit zwischen den beiden DAGs schaffen. Darüber hinaus zeigt die Airflow-Benutzeroberfläche bei der Einbeziehung des Markers im vorhergehenden DAG eine zusätzliche Schaltfläche an, die den Benutzer direkt vom vorhergehenden DAG zum nachgelagerten DAG leitet, siehe Abb. 6. Dies verbessert die Sichtbarkeit und Navigation zwischen den gekoppelten DAGs.

leitet auf den gekoppelten nachgelagerten DAG weiter.
3. Datensätze
Der Datensatzansatz in Apache Airflow bietet eine leistungsstarke Methode zur Realisierung von inter-DAG-Abhängigkeiten, indem er Links zwischen Datensätzen und DAGs erstellt. Es ermöglicht dem Benutzer, einen bestimmten Datensatz anzugeben, dem DAGs folgen können, wodurch die abonnierten DAGs jedes Mal ausgeführt werden, wenn es eine Änderung im Datensatz gibt.
Die Planung von DAG-Ausführungen basierend auf Dataset-Updates ermöglicht es Ihnen, dynamische und ereignisgesteuerte Workflows zu erstellen. Dieser Ansatz ist besonders nützlich, wenn nachgelagerte DAGs nach Datensatz-Updates von vorhergehenden DAGs ausgeführt werden müssen, insbesondere in Fällen, in denen die Updates unregelmäßig sind. Daher sorgt die Festlegung von Abhängigkeiten basierend auf Datensatzänderungen dafür, dass nachgelagerte Workflows effektiv auf Datenaktualisierungen reagieren. Eine schematische Übersicht dieses Ansatzes ist in Abb. 7 illustriert.

Um inter-DAG-Abhängigkeiten unter Verwendung von Datensätzen zu implementieren, müssen die Datensätze sowohl in den vorhergehenden als auch in den nachgelagerten DAGs definiert werden. Es ist wichtig zu beachten, dass der bereitgestellte Datensatz einfach ein String-Identifikator ist, und Airflow keine direkte Verbindung zum tatsächlichen Datensatz herstellt oder Kenntnis von dessen Inhalt, Standort oder tatsächlich angewendeten Änderungen hat. Daher machen die Verbindungen sehr flexibel, da kein spezifischer Connector oder Plugin erforderlich ist, um mit der tatsächlichen Datenbank zu kommunizieren. Andererseits ist es wichtig zu erinnern, dass alle Datensatz-Updates, die nicht von Airflow-Aufgaben stammen, nicht erfasst werden.
Im Code wird die Methode implementiert, indem ausdrücklich angegeben wird, wann Datensatzänderungen stattfinden. Im vorhergehenden DAG sollte jede Aufgabe, die einen Datensatz aktualisiert, den Parameter outlets
enthalten, der den Datensatz angibt, der von dieser Aufgabe aktualisiert wird. Im nachgelagerten DAG sollten die Datensätze (auf die der DAG angewiesen ist) als Parameter im Zeitplan definiert werden. Beachten Sie, dass Sie mehrere Datensätze einfügen können, indem Sie sie als Liste im Zeitplanattribut angeben. Eine Beispielimplementierung der inter-DAG-Kopplung mit Datensätzen ist im folgenden Codeblock dargestellt.
Zur Laufzeit wird der DAG geplant, wenn alle Datensätze, die er konsumiert, seit der letzten Ausführung mindestens einmal aktualisiert wurden. Dies sorgt dafür, dass der nachgelagerte DAG nur ausgeführt wird, wenn die erforderlichen Datensätze von den entsprechenden vorhergehenden Aufgaben modifiziert oder aktualisiert wurden. Ein Beispiel wird in Abb. 8 gezeigt.

Um den Benutzern einen Überblick über alle inter-DAG-Datensatzverbindungen zu geben, bietet Airflow eine spezielle Datensatzansicht in der Benutzeroberfläche. Dies dient als leistungsstarkes Tool zur Überwachung und Optimierung Ihrer Datenpipelines. Es bietet Einblick in die Aktualisierungszeiten des Datensatzes sowie die Beziehungen zwischen vorhergehenden und nachgelagerten DAGs, sodass der Benutzer den Datenfluss und die inter-DAG-Abhängigkeiten zurückverfolgen kann. Daher kann diese Funktion auch zu Zwecken der Datenherkunft verwendet werden. Ein Beispiel der Datensatzansicht ist in Abb. 9 zu sehen.

Durch die Nutzung des Datensatzansatzes können Sie flexible und reaktive Workflows erstellen, die sich an Änderungen in Ihrer Datenumgebung anpassen. Egal, ob Sie nachgelagerte DAGs nach Datensatz-Updates auslösen oder komplexe Datenpipelines mit komplizierten Abhängigkeiten erstellen müssen, Datensätze in Airflow bieten einen robusten Mechanismus, um inter-DAG-Kopplungen basierend auf Datenbewusstsein zu erreichen.
4. API
Die Airflow-API bietet einen leistungsstarken Ansatz, um einen DAG von einer externen Quelle aus auszulösen. Wenn die externe Quelle eine andere Airflow-Instanz ist, ist es möglich, inter-DAG-Abhängigkeiten mit der API zu erstellen, um die zuvor besprochenen Methoden Trigger, Sensoren und Datensätze zu ergänzen. Daher ist diese Methode besonders vorteilhaft, wenn Ihre abhängigen DAGs in verschiedenen Airflow-Umgebungen bereitgestellt werden. Durch die Nutzung der Airflow-API können Sie Aufgaben nahtlos koppeln und DAG-Ausführungen über verschiedene Umgebungen hinweg auslösen, was eine effiziente Koordination von Workflows über verschiedene Umgebungen ermöglicht.
Die Kopplung von inter-DAGs über die API erfolgt durch eine POST-Anfrage an den DAGRuns-Endpunkt, der es ermöglicht, einen Remote-DAG-Lauf zu initiieren. Um dies zu implementieren, sollte der SimpleHttpOperator
im DAG enthalten sein, und der endpoint
-Parameter sollte auf /api/v1/dags/<dag-id>/dagRuns
gesetzt werden, wobei <dag-id>
die ID des Ziel-DAGs darstellt. Das gewünschte Ausführungsdatum kann im data
-Parameter angegeben werden, das das Ausführungsdatum des vorhergehenden DAGs oder ein anderes Datum sein kann. Stellen Sie sicher, dass die HTTP-Verbindung (http_conn_id
) mit den erforderlichen Authentifizierungsdaten konfiguriert ist. Neue Verbindungen können auf der Seite admin>connections in der Airflow-Benutzeroberfläche erstellt werden. Sobald der API-Aufruf zum Auslösen des nachgelagerten DAGs abgeschlossen ist, wird die entsprechende Aufgabe im vorhergehenden DAG als abgeschlossen markiert. Siehe den folgenden Codeblock für eine Beispielimplementierung.
Vergleich der Methoden zur inter-DAG-Abhängigkeit
In den vorherigen Abschnitten haben wir vier verschiedene Methoden zur inter-DAG-Kopplung erläutert. In diesem Abschnitt heben wir ihre wichtigsten Unterschiede und Ähnlichkeiten hervor. Dieser Vergleich sollte es Ihnen ermöglichen, die geeignete Methode zu identifizieren, wenn Sie auf eine Situation stoßen, in der Sie eine inter-DAG-Abhängigkeit implementieren müssen.

Aus meiner Erfahrung ist der Datensatzansatz typischerweise die am besten geeignete Methode zur Erstellung inter-DAG-Abhängigkeiten. Ihrem größten Vorteil sehe ich in der Nutzung eines ereignisgesteuerten Kopplungsprinzips. Dies abstrahiert effektiv den Aspekt der Zeitplanung und vereinfacht die Implementierung und Wartbarkeit für den Benutzer. Darüber hinaus erleichtert es die klare Datenherkunft und ermöglicht leicht sowohl ein Eins-zu-viele- als auch ein Viele-zu-eins-Link. Wenn Sie also eine Airflow-Version von 2.4 oder höher nutzen, empfehle ich, den Datensatzansatz bei der Implementierung inter-DAG-Abhängigkeiten anzuwenden.
Fazit
Die Implementierung von inter-DAG-Abhängigkeiten in Airflow ermöglicht es Ihnen, komplexe und voneinander abhängige Workflows zu erstellen, ohne die Wartbarkeit und die organisatorische Struktur zu opfern. In diesem Blogbeitrag haben wir vier Methoden zur Implementierung inter-DAG-Abhängigkeiten in Apache Airflow untersucht:
Trigger: aktivieren nachgelagerte DAGs, sobald eine vorhergehende Aufgabe abgeschlossen ist, und bieten einen push-basierten Ansatz.
Sensoren: verwenden ein pull-basiertes Modell und warten auf spezifische vorhergehende Ereignisse, um abzuschließen.
Datensätze: ideal für die Behandlung nicht-regelmäßiger Updates und für die Erstellung ereignisgesteuerter und dynamischer Workflows. Darüber hinaus ermöglicht es, über Datenabhängigkeiten nachzudenken, anstatt über code- oder technische Verbindungen in den anderen Methoden.
Airflow-API: ermöglicht eine effiziente Koordination, indem sie Aufgaben verbindet und DAG-Ausführungen für Abhängigkeiten zwischen verschiedenen Airflow-Umgebungen initiiert.
Durch die Wahl der geeigneten Methode basierend auf Ihren Workflow-Bedürfnissen können Sie eine effiziente Koordination zwischen Aufgaben sicherstellen und die Anzahl der Pipelines problemlos skalieren, während Sie diese wartbar halten. Durch die Nutzung der von Airflow bereitgestellten inter-DAG-Funktionen ist es möglich, ein robustes und miteinander verbundenes DAG-Ökosystem aufzubauen, das Ihre datengetriebenen Prozesse vorantreibt.
Vielen Dank an Jonny Daenen für wertvolle Beiträge und Rückmeldungen zu diesem Beitrag.
Quellen
Latest
Portable by design: Rethinking data platforms in the age of digital sovereignty
Build a portable, EU-compliant data platform and avoid vendor lock-in—discover our cloud-neutral stack in this deep-dive blog.
Cloud-Unabhängigkeit: Test eines europäischen Cloud-Anbieters gegen die Giganten
Kann ein europäischer Cloud-Anbieter wie Ionos AWS oder Azure ersetzen? Wir testen es – und finden überraschende Vorteile in Bezug auf Kosten, Kontrolle und Unabhängigkeit.
Vermeide schlechte Daten von Anfang an
Das Erfassen aller Daten ohne Qualitätsprüfungen führt zu wiederkehrenden Problemen. Priorisieren Sie die Datenqualität von Anfang an, um nachgelagerte Probleme zu vermeiden.