import WebSocket from "isomorphic-ws";
import { decoding, encoding, math, time } from "lib0";
import * as BroadcastChannel from "lib0/broadcastchannel";
import { Observable } from "lib0/observable";
import * as url from "lib0/url";
import { pino } from "pino";
import * as AuthProtocol from "y-protocols/auth";
import * as AwarenessProtocol from "y-protocols/awareness";
import * as SyncProtocol from "y-protocols/sync";
import * as Y from "yjs";

import * as MessageTypes from "./MessageTypes.js";

// @todo - this should depend on awareness.outdatedTime
const MESSAGE_RECONNECT_TIMEOUT = 30 * 1000;
const DEFAULT_MAX_BACKOFF_TIME = 10 * 1000;
const DEFAULT_MAX_RETRIES = 500;
const DEFAULT_WAIT_SYNC_TIMEOUT = 15 * 1000;
const MAX_CLOSE_TIMEOUT = 30 * 1000;

// *********************************************
// Interfaces
// *********************************************/
export interface MessageHandler {
  (
    encoder: encoding.Encoder,
    decoder: decoding.Decoder,
    provider: WebSocketProvider,
    emitSynced: boolean,
    messageType: number,
  ): void;
}

export interface WebSocketProviderOptions {
  /**
   * The access token to use when connecting to the server.
   */
  accessToken?: string;

  /**
   * Optional AwarenessProtocol.Awareness instance to use.
   */
  awareness?: AwarenessProtocol.Awareness;

  /**
   * Whether to connect to the server immediately.
   */
  connect?: boolean;

  /**
   * Disable cross-tab BroadcastChannel communication.
   */
  disableBroadcastChannel?: boolean;

  /**
   *  Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff).
   */
  maxBackoffTime?: number;

  /**
   * Maximum number of retries before giving up.
   */
  maxRetries?: number;

  /**
   * Optional message handlers to use.
   */
  messageHandlers?: Array<MessageHandler>;

  /**
   * The query parameters to use when connecting to the server.
   */
  params?: Record<string, string>;

  /**
   * Request server state every `resyncInterval` milliseconds.
   */
  resyncInterval?: number;

  /**
   * The logger instance to use.
   */
  logger?: pino.BaseLogger;
}

/**
 * Websocket Provider for YJS. Creates a websocket connection to sync the shared document.
 * The document name is attached to the provided url. I.e. the following example
 * creates a websocket connection to http://localhost:1234/my-document-id
 *
 * @example
 *   import * as Y from "yjs";
 *   import { WebSocketProvider } from "@bigpi/y-websocket";
 *   const doc = new Y.Doc();
 *   const options = { accessToken: "123", connect: false };
 *   const provider = new WebSocketProvider("http://localhost:1234", "my-document-id", doc, options);
 *
 * @extends {Observable<string>}
 */
export class WebSocketProvider extends Observable<string> {
  // *********************************************
  // Private fields
  // *********************************************/
  private _isDisposed: boolean = false;
  private _livenessCheckInterval: ReturnType<typeof setInterval>;
  private _reconnectTimeout: number | NodeJS.Timeout | undefined;
  private _resyncInterval: ReturnType<typeof setInterval> | 0;
  private _synced: boolean;

