Question

Looping through Observable and mapping result cuncurrently

I have a NgRx selector data$: Observable<Data[]> which contains array of objects. Inside my component I need to go through each data item and make two HTTP calls. One is conditional (based on data from another observable isConditionTrue$: Observable<boolean>).

The data is being subscribed inside the table via async pipe. Additional HTTP calls will map some of the data attributes.

HTTP calls should be performed concurrently.

For instance:

  1. The Component initialization, data$ is being subscribed inside the table, value is empty array []
  2. data$ emits the first value, an array has length 5, 5 items will be displayed in the table immediately
  3. Loop through 5 items
  4. Each item triggers httpCallA
  5. Some item triggers httpCallB if isConditionTrue$ returns true
  6. httpCallB from item no.3 is completed
  7. Table item no.3 will be rerendered
  8. httpCallA from item no.2 is completed
  9. Table item no.2 is rerendered and so on.

If some of the request fails, other requests should continue.

I have tried pushing result of the HTTP call into results array, however it is not a good solution (as it creates more results than we need).

I had also an idea to find object from data by ID after each HTTP call is completed, but maybe there is a better solution.

Whole code feels very complicated and I can imagine that somebody else could have hard time during reading it.

vm$ = combineLatest([data$, isConditionTrue$]).pipe(
  switchMap(([data, isConditionTrue]) => {
    // no data
    if (!data?.length) {
      return of({ data: [], isConditionTrue });
    }
    const result = [];
    // loop through data
    return from(data).pipe(
      // create concurent HTTP calls
      mergeMap((dataItem) => {
        // combine multiple http request
        return merge(
          // first http call (should be performed everytime)
          httpCallA(dataItem.id).pipe(
            map((response) => {
              if (!response) {
                return dataItem;
              }
              return {
                ...response,
                ...dataItem,
              };
            }),
            catchError((error) => {
              console.error(error);
              return of(dataItem);
            }),
          ),
          // second http request (conditional)
          of(dataItem).pipe(
            filter(() => isConditionTrue), // only perform when condition is true
            mergeMap(() => {
              // if dataItem is of specific type call HTTP call
              if (dataItem.conditionA) {
                return httpCallB(dataItem.id).pipe(
                  map((res) => ({ ...res, dataItem })),
                  catchError((error) => {
                    console.error(error);
                    return of(dataItem);
                  }),
                );
              }

              // otherwise
              return httpCallC(dataItem.id).pipe(
                map((res) => ({ ...res, dataItem })),
                catchError((error) => {
                  console.error(error);
                  return of(dataItem);
                }),
              );
            }),
          ),
        );
      }),
      tap((value) => result.push(value)),
      map(() => ({ data: result, isConditionTrue })),
    );
  }),
  startWith({ data: [], isConditionTrue: false }),
);

Any help is much appreciated. Thank you!

 3  145  3
1 Jan 1970

Solution

 1

Arrays are stored by reference, so you can use this rule and directly update the array properties by their index data[index]={...}.

Inside the subscribe you can fire cdr.detectChanges() whenever the subscribe is called, this will cause the grid to refresh and hence give you the desired result!

The reason I swapped, merge with combineLatest is because for each emit, there will be call to subscribe, so that you get the dynamically updating table.

The reason I used toArray is to reduce the number of emits to only the full list.

The reason I used startWith and endWith is to discard the previous observables data, since we are updating the data in-place inside the A, B and C request, we can use these operators, to always provide the required { data: data, isConditionTrue } consistently.

import './style.css';

import {
  rx,
  of,
  map,
  combineLatest,
  switchMap,
  mergeMap,
  from,
  startWith,
  tap,
  merge,
  catchError,
  filter,
  toArray,
  delay,
  iif,
  endWith,
  ignoreElements,
  forkJoin,
} from 'rxjs';

const httpCallA = (data: any) => of({ testA: 'A' }).pipe(delay(2000));
const httpCallB = (data: any) => of({ testB: 'B' }).pipe(delay(2000));
const httpCallC = (data: any) => of({ testC: 'C' }).pipe(delay(2000));

