Event-Driven KI: Aufbau eines Forschungsassistenten mit Kafka und Flink

Der Aufstieg der agentischen KI hat die Begeisterung für Agenten angeheizt, die autonom Aufgaben ausführen, Empfehlungen abgeben und komplexe Workflows ausführen, die KI mit herkömmlicher Rechenleistung kombinieren. Aber die Schaffung solcher Agenten in realen, produktgetriebenen Umgebungen birgt Herausforderungen, die über die KI selbst hinausgehen.

Ohne sorgfältige Architektur können Abhängigkeiten zwischen Komponenten Engpässe schaffen, die Skalierbarkeit einschränken und die Wartung komplizieren, wenn Systeme sich weiterentwickeln. Die Lösung liegt in der Entkopplung von Workflows, bei der Agenten, Infrastruktur und andere Komponenten fließend interagieren, ohne starre Abhängigkeiten.

Diese Art von flexibler, skalierbarer Integration erfordert eine gemeinsame „Sprache“ für den Datenaustausch – eine robuste ereignisgesteuerte Architektur (EDA), die von Ereignisströmen unterstützt wird. Durch die Organisation von Anwendungen um Ereignisse herum können Agenten in einem reaktionsfähigen, entkoppelten System arbeiten, in dem jeder Teil unabhängig agiert. Teams können Technologieentscheidungen frei treffen, Skalierungsanforderungen separat verwalten und klare Grenzen zwischen Komponenten aufrechterhalten, was echte Agilität ermöglicht.

Um diese Prinzipien zu testen, habe ich PodPrep AI entwickelt, einen KI-gestützten Forschungsassistenten, der mir hilft, mich auf Podcast-Interviews bei Software Engineering Daily und Software Huddle vorzubereiten. In diesem Beitrag werde ich auf das Design und die Architektur von PodPrep AI eingehen und zeigen, wie EDA und Echtzeit-Datenströme ein effektives agentisches System antreiben.

Hinweis: Wenn Sie sich nur den Code ansehen möchten, springen Sie zu meinem GitHub-Repository hier.

Warum eine ereignisgesteuerte Architektur für KI?

In realen AI-Anwendungen hält ein eng gekoppeltes, monolithisches Design nicht stand. Während Machbarkeitsstudien oder Demos oft ein einziges, einheitliches System aus Gründen der Einfachheit verwenden, wird dieser Ansatz schnell unpraktisch in der Produktion, insbesondere in verteilten Umgebungen. Eng gekoppelte Systeme schaffen Engpässe, begrenzen die Skalierbarkeit und verlangsamen die Iteration – alles kritische Herausforderungen, die es zu vermeiden gilt, während AI-Lösungen wachsen.

Betrachten wir einen typischen AI-Agenten.

Er könnte Daten aus mehreren Quellen abrufen, Prompt-Engineering- und RAG-Workflows verwalten und direkt mit verschiedenen Tools interagieren, um deterministische Workflows auszuführen. Die erforderliche Orchestrierung ist komplex, mit Abhängigkeiten von mehreren Systemen. Und wenn der Agent mit anderen Agenten kommunizieren muss, erhöht sich die Komplexität nur noch. Ohne eine flexible Architektur machen diese Abhängigkeiten die Skalierung und Modifikation nahezu unmöglich.

Example AI agent dependency graph

In der Produktion kümmern sich unterschiedliche Teams normalerweise um verschiedene Teile des Stacks: MLOps und Data Engineering verwalten die RAG-Pipeline, Data Science wählt Modelle aus, und Anwendungsentwickler erstellen die Schnittstelle und das Backend. Ein eng gekoppeltes Setup zwingt diese Teams in Abhängigkeiten, die die Lieferung verlangsamen und die Skalierung erschweren. Idealerweise sollten die Anwendungsschichten die Interna der AI nicht verstehen müssen; sie sollten einfach die Ergebnisse bei Bedarf konsumieren.

