Skip to main content

Implement a custom Observable

You are required to implement a custom Observable from a RxJS library.

Observable is a concept that represents a stream of data or events delivered over time. It's like a fancy way of saying you can subscribe to a source that continuously pushes out new information rather than waiting for a single response.

An Observable is a lazy push collection of multiple values. It’s like a stream of data that can emit values over time.

Think of it like watching a live stream instead of a downloaded video. With a download, you get everything at once, but with a live stream, new content appears as it's produced. Observables provide a similar mechanism for data in JavaScript.

Observable vs Observer

  • Observable: This is the source of the data stream, similar to the live stream itself. It can produce any data, from numbers and strings to complex objects.
  • Observer: This entity subscribes to the Observable to receive the data. Imagine yourself as the viewer watching the livestream. The observer gets notified whenever new data is available.

Example of usage

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});

observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});

/*
Output:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

*/

Solution

class MyObservable {
constructor(subscribeFn) {
this._subscribeFn = subscribeFn;
}

subscribe(observer) {
this._subscribeFn(observer);
}
}

// Example usage:
const myObservable = new MyObservable((observer) => {
observer.next('Hello world!'); // Emit initial data
setTimeout(() => {
observer.next('Data after 1 second');
}, 1000);
setTimeout(() => {
observer.complete(); // Signal completion (no more data)
}, 2000);
});

myObservable.subscribe({
next(data) {
console.log('Received data:', data);
},
error(err) {
console.error('Error:', err);
},
complete() {
console.log('Observable completed');
},
});