const data$ = of([
  { test: 1 },
  { test: 2 },
  { test: 3 },
  { test: 4 },
  { test: 5 },
]);
const isConditionTrue$ = of(true);
combineLatest([data$, isConditionTrue$])
  .pipe(
    switchMap(([data, isConditionTrue]: any) => {
      // no data
      if (!data?.length) {
        return of({ data: [], isConditionTrue });
      }
      // loop through data
      return from(data)
        .pipe(
          // create concurent HTTP calls
          mergeMap((dataItem: any, index: number) => {
            // combine multiple http request
            return combineLatest([
              // first http call (should be performed everytime)
              httpCallA(dataItem.id).pipe(
                tap((response: any) => {
                  if (!response) {
                    return dataItem;
                  }
                  data[index] = {
                    ...data[index],
                    ...response,
                  };
                }),
                catchError((error: any) => {
                  console.error(error);
                  return of(dataItem);
                })
              ),
              // second http request (conditional)
              of(dataItem).pipe(
                filter(() => isConditionTrue), // only perform when condition is true
                mergeMap(() =>
                  iif(
                    () => dataItem.conditionA,
                    httpCallB(dataItem.id).pipe(
                      tap((res: any) => {
                        data[index] = {
                          ...data[index],
                          ...res,
                        };
                      }),
                      catchError((error: any) => {
                        console.error(error);
                        return of(dataItem);
                      })
                    ),
                    httpCallC(dataItem.id).pipe(
                      tap((res: any) => {
                        data[index] = {
                          ...data[index],
                          ...res,
                        };
                      }),
                      catchError((error: any) => {
                        console.error(error);
                        return of(dataItem);
                      })
                    )
                  )
                )
              ),
            ]);
          }),
          toArray(),
          map(() => ({ data, isConditionTrue: false }))
        )
        .pipe(
          startWith({ data, isConditionTrue: false }),
          endWith({ data, isConditionTrue: false })
        );
    }),
    startWith({ data: [], isConditionTrue: false })
  )
  .subscribe(console.log);

Stackblitz Demo

2024-07-18
Naren Murali

Solution

 0

You could try building the full request from the array and using forkJoin of a forkJoin to run all the requests in parallel;


const httpCallA = (id: number) => of({ resA: 'A', id }).pipe(delay(1000));
const httpCallB = (id: number) => of({ resB: 'B', id }).pipe(delay(1500));
const httpCallC = (id: number) => of({ resC: 'C', id }).pipe(delay(2000));

const data$ = new BehaviorSubject([
  { id: 1, conditionA: false },
  { id: 2, conditionA: false },
  { id: 3, conditionA: true },
  { id: 4, conditionA: false },
  { id: 5, conditionA: true },
  { id: 6, conditionA: true },
]);

const isConditionTrue$ = of(true);

const vm$ = combineLatest([data$, isConditionTrue$]).pipe(
  filter(([data, _]) => data?.length > 0),
  switchMap(([data, isConditionTrue]) => {
    const requests = [];
    data.forEach((item) => {
      // conditionally create a request object
      const request = forkJoin({
        item: of(item),
        a: httpCallA(item.id),
        b: isConditionTrue && item.conditionA ? httpCallB(item.id) : of(null),
        c: isConditionTrue && !item.conditionA ? httpCallC(item.id) : of(null),
      })
      .pipe(
        map((res) => {
          // tranform item by combining http calls with item
          return { ...res.item, ...res.a, ...res.b, ...res.c }
        })
      );

      // push observable into request array
      requests.push(request);
    });

    // use fork join again to convert array of observables to a single observable 
    // that returns response as an array
    return forkJoin(requests);
  })
);

vm$.subscribe(console.log);

Output

[
    {
        "id": 1,
        "conditionA": false,
        "resA": "A",
        "resC": "C"
    },
    {
        "id": 2,
        "conditionA": false,
        "resA": "A",
        "resC": "C"
    },
    {
        "id": 3,
        "conditionA": true,
        "resA": "A",
        "resB": "B"
    },
    {
        "id": 4,
        "conditionA": false,
        "resA": "A",
        "resC": "C"
    },
    {
        "id": 5,
        "conditionA": true,
        "resA": "A",
        "resB": "B"
    },
    {
        "id": 6,
        "conditionA": true,
        "resA": "A",
        "resB": "B"
    }
]
2024-07-18
Balaji