Question

RxJS observable that fires if it's the first of a kind OR a certain time went by

Hey RxJS pros out there,

I have a stream that procs multiple times a second. Now I want an observer that procs if it's the first of a kind OR a certain time went by after the last proc. I want to achieve this with pure RxJS without additional "helper variables".

Scenario:

const list = ["foo", "foo", "bar", "foo",
              "foo", "foo", "foo", "foo",
              "foo", "foo", "bar", "foo"];

// in real world obs$ drops "foo" and "bar" randomly infinite times
const obs$ = timer(0, 100).pipe(take(12)); 

$obs
  .pipe(map((v, i)=>list[i]+"#"+i))
  .subscribe(console.log); 

Below are all the values fired by the observable. I want to catch the green ones (✅), and ignore the red ones (❌).

// ⬇
"foo#1"  // ✅ first of a kind
"foo#2"  // ❌
"bar#3"  // ✅ first of a kind
"foo#4"  // ✅ first of a kind
"foo#5"  // ❌
"foo#6"  // ❌
"foo#7"  // ❌
"foo#8"  // ❌
"foo#9"  // ✅ <-- I want this one too, because a certain time (0.5 seconds) went by
"foo#10" // ❌
"bar#11" // ✅ first of a kind
"foo#12" // ✅ first of a kind

So I want this output:

1#foo
3#bar
4#foo
9#foo
11#bar
12#foo

How?

 3  117  3
1 Jan 1970

Solution

 2

You can use the scan operator to compute a "state" based on a prior state and the incoming emission.

In this case, our "state" will be an object with all the info needed to determine whether a given emission should be emitted or not. We can provide an initial state that the scan function will use when handling the first emission. After that, it will use the value we return as the state.

Here's what we can use as the initial state:

{
  startTime: Date.now(),
  previousKind: '',
  shouldEmit: true,
  item: '',
}
  • startTime is used to determine if enough time has passed to force emitting.
  • previousKind is to keep track of the kind of item previously emitted, so we can determine if the current item is different from the previous.
  • shouldEmit is a boolean to indicate if this item should be emitted.
  • item is just the emitted item.

This info will be used inside our scan operator below to generate the new state and this is the same shape that will be emitted by the scan operator:

obs$.pipe(
  scan(/* generates state - details revealed later */),
  filter(state => state.shouldEmit),
  map(state => state.item)
).subscribe(
  item => console.log(`✅ ${item}`)
); 

You can see we apply our scan operation, then simply filter out items that are not marked as shouldEmit, then, we use map to emit the original item.


Here is the contents of the scan operator.

  scan((state, item) => {
    const kind = item.split('#')[0];
    const isFirstOfKind = kind !== state.previousKind;
    const durationExceeded = Date.now() - state.startTime > DURATION;
    const shouldEmit = isFirstOfKind || durationExceeded;

    return {
      startTime: shouldEmit ? Date.now() : state.startTime,
      previousKind: kind,
      shouldEmit,
      item,
    }
  }, {
    startTime: Date.now(),
    previousKind: '',
    shouldEmit: true,
    item: '',
  })

You can see we pass scan a function that receives the prior state and the current emission ('item'). With this info, we return a new state that will be emitted to operators downstream. This state is also available the next time scan receives an emission.


Here is a StackBlitz demo.

2024-07-06
BizzyBob

Solution

 1

After BizzyBob gave the correct answer I found something that feels more "native". It's not fullfilling the given requirement 100%, but it does the job for my real life scenario. So I want to share it.

obs$
  .pipe(
    groupBy((x) => x.split('#')[0]),
    mergeMap((group) =>
      group.pipe(
        throttleTime(500, undefined, { leading: false, trailing: true })
      )
    )
  )
  .subscribe((item) => console.log(`✅ ${item}`));

throttleTime() always outputs a distincted value after some time (500ms), if at at least one value came into input in that timespan. Because I grouped values with groupBy, throttleTime() can count on that all input values are the same.

Here is the output:

✅ bar#3
✅ foo#5
✅ foo#10
✅ bar#11
✅ foo#12

As mentioned it's not matching the requirement, but still, it's working like debounceTime PLUS "forcing piping every x seconds". That was what I needed.

Stackblitz

2024-07-08
jaheraho