import * as zod from 'zod';

import { CartesianPose } from '@sb/geometry';
import * as log from '@sb/log';
import {
  ArmJointPositions,
  ArmJointVelocities,
  FrameOfReference,
  JerkLimit,
  JointNumber,
  MotionKind,
  TCPOffsetOption,
} from '@sb/motion-planning';
import { EventEmitter } from '@sb/utilities';

import type {
  RoutineRunner,
  RunVisionMethodResult,
  StepValidationMessage,
} from '..';
import {
  ActuateDeviceArgs,
  IOLevel,
  PayloadState,
  PlayRoutineArgs,
  RobotToExternalPort,
  Routine,
  RunVisionMethodArgs,
  SpeedProfile,
} from '..';

interface PacketEvents {
  // packets that should/will get sent to the sender
  send: string;

  // packets that come from the sender
  receive: string;
}

interface MessageEvents {
  // messages that should/will get sent to the sender
  send(id: number, message: Record<string, any>): void;

  // requests that come from the sender
  request(id: number, data: Record<string, any>): void;
}

interface LifecycleEvents {
  destroyed: void;

  error: Error;

  // emitted when packets start being handled
  startPacketHandling: void;
  // emitted when packets stop being handled
  stopPacketHandling: void;
}

const ns = log.namespace('RoutineRunnerPacketReceiver');

const DEFAULT_PACKET_CHUNK_SIZE_MAX = 2 ** 16;

// interval to send robot state update to frontend
const STATE_UPDATE_INTERVAL = 50;

// number of packets to gather to before reporting packet size.  Report every 300 seconds
const STATE_UPDATE_REPORT_INTERVAL_PACKET_LENGTH =
  300 * (1000 / STATE_UPDATE_INTERVAL);

// Greenlist for request kinds.
// Anything that doesn't match one of these gets emitted by
// [[RoutineRunnerPacketReceiver.onExternalRequest]]
const RECOGNIZED_REQUEST_KINDS = [
  'actuateDevice',
  'changeRoutineSpeedProfile',
  'confirmStep',
  'emergencyStop',
  'getGlobalSpeedLimits',
  'getJointAnglesForCartesianSpacePose',
  'loadRoutine',
  'moveJointRelative',
  'moveToCartesianSpacePose',
  'moveToJointSpacePoint',
  'moveToolRelative',
  'pauseRoutine',
  'playRoutine',
  'recover',
  'resumeRoutine',
  'runVisionMethod',
  'setAntigravitySpeeds',
  'setJerkLimit',
  'setTCPOffsetOption',
  'setOutputIO',
  'setRobotPayload',
  'skipPreflightTestRun',
  'stop',
  'unbrakeJoint',
  'setROSControlEnabled',
  'clearMotionPlanCache',
  'validateGuidedMode',
  'vizbotOnlySetJointPositions',
  'vizbotOnlySetGripperState',
  'vizbotOnlySetDynamicBaseState',
  'vizbotOnlySetInputIO',
  'vizbotOnlyResetEquipmentList',
] as const;

type RecognizedRequestKind = (typeof RECOGNIZED_REQUEST_KINDS)[number];

function isRecognizedRequestKind(kind: string): kind is RecognizedRequestKind {
  return RECOGNIZED_REQUEST_KINDS.includes(kind as any);
}

type RecognizedRequestHandler = Record<
  RecognizedRequestKind,
  (message: Record<string, unknown>) => any
>;

function chunks(buffer: Buffer, chunkSize: number): Array<Buffer> {
  const result: Array<Buffer> = [];
  const len = buffer.length;
  let start = 0;

  while (start < len) {
    const end = start + chunkSize;
    result.push(buffer.slice(start, end));
    start = end;
  }

  return result;
}

/**
 * The RoutineRunnerPacketReceiver wraps a [[RoutineRunner]], taking network messages,
 * parsing them, dispatching them to the routine runner, and sending back responses
 * as network packets.
 */
export class RoutineRunnerPacketReceiver implements RecognizedRequestHandler {
  protected packets = new EventEmitter<PacketEvents>();

