import { Injectable } from '@angular/core';
import { BehaviorSubject, catchError, delayWhen, EMPTY, map, Observable, retryWhen, Subject, switchAll, tap, timer } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { RECONNECT_INTERVAL, WSStatus } from '../../const/web-socket.const';


@Injectable({
  providedIn: 'root'
})

export class WebSocketService {

  public socketStatus$: BehaviorSubject<WSStatus>;
  public messages$: Observable<any>;

  private messagesSubject$: Subject<any>;
  private socket$: WebSocketSubject<any> | undefined;

  constructor() {
    this.messagesSubject$ = new Subject<any>();
    this.socketStatus$ = new BehaviorSubject<WSStatus>(WSStatus.closed);
    this.messages$ = this.messagesSubject$
      .pipe(
        switchAll(),
        map((data: any) => new Uint8Array(data)),
        catchError(e => {
          throw e;
        })
      );
  }

  /**
   * Creates a new WebSocket subject and connects it to the messages subject
   *
   * @param url
   * @param cfg if cfg.re-connect = true the observable will be retried.
   */
   public connect(url: string, cfg: { reconnect: boolean } = { reconnect: false }): void {
    if ( !this.socket$ || this.socket$.closed ) {
      this.socket$ = this.getNewWebSocket(url);
      const BLOB_MESSAGES = this.socket$
        .pipe(
          cfg.reconnect ? this.reconnect : o => o,
          tap({
            error: error => console.error(error),
          }), catchError(() => EMPTY)
        );
      this.messagesSubject$.next(BLOB_MESSAGES);
    }
  }

  /**
   * Sends a message to server
   *
   * @param msg
   */
  public sendMessage(msg: any): void {
    if ( this.socket$ ) {
      this.socket$.next(msg);
    }
  }

  /**
   * Completes current WebSocketSubject and
   * makes it undefined, so it can be opened
   */
   public close(): void {
    if ( this.socket$ ) {
      this.socket$.complete();
      this.socket$ = undefined;
      this.socketStatus$.next(WSStatus.closedByUser);
    }
    console.log('[WebSocketService] status after close: ', this.socketStatus$.getValue());
  }

  /**
   * Returns a new WebSocketSubject
   *
   * @param url: Address of websocket server
   * @return WebSocketSubject
   */
  private getNewWebSocket(url: string): WebSocketSubject<Blob> {
    this.socketStatus$.next(WSStatus.connecting);
    console.log('[WebSocketService] Before opening: ' + url);

    return webSocket({
      url,
      serializer: (msg: Blob) => msg,
      deserializer: (e: any) => e.data,
      binaryType: 'arraybuffer',
      openObserver: {
        next: () => {
          this.socketStatus$.next(WSStatus.open);
          console.log('[WebSocketService] Connected');
        }
      },
      closingObserver: {
        next: () => {
          this.socketStatus$.next(WSStatus.closing);
          console.log('[WebSocketService] Closing');
        }
      },
      closeObserver: {
        next: () => {
          this.socket$ = undefined;
          const curSocketStatus: string = this.socketStatus$.getValue();
          if ( curSocketStatus !== WSStatus.closedByUser ) {
            console.log('[WebSocketService] ClosedByServer');
            this.socketStatus$.next(WSStatus.closedByServer);
            this.connect(url, { reconnect: true });
          } else {
            console.log('[WebSocketService] ClosedByUser');
            this.socketStatus$.next(WSStatus.closed);
          }
        }
      }

    });

  }

  /**
   * Retries Ws connection with an interval
   *
   * @param observable the observable to be retried
   */
  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
      retryWhen(errors => errors.pipe(
          delayWhen(() => timer(RECONNECT_INTERVAL))
        )
      )
    );
  }

}
