import { BehaviorSubject, Observable, Subscribable, Unsubscribable } from 'rxjs';
import { WebSocketConnectionStatus } from '../../models';



export interface IWebSocket {
  close();
  send(data: string | ArrayBuffer | Blob);
  binaryType: string;
  onopen?: (OpenEvent: any) => any;
  onclose?: (CloseEvent: any) => any;
  onmessage?: (MessageEvent: any) => any;
  onerror?: (ErrorEvent: any) => any;
}

export class WebSocketEx {

  connectionStatus: BehaviorSubject<WebSocketConnectionStatus>;
  messages: Observable<Uint8Array>;

  private socket: WebSocket;

  constructor(
    public url: string,
    upStream: Subscribable<string | ArrayBuffer | Blob>,
    protocols?: string | string[],
  ) {
    this.connectionStatus = new BehaviorSubject<WebSocketConnectionStatus>(
      WebSocketConnectionStatus.Closed
    );

    this.messages = new Observable<Uint8Array>((observer) => {
      this.socket = new WebSocket(url, protocols || []);
      this.socket.binaryType = 'arraybuffer';
      let inputSubscription: Unsubscribable;

      let open = false;
      const onClose = () => {
        if (!open) return;

        this.connectionStatus.next(WebSocketConnectionStatus.Closed);
        open = false;
      }

      this.socket.onopen = () => {
        open = true;
        this.connectionStatus.next(WebSocketConnectionStatus.Open);
        inputSubscription = upStream.subscribe((data) => {
          if (!this.socket) return;

          this.socket.send(data);
        });
      }

      this.socket.onmessage = (message: MessageEvent) => {
        const bytes = new Uint8Array(message.data, 0);
        observer.next(bytes);
      }

      this.socket.onerror = (error: ErrorEvent) => {
        onClose();
        observer.error(error);
      }

      this.socket.onclose = (event: CloseEvent) => {
        onClose();
        // if (event && event.wasClean) {
        //   observer.complete();
        // } else {
        //   observer.error(new Error(event ? event.reason : event.reason));
        // }
        observer.error(new Error(event ? event.reason : ''));
      }

      return () => {
        // this is the complete() for the observable
        if (inputSubscription) {
          inputSubscription.unsubscribe();
        }

        if (!this.socket) return;

        onClose();
        this.socket.close();
      }
    });
  }

  close(): void {
    if (!this.socket) return;

    this.socket.close();
    this.socket = undefined;
  }

}