  protected messages = new EventEmitter<MessageEvents>();

  protected lifecycle = new EventEmitter<LifecycleEvents>();

  /**
   * Chunk size to use to split up packets so we don't send huge packets
   * all at once.
   *
   * RoutineRunnerPacketSender uses a JSONStream decoder to decode these chunks.
   */
  public chunkSize = DEFAULT_PACKET_CHUNK_SIZE_MAX;

  /**
   * Clean up functions to call when the receiver gets destroyed
   */
  protected destructors: Array<() => void> = [];

  /**
   * Extension methods that have been registered with `addExtendedMethod`
   */
  private extensionMethods = new Map<
    string,
    (message: Record<string, object>) => any
  >();

  /**
   * A buffer of packets so we can start making requests using the sender
   * before the network is actually open.
   *
   * Once `.onPacket(cb)` is called, this buffer is flushed
   */
  protected beforeOpenBuffer: Array<string> = [];

  private routineRunner: RoutineRunner;

  /**
   * The next message ID to use.
   * Increment by 2 whenever sending a new one.
   *
   * Messages initiated by the sender are even, and
   * messages initiated by the receiver are odd.
   *
   * Responses to messages use the same ID the original message
   * used.
   */
  private nextMessageID = 1;

  private stateUpdatePacketsTotalSize: number = 0;

  private stateUpdatePacketsMinSize: number | undefined = undefined;

  private stateUpdatePacketsMaxSize: number | undefined = undefined;

  private stateUpdatePacketsCount: number = 0;

  public constructor(args: {
    routineRunner: RoutineRunner;
    connectionID?: string;
    reportPacketSize?: boolean;
  }) {
    this.routineRunner = args.routineRunner;

    // when a new packet comes in, write it to the decoder for parsing
    this.destructors.push(
      this.packets.on('receive', async (packet) => {
        let data: any;

        try {
          data = JSON.parse(packet);
        } catch (e) {
          this.lifecycle.emit(
            'error',
            new Error(`Packet could not be parsed: ${e.message}`),
          );

          return;
        }

        try {
          if (typeof data !== 'object' || !data) {
            this.lifecycle.emit(
              'error',
              new TypeError(`Received non-object: ${typeof data}`),
            );

            return;
          }

          if (!('id' in data) || !('data' in data)) {
            this.lifecycle.emit(
              'error',
              new TypeError(
                `Received bad data from sender: ${JSON.stringify(data)}`,
              ),
            );

            return;
          }

          // message should be of type ReceiverMessage
          const message = data.data;

          if (!('kind' in message)) {
            this.lifecycle.emit(
              'error',
              new TypeError(
                `Received bad data from sender: ${JSON.stringify(data)}`,
              ),
            );

            return;
          }

          const { kind } = message;

          switch (kind) {
            case 'destroyed': {
              this.destroyHandle();
              break;
            }
            default: {
              try {
                const extensionMethod = this.extensionMethods.get(kind);
                log.debug(ns`packets.dispatch`, 'dispatching', { kind });

                if (isRecognizedRequestKind(kind)) {
                  const responseData = await this[kind](message);

                  this.messages.emit('send', data.id, {
                    kind: 'response',
                    responseData,
                  });
                } else if (extensionMethod) {
                  const responseData = await extensionMethod(message);

                  this.messages.emit('send', data.id, {
                    kind: 'response',
                    responseData,
                  });
                }
              } catch (e) {
                log.error(
                  ns`packets.error`,
                  'caught error while processing packets',
                  {
                    kind,
                    id: data.id,
                    error: e,
                  },
                );

                this.messages.emit('send', data.id, {
                  kind: 'response',
                  error: e?.message ?? e.toString(),
                });
              }

              break;
            }
          }
        } catch (e) {
          this.lifecycle.emit('error', e);
        }
      }),
    );

    // when sending messages, either buffer it or emit it to
    // `onPacket` listeners.
    this.destructors.push(
      this.messages.on('send', (id, data) => {
        const message = {
          id,
          data,
        };

        const buffer = Buffer.from(JSON.stringify(message));
        const chunked = chunks(buffer, this.chunkSize);

        if (!this.arePacketsHandled()) {
          this.beforeOpenBuffer = this.beforeOpenBuffer.concat(
            chunked.map((b) => b.toString()),
          );
        } else {
          chunked.forEach((chunk) => {
            this.packets.emit('send', chunk.toString());
          });
        }

        // report state update packet size
        if (args.reportPacketSize && data.kind === 'state') {
          this.reportPacketSize(buffer.length);
        }
      }),
    );

    const stateInterval = setInterval(() => {
      const state = this.routineRunner.getState();

      this.messages.emit('send', this.nextMessageID, {
        kind: 'state',
        state,
      });

      this.nextMessageID += 2;
    }, STATE_UPDATE_INTERVAL);

    this.destructors.push(() => {
      clearInterval(stateInterval);
    });
  }

