import { Injectable } from '@angular/core';
import { Store } from '@ngrx/store';
import {
  BehaviorSubject,
  filter,
  interval,
  retry,
  switchMap,
  take,
  tap,
} from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { createPingStreamEvent } from '../helpers/stream-events';
import { incomingStreamEvent } from '../redux/app.actions';
import { StreamEvent } from '../types/stream-event.type';
import { LoggerService } from './logger.service';

@Injectable({ providedIn: 'root' })
export class StreamService {
  private connected$ = new BehaviorSubject<boolean>(false);
  private socket$: WebSocketSubject<StreamEvent> | undefined;
  private queue: StreamEvent[] = [];

  constructor(
    private store: Store,
    private loggerService: LoggerService,
  ) {}

  async listen() {
    interval(30000)
      .pipe(
        switchMap(() => this.connected$.pipe(take(1))),
        filter(connected => connected),
      )
      .subscribe(() => this.send(createPingStreamEvent()));

    const url = `${location.protocol.startsWith('https') ? 'wss' : 'ws'}://${
      location.host
    }/api/stream`;

    this.loggerService.log(`[StreamService] Connecting to ${url}`);

    const socket$ = webSocket<StreamEvent>({
      url,
    });

    this.send(createPingStreamEvent());
    socket$
      .pipe(
        retry({
          count: 5,
          delay: 5000,
          resetOnSuccess: true,
        }),
        tap(() => this.connected$.next(true)),
        tap(() =>
          this.queue
            .splice(0)
            .reverse()
            .forEach(event => this.send(event)),
        ),
      )
      .subscribe({
        next: event => this.store.dispatch(incomingStreamEvent({ event })),
        error: error => {
          this.loggerService.error(`[StreamService] Error`, error);

          this.connected$.next(false);
          socket$.next(createPingStreamEvent());
        },
        complete: () => {
          this.loggerService.info(`[StreamService] Closed`);

          this.connected$.next(false);
        },
      });

    this.socket$ = socket$;
  }

  send(event: StreamEvent) {
    if (!this.socket$) {
      this.loggerService.log(`[StreamService] Queueing event`, event);
      this.queue.push(event);

      return;
    }

    this.loggerService.log(`[StreamService] Sending event`, event);

    this.socket$.next(event);
  }
}
