STOMP 是一個極其簡單且強大的協議,用於傳送消息,它被像 RabbitMQ、ActiveMQ 和 Apollo 這樣popular 的服務器實現。使用 WebSocket 上的 STOMP 是一個直觀的協議,使其成為從網絡瀏覽器傳送消息的popular 選擇,因為像 AMQP 這樣的協議受到限制,因為瀏覽器不允許 TCP 連接。

要使用 WebSocket 上的 STOMP,你可以使用 @stomp/stompjs,但它具有複雜的回調和複雜的 API,適合更專業的用途。幸運的是,還有一個較少被人知的 @stompjs/rx-stomp,它通過 RxJS 觀察員提供一個很好的接口。觀察員不僅限於 Angular,並且與 React 的運作方式非常契合。當組合複雜的工作流程和管道時,它是一個很好的接口,這些管道來自很多不同的消息來源。

教程沿著與 Angular 初始版本 Angular 類似的路徑進行,但部件結構和代碼風格則針對 React 的功能性風格進行調整。

注意: 這個教程是用 strict TypeScript 撰寫的,但 JavaScript 代碼几乎是相同的,因為我們只有 5 種類型声明。對於 JS 版本,你可以跳過類型导入和声明。

目錄

目标

在此,我们将构建一个简化的聊天室应用程序,展示RxStomp在不同组件中的各种方面。总体而言,我们希望实现:

  • 一个通过RxStomp连接到STOMP服务器的React前端。

  • 根据与STOMP服务器的连接显示实时连线状态。

  • 任何可配置主题的发布/订阅逻辑。

  • 将RxStomp逻辑分散到多个组件中,以展示如何分离逻辑和责任。

  • 將RxStomp的連接/訂閱生命週期與React元件生命週期對齐,以確保沒有泄露或未關閉的觀察者。

前提

  • 您應該運行一個STOMP服務器,以便React應用程序可以與之連接。在這裡,我們將使用帶有rabbitmq_web_stomp擴展的RabbitMQ。

  • 最新版的React。本教程將使用v18,雖然較舊的版本可能也會正常工作。

  • 對觀察者有一定的熟悉度也会有帮助。

使用RabbitMQ的起始STOMP服務器

如果您也想使用RabbitMQ(非必需),這裡有不同操作系統的安裝指南。要添加擴展,您需要運行:

$ rabbitmq-plugins enable rabbitmq_web_stomp

如果您能夠使用Docker,一個類似於這個的Docker文件將為教程設定所有需要的內容:

FROM rabbitmq:3.8.8-alpine

run rabbitmq-plugins enable --offline rabbitmq_web_stomp

EXPOSE 15674

React 開始模板

在這個教程中,我們將使用Vitereact-ts模板。我們應用程序的中心组件將是App组件,並為其他特定的STOMP功能創建子组件。

如何安裝RxStomp

我們將使用@stomp/rx-stomp npm包:

$ npm i @stomp/rx-stomp rxjs

這將安裝版本2.0.0

注意:本教程不指定rxjs也可以正常工作进行,因為它是姐妹依賴,但是明确指它是好的做法。

如何管理與STOMP服務器的連接和斷開

現在,讓我們打開App.tsx並初始化我們的RxStomp客戶端。由於客戶端不是 rendering 會變化的狀態,我們將把它包在useRef Hook裡。

// src/App.tsx
import { useRef } from 'react'
import { RxStomp } from '@stomp/rx-stomp'

import './App.css'

function App() {
  const rxStompRef = useRef(new RxStomp())
  const rxStomp = rxStompRef.current

  return (
    <>
      <h1>Hello RxStomp!</h1>
    </>
  )
}

export default App

假設默認端口和登錄詳細信息,我們接下來為我們的連接定義一些配置。

// src/App.tsx

import { RxStomp } from '@stomp/rx-stomp'
import type { RxStompConfig } from '@stomp/rx-stomp'
...
const rxStompConfig: RxStompConfig = {
  brokerURL: 'ws://localhost:15674/ws',
  connectHeaders: {
    login: 'guest',
    passcode: 'guest',
  },
  debug: (msg) => {
    console.log(new Date(), msg)
  },
  heartbeatIncoming: 0,
  heartbeatOutgoing: 20000,
  reconnectDelay: 200,
}