  private reportPacketSize(size: number) {
    this.stateUpdatePacketsTotalSize += size;
    this.stateUpdatePacketsCount += 1;

    if (
      this.stateUpdatePacketsMinSize === undefined ||
      size < this.stateUpdatePacketsMinSize
    ) {
      this.stateUpdatePacketsMinSize = size;
    }

    if (
      this.stateUpdatePacketsMaxSize === undefined ||
      size > this.stateUpdatePacketsMaxSize
    ) {
      this.stateUpdatePacketsMaxSize = size;
    }

    if (
      this.stateUpdatePacketsCount >= STATE_UPDATE_REPORT_INTERVAL_PACKET_LENGTH
    ) {
      const stateUpdateAverageSize =
        this.stateUpdatePacketsTotalSize / this.stateUpdatePacketsCount;

      log.info(ns`packetSize.stats`, 'Packset size stats', {
        stateUpdateAverageSize,
        stateUpdateMaxSize: this.stateUpdatePacketsMaxSize,
        stateUpdateMinSize: this.stateUpdatePacketsMinSize,
      });

      this.stateUpdatePacketsTotalSize = 0;
      this.stateUpdatePacketsCount = 0;
      this.stateUpdatePacketsMinSize = undefined;
      this.stateUpdatePacketsMaxSize = undefined;
    }
  }

  /**
   * Is there an active `onPacket` handler?
   */
  public arePacketsHandled(): boolean {
    // This is an implementation detail, but `onPacket` should be called once
    // a connection has been established while the cancelation function should be
    // called once the connection closes, so the listener count is indicative
    // of whether the connection is open.
    return this.packets.listenerCount('send') !== 0;
  }

  /**
   * Subclasses can add extended methods which will get called when unrecognized kinds
   * come through the packets.
   *
   * This allows subclasses to add methods that can handle methods this library is not
   * equipped to handle.
   *
   * @param kind The string that identifies the method kind
   * @param method The method that will get called with the messages as they come in. This will
   *               not be bound to `this` when called, so be sure to either bind it ahead of time
   *               or use arrow functions to lexically bind `this` (see tests for examples). The
   *               return value will be sent through packets as a normal response, and errors will
   *               be caught and returned as errors.
   */
  protected addExtendedMethod(
    kind: string,
    method: (message: Record<string, object>) => any,
  ) {
    this.extensionMethods.set(kind, method);
  }

  /**
   * Listen for packets that should get sent to the receiver.
   *
   * Only call this once packets are ready to be sent to the other side.
   * Once this is called, packets that would previously have been sent get flushed
   * to the callback and no longer get buffered.
   *
   * @return a cancelation function
   */
  public onPacket(cb: (packet: string) => void): () => void {
    const cancel = this.packets.on('send', cb);

    if (this.packets.listenerCount('send') === 1) {
      this.lifecycle.emit('startPacketHandling');
    }

    this.beforeOpenBuffer.forEach((packet) => {
      cb(packet);
    });

    this.beforeOpenBuffer = [];

    return () => {
      cancel();

      if (!this.arePacketsHandled()) {
        this.lifecycle.emit('stopPacketHandling');
      }
    };
  }

  /**
   * Take in a packet from the sender
   */
  public receivePacket(packet: string) {
    this.packets.emit('receive', packet);
  }

