事件驅動的人工智慧:使用 Kafka 和 Flink 構建研究助手

代理型人工智慧的興起激發了人們對於自主執行任務、提出建議以及執行將人工智慧與傳統計算結合的複雜工作流程的代理的興奮感。然而,在現實的產品驅動環境中創建這樣的代理面臨的挑戰超出了人工智慧本身的範疇。

如果沒有仔細的架構設計,組件之間的依賴關係可能會造成瓶頸,限制可擴展性,並使得隨著系統演變而維護變得複雜。解決方案在於解耦工作流程,讓代理、基礎設施和其他組件能夠流暢互動,而不受僵硬的依賴關係所限制。

這種靈活、可擴展的整合需要一種共享的“語言”來進行數據交換——由事件流驅動的穩健事件驅動架構(EDA)。通過圍繞事件組織應用程序,代理可以在響應式、解耦的系統中運作,每個部分獨立完成其工作。團隊可以自由選擇技術,分開管理擴展需求,並在組件之間保持清晰的界限,從而實現真正的敏捷性。

為了驗證這些原則,我開發了PodPrep AI,一個由人工智慧驅動的研究助手,幫助我準備在Software Engineering Daily和Software Huddle上的播客訪談。在這篇文章中,我將深入探討PodPrep AI的設計和架構,展示EDA和實時數據流如何驅動一個有效的代理系統。

注意:如果您只想查看代碼,可以跳轉到我的GitHub倉庫 這裡

為什麼選擇事件驅動架構來進行人工智慧?

在現實世界的 AI 應用中,緊密耦合的單體設計並不奏效。雖然概念驗證或演示通常使用單一的統一系統以簡化流程,但這種方法在生產環境中迅速變得不切實際,尤其是在分散式環境中。緊密耦合的系統會造成瓶頸,限制可擴展性,並減緩迭代速度——這些都是隨著 AI 解決方案增長而必須避免的關鍵挑戰。

考慮一個典型的 AI 代理。

它可能需要從多個來源提取數據,處理提示工程和 RAG 工作流程,並直接與各種工具互動以執行確定性工作流程。所需的協調非常複雜,依賴於多個系統。如果代理需要與其他代理進行通信,則複雜性只會增加。沒有靈活的架構,這些依賴會使擴展和修改幾乎變得不可能。

Example AI agent dependency graph

在生產環境中,不同的團隊通常負責堆疊的不同部分:MLOps 和數據工程管理 RAG 管道,數據科學選擇模型,應用開發人員構建界面和後端。一個緊密耦合的設置迫使這些團隊形成依賴,從而延緩交付並使擴展變得困難。理想情況下,應用層不應該需要了解 AI 的內部運作;它們應該在需要時簡單地消耗結果。

此外,人工智能應用無法孤立運作。要實現真正的價值,人工智能洞察需要在客戶數據平台(CDP)、客戶關係管理(CRM)、分析等之間無縫流動。客戶互動應實時觸發更新,直接供應到其他工具以進行操作和分析。如果沒有統一的方式,跨平台整合洞察將變成難以管理且無法擴展的拼湊。

基於事件驅動架構(EDA)的人工智能解決了這些挑戰,通過創建數據的“中央神經系統”。使用EDA,應用程序廣播事件,而不是依賴鏈式命令。這使得組件解耦,允許數據在需要的地方異步流動,使每個團隊能夠獨立工作。EDA促進無縫數據整合、可擴展增長和韌性——使其成為現代人工智能驅動系統的強大基礎。

設計一個可擴展的人工智能驅動研究代理

在過去兩年中,我在《Software Engineering Daily》、《Software Huddle》和《Partially Redacted》中主持了數百個播客。

為了準備每個播客,我進行深入的研究過程,準備一份播客簡報,其中包含我的想法、嘉賓和主題的背景,以及一系列潛在問題。為了建立這份簡報,我通常會研究嘉賓及其所工作的公司,收聽他們可能出現過的其他播客,閱讀他們撰寫的博客文章,以及了解我們將討論的主要主題。

我試圖編織到其他播客中我主持過的或與主題相關的經驗。整個過程需要相當多的時間和努力。大型播客運營擁有專門的研究人員和助手來為主持人做這項工作。我這裡沒有運作那種規模的運營。我必須自己完成所有這些。

為了應對這個問題,我想建立一個可以為我完成這項工作的代理。在高層次上,這個代理看起來像下面的圖片。