function App() {
  ...

為了更好的開發體驗,我們將所有消息與時間戳記錄到本地控制台,並將計時器頻率設置得較低。您的配置應該非常不同於您的生產應用程序,所以請查看RxStompConfig 文档,了解所有可用的選項。

接下來的步驟,我們會在一個 useEffect 挂钩內將配置傳遞給 rxStomp。這會管理与元件生命週期一同進行的连线激活。

// src/App.tsx
...
function App() {
  const rxStompRef = useRef(new RxStomp())
  const rxStomp = rxStompRef.current

  useEffect(() => {
    rxStomp.configure(rxStompConfig)
    rxStomp.activate()

    return () => { 
      rxStomp.deactivate() 
    }
  })
  ...

雖然我們的應用程序中沒有視覺變化,但查看日誌應該會顯示连线和心跳日誌。以下是一個應該要看起來像什麼的例子:

Date ... >>> CONNECT
login:guest
passcode:guest
accept-version:1.2,1.1,1.0
heart-beat:20000,0

Date ... Received data 
Date ... <<< CONNECTED
version:1.2
heart-beat:0,20000
session:session-EJqaGQijDXqlfc0eZomOqQ
server:RabbitMQ/4.0.2
content-length:0

Date ... connected to server RabbitMQ/4.0.2 
Date ... send PING every 20000ms 
Date ... <<< PONG 
Date ... >>> PING

注意:通常,如果您看到重複的日誌,這可能是未正確實現脱機或取消訂閱功能的迹象。在開發環境中,React會為每個元件渲染两次,以幫助人們捕捉這些錯誤React.StrictMode

如何監視连线狀態

RxStomp有一個RxStompState枚舉,代表與我們的中介器可能的连线狀態。我們下一个目標是在我們的UI中顯示连线狀態。

我們來為這 create一個新元件,稱為 Status.tsx

// src/Status.tsx
import { useState } from 'react'

export default function Status() {
  const [connectionStatus, setConnectionStatus] = useState('')

  return (
    <>
      <h2>Connection Status: {connectionStatus}</h2>
    </>
  )
}

我們可以使用rxStomp.connectionState$ 可觀察物件來绑定到我們的connectionStatus字符串。與我們使用useEffect的方式類似,我們將使用卸載動作來unsubscribe()

// src/App.tsx
import { RxStompState } from '@stomp/rx-stomp'
import { useEffect, useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'


export default function Status(props: { rxStomp: RxStomp }) {
  const [connectionStatus, setConnectionStatus] = useState('')

  useEffect(() => {
    const statusSubscription = props.rxStomp.connectionState$.subscribe((state) => {
      setConnectionStatus(RxStompState[state])
    })

    return () => {
      statusSubscription.unsubscribe()
    }
  }, [])

  return (
    <>
      <h2>Connection Status: {connectionStatus}</h2>
    </>
  )
}

为了查看它,我们需要在我们的应用中包含它:

// src/App.tsx
import Status from './Status'
...
  return (
    <>
      <h1>Hello RxStomp!</h1>

      <Status rxStomp={rxStomp}/>
    </>
  )

至此,您應該在萤幕上有一個運作中的視覺指標。試著將STOMP服務關閉,並查看記錄是否如預期運作。

如何傳送消息

我們來建立一個簡單的聊天室,以展示带有经纪人的简化端到端消息流程。

我們可以把功能放在一個新的聊天室元件裡。首先,我們可以創建一個具有自訂用戶名消息字段,並將其绑定到輸入。

// src/Chatroom.tsx
import { useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'

export default function Chatroom(props: {rxStomp: RxStomp}) {
  const [message, setMessage] = useState('')
  const [userName, setUserName] = useState(`user${Math.floor(Math.random() * 1000)}`)

  return (
    <>
      <h2>Chatroom</h2>

      <label htmlFor='username'>Username: </label>
      <input
        type='text'
        name='username'
        value={userName}
        onChange={(e) => setUserName(e.target.value)}
      />

      <label htmlFor='message'>Message: </label>

      <input
        type='text'
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        name='message'
      />
    </>
  )    
}

讓我們把它放在我們的App中,並有一個切換加入聊天室:

// src/App.tsx
import { useEffect, useState, useRef } from 'react'
import Chatroom from './Chatroom'
...
function App() {
  const [joinedChatroom, setJoinedChatroom] = useState(false)
  ...
  return (
    <>
      <h1>Hello RxStomp!</h1>

      <Status rxStomp={rxStomp}/>

      {!joinedChatroom && (
        <button onClick={() => setJoinedChatroom(true)}>
          Join chatroom!
        </button>
      )}

      {joinedChatroom && (
        <>
          <button onClick={() => setJoinedChatroom(false)}>
            Leave chatroom!
          </button>

          <Chatroom rxStomp={rxStomp}/>
        </>
      )}

    </>
  )

是時候實際傳送消息了。STOMP最適合傳送基於文字的消息(二進制數據也是可能的)。我們将在一个新的types文件中定义我們正在傳送數據的結構:

// types.ts
interface ChatMessage {
  userName: string,
  message: string
}

注意:如果您不使用TypeScript,您可以跳過添加此类型定義。

接下來,讓我們使用JSON將消息序列化,並使用.publish與目的地主題和我們的JSON body將消息傳送到我們的STOMP服務器。

// src/Chatroom.tsx
import type { ChatMessage } from './types'
...
const CHATROOM_NAME = '/topic/test'

export default function Chatroom(props: {rxStomp: RxStomp}) {
  ...
  function sendMessage(chatMessage: ChatMessage) {
    const body = JSON.stringify({ ...chatMessage })
    props.rxStomp.publish({ destination: CHATROOM_NAME, body })
    console.log(`Sent ${body}`)
    setMessage('')
  }

  return (
    <>
      <h2>Chatroom</h2>

      <label htmlFor="username">Username: </label>
      <input
        type="text"
        name="username"
        value={userName}
        onChange={(e) => setUserName(e.target.value)}
      />

      <label htmlFor="message">Message: </label>

      <input
        type="text"
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        name="message"
      />

      <button onClick={() => sendMessage({userName, message})}>Send Message</button>
    </>
  )
}

來试试看,嘗試點擊几次傳送消息按鈕,看看序列化是否正常工作。雖然您還無法看到任何視覺上的變化,但控制台日志應該會显示它:

Date ... >>> SEND
destination:/topic/test
content-length:45

Sent {"userName":"user722","message":"1234567890"}

如何接收消息

我們將創建一個新的組件,以顯示來自所有用戶的消息列表。目前,我們將使用相同的類型,將主題名稱作為屬性傳遞,並將所有內容顯示為列表。所有這些都放在一個名為MessageList的新組件中。

// src/MessageDisplay.tsx
import { useEffect, useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'
import type { ChatMessage } from './types'

export default function MessageDisplay(props: {rxStomp: RxStomp, topic: string}) {
  const [chatMessages, setChatMessages] = useState<ChatMessage[]>([
    {userName: 'admin', message: `Welcome to ${props.topic} room!`}
  ])

  return(
  <>
  <h2>Chat Messages</h2>
  <ul>
    {chatMessages.map((chatMessage, index) => 
      <li key={index}>
        <strong>{chatMessage.userName}</strong>: {chatMessage.message}
      </li>
    )}
  </ul>
  </>
  )
}

是時候將所有東西整合在一起了!

與使用Status組件管理訂閱的方式類似,我們在組件掛載時設置訂閱,並在組件卸載時取消訂閱。

使用RxJS的pipemap,我們可以將JSON反序列化為我們的ChatMessage。模塊化的設計可以讓您根據需要使用RxJS操作符設置更復雜的管道。

// src/MessageDisplay.tsx
...
import { map } from 'rxjs'

export default function MessageDisplay(props: {rxStomp: RxStomp, topic: string}) {
  const [chatMessages, setChatMessages] = useState<ChatMessage[]>([
    {userName: 'admin', message: `Welcome to ${props.topic} room!`}
  ])

  useEffect(() => {
    const subscription = props.rxStomp
      .watch(props.topic)
      .pipe(map((message) => JSON.parse(message.body)))
      .subscribe((message) => setChatMessages((chatMessages) => [...chatMessages, message]))

    return () => {
      subscription.unsubscribe()
    }
  }, [])

  ...

此時,聊天GUI應該正確顯示消息,您可以嘗試打開多個選項卡作為不同的用戶。

在這裡可以嘗試的另一件事是關閉STOMP服務器,發送幾個消息,然後再打開它。消息應該在服務器準備就緒時被本地排隊並分發。很棒!

總結

在本教程中,我們:

  • 安裝了@stomp/rx-stomp以獲得良好的開發體驗。

  • 設置了RxStompConfig以使用連接詳情、調試器日誌記錄和計時器設置配置客戶端。

  • 使用 rxStomp.activaterxStomp.deactivate 來管理客戶端的主要生命週期。

  • 使用 rxStomp.connectionState$ 可觀察值監控訂閱狀態。

  • 使用 rxStomp.publish 發布可在配置目的地和消息體上調用的信息。

  • 使用 rxStomp.watch為給定主題創建一個可觀察值。

  • 使用控制台日誌和 React 元件來查看庫的實際作用,並驗證功能和故障容忍度。

您可以者在 Gitlab 上找到最終的代碼:https://gitlab.com/harsh183/rxstomp-react-tutorial。您可以自由地將其作為開始模板使用,並報告可能出現的任何問題。