  // *********************************************
  // Constructors
  // *********************************************/
  /**
   * @param serverUrl The server to connect to.
   * @param documentId The document ID to use.
   * @param yDocument The YJS document to sync.
   */
  constructor(serverUrl: string, documentId: string, yDocument: Y.Doc, options: WebSocketProviderOptions) {
    super();
    const {
      accessToken,
      awareness,
      connect,
      disableBroadcastChannel,
      logger,
      maxBackoffTime,
      messageHandlers,
      params,
      resyncInterval,
    } = options;
    this.logger = logger;
    this.logger?.debug("WebSocketProvider.constructor()");

    // Ensure that URL does not end with /
    while (serverUrl[serverUrl.length - 1] === "/") {
      serverUrl = serverUrl.slice(0, serverUrl.length - 1);
    }
    const encodedParams = url.encodeQueryParams(params ?? {});
    this._synced = false;
    this.accessToken = accessToken;
    this.awareness = awareness ?? new AwarenessProtocol.Awareness(yDocument);
    this.broadcastChannel = `${serverUrl}/${documentId}`;
    this.broadcastConnected = false;
    this.disableBroadcastChannel = disableBroadcastChannel ?? false;
    this.documentId = documentId;
    this.yDocument = yDocument;
    this.maxBackoffTime = maxBackoffTime ?? DEFAULT_MAX_BACKOFF_TIME;
    this.maxRetries = options.maxRetries ?? DEFAULT_MAX_RETRIES;
    this.messageHandlers = messageHandlers ? messageHandlers.slice() : this._createDefaultMessageHandlers();
    this.shouldConnect = connect ?? true;
    this.url = `${serverUrl}/${documentId}${encodedParams.length === 0 ? "" : "?" + encodedParams}`;
    this.ws = null;
    this.wsConnected = false;
    this.wsConnecting = false;
    this.wsLastMessageReceived = 0;
    this.wsUnsuccessfulReconnects = 0;

    // Attach event handlers
    this.yDocument.on("update", this._onYDocumentUpdate);
    this.awareness.on("update", this._onAwarenessUpdate);

    // Set up automatic event handler removal
    if (typeof window !== "undefined") {
      window.addEventListener("unload", this._onWindowUnload);
    } else if (typeof process !== "undefined") {
      process.on("exit", this._onWindowUnload);
    }

    // Set up resync
    this._resyncInterval = 0;
    if (resyncInterval !== undefined && resyncInterval > 0) {
      this._resyncInterval = setInterval(() => {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
          // resend sync step 1
          const encoder = encoding.createEncoder();
          encoding.writeVarUint(encoder, MessageTypes.MESSAGE_SYNC);
          SyncProtocol.writeSyncStep1(encoder, yDocument);
          this.ws.send(encoding.toUint8Array(encoder));
        }
      }, resyncInterval);
    }

    // Set up timer to check for live connection
    this._livenessCheckInterval = setInterval(() => {
      if (this.wsConnected && this.ws && MESSAGE_RECONNECT_TIMEOUT < time.getUnixTime() - this.wsLastMessageReceived) {
        // No message received in a long time - not even your own awareness updates (which are updated every 15 seconds)
        this.ws.close();
      }
    }, MESSAGE_RECONNECT_TIMEOUT / 10);