Darüber hinaus können KI-Anwendungen nicht isoliert arbeiten. Für einen echten Mehrwert müssen KI-Erkenntnisse nahtlos über Kunden-Datenplattformen (CDPs), CRMs, Analysen und mehr fließen. Kundeninteraktionen sollten Echtzeit-Updates auslösen, die direkt in andere Tools zur Handlung und Analyse einfließen. Ohne einen vereinheitlichten Ansatz wird die Integration von Erkenntnissen über Plattformen hinweg zu einem Flickenteppich, der schwer zu verwalten und unmöglich zu skalieren ist.

KI-gesteuerte EDA begegnet diesen Herausforderungen, indem sie ein „zentrales Nervensystem“ für Daten schafft. Mit EDA senden Anwendungen Ereignisse aus, anstatt auf verkettete Befehle angewiesen zu sein. Dies entkoppelt Komponenten, ermöglicht einen asynchronen Datenfluss überall dort, wo er benötigt wird, und ermöglicht es jedem Team, unabhängig zu arbeiten. EDA fördert eine nahtlose Datenintegration, skalierbares Wachstum und Widerstandsfähigkeit – was es zu einer leistungsstarken Grundlage für moderne KI-gesteuerte Systeme macht.

Entwurf eines skalierbaren KI-gesteuerten Forschungsagenten

In den letzten zwei Jahren habe ich Hunderte von Podcasts auf Software Engineering Daily, Software Huddle und Partially Redacted moderiert.

Um mich auf jeden Podcast vorzubereiten, führe ich einen umfassenden Rechercheprozess durch, um einen Podcast-Brief vorzubereiten, der meine Gedanken, Hintergrundinformationen über den Gast und das Thema sowie eine Reihe potenzieller Fragen enthält. Für diesen Brief recherchiere ich typischerweise den Gast und das Unternehmen, für das er arbeitet, höre mir andere Podcasts an, an denen er teilgenommen haben könnte, lese Blogbeiträge, die er verfasst hat, und informiere mich über das Hauptthema, über das wir sprechen werden.

Ich versuche, Verbindungen zu anderen Podcasts, die ich moderiert habe, oder zu meinen eigenen Erfahrungen im Zusammenhang mit dem Thema oder ähnlichen Themen herzustellen. Dieser gesamte Prozess erfordert erhebliche Zeit und Mühe. Große Podcast-Unternehmen haben eigene Forscher und Assistenten, die diese Arbeit für den Moderator erledigen. Ich betreibe hier kein solches Unternehmen. Ich muss das alles selbst machen.

Um dies anzugehen, wollte ich einen Agenten entwickeln, der diese Arbeit für mich erledigen könnte. Auf einer hohen Ebene würde der Agent etwa wie das unten stehende Bild aussehen.

High-level flow diagram for the research agent I wanted to build

Ich stelle grundlegende Quellenmaterialien wie den Namen des Gasts, das Unternehmen, die Themen, auf die ich mich konzentrieren möchte, einige Referenz-URLs wie Blog-Beiträge und bestehende Podcasts zur Verfügung, und dann geschieht etwas AI-Magie, und meine Recherche ist abgeschlossen.

Diese einfache Idee führte mich zur Entwicklung von PodPrep AI, meinem KI-gestützten Forschungsassistenten, der mich nur Token kostet.

Der Rest dieses Artikels behandelt das Design von PodPrep AI, beginnend mit der Benutzeroberfläche.

Erstellung der Agenten-Benutzeroberfläche

Ich habe die Benutzeroberfläche des Agenten als Webanwendung entworfen, in der ich problemlos Quellenmaterial für den Forschungsprozess eingeben kann. Dazu gehören der Name des Gasts, sein Unternehmen, das Interviewthema, jeglicher zusätzlicher Kontext und Links zu relevanten Blogs, Websites und früheren Podcast-Interviews.

Example of creating a podcast research bundle

Ich hätte dem Agenten weniger Anweisungen geben können und ihn im Rahmen des agilen Workflows die Quellenmaterialien suchen lassen können, aber für Version 1.0 habe ich beschlossen, die Quell-URLs bereitzustellen.