High-level flow diagram for the research agent I wanted to build

我提供基本的來源材料,如嘉賓姓名、公司、我想專注的主題、一些參考網址,如博客文章和現有的播客,然後一些人工智能的魔法發生,我的研究就完成了。

這個簡單的想法讓我創建了PodPrep AI,這是我的AI動力研究助手,只花費我代幣。

本文的其餘部分討論了PodPrep AI的設計,從用戶界面開始。

構建代理用戶界面

我設計了代理的界面作為一個Web應用程序,我可以輕鬆輸入研究過程的來源材料。這包括嘉賓的姓名、他們的公司、訪談主題、任何額外的上下文,以及相關博客、網站和之前的播客訪談的鏈接。

Example of creating a podcast research bundle

我本來可以給代理更少的指示,作為代理工作流程的一部分,讓它去找來源材料,但對於1.0版本,我決定提供來源URL。

這個網絡應用程式是一個標準的三層應用程式,是用Next.jsMongoDB構建的應用程式數據庫。它不知道任何關於人工智能的事情。它只允許用戶輸入新的研究包,這些研究包會出現在處理狀態,直到代理過程完成工作流並在應用程式數據庫中填充研究簡報。

List of processing and processed research requests

一旦人工智能魔法完成,我就可以訪問顯示下面輸入的簡報文件。

Example of a complete research bundle

創建代理工作流程

對於版本1.0,我希望能夠執行三個主要操作來構建研究簡報:

  1. 對於任何網站URL、博客文章或播客,檢索文本或摘要,將文本分成合理的大小,生成嵌入,並存儲向量表示。
  2. 從研究來源URL提取的所有文本中,提取最有趣的問題,並存儲這些問題。
  3. 生成一個播客研究簡報,結合基於嵌入的最相關內容,以前提出的最好問題,以及任何其他作為捆綁輸入的信息。

下面的圖顯示了從網絡應用程式到代理工作流程的架構。

Agentic workflow for PodPrep AI

上述的動作#1受處理URL和創建嵌入代理 HTTP接收端點的支持。

動作#2是使用Flink和Confluent Cloud中內置的人工智能模型支持來完成的。

最後,Action#3由生成研究簡報代理執行,也是一個HTTP接收端點,在前兩個動作完成後調用。

在接下來的部分中,我會詳細討論每個動作。

處理URL並創建嵌入代理

這個代理負責從研究來源URL和向量嵌入流水線中提取文本。下面是在幕後處理研究資料時正在發生的高層次流程。

Flow diagram for the Process URLs and Create Embeddings agent

一旦用戶創建並保存了一個研究捆綁到MongoDB,一個MongoDB源連接器會向名為research-requests的Kafka主題發送消息。這是啟動代理工作流的開始。

每個對HTTP端點的POST請求都包含了研究請求中的URL和MongoDB研究捆綁集合中的主鍵。

代理循環遍歷每個URL,如果不是蘋果播客,則檢索完整頁面HTML。由於我不知道該頁面的結構,無法依賴HTML解析庫來查找相關文本。取而代之的是,我使用下面的提示將頁面文本發送給gpt-4o-mini模型,溫度為零,以獲取所需內容。

JavaScript

 

對於播客,我需要做更多工作。

反向工程蘋果播客URL

為了從播客集數中提取數據,我們首先需要使用Whisper模型將音頻轉換為文本。但在此之前,我們必須找到每個播客集數的實際MP3文件,下載它並將其分割成25MB或更小的塊(Whisper的最大尺寸)。

挑戰在於蘋果不提供其播客集數的直接 MP3 連結。但是,MP3 檔案可在播客的原始 RSS 資料中找到,我們可以通過蘋果播客 ID 程式化地找到這個資料源。

例如,在下面的 URL 中,/id 後的數字部分是該播客的獨特蘋果 ID:

https://podcasts.apple.com/us/podcast/deep-dive-into-inference-optimization-for-llms-with/id1699385780?i=1000675820505

使用蘋果的 API,我們可以查詢播客 ID 並檢索包含 RSS 資料源 URL 的 JSON 回應:

https://itunes.apple.com/lookup?id=1699385780&entity=podcast

一旦我們有了 RSS 資料源 XML,我們可以在其中搜索特定集數。因為我們只有蘋果的集數 URL(而非實際標題),我們使用 URL 中的標題 slug 來定位資料源中的集數並檢索其 MP3 連結。