    // We connect by default unless disabled in options
    if (this.shouldConnect) {
      this.connect();
    }
  }

  // *********************************************
  // Public properties
  // *********************************************/
  public accessToken: string | undefined;
  public readonly awareness: AwarenessProtocol.Awareness;
  public readonly broadcastChannel: string;
  public broadcastConnected: boolean;
  public readonly disableBroadcastChannel: boolean;
  public readonly documentId: string;
  public readonly logger: pino.BaseLogger | undefined;
  public readonly maxBackoffTime: number;
  public readonly maxRetries: number = DEFAULT_MAX_RETRIES;
  public readonly messageHandlers: Array<MessageHandler>;
  public shouldConnect: boolean;
  public readonly url: string;
  public ws: WebSocket | null;
  public wsConnected: boolean;
  public wsConnecting: boolean;
  public wsLastMessageReceived: number;
  public wsUnsuccessfulReconnects: number;
  public readonly yDocument: Y.Doc;

  public get synced() {
    return this._synced;
  }

  public set synced(state) {
    this._synced = state;
    this.emit("synced", [state]);
    this.emit("sync", [state]);
  }

  // *********************************************
  // Public methods
  // *********************************************/
  /**
   * Clean up resources, such as event handlers and timers.
   */
  public async destroy() {
    this.logger?.debug("WebSocketProvider.destroy()");

    this._isDisposed = true;

    // Clear timers
    if (this._resyncInterval !== 0) {
      clearInterval(this._resyncInterval);
    }

    clearInterval(this._livenessCheckInterval);
    clearTimeout(this._reconnectTimeout);

    await this.disconnect();

    if (typeof window !== "undefined") {
      window.removeEventListener("unload", this._onWindowUnload);
    } else if (typeof process !== "undefined") {
      process.off("exit", this._onWindowUnload);
    }

    this.awareness.off("update", this._onAwarenessUpdate);
    this.awareness.destroy();

    this.yDocument.off("update", this._onYDocumentUpdate);
    this.yDocument.destroy();

    super.destroy();
  }

  public connectBroadcastChannel() {
    if (this.disableBroadcastChannel) {
      return;
    }
    if (!this.broadcastConnected) {
      BroadcastChannel.subscribe(this.broadcastChannel, this._onBroadcastMessageReceived);
      this.broadcastConnected = true;
    }

    // Send sync step1 to broadcast channel
    const encoderSync = encoding.createEncoder();
    encoding.writeVarUint(encoderSync, MessageTypes.MESSAGE_SYNC);
    SyncProtocol.writeSyncStep1(encoderSync, this.yDocument);
    BroadcastChannel.publish(this.broadcastChannel, encoding.toUint8Array(encoderSync), this);

    // Broadcast local state
    const encoderState = encoding.createEncoder();
    encoding.writeVarUint(encoderState, MessageTypes.MESSAGE_SYNC);
    SyncProtocol.writeSyncStep2(encoderState, this.yDocument);
    BroadcastChannel.publish(this.broadcastChannel, encoding.toUint8Array(encoderState), this);

    // Write queryAwareness
    const encoderAwarenessQuery = encoding.createEncoder();
    encoding.writeVarUint(encoderAwarenessQuery, MessageTypes.MESSAGE_QUERY_AWARENESS);
    BroadcastChannel.publish(this.broadcastChannel, encoding.toUint8Array(encoderAwarenessQuery), this);

    // Broadcast local awareness state
    const encoderAwarenessState = encoding.createEncoder();
    encoding.writeVarUint(encoderAwarenessState, MessageTypes.MESSAGE_AWARENESS);
    encoding.writeVarUint8Array(
      encoderAwarenessState,
      AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.yDocument.clientID]),
    );
    BroadcastChannel.publish(this.broadcastChannel, encoding.toUint8Array(encoderAwarenessState), this);
  }

  public disconnectBroadcastChannel() {
    // Broadcast message with local awareness state set to null (indicating disconnect)
    const encoder = encoding.createEncoder();
    encoding.writeVarUint(encoder, MessageTypes.MESSAGE_AWARENESS);
    encoding.writeVarUint8Array(
      encoder,
      AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.yDocument.clientID], new Map()),
    );

    this._sendAndBroadcastMessage(encoding.toUint8Array(encoder));

    if (this.broadcastConnected) {
      BroadcastChannel.unsubscribe(this.broadcastChannel, this._onBroadcastMessageReceived);
      this.broadcastConnected = false;
    }
  }

  /**
   * Connects to the server.
   */
  public connect() {
    this.logger?.debug({ wsConnected: this.wsConnected, ws: this.ws ? "[set]" : "[not set]" }, "WebSocketProvider.connect()");

    this.shouldConnect = true;
    if (!this.wsConnected && this.ws === null) {
      this._setupWebSocket();
      this.connectBroadcastChannel();
    }
  }

  /**
   * Disconnects from the server.
   */
  public async disconnect() {
    this.logger?.debug("WebSocketProvider.disconnect()");

    return new Promise<void>((resolve) => {
      this.shouldConnect = false;
      this.disconnectBroadcastChannel();
      if (this.ws !== null) {
        this.logger?.debug({ bufferedAmount: this.ws.bufferedAmount }, "WebSocketProvider.disconnect()");
        let timeout: number | NodeJS.Timeout | undefined;
        const onClosedHandler = () => {
          this.logger?.debug(
            { bufferedAmount: this.ws?.bufferedAmount },
            "WebSocketProvider.disconnect() onClosedHandler called",
          );
          clearTimeout(timeout);
          this.ws?.removeEventListener("close", onClosedHandler);
          resolve();
        };
        this.logger?.debug({ readyState: this.ws.readyState }, "WebSocketProvider.disconnect()");
        if (this.ws.readyState !== WebSocket.CLOSED) {
          this.ws.addEventListener("close", onClosedHandler);
          this.ws.close();
          timeout = setTimeout(() => {
            this.logger?.warn("WebSocketProvider.disconnect() onClosedHandler called after timeout");
            onClosedHandler();
          }, MAX_CLOSE_TIMEOUT);
        } else {
          resolve();
        }
      } else {
        this.logger?.trace("WebSocketProvider.disconnect() ws is null");
        resolve();
      }
    });
  }

  /**
   * Waits for the websocket connection to be ready and the document to have synced.
   *
   * @param timeoutMilliseconds Maximum time to wait for the document to be synced.
   *
   * @returns A promise that resolves when the document is synced.
   */
  public waitUntilSynced(timeoutMilliseconds: number = DEFAULT_WAIT_SYNC_TIMEOUT) {
    return new Promise<void>((resolve, reject) => {
      if (this.wsConnected && this.synced) {
        resolve();
      } else {
        let timeout: number | NodeJS.Timeout | undefined;

        // Create listener to resolve until synced
        const listener = () => {
          if (this.synced) {
            this.off("synced", listener);
            clearTimeout(timeout);
            resolve();
          }
        };

        // Hook up event listener to resolve promise
        this.on("synced", listener);

        this.connect();

        // Set timeout to reject promise and remove event listener
        timeout = setTimeout(() => {
          this.off("synced", listener);
          reject(new Error("Timed out waiting for document to sync"));
        }, timeoutMilliseconds);
      }
    });
  }

  // *********************************************
  // Private methods, event handlers
  // *********************************************/
  /**
   * Broadcast (local storage) listener.
   *
   * @param data
   * @param origin
   */
  private _onBroadcastMessageReceived = (data: ArrayBuffer, origin: any) => {
    // Only process message if it didn't originate from us
    if (origin !== this) {
      const encoder = this._readWebSocketMessage(new Uint8Array(data), false);

      if (encoding.length(encoder) > 1) {
        // Use `this` reference here instead of a string like we do for other origin sources
        BroadcastChannel.publish(this.broadcastChannel, encoding.toUint8Array(encoder), this);
      }
    }
  };

  /**
   * Listens to YJS updates and sends them to remote peers (ws and broadcast channel, when enabled).
   *
   * @param update The updated data.
   * @param origin The origin.
   */
  private _onYDocumentUpdate = (update: Uint8Array, origin: any) => {
    this.logger?.debug({ origin, synced: this.synced, updateLength: update.length }, "WebSocketProvider._onYDocumentUpdate()");

    // Send the update if we're synced and it didn't originate from us reading/applying update we received
    if (this.synced && origin !== "remote") {
      // this.logger?.debug(
      //   { origin, updateLength: update.length },
      //   "WebSocketProvider._onYDocumentUpdate() Sending update to peers."
      // );

      const encoder = encoding.createEncoder();
      encoding.writeVarUint(encoder, MessageTypes.MESSAGE_SYNC);
      SyncProtocol.writeUpdate(encoder, update);

      this._sendAndBroadcastMessage(encoding.toUint8Array(encoder));
    }
  };

  /**
   * @param update The updated data.
   * @param origin The origin.
   */
  private _onAwarenessUpdate = (
    { added, updated, removed }: { added: Array<number>; updated: Array<number>; removed: Array<number> },
    origin: any,
  ) => {
    const changedClients = added.concat(updated).concat(removed);
    const encoder = encoding.createEncoder();

    encoding.writeVarUint(encoder, MessageTypes.MESSAGE_AWARENESS);
    encoding.writeVarUint8Array(encoder, AwarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients));

    this._sendAndBroadcastMessage(encoding.toUint8Array(encoder));
  };

  /**
   * Removes handlers when the window is closed or the process exits.
   */
  private _onWindowUnload = async () => {
    AwarenessProtocol.removeAwarenessStates(this.awareness, [this.yDocument.clientID], "window unload");

    await this.destroy();
  };

  /**
   * Handles the web socket connection being closed.
   *
   * @param event The close event.
   */
  private _onWsClose = (event: WebSocket.CloseEvent) => {
    this.logger?.debug("WebSocketProvider._onWsClose()");

    // Raise event to notify listeners
    this.emit("connection-close", [event, this]);

    // Remove event handlers
    this.ws?.removeEventListener("close", this._onWsClose);
    this.ws?.removeEventListener("error", this._onWsError);
    this.ws?.removeEventListener("message", this._onWsMessage);
    this.ws?.removeEventListener("open", this._onWsOpen);

    this.ws = null;
    this.wsConnecting = false;

    // Check if we were connected before
    if (this.wsConnected) {
      this.wsConnected = false;
      this.synced = false;

      // Update awareness (all users except local)
      AwarenessProtocol.removeAwarenessStates(
        this.awareness,
        Array.from(this.awareness.getStates().keys()).filter((client) => client !== this.yDocument.clientID),
        this,
      );

      this.emit("status", [
        {
          status: "disconnected",
        },
      ]);
    } else {
      // We were not connected before, so we need to retry
      this.wsUnsuccessfulReconnects++;
    }

    if (!this._isDisposed) {
      // Start with no reconnect timeout and increase timeout by using exponential backoff starting with 100ms
      this._reconnectTimeout = setTimeout(
        () => {
          this.logger?.debug("WebSocketProvider._onWsClose() Reconnecting...");
          this._setupWebSocket();
        },
        math.min(math.pow(2, this.wsUnsuccessfulReconnects) * 100, this.maxBackoffTime),
        this,
      );
    }
  };

  /**
   * Handles errors on the web socket connection.
   *
   * @param event The error event.
   */
  private _onWsError = (event: WebSocket.ErrorEvent) => {
    this.logger?.debug({ event }, "WebSocketProvider._onWsError()");

    this.emit("connection-error", [event, this]);
  };

  /**
   * Handles messages received on the web socket connection.
   *
   * @param event The message event.
   */
  private _onWsMessage = (event: WebSocket.MessageEvent) => {
    // this.logger?.trace("WebSocketProvider._onWsMessage()");

    this.wsLastMessageReceived = time.getUnixTime();
    const encoder = this._readWebSocketMessage(new Uint8Array(event.data as Buffer), true);
    if (encoding.length(encoder) > 1) {
      this.ws?.send(encoding.toUint8Array(encoder));
    }
  };

  /**
   * Handles the web socket connection being opened.
   */
  private _onWsOpen = () => {
    this.logger?.debug("WebSocketProvider._onWsOpen()");

    this.wsLastMessageReceived = time.getUnixTime();
    this.wsConnecting = false;
    this.wsConnected = true;
    this.wsUnsuccessfulReconnects = 0;
    this.emit("status", [
      {
        status: "connected",
      },
    ]);

    // Always send sync step 1 when connected
    const encoder = encoding.createEncoder();
    encoding.writeVarUint(encoder, MessageTypes.MESSAGE_SYNC);
    SyncProtocol.writeSyncStep1(encoder, this.yDocument);
    this.ws?.send(encoding.toUint8Array(encoder));

    // Broadcast local awareness state
    if (this.awareness.getLocalState() !== null) {
      const encoderAwarenessState = encoding.createEncoder();
      encoding.writeVarUint(encoderAwarenessState, MessageTypes.MESSAGE_AWARENESS);
      encoding.writeVarUint8Array(
        encoderAwarenessState,
        AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.yDocument.clientID]),
      );
      this.ws?.send(encoding.toUint8Array(encoderAwarenessState));
    }
  };

  // *********************************************
  // Private methods
  // *********************************************/
  /**
   * @param buffer
   * @param emitSynced
   *
   * @return
   */
  private _readWebSocketMessage(buffer: Uint8Array, emitSynced: boolean) {
    const decoder = decoding.createDecoder(buffer);
    const encoder = encoding.createEncoder();
    const messageType = decoding.readVarUint(decoder);
    const messageHandler = this.messageHandlers[messageType];

    if (messageHandler) {
      messageHandler(encoder, decoder, this, emitSynced, messageType);
    } else {
      console.error("Unable to compute message");
    }

    return encoder;
  }

  /**
   * Configures the web socket connection.
   */
  private _setupWebSocket() {
    this.logger?.debug(
      {
        shouldConnect: this.shouldConnect,
        ws: this.ws ? "[set]" : "[not set]",
        wsUnsuccessfulReconnects: this.wsUnsuccessfulReconnects,
        maxRetries: this.maxRetries,
      },
      "WebSocketProvider._setupWebSocket()",
    );

    if (this.shouldConnect && this.ws === null && this.wsUnsuccessfulReconnects < this.maxRetries) {
      try {
        let websocket: WebSocket;

        // NOTE: Passing accessToken is only supported on Node - not in the browser
        if (this.accessToken) {
          const socketOptions: WebSocket.ClientOptions = {
            headers: {
              Authorization: `Bearer ${this.accessToken}`,
            },
          };

          // Create a new WebSocket
          websocket = new WebSocket(this.url, [], socketOptions);
        } else {
          // Create a new WebSocket
          websocket = new WebSocket(this.url);
        }

        websocket.binaryType = "arraybuffer";
        this.ws = websocket;
        this.wsConnecting = true;
        this.wsConnected = false;
        this.synced = false;

        // Attach event handlers
        websocket.addEventListener("close", this._onWsClose);
        websocket.addEventListener("error", this._onWsError);
        websocket.addEventListener("message", this._onWsMessage);
        websocket.addEventListener("open", this._onWsOpen);

        this.emit("status", [
          {
            status: "connecting",
          },
        ]);
      } catch (error) {
        console.error("Unable to connect to websocket", error);
        this.emit("connection-error", [error, this]);
      }
    }
  }

  /**
   * Send a message to the server and broadcast channel, if connected.
   *
   * @param {ArrayBuffer} buffer
   */
  private _sendAndBroadcastMessage(buffer: ArrayBuffer) {
    if (this.wsConnected && this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(buffer);
    }
    if (this.broadcastConnected) {
      BroadcastChannel.publish(this.broadcastChannel, buffer, this);
    }
  }

  /**
   * Create the default message handlers for all the message types we support.
   *
   * @returns An array of default message handlers.
   */
  private _createDefaultMessageHandlers() {
    const result: Array<MessageHandler> = [];

    /**
     * @param {WebSocketProvider} provider
     * @param {string} reason
     */
    const permissionDeniedHandler = (provider: WebSocketProvider, reason: string) => {
      console.warn(`Permission denied to access ${provider.url}.\n${reason}`);
    };

    result[MessageTypes.MESSAGE_SYNC] = (encoder, decoder, provider, emitSynced, messageType) => {
      encoding.writeVarUint(encoder, MessageTypes.MESSAGE_SYNC);
      const syncMessageType = SyncProtocol.readSyncMessage(decoder, encoder, provider.yDocument, "remote");
      if (emitSynced && syncMessageType === SyncProtocol.messageYjsSyncStep2 && !provider.synced) {
        provider.synced = true;
      }
    };

    result[MessageTypes.MESSAGE_QUERY_AWARENESS] = (encoder, decoder, provider, emitSynced, messageType) => {
      encoding.writeVarUint(encoder, MessageTypes.MESSAGE_AWARENESS);
      encoding.writeVarUint8Array(
        encoder,
        AwarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())),
      );
    };

    result[MessageTypes.MESSAGE_AWARENESS] = (encoder, decoder, provider, emitSynced, messageType) => {
      AwarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), "remote");
    };

    // TODO: Should we attempt to use this one or remove? - it's from the original source but no examples of its use existed and some code was commented out
    result[MessageTypes.MESSAGE_AUTH] = (encoder, decoder, provider, emitSynced, messageType) => {
      AuthProtocol.readAuthMessage(decoder, provider.yDocument, (yDoc, reason) => {
        permissionDeniedHandler(provider, reason);
      });
    };

    result[MessageTypes.MESSAGE_UNAUTHORIZED] = (encoder, decoder, provider, emitSynced, messageType) => {
      provider.emit("unauthorized", []);
      provider.destroy();
    };

    result[MessageTypes.MESSAGE_INVALID_VERSION] = (encoder, decoder, provider, emitSynced, messageType) => {
      provider.emit("invalid-version", []);
      provider.destroy();
    };

    result[MessageTypes.MESSAGE_ERROR] = (encoder, decoder, provider, emitSynced, messageType) => {
      provider.emit("error", []);
      provider.destroy();
    };

    return result;
  }
}
