WebSockets With RxJS

tl;dr;

WebSockets are a great way to communicate with a server in real time. RxJS is a great way to handle streams of data. Let's combine the two! In this article, we'll look at how to use RxJS to create a WebSocket connection and send and receive messages.

Connect to a WebSocket

Vannilla JS

You absolutely can use WebSockets with vanilla JavaScript. Here's a quick example:

var wsURL = 'ws://localhost:8080'
this.ws = new WebSocket(wsURL)

It's very easy to get set up, but it's not very easy to manage. You have to manually handle both the incoming and out going messages, the connection, and the disconnection of the socket.

RxJS

WebSockets are a first class citizen in RxJS. The WebSocket documentation is quite detailed, so I won't go into too much detail beyond the syntax.

Here is an example of how to connect to a WebSocket with RxJS using TypeScript:

interface WebSocketMessage {
  // describe the shape of the messages
  greeting: string;
}

const wsConfig = {
  url: "ws://localhost:8080",
  openObserver: {
    next: () => {
      console.log("WS connection opened, initializing state"),
    }
  },
  closeObserver: {
    next: (_event) => {
      console.log("WS connection closing, checking for errors"),
    }
  },
};

const ws = webSocket<WebSocketMessage>(wsConfig);
ws.subscribe();

In this case, there is more setup. We have to define the shape of the messages we are sending and receiving. We also define the openObserver and closingObserver to explicitly handle the opening and closing of the WebSocket. Here we can do things like initialize state or explicitly handle errors.

There is more boilerplate. However, we get things for free like automatic reconnection, type safe message interfaces, simplified message sending & receiving among others we'll get into.

Sending a message to the server

Vanilla JS

this.ws.send(JSON.stringify({ greeting: 'Hello World' }))

Again, this is a simple and fine way to send a message. Especially if we create a helper function that does the JSON.stringify for us.

RxJS

ws.next({ greeting: 'Hello World' })

This is even simpler. We can send in a plain object that adheres to our interface and RxJS will take care of the rest.

Receiving a message from the server

Vanilla JS

this.ws.onmessage = (event) => {
  var data = JSON.parse(event.data)
  // ...
}

This is where things get a little more complicated in the vanilla JS side. We have to assign the onmessage member of the WebSocket we initialized. We have to manually parse the message. This is starting to look like why we left JQuery.

RxJS

ws.pipe(
  tap((data) => {
    // ...
  }),
).subscribe()

RxJS makes it very easy to handle incoming messages. We can use the pipe function to chain together multiple operators before we subscribe to the WebSocket In this case, we use the tap operator to set state or run side effects. We can also use the map operator to transform the data. In fact, in this pipe we can use any of the operators RxJS provides.


Aside - WebSockets vs Socket.io

Socket.io is a fantastic alternative to using WebSockets directly. It provides a lot of the same benefits as raw WebSockets, but will use other protocols if WebSockets are not available. Here is a good Stack Overflow question and discussion on the differences between the two.

We can use RxJS alongside Socket.io. However, I'm not going to go into that in this article. I'm using a serverless WebSocket API using the Serverless Framework with AWS Lambda, DynamoDB and API Gateway (Blog article coming soon on the backend). This approach doesn't support Socket.io.


WebSockets Using RxJS With React

If you'd like to see a full example of this in action, check out this blogpost on Reactive React.