Die Webanwendung ist eine Standard-Dreischichtenanwendung, die mit Next.js und MongoDB für die Anwendungsdatenbank erstellt wurde. Sie weiß nichts über KI. Sie ermöglicht es einfach dem Benutzer, neue Forschungsbündel einzugeben, die dann im Verarbeitungszustand erscheinen, bis der agentische Prozess den Workflow abgeschlossen und einen Forschungsbericht in der Anwendungsdatenbank erstellt hat.

List of processing and processed research requests

Sobald die KI-Magie abgeschlossen ist, kann ich auf ein Informationsdokument für den Eintrag zugreifen, wie unten dargestellt.

Example of a complete research bundle

Erstellung des agentischen Workflows

Für Version 1.0 wollte ich in der Lage sein, drei Hauptaktionen zur Erstellung des Forschungsberichts durchzuführen:

  1. Für jede Website-URL, Blogbeitrag oder Podcast den Text oder die Zusammenfassung abrufen, den Text in angemessene Abschnitte aufteilen, Einbettungen generieren und die Vektorrepräsentation speichern.
  2. Für alle aus den Forschungsquellen-URLs extrahierten Texte die interessantesten Fragen extrahieren und speichern.
  3. Einen Podcast-Forschungsbericht erstellen, der den relevantesten Kontext basierend auf den Einbettungen, den besten zuvor gestellten Fragen und allen anderen Informationen enthält, die Teil des Bündeleingangs waren.

Das nachstehende Bild zeigt die Architektur von der Webanwendung bis zum agentischen Workflow.

Agentic workflow for PodPrep AI

Aktion #1 oben wird durch den HTTP-Senkenendpunkt Process URLs & Create Embeddings Agent unterstützt.

Aktion #2 wird mithilfe von Flink und der integrierten KI-Modellunterstützung in Confluent Cloud durchgeführt.

Schließlich wird Aktion #3 vom Generate Research Brief Agent ausgeführt, der auch ein HTTP-Senken-Endpunkt ist und der aufgerufen wird, sobald die ersten beiden Aktionen abgeschlossen sind.

Im folgenden Abschnitt erläutere ich jede dieser Aktionen im Detail.

Der Prozess-URLs und Einbettungen erstellen Agent

Dieser Agent ist dafür verantwortlich, Texte von den Forschungsquellen-URLs und dem Vektor-Einbettungs-Pipeline abzurufen. Im Folgenden wird der grobe Ablauf dessen, was hinter den Kulissen geschieht, um die Forschungsmaterialien zu verarbeiten, dargestellt.

Flow diagram for the Process URLs and Create Embeddings agent

Nachdem ein Forschungspaket vom Benutzer erstellt und in MongoDB gespeichert wurde, erzeugt ein MongoDB-Quellen-Connector Nachrichten für ein Kafka-Thema namens research-requests. Dies ist es, was den agentischen Workflow startet.

Jeder POST-Request an den HTTP-Endpunkt enthält die URLs aus der Forschungsanfrage und den Primärschlüssel in der MongoDB-Forschungsbündel-Sammlung.

Der Agent durchläuft jede URL und falls es sich nicht um einen Apple-Podcast handelt, ruft er das komplette HTML der Seite ab. Da ich die Struktur der Seite nicht kenne, kann ich mich nicht auf HTML-Analyse-Bibliotheken verlassen, um den relevanten Text zu finden. Stattdessen sende ich den Seitentext an das gpt-4o-mini-Modell mit einer Temperatur von null, um mit dem unten stehenden Prompt zu bekommen, was ich brauche.

JavaScript

 

Bei Podcasts muss ich noch etwas mehr Arbeit leisten.

Rückwärtsentwicklung von Apple Podcast URLs

Um Daten aus Podcast-Episoden abzurufen, müssen wir zunächst den Ton in Text mithilfe des Whisper-Modells umwandeln. Doch bevor wir das tun können, müssen wir die tatsächliche MP3-Datei für jede Podcast-Episode lokalisieren, herunterladen und in Stücke von 25 MB oder weniger aufteilen (die maximale Größe für Whisper).

