import {
  BehaviorSubject,
  merge,
  Observable,
  of,
  Subject,
  throwError,
  timer,
} from 'rxjs';
import {
  catchError,
  concatMap,
  filter,
  finalize,
  map,
  share,
  shareReplay,
  switchMap,
  take,
  takeUntil,
  tap,
} from 'rxjs/operators';
import { HttpErrorResponse } from '@angular/common/http';

type ReqId = number;
interface ReqWithId<T> {
  req: Observable<T>;
  id: ReqId;
}
interface ResultWithId<T> {
  res: T;
  id: ReqId;
}

export interface RequestQueue<T> {
  queue: (o: Observable<T>) => Observable<T>;
}

/**
 * @description createQueue - creates object with method `queue` which accepts Observable that emits *single* value; Why only one
 * value? If observable doesn't terminate then other queued request-observable won't start.
 * @param size - integer; number of parallel requests
 */
export const createQueue = <T>(size: number) => {
  let lastId: ReqId; // Incremented counter of requests used to create new identifier to match request with received result
  let lines: Subject<ReqWithId<T>>[]; // requests stay in lines like clients to teller in bank
  let feed: Observable<ResultWithId<T | HttpErrorResponse>>; // when some network request resolved it's result broadcasts here

  const init = () => {
    lastId = 0;
    lines = Array.from({ length: size }).map(() => new Subject<ReqWithId<T>>());
    feed = merge(
      ...lines.map((c) =>
        c.pipe(
          concatMap((r) =>
            r.req.pipe(
              catchError((err: HttpErrorResponse) => of(err)),
              map((res) => ({ res, id: r.id }))
              // shareReplay() // in case of excessive network requests uncomment this
            )
          )
        )
      )
    ).pipe(share());
  };

  init();

  const queue = (o: Observable<T>): Observable<T> => {
    const id = ++lastId;
    const reqCoord = id % size; // fit generated number id in range 0..size
    const canceled = new BehaviorSubject(false);

    const initSubscription$ = of(null).pipe(
      takeUntil(canceled.pipe(filter((c) => c))), // cancel when user unsubscribed
      tap(() => {
        // push request into live queue; request wasn't active upon this moment, and is activated only after user's subscription
        timer(0, 0)
          .pipe(
            // delay to skip *current* event-loop cycle because most first subscription in queue does not work without this (you can debug to see why, if curious)
            take(1),
            takeUntil(canceled.pipe(filter((c) => c))) // checking cancel event in case if user unsubscribed this delay
          )
          .subscribe(() => {
            lines[reqCoord].next({
              // push request into one of lines
              req: o.pipe(
                takeUntil(canceled.pipe(filter((a) => a))), // cancel flying(already in network) request if user unsubscribed
                take(1)
                // shareReplay() // in case of excessive network requests uncomment this
              ),
              id,
            });
          });
      })
    );

    return initSubscription$.pipe(
      switchMap(() =>
        feed.pipe(
          // listen to all finished network requests
          filter((r) => r.id === id), // find needed request
          take(1),
          map((a) => a.res), // extract actual data from request
          shareReplay(), // to not repeat request after new subscription
          switchMap((res) =>
            res instanceof HttpErrorResponse ? throwError(res) : of(res)
          ),
          finalize(() => canceled.next(true)) // cancel everything if user unsubscribed
        )
      )
    );
  };

  return { queue };
};