JavaScript

 

現在有了來自博客文章、網站和 MP3 的文字,代理使用 LangChain 的遞迴字元文字拆分器將文字拆分為塊並從這些塊生成嵌入。這些塊發佈到 text-embeddings 主題並下沉到 MongoDB。

  • 注意我選擇將 MongoDB 同時用作應用程式資料庫和向量資料庫。但是,由於我採取的 EDA 方法,這兩者可以很容易地成為獨立系統,只需將 Text Embeddings 主題的下沉連接器進行交換。

除了创建和发布嵌入之外,代理还会将来自来源的文本发布到名为full-text-from-sources的主题。发布到此主题将启动“动作#2”。

使用Flink和OpenAI提取问题

Apache Flink是一个开源的流处理框架,专为实时处理大量数据而构建,非常适合高吞吐量、低延迟的应用程序。通过将Flink与Confluent配对,我们可以将OpenAI的GPT等LLM直接引入流式工作流程中。这种集成实现了实时的RAG工作流程,确保提取问题的过程与最新的可用数据一起运行。

在流中拥有原始源文本还可以让我们随后引入使用相同数据的新工作流程,增强研究简报生成过程或将其发送到下游服务,如数据仓库。这种灵活的设置使我们可以随着时间的推移逐步添加额外的人工智能和非人工智能功能,而无需彻底改造核心管道。

在PodPrep AI中,我使用Flink从从源URL中提取问题。

设置Flink调用LLM涉及通过Confluent的CLI配置连接。以下是设置OpenAI连接的示例命令,但也有多个选项可用。

Shell

 

一旦連接建立,我可以在雲控制台或 Flink SQL shell 中創建模型。對於問題提取,我會相應地設置模型。

SQL

 

有了準備好的模型,我使用 Flink 內建的 ml_predict 函數從來源材料中生成問題,將輸出寫入名為 mined-questions 的流,該流與 MongoDB 同步以供以後使用。

SQL

 

Flink 還幫助跟踪所有研究材料何時被處理完畢,從而觸發研究簡報的生成。一旦在 mined-questions 中的 URL 與完整文本來源流中的 URL 匹配,便會將信息寫入 completed-requests 流。

SQL

 

隨著消息寫入 completed-requests,研究捆綁的唯一 ID 被發送到生成研究簡報代理

生成研究簡報代理

此代理使用所有最相關的研究材料,並使用 LLM 創建研究簡報。以下是創建研究簡報所涉及的事件高層次流程。

生成研究簡報代理的流程圖

讓我們來分解其中的一些步驟。為了構建 LLM 的提示,我將挖掘出的問題、主題、客人姓名、公司名稱、系統提示以及存儲在向量數據庫中與播客主題最相似的語境結合在一起。

由於研究包的上下文信息有限,直接從向量存儲中提取最相關的上下文是具有挑戰性的。為了解決這個問題,我讓LLM生成搜索查詢來定位最佳匹配的內容,如圖表中的“創建搜索查詢”節點所示。

JavaScript

 

利用LLM生成的查詢,我創建一個嵌入並通過向量索引搜索MongoDB,通過bundleId過濾以限制搜索到與特定播客相關的材料。

在識別出最佳上下文信息後,我構建一個提示並生成研究簡報,將結果保存到MongoDB以供網絡應用程序顯示。

實施時需注意的事項

我用Javascript編寫了PodPrep AI的前端應用程序和代理,但在實際情況下,代理可能會使用不同的語言,如Python。此外,為了簡化,處理網址和創建嵌入代理生成研究簡報代理都在同一個項目中運行於同一個網絡服務器。在實際的生產系統中,這些可以是無伺服器函數,獨立運行。

最後的想法

建立 PodPrep AI 強調了事件驅動架構如何使現實世界的 AI 應用程序能夠平滑擴展和適應。利用 Flink 和 Confluent,我創建了一個能夠實時處理數據的系統,推動了無需剛性依賴的 AI 驅動工作流程。這種解耦的方法允許組件獨立運作,同時通過事件流保持連接——這對於不同團隊管理堆疊的各個部分的複雜分佈式應用程序至關重要。

在當今的 AI 驅動環境中,跨系統訪問新鮮的實時數據至關重要。事件驅動架構作為數據的“中央神經系統”,在系統擴展時實現無縫集成和靈活性。

Source:
https://dzone.com/articles/build-a-research-assistant-with-kafka-flink