STOMP是一个令人惊叹的简单而强大的协议,用于发送消息,它是由流行的服务器如RabbitMQ、ActiveMQ和Apollo实现的。使用WebSocket上的STOMP是一种简单的协议,使其成为从网页浏览器发送消息的一个受欢迎的选择,因为像AMQP这样的协议受到限制,浏览器不支持TCP连接。

要使用WebSocket上的STOMP,您可以使用@stomp/stompjs,但它具有复杂的回调和复杂的API,适用于更专业的用例。幸运的是,还有一个不太知名的@stompjs/rx-stomp,它通过RxJS可观察对象提供了一个很好的接口。可观察对象不仅限于Angular,它们与React的工作方式也非常契合。当组合具有许多不同消息来源的复杂工作流和管道时,这是一个很棒的接口。

教程遵循与Angular初始版本类似的路径,但组件结构和代码风格更倾向于React的功能风格。

注意:本教程使用strict TypeScript编写,但由于我们只有5个类型声明,JavaScript代码几乎相同。对于JS版本,您可以跳过类型导入和定义。

目录

目标

在这里,我们将建立一个简化的聊天室应用程序,展示RxStomp在不同组件中的各个方面。总体上,我们希望实现:

  • 一个使用RxStomp连接到STOMP服务器的React前端。

  • 一个基于与STOMP服务器的连接显示实时连接状态的界面。

  • 任何可配置主题的发布/订阅逻辑。

  • 将RxStomp逻辑分散到多个组件中,以展示如何分离逻辑和责任。

  • 将RxStomp连接/订阅生命周期与React组件生命周期对齐,以确保没有泄漏或未关闭的观察者。

前提条件

  • 您应该运行一个STOMP服务器,以便React应用程序可以连接到它。在这里,我们将使用带有rabbitmq_web_stomp扩展的RabbitMQ。

  • 最新版本的React。本教程将使用v18,尽管旧版本也可能正常工作。

  • 熟悉 observables 也将有助于解决问题。

带有RabbitMQ的起始STOMP服务器

如果您也想使用RabbitMQ(不是严格必需的),以下是不同操作系统的安装指南。为了添加扩展,您需要运行:

$ rabbitmq-plugins enable rabbitmq_web_stomp

如果您能够使用Docker,一个类似于这个的Docker文件将为您设置教程所需的一切:

FROM rabbitmq:3.8.8-alpine

run rabbitmq-plugins enable --offline rabbitmq_web_stomp

EXPOSE 15674

启动 React 模板

在本教程中,我们将使用 Vitereact-ts 模板。我们应用程序的核心将位于 App 组件中,并且我们将为其他特定的 STOMP 功能创建子组件。

如何安装 RxStomp

我们将使用 @stomp/rx-stomp npm 包:

$ npm i @stomp/rx-stomp rxjs

这将安装版本 2.0.0

注意:本教程不显式指定 rxjs 仍然可以工作,因为它是一个姐妹依赖项,但是明确地这样做是一个好的实践。

如何管理与 STOMP 服务器的连接和断开

现在,让我们打开 App.tsx 并初始化我们的 RxStomp 客户端。由于客户端不是一个将会影响渲染的状态,我们将把它包裹在 useRef 钩子中。

// src/App.tsx
import { useRef } from 'react'
import { RxStomp } from '@stomp/rx-stomp'

import './App.css'

function App() {
  const rxStompRef = useRef(new RxStomp())
  const rxStomp = rxStompRef.current

  return (
    <>
      <h1>Hello RxStomp!</h1>
    </>
  )
}

export default App

假设默认端口和认证详细信息,我们接下来定义一些连接配置。

// src/App.tsx

import { RxStomp } from '@stomp/rx-stomp'
import type { RxStompConfig } from '@stomp/rx-stomp'
...
const rxStompConfig: RxStompConfig = {
  brokerURL: 'ws://localhost:15674/ws',
  connectHeaders: {
    login: 'guest',
    passcode: 'guest',
  },
  debug: (msg) => {
    console.log(new Date(), msg)
  },
  heartbeatIncoming: 0,
  heartbeatOutgoing: 20000,
  reconnectDelay: 200,
}

