Source: RunNumberStreamify.js

const RunNumber = require("./RunNumber");
const Cloudy = require("./Cloudy");
const { Observable, BehaviorSubject } = require("rxjs");
const { multicast, refCount, map } = require("rxjs/operators");
const orderBy = require("lodash/orderBy");

// const Rx = require('rxjs/Rx');

/**
* augmented with RxJS
* useful for Angular
* @extends RunNumber
*/
class RunNumberStreamify extends RunNumber {
	constructor(cloudy, db) {
		super(cloudy, db);

		/** @type {Observable} */
		this.activities$ = autoStream(this.query, this).pipe(
			map((arr) => orderBy(arr, "time", "desc")),
			multicast(new BehaviorSubject([])), // necessary for multicast here because of multi-threading for cancel listener
			refCount()
		);
	}

	/**
	 * copied from super class RunNumber, not a good practice!
	 * make sure it gets GC'd
	 * @param {Object} cloudyOptions
	 * @param {Object} storeOptions
	 * @returns {Promise<RunNumber>} Ready'd RunNumber instance
	 */
	static async create(cloudyOptions, storeOptions) {
		const cloudy = await Cloudy.create(cloudyOptions);
		const db = await cloudy.store("runNumber", storeOptions);
		await db.load();
		return new RunNumberStreamify(cloudy, db);
	}
}

module.exports = RunNumberStreamify; // JSDoc, sorry

function autoStream(fn, context){
	return Observable.create((observer) => {
		function updateNext(e) {
			console.debug("updateNext", fn.name, e);
			observer.next(fn.apply(context));
		}

		updateNext("first");
		context.on("replicated", updateNext);
		context.on("write", updateNext);

		return function onComplete() {
			console.log("Completed", fn.name);
			context.removeListener("replicated", updateNext);
			context.removeListener("write", updateNext);
		};
	});
}