Die Herausforderung besteht darin, dass Apple keinen direkten MP3-Link für seine Podcast-Episoden bereitstellt. Das MP3-Dateiformat ist jedoch im ursprünglichen RSS-Feed des Podcasts verfügbar, und wir können diesen Feed programmatisch mithilfe der Apple-Podcast-ID finden.

Zum Beispiel ist im folgenden URL der numerische Teil nach /id die eindeutige Apple-ID des Podcasts:

https://podcasts.apple.com/us/podcast/deep-dive-into-inference-optimization-for-llms-with/id1699385780?i=1000675820505

Mit der API von Apple können wir die Podcast-ID nachschlagen und eine JSON-Antwort abrufen, die die URL für den RSS-Feed enthält:

https://itunes.apple.com/lookup?id=1699385780&entity=podcast

Sobald wir den RSS-Feed-XML haben, durchsuchen wir ihn nach der spezifischen Episode. Da wir nur die Episoden-URL von Apple haben (und nicht den tatsächlichen Titel), verwenden wir den Titel-Slug aus der URL, um die Episode im Feed zu finden und ihre MP3-URL abzurufen.

JavaScript

 

Jetzt, da der Text aus Blogbeiträgen, Websites und MP3s verfügbar ist, verwendet der Agent den rekursiven Text-Splitter von LangChain, um den Text in Abschnitte zu unterteilen und die Embeddings aus diesen Abschnitten zu generieren. Die Abschnitte werden im text-embeddings-Thema veröffentlicht und in MongoDB gespeichert.

  • HinweisIch habe mich entschieden, MongoDB sowohl als Anwendungsdatenbank als auch als Vektordatenbank zu verwenden. Aufgrund des EDA-Ansatzes, den ich gewählt habe, können dies jedoch leicht separate Systeme sein, und es ist nur eine Frage des Austauschs des Sink-Connectors vom Thema Text-Embeddings.

Neben der Erstellung und Veröffentlichung der Embeddings veröffentlicht der Agent auch den Text aus den Quellen in ein Thema namens full-text-from-sources. Die Veröffentlichung in diesem Thema initiiert Aktion #2.

Fragen mit Flink und OpenAI extrahieren

Apache Flink ist ein Open-Source-Stream-Verarbeitungs-Framework, das für die Verarbeitung großer Datenmengen in Echtzeit entwickelt wurde und ideal für Anwendungen mit hohem Durchsatz und niedriger Latenz ist. Durch die Kombination von Flink mit Confluent können wir LLMs wie OpenAIs GPT direkt in Streaming-Workflows integrieren. Diese Integration ermöglicht Echtzeit-RAG-Workflows und stellt sicher, dass der Fragenextraktionsprozess mit den aktuellsten verfügbaren Daten arbeitet.

Den Originaltext der Quelle im Stream zu haben, ermöglicht es uns auch, später neue Workflows einzuführen, die dieselben Daten verwenden, was den Prozess der Erstellung von Forschungsbriefen verbessert oder diese an nachgelagerte Dienste wie ein Data Warehouse sendet. Dieses flexible Setup erlaubt es uns, im Laufe der Zeit zusätzliche KI- und Nicht-KI-Funktionen einzufügen, ohne die Kernpipeline überarbeiten zu müssen.

In PodPrep AI verwende ich Flink, um Fragen aus Texten zu extrahieren, die von Quell-URLs abgerufen werden.

Die Einrichtung von Flink, um einen LLM aufzurufen, beinhaltet die Konfiguration einer Verbindung über die CLI von Confluent. Unten finden Sie ein Beispielkommando zur Einrichtung einer OpenAI-Verbindung, obwohl mehrere Optionen verfügbar sind.

Shell

 

Sobald die Verbindung hergestellt ist, kann ich ein Modell erstellen entweder in der Cloud-Konsole oder im Flink SQL-Shell. Für die Fragenextraktion richte ich das Modell entsprechend ein.

SQL

 

Mit dem bereitgestellten Modell verwende ich die integrierte ml_predict Funktion von Flink, um Fragen aus dem Ausgangsmaterial zu generieren und schreibe die Ausgabe in einen Stream namens mined-questions, der mit MongoDB synchronisiert wird, um später verwendet zu werden.