  public destroyHandle() {
    this.destructors.forEach((destructor) => {
      destructor();
    });

    this.packets.removeAllListeners();
    this.lifecycle.removeAllListeners();
    this.messages.removeAllListeners();
  }

  public async destroy() {
    this.routineRunner.destroy();
  }

  public async loadRoutine(
    message: Record<string, unknown>,
  ): Promise<{ errors: Array<StepValidationMessage> }> {
    const routine = Routine.parse(message.routine);

    // TypeScript appears to not care about any type narrowing when it's
    // unknown, so just use any and try to validate correctly.
    const startConditions = message.startConditions as any;

    if (startConditions) {
      if (typeof startConditions !== 'object') {
        throw new TypeError('startConditions must be an object');
      }

      const { currentStepID, variables } = startConditions;

      if (typeof currentStepID !== 'string') {
        throw new TypeError('currentStepID must be a string');
      }

      if (!variables) {
        throw new TypeError(
          'variables must be provided when a currentStepID is provided',
        );
      }

      if (typeof variables !== 'object') {
        throw new TypeError(
          'variables must be a valid Routine Runner variable bag',
        );
      }
    }

    return this.routineRunner.loadRoutine(routine, startConditions);
  }

  async moveToJointSpacePoint(message: Record<string, unknown>) {
    const goal = ArmJointPositions.parse(message.goal);
    const options: SpeedProfile = SpeedProfile.parse(message.options);

    return this.routineRunner.moveToJointSpacePoint(goal, options);
  }

  async getJointAnglesForCartesianSpacePose(message: Record<string, unknown>) {
    const goal = CartesianPose.parse(message.goal);
    const motionKind = MotionKind.parse(message.motionKind);

    return this.routineRunner.getJointAnglesForCartesianSpacePose(
      goal,
      motionKind,
    );
  }

  async moveToCartesianSpacePose(message: Record<string, unknown>) {
    const goal = CartesianPose.parse(message.goal);
    const motionKind = MotionKind.parse(message.motionKind);
    const options: SpeedProfile = SpeedProfile.parse(message.options);

    return this.routineRunner.moveToCartesianSpacePose(
      goal,
      motionKind,
      options,
    );
  }

  async moveJointRelative(message: Record<string, unknown>) {
    const jointNumber = JointNumber.parse(message.jointNumber);

    const options: SpeedProfile = SpeedProfile.parse(message.options);

    return this.routineRunner.moveJointRelative(jointNumber, options);
  }

  async moveToolRelative(message: Record<string, unknown>) {
    const offset = CartesianPose.parse(message.offset);
    const frame = FrameOfReference.parse(message.frame);

    const options: SpeedProfile = SpeedProfile.parse(message.options);

    return this.routineRunner.moveToolRelative(frame, offset, options);
  }

  async actuateDevice(message: Record<string, unknown>) {
    const args = ActuateDeviceArgs.parse(message);

    return this.routineRunner.actuateDevice(args);
  }

  setOutputIO(message: Record<string, unknown>) {
    const { changes: unvalidatedChanges } = message;

    if (!Array.isArray(unvalidatedChanges)) {
      throw new TypeError('changes must be an array');
    }

    const changes = unvalidatedChanges.map((change, index) => {
      if (!change || typeof change !== 'object') {
        throw new TypeError(
          `All changes must be objects (change ${index} is not)`,
        );
      }

      return {
        label: RobotToExternalPort.parse(change.label),
        level: IOLevel.parse(change.level),
      };
    });

    return this.routineRunner.setOutputIO(changes);
  }

  async recover() {
    return this.routineRunner.recover();
  }

  async playRoutine(message: Record<string, unknown>) {
    const args = PlayRoutineArgs.parse(message);

    return this.routineRunner.playRoutine(args);
  }

  async skipPreflightTestRun() {
    return this.routineRunner.skipPreflightTestRun();
  }

  async changeRoutineSpeedProfile(
    message: Record<string, unknown>,
  ): Promise<void> {
    const speedProfile = SpeedProfile.parse(message.speedProfile);

    return this.routineRunner.changeRoutineSpeedProfile(speedProfile);
  }

