import { BehaviorSubject, Observable, ReplaySubject, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { ClientInfo } from './signaling-server-messages';

import { visiblePageTimer } from '@/utils/page-visibility';
import { Finalizable } from '@/utils/finalizable';
import { RtcSendDataChannels } from './rtc-send-data-channels';
import { completeAll } from '@/utils/complete-all';

const WEB_RTC_OBSERVERS_CHECK_INTERVAL_MILLIS = 1000;

export class RtcStreamManager extends Finalizable {
  private readonly _latency$ = new Subject<number>();
  readonly latency$ = this._latency$.asObservable();

  private readonly _controlledBy$ = new BehaviorSubject<ClientInfo | undefined>(
    undefined,
  );
  readonly controlledBy$ = this._controlledBy$.asObservable();

  private jsonChannelMap = new Map<string, ReplaySubject<any>>();
  private binaryChannelMap = new Map<string, ReplaySubject<DataView>>();

  private channelState = new Map<string, boolean>();

  constructor(private readonly rtcSendDataChannels: RtcSendDataChannels) {
    super();
    this.rtcSendDataChannels.dataStream$.subscribe((data) => {
      this.onDataChannel(data);
    });

    visiblePageTimer(0, WEB_RTC_OBSERVERS_CHECK_INTERVAL_MILLIS)
      .pipe(takeUntil(this.finalized$))
      .subscribe(() => {
        for (const [label, isEnabled] of this.channelState.entries()) {
          const subject =
            this.jsonChannelMap.get(label) ?? this.binaryChannelMap.get(label);
          const isActive = Boolean(subject && subject.observers.length > 0);

          if (isActive !== isEnabled) {
            this.sendStreamEnabled(label, isActive);
          }
        }
      });
  }

  public sendStreamEnabled(label: string, enabled: boolean) {
    this.rtcSendDataChannels.sendReliable({ label, streamEnabled: enabled });
    this.channelState.set(label, enabled);
  }

  // TODO: validation
  getJsonStream<T>(label: string): Observable<T> {
    const channelSubject = this.jsonChannelMap.get(label) as Subject<T>;
    if (!channelSubject) {
      const newSubject = new ReplaySubject<T>(1);
      this.jsonChannelMap.set(label, newSubject);
      this.sendStreamEnabled(label, true);
      return newSubject.asObservable();
    }
    return channelSubject.asObservable();
  }

  getBinaryStream(label: string): Observable<DataView> {
    const channelSubject = this.binaryChannelMap.get(label);
    if (!channelSubject) {
      this.sendStreamEnabled(label, true);
      const newSubject = new ReplaySubject<DataView>(1);
      this.binaryChannelMap.set(label, newSubject);
      return newSubject.asObservable();
    }
    return channelSubject.asObservable();
  }

  sendDisableStreams() {
    for (const label of this.jsonChannelMap.keys()) {
      this.sendStreamEnabled(label, false);
    }
    for (const label of this.binaryChannelMap.keys()) {
      this.sendStreamEnabled(label, false);
    }
  }

  sendEnabledStreams() {
    for (const label of this.jsonChannelMap.keys()) {
      this.sendStreamEnabled(label, true);
    }
    for (const label of this.binaryChannelMap.keys()) {
      this.sendStreamEnabled(label, true);
    }
  }

  private onDataChannel(data: unknown) {
    if (typeof data === 'string') {
      this.onJsonMessage(data);
    } else if (data instanceof ArrayBuffer) {
      // for binary message in Chrome
      this.onBinaryMessage(data);
    } else if (data instanceof Blob) {
      // for binary message in Firefox
      new Response(data)
        .arrayBuffer()
        .then((buffer) => {
          this.onBinaryMessage(buffer);
        })
        .catch((_) => {
          console.warn('Failed to read binary message.');
        });
    } else {
      console.warn(`Got message of unknown type: ${data}`);
    }
  }

  private onBinaryMessage(data: ArrayBuffer) {
    // Convert from UTF8 to unicode string.
    const payloadString = new TextDecoder('utf-8').decode(
      new Uint8Array(data, 0, 20),
    );
    const label = payloadString.split('\0')[0];
    if (label === undefined) {
      throw Error(`Label does not exist at ${payloadString}`);
    }
    const dataPayload = new DataView(data, 20);
    const channelSubject = this.binaryChannelMap.get(label);

    if (channelSubject) {
      if (channelSubject.observers.length > 0) {
        channelSubject.next(dataPayload);
      } else {
        this.binaryChannelMap.delete(label);
        this.sendStreamEnabled(label, false);
      }
    } else {
      this.sendStreamEnabled(label, false);
    }
  }

  private onJsonMessage(dataStr: string) {
    try {
      const data = JSON.parse(dataStr);

      if ('ping' in data) {
        if (!('timestampMs' in data.ping) || !('latencyMs' in data.ping)) {
          console.warn(`Got malformed ping: ${dataStr}`);
          return;
        }
        this.rtcSendDataChannels.sendUnreliable({
          pong: data.ping.timestampMs,
        });

        this._latency$.next(data.ping.latencyMs);
        return;
      }
      if ('controlledBy' in data) {
        const controllingClientInfo = data.controlledBy as ClientInfo;
        if (controllingClientInfo.uid) {
          // TODO: Remove once legacy signaling is updated on all robots.
          if (controllingClientInfo.id === undefined) {
            controllingClientInfo.id = controllingClientInfo.uid;
          }
          this._controlledBy$.next(controllingClientInfo);
        } else {
          this._controlledBy$.next(undefined);
        }
        return;
      }
      if ('sequenceNumber' in data && 'metaData' in data) {
        return; // can removed when all robots are updated
      }
      if ('label' in data && 'payload' in data) {
        const label = data.label;
        const channelSubject = this.jsonChannelMap.get(label);
        if (channelSubject && channelSubject.observers.length > 0) {
          channelSubject.next(data.payload);
        } else {
          this.sendStreamEnabled(label, false);
        }
      } else {
        console.error(`Got unexpected message: ${dataStr}`);
      }
    } catch (e) {
      console.error(`Not JSON message: `, dataStr);
    }
  }

  protected async onFinalize(): Promise<void> {
    completeAll(this._latency$, this._controlledBy$);
  }
}