SQL

 

Flink hilft auch dabei, nachzuvollziehen, wann alle Forschungsunterlagen verarbeitet wurden, was die Erstellung des Forschungsberichts auslöst. Dies geschieht, indem in einen completed-requests Stream geschrieben wird, sobald die URLs in mined-questions mit denen im Volltextquellen-Stream übereinstimmen.

SQL

 

Wenn Nachrichten in completed-requests geschrieben werden, wird die eindeutige ID für das Forschungsbundle an den Generate Research Brief Agent gesendet.

Der Generate Research Brief Agent

Dieser Agent nimmt alle relevantesten Forschungsunterlagen und verwendet ein LLM, um einen Forschungsbericht zu erstellen. Unten ist der übergeordnete Ablauf der Ereignisse, die stattfinden, um einen Forschungsbericht zu erstellen.

Ablaufdiagramm für den Generate Research Brief Agent

Lassen Sie uns einige dieser Schritte aufschlüsseln. Um den Prompt für das LLM zu erstellen, kombiniere ich die extrahierten Fragen, das Thema, den Namen des Gastes, den Firmennamen, einen Systemprompt zur Anleitung und den Kontext, der in der Vektordatenbank gespeichert ist und am semantisch ähnlichsten zum Podcast-Thema ist.

Aufgrund des begrenzten Kontextinformationen im Forschungsbundle ist es herausfordernd, den relevantesten Kontext direkt aus dem Vektorspeicher zu extrahieren. Um dies zu lösen, lasse ich das LLM eine Suchanfrage generieren, um den bestmöglich übereinstimmenden Inhalt zu finden, wie im Knoten „Suchanfrage erstellen“ im Diagramm dargestellt.

JavaScript

 

Mit der von LLM generierten Anfrage erstelle ich eine Einbettung und durchsuche MongoDB über einen Vektorindex, wobei ich nach der bundleId filtere, um die Suche auf Materialien zu beschränken, die für den spezifischen Podcast relevant sind.

Nachdem die beste Kontextinformation identifiziert wurde, erstelle ich eine Aufforderung und generiere den Forschungsbericht, speichere das Ergebnis in MongoDB, damit die Webanwendung es anzeigen kann.

Zu beachten bei der Implementierung

Ich habe sowohl die Front-End-Anwendung für PodPrep AI als auch die Agents in JavaScript geschrieben, aber in einem realen Szenario wäre der Agent wahrscheinlich in einer anderen Sprache wie Python. Zusätzlich sind sowohl der Prozess URLs & Einbettungen Agent als auch der Generiere Forschungsbericht Agent aus Gründen der Einfachheit im selben Projekt auf demselben Webserver. In einem echten Produktionssystem könnten diese serverlose Funktionen sein, die unabhängig voneinander laufen.

Abschließende Gedanken

Der Aufbau von PodPrep AI hebt hervor, wie eine ereignisgesteuerte Architektur es ermöglicht, KI-Anwendungen in der realen Welt skalierbar und reibungslos anpassbar zu machen. Mit Flink und Confluent habe ich ein System erstellt, das Daten in Echtzeit verarbeitet, um einen KI-gesteuerten Workflow ohne starre Abhängigkeiten zu unterstützen. Dieser entkoppelte Ansatz ermöglicht es den Komponenten, unabhängig voneinander zu arbeiten, aber über Ereignisströme verbunden zu bleiben – dies ist unerlässlich für komplexe, verteilte Anwendungen, bei denen unterschiedliche Teams verschiedene Teile des Systems verwalten.

In der heutigen von KI geprägten Umgebung ist der Zugriff auf frische, Echtzeitdaten über Systeme hinweg unerlässlich. EDA fungiert als „zentrales Nervensystem“ für Daten, das eine nahtlose Integration und Flexibilität bei der Skalierung des Systems ermöglicht.

Source:
https://dzone.com/articles/build-a-research-assistant-with-kafka-flink