  async setAntigravitySpeeds(message: Record<string, unknown>): Promise<void> {
    const speeds = ArmJointVelocities.parse(message.speeds);

    return this.routineRunner.setAntigravitySpeeds(speeds);
  }

  async stop(message: Record<string, unknown>) {
    if (typeof message.reasonForStopping !== 'string') {
      throw new TypeError('reasonForStopping must be a string');
    }

    return this.routineRunner.stop(message.reasonForStopping);
  }

  async pauseRoutine() {
    return this.routineRunner.pauseRoutine({ kind: 'user' });
  }

  async emergencyStop(message: Record<string, unknown>) {
    if (typeof message.source !== 'string') {
      throw new TypeError('source must be a string');
    }

    return this.routineRunner.emergencyStop(message.source);
  }

  async resumeRoutine() {
    return this.routineRunner.resumeRoutine();
  }

  async confirmStep(message: Record<string, unknown>) {
    const { stepID } = message;

    if (typeof stepID !== 'string') {
      throw new TypeError('stepID must be a string');
    }

    return this.routineRunner.confirmStep(stepID);
  }

  async setRobotPayload(message: Record<string, unknown>) {
    const payload = PayloadState.parse(message.payload);

    return this.routineRunner.setRobotPayload(payload);
  }

  async getGlobalSpeedLimits() {
    return this.routineRunner.getGlobalSpeedLimits();
  }

  async validateGuidedMode() {
    return this.routineRunner.validateGuidedMode();
  }

  async setJerkLimit(message: Record<string, unknown>) {
    const jerkLimit: JerkLimit = JerkLimit.parse(message.jerkLimit);

    return this.routineRunner.setJerkLimit(jerkLimit);
  }

  async runVisionMethod(
    message: Record<string, unknown>,
  ): Promise<RunVisionMethodResult> {
    const args = RunVisionMethodArgs.parse(message);

    return this.routineRunner.visionMethodRunner.run(args);
  }

  async setTCPOffsetOption(message: Record<string, unknown>): Promise<void> {
    const tcpOffsetOption = TCPOffsetOption.parse(message.tcpOffsetOption);

    return this.routineRunner.setTCPOffsetOption(tcpOffsetOption);
  }

  async unbrakeJoint(message: Record<string, unknown>): Promise<void> {
    const jointNumber = JointNumber.parse(message.jointNumber);

    return this.routineRunner.unbrakeJoint(jointNumber);
  }

  async setROSControlEnabled(message: Record<string, unknown>): Promise<void> {
    const enabled = zod.boolean().parse(message.enabled);

    return this.routineRunner.setROSControlEnabled(enabled);
  }

  async clearMotionPlanCache(_: Record<string, unknown>): Promise<void> {
    log.info('clearMotionPlanCache', 'clearing motion plan cache');

    return this.routineRunner.clearMotionPlanCache();
  }

  /* methods only supported when running in Vizbot (Simulated Robot/Blue Mode) */
  public vizbotOnlySetJointPositions = (
    _message: Record<string, unknown>,
  ): void => {
    throw new Error(
      'Only supported while using Simulated Robot (robot_kind=simulated)',
    );
  };

  public vizbotOnlySetDynamicBaseState = (
    _message: Record<string, unknown>,
  ): void => {
    throw new Error(
      'Only supported while using Simulated Robot (robot_kind=simulated)',
    );
  };

  public vizbotOnlySetGripperState = (
    _message: Record<string, unknown>,
  ): void => {
    throw new Error(
      'Only supported while using Simulated Robot (robot_kind=simulated)',
    );
  };

  public vizbotOnlyResetEquipmentList = (
    _message: Record<string, unknown>,
  ): void => {
    throw new Error(
      'Only supported while using Simulated Robot (robot_kind=simulated)',
    );
  };

  public vizbotOnlySetInputIO = (_message: Record<string, unknown>): void => {
    throw new Error(
      'Only supported while using Simulated Robot (robot_kind=simulated)',
    );
  };
}