function App() {
  ...

为了更好的开发体验,我们在本地控制台中记录了所有带有时间戳的消息,并设置了低频率的定时器。您的配置应该与您的生产应用程序大不相同,因此查看 RxStompConfig 文档 中所有可用的选项。

接下来,我们将在一个useEffect钩子内部将配置传递给rxStomp。这使得连接的激活与组件的生命周期一同管理。

// src/App.tsx
...
function App() {
  const rxStompRef = useRef(new RxStomp())
  const rxStomp = rxStompRef.current

  useEffect(() => {
    rxStomp.configure(rxStompConfig)
    rxStomp.activate()

    return () => { 
      rxStomp.deactivate() 
    }
  })
  ...

虽然我们应用中没有视觉上的变化,但检查日志应该会显示连接和心跳日志。以下是应该看起来怎样的示例:

Date ... >>> CONNECT
login:guest
passcode:guest
accept-version:1.2,1.1,1.0
heart-beat:20000,0

Date ... Received data 
Date ... <<< CONNECTED
version:1.2
heart-beat:0,20000
session:session-EJqaGQijDXqlfc0eZomOqQ
server:RabbitMQ/4.0.2
content-length:0

Date ... connected to server RabbitMQ/4.0.2 
Date ... send PING every 20000ms 
Date ... <<< PONG 
Date ... >>> PING

注意:通常,如果您看到重复的日志,这可能是未正确实现停用或取消订阅功能的迹象。在开发环境中,React会渲染每个组件两次以帮助人们通过React.StrictMode捕捉这些错误

如何监控连接状态

RxStomp有一个RxStompState枚举,它代表了与我们的代理可能存在的连接状态。我们下一个目标是将连接状态显示在我们的UI中。

让我们为此创建一个新组件,称为Status.tsx

// src/Status.tsx
import { useState } from 'react'

export default function Status() {
  const [connectionStatus, setConnectionStatus] = useState('')

  return (
    <>
      <h2>Connection Status: {connectionStatus}</h2>
    </>
  )
}

我们可以使用rxStomp.connectionState$可观察对象来绑定我们的connectionStatus字符串。与使用useEffect类似,我们将使用卸载操作来unsubscribe()

// src/Status.tsx
import { RxStompState } from '@stomp/rx-stomp'
import { useEffect, useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'


export default function Status(props: { rxStomp: RxStomp }) {
  const [connectionStatus, setConnectionStatus] = useState('')

  useEffect(() => {
    const statusSubscription = props.rxStomp.connectionState$.subscribe((state) => {
      setConnectionStatus(RxStompState[state])
    })

    return () => {
      statusSubscription.unsubscribe()
    }
  }, [])

  return (
    <>
      <h2>Connection Status: {connectionStatus}</h2>
    </>
  )
}

要查看它,我们将其包含在我们的应用中:

// src/App.tsx
import Status from './Status'
...
  return (
    <>
      <h1>Hello RxStomp!</h1>

      <Status rxStomp={rxStomp}/>
    </>
  )

此时,您应该在屏幕上看到一个工作的视觉指示器。尝试通过关闭STOMP服务器并查看日志是否按预期工作。

如何发送消息

让我们创建一个简单的聊天室,展示经纪人的简化端到端消息流程。

我们可以将功能放在一个新的Chatroom组件中。首先,我们可以创建带有自定义usernamemessage字段的组件,这些字段绑定到输入框中。

// src/Chatroom.tsx
import { useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'

export default function Chatroom(props: {rxStomp: RxStomp}) {
  const [message, setMessage] = useState('')
  const [userName, setUserName] = useState(`user${Math.floor(Math.random() * 1000)}`)

  return (
    <>
      <h2>Chatroom</h2>

      <label htmlFor='username'>Username: </label>
      <input
        type='text'
        name='username'
        value={userName}
        onChange={(e) => setUserName(e.target.value)}
      />

      <label htmlFor='message'>Message: </label>

      <input
        type='text'
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        name='message'
      />
    </>
  )    
}

让我们在App中包含这个组件,并添加一个切换按钮来加入聊天室:

// src/App.tsx
import { useEffect, useState, useRef } from 'react'
import Chatroom from './Chatroom'
...
function App() {
  const [joinedChatroom, setJoinedChatroom] = useState(false)
  ...
  return (
    <>
      <h1>Hello RxStomp!</h1>

      <Status rxStomp={rxStomp}/>

      {!joinedChatroom && (
        <button onClick={() => setJoinedChatroom(true)}>
          Join chatroom!
        </button>
      )}

      {joinedChatroom && (
        <>
          <button onClick={() => setJoinedChatroom(false)}>
            Leave chatroom!
          </button>

          <Chatroom rxStomp={rxStomp}/>
        </>
      )}

    </>
  )

现在是时候真正发送消息了。STOMP最适合发送基于文本的消息(也可以发送二进制数据)。我们将在一个新的types文件中定义要发送的数据结构:

// types.ts
interface ChatMessage {
  userName: string,
  message: string
}

注意:如果您不使用TypeScript,可以跳过添加此类型定义。

接下来,我们将使用JSON来序列化消息,并使用.publish将消息发送到我们的STOMP服务器,包括目标主题和JSONbody

// src/Chatroom.tsx
import type { ChatMessage } from './types'
...
const CHATROOM_NAME = '/topic/test'

export default function Chatroom(props: {rxStomp: RxStomp}) {
  ...
  function sendMessage(chatMessage: ChatMessage) {
    const body = JSON.stringify({ ...chatMessage })
    props.rxStomp.publish({ destination: CHATROOM_NAME, body })
    console.log(`Sent ${body}`)
    setMessage('')
  }

  return (
    <>
      <h2>Chatroom</h2>

      <label htmlFor="username">Username: </label>
      <input
        type="text"
        name="username"
        value={userName}
        onChange={(e) => setUserName(e.target.value)}
      />

      <label htmlFor="message">Message: </label>

      <input
        type="text"
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        name="message"
      />

      <button onClick={() => sendMessage({userName, message})}>Send Message</button>
    </>
  )
}

为了测试,尝试点击几次发送消息按钮,看看序列化是否正常工作。虽然您目前看不到任何视觉变化,但控制台日志应该会显示出来:

Date ... >>> SEND
destination:/topic/test
content-length:45

Sent {"userName":"user722","message":"1234567890"}

如何接收消息

我们将创建一个新的组件来显示所有用户的消息列表。现在,我们将使用相同的类型,将主题名称作为属性传递,并将所有内容显示为列表。所有这些都放在一个名为MessageList的新组件中。

// src/MessageDisplay.tsx
import { useEffect, useState } from 'react'
import type { RxStomp } from '@stomp/rx-stomp'
import type { ChatMessage } from './types'

export default function MessageDisplay(props: {rxStomp: RxStomp, topic: string}) {
  const [chatMessages, setChatMessages] = useState<ChatMessage[]>([
    {userName: 'admin', message: `Welcome to ${props.topic} room!`}
  ])

  return(
  <>
  <h2>Chat Messages</h2>
  <ul>
    {chatMessages.map((chatMessage, index) => 
      <li key={index}>
        <strong>{chatMessage.userName}</strong>: {chatMessage.message}
      </li>
    )}
  </ul>
  </>
  )
}

是时候将所有内容整合起来了!

Status组件管理订阅类似,我们在挂载时设置订阅,并在卸载时取消订阅。

使用RxJS的pipemap,我们可以将JSON反序列化为我们的ChatMessage。模块化设计可以让您根据需要使用RxJS操作符设置更复杂的管道。

// src/MessageDisplay.tsx
...
import { map } from 'rxjs'

export default function MessageDisplay(props: {rxStomp: RxStomp, topic: string}) {
  const [chatMessages, setChatMessages] = useState<ChatMessage[]>([
    {userName: 'admin', message: `Welcome to ${props.topic} room!`}
  ])

  useEffect(() => {
    const subscription = props.rxStomp
      .watch(props.topic)
      .pipe(map((message) => JSON.parse(message.body)))
      .subscribe((message) => setChatMessages((chatMessages) => [...chatMessages, message]))

    return () => {
      subscription.unsubscribe()
    }
  }, [])

  ...

此时,聊天GUI应该能够正确显示消息,并且您可以尝试在不同的用户下打开多个选项卡。

在这里尝试的另一件事是关闭STOMP服务器,发送一些消息,然后再打开它。消息应该在服务器准备好后本地排队并发送。很棒!

总结

在本教程中,我们:

  • 安装了@stomp/rx-stomp,以获得良好的开发体验。

  • 设置了RxStompConfig,用于配置客户端的连接详细信息、调试日志和计时器设置。

  • 使用rxStomp.activaterxStomp.deactivate来管理客户端的主要生命周期。

  • 使用rxStomp.connectionState$可观察对象监控订阅状态。

  • 使用rxStomp.publish发布消息,具有可配置的目的地和消息体。

  • 使用rxStomp.watch为给定主题创建可观察对象。

  • 既使用控制台日志,也使用React组件来查看库的实际运行情况,并验证功能和故障容错。

您可以在Gitlab上找到最终代码:https://gitlab.com/harsh183/rxstomp-react-tutorial。请随意将其作为起点模板使用,并报告可能出现的问题。