Full Stack / 4 min read
Observables using RXJS
Introduction
Observables using RXJS

Introduction
Observables in RxJS (Reactive Extensions for JavaScript) provide a powerful mechanism for handling asynchronous data streams, such as real-time data from a server, user inputs, or events.
This provides an in-depth guide on creating an RxJS Observable that receives incoming data, processes it, and then changes the user interface (UI) with the results.
Step 1: Install RxJS
If you’re using a build tool like Webpack or a framework like React, you can install RxJS through npm or yarn:
npm install rxjs
# or
yarn add rxjsStep 2: Create an Observable to Listen for Data
Let’s assume you’re listening to server-side updates via Web Socket. You can create an observable that listens to the Web Socket stream:
import { Observable } from 'rxjs';
// Create an observable to listen to a WebSocket stream
const myObservable$ = new Observable(subscriber => {
const ws = new WebSocket('wss://test.com/data'); // Connect to a WebSocket server
// When data is received from the WebSocket, pass it to the subscriber
ws.onmessage = event => {
const data = JSON.parse(event.data); // Parse the incoming data
subscriber.next(data); // Emit the data to the subscriber
};
// Handle WebSocket errors
ws.onerror = error => subscriber.error(error);
// Clean up WebSocket connection on completion
return () => ws.close();
});Step 3: Modify the Incoming Data
You can use RxJS operators to modify the data stream before the UI consumes it. Let’s use the map operator to alter the data.
import { map } from 'rxjs/operators';
// Modify the incoming data
const modifiedMyObservable$ = myObservable$.pipe(
map((data) => ({
...data,
timestamp: new Date().toISOString()
}))
);Step 4: Subscribe to the Observable and Update the UI
Once the observable has been set up and the data has been changed, you must subscribe to it and update the user interface whenever new data becomes available.
modifiedMyObservable$.subscribe({
next: (data) => {
// Update the UI with the incoming data
updateUI(data);
},
error: (err) => {
console.error('Error in stream:', err);
},
complete: () => {
console.log('Incoming stream completed');
}
});
// Function to update the UI
function updateUI(data) {
const dataDisplay = document.getElementById('info-para');
dataDisplay.innerHTML = `Details: ${JSON.stringify(data)}`;
}Step 5: Handling Disconnection or Completion
If the Web Socket connection is terminated or you need to manage disconnection and reconnection behaviour, you can utilize more advanced RxJS operators like retry, catchError, or finalize.
For example, to automatically retry on an error:
import { retry } from 'rxjs/operators';
const retryingObservable = myObservable$.pipe(
retry(4) // Retry up to 4 times before failing
);
retryingObservable.subscribe({
next: data => updateUI(data),
error: err => console.error('Failed after 4 retries:', err)
});Explanation:
- Observable: listens (for example, through Web Socket) to a real-time data stream.
- Operators: RxJS operators (
map,retryetc.) can be used to handle data stream failures, transform, and retry failures. - Subscription: Subscribes to the observable to consume the real-time data and refresh the UI when data is received or modified.
- Error Handling: Uses RxJS error handling and retry operators to handle connection failures and retries.
Benefits:
- Real-Time Data Handling: Handle real-time server data updates with ease.
- Error Handling: Robust error management with automatic retries.
- Efficient UI Updates: To enhance the user experience, only update the user interface when new data is received.
By integrating RxJS with observables and Web Sockets (or other real-time APIs), you can efficiently manage real-time data streams in a web application while ensuring seamless user interaction and performance.
Queries and Doubts
Thanks for the read :) Hope you have enjoyed reading it 🤩 and learned something new today.
If you have any doubts or queries feel free to drop a comment or
⁍ Connect with me on my 🔗Topmate 💬.
⁍ You can also reach out to me on my 🔗LinkedIn.
⁍ Please clap for this post if you enjoyed reading it 📗 and follow for more interesting articles.
⁍ You can also support me and my writings by treating me to a nice virtual cup of coffee ☕️.