All files / rpc-codec-binary/src RxCompactBinaryBatchCodec.ts

93.85% Statements 107/114
88.67% Branches 47/53
100% Functions 5/5
94.44% Lines 102/108

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219      1x                 1x 509x 509x 509x 509x 509x 509x 509x 509x     1x               520x 520x 34x                 34x 34x 34x           486x 486x 486x                   1x 108x 108x       108x     108x       108x 108x 108x 108x 108x 108x 738x 738x   180x 180x   210x             210x   141x             141x   74x             74x   31x 31x   27x 27x   57x           57x   11x 11x   7x 7x         108x       108x 108x 108x 108x 108x 108x 738x 738x 738x   180x 180x 180x 180x 180x 180x 180x 180x   180x         425x 425x 425x 425x 425x 425x   210x 93x 210x   141x 75x 141x   74x 74x   425x         95x 95x 95x 95x   27x 27x   57x 23x 57x   11x 11x   95x     38x 38x 31x 38x           108x      
import {RpcMessageFormat} from '@jsonjoy.com/rpc-codec-base/lib/constants';
import {CompactMessageType} from '@jsonjoy.com/rpc-messages/lib/constants';
import {BinaryMessageType} from './constants';
import {BinaryMessageWriter} from './BinaryMessageWriter';
import type {CompactMessage} from '@jsonjoy.com/rpc-messages';
import type {getEncoder} from '@jsonjoy.com/json-type/lib/codegen/binary/shared';
import type {JsonValueCodec} from '@jsonjoy.com/json-pack/lib/codecs/types';
import type {BinBatchCodec} from '@jsonjoy.com/rpc-codec-base/lib/types';
import type {IReader} from '@jsonjoy.com/buffers/lib/types';
 
type Message = CompactMessage;
 
const readPayload = (decoder: JsonValueCodec['decoder'], uint8: Uint8Array, start: number, size: number): unknown => {
  Iif (!size) return undefined;
  const reader = decoder.reader;
  const source = reader.uint8;
  const x = reader.x;
  const value = decoder.read(uint8.subarray(start, start + size));
  reader.reset(source);
  reader.x = x;
  return value;
};
 
const readPayloadId = (
  reader: IReader,
  word: number,
): {
  payloadStart: number;
  payloadSize: number;
  id: number;
} => {
  const payloadStart = reader.x;
  if (word & 0b1_0000_00000000_00000000_00000000) {
    Iif (word & 0b10000000_00000000) {
      const payloadSize = ((0b1111_11111111 & (word >>> 16)) << 15) | (word & 0b1111111_11111111);
      reader.skip(payloadSize);
      return {
        payloadStart,
        payloadSize,
        id: reader.u16(),
      };
    }
    const payloadSize = ((0b1111_11111111 & (word >>> 16)) << 7) | ((word >>> 8) & 0x7f);
    reader.skip(payloadSize);
    return {
      payloadStart,
      payloadSize,
      id: ((word & 0xff) << 8) | reader.u8(),
    };
  }
  const payloadSize = (word >>> 16) & 0b1111_11111111;
  reader.skip(payloadSize);
  return {
    payloadStart,
    payloadSize,
    id: word & 0xffff,
  };
};
 
/**
 * Encodes from "compact" POJO format to "rx.binary" over the wire, and back.
 */
export class RxCompactBinaryBatchCodec implements BinBatchCodec<Message> {
  public readonly id = 'rx.binary';
  public readonly format = RpcMessageFormat.Binary;
  private readonly msgWriter: BinaryMessageWriter;
 
  constructor(
    public readonly codec: JsonValueCodec,
    getTypeEncoder?: typeof getEncoder,
  ) {
    this.msgWriter = new BinaryMessageWriter(getTypeEncoder);
  }
 
  public toChunk(batch: Message[]): Uint8Array {
    const codec = this.codec;
    const writer = codec.encoder.writer;
    writer.reset();
    const length = batch.length;
    const msgWriter = this.msgWriter;
    for (let i = 0; i < length; i++) {
      const message = batch[i];
      switch (message[0]) {
        case CompactMessageType.Notification:
          msgWriter.writeType2(codec, message[1] as string, message.length > 2 ? message[2] : undefined);
          break;
        case CompactMessageType.RequestData:
          msgWriter.writeType4(
            codec,
            BinaryMessageType.RequestData << 13,
            message[1] as number,
            message[2] as string,
            message.length > 3 ? message[3] : undefined,
          );
          break;
        case CompactMessageType.RequestComplete:
          msgWriter.writeType4(
            codec,
            BinaryMessageType.RequestComplete << 13,
            message[1] as number,
            message[2] as string,
            message.length > 3 ? message[3] : undefined,
          );
          break;
        case CompactMessageType.RequestError:
          msgWriter.writeType4(
            codec,
            BinaryMessageType.RequestError << 13,
            message[1] as number,
            message[2] as string,
            message[3],
          );
          break;
        case CompactMessageType.RequestUnsubscribe:
          writer.u32(0b11100000_00000000_00000000_00000000 | (message[1] as number));
          break;
        case CompactMessageType.ResponseData:
          msgWriter.writeType3(codec, BinaryMessageType.ResponseData << 13, message[1] as number, message[2]);
          break;
        case CompactMessageType.ResponseComplete:
          msgWriter.writeType3(
            codec,
            BinaryMessageType.ResponseComplete << 13,
            message[1] as number,
            message.length > 2 ? message[2] : undefined,
          );
          break;
        case CompactMessageType.ResponseError:
          msgWriter.writeType3(codec, BinaryMessageType.ResponseError << 13, message[1] as number, message[2]);
          break;
        case CompactMessageType.ResponseUnsubscribe:
          writer.u32(0b11100000_00000001_00000000_00000000 | (message[1] as number));
          break;
        default:
          throw new Error('UNKNOWN_MSG');
      }
    }
    return writer.flush();
  }
 
  public fromChunk(uint8: Uint8Array): Message[] {
    const codec = this.codec;
    const decoder = codec.decoder;
    const reader = decoder.reader;
    reader.reset(uint8);
    const messages: Message[] = [];
    while (reader.x < reader.uint8.length) {
      const word = reader.u32();
      const type = word >>> 29;
      switch (type) {
        case BinaryMessageType.Notification: {
          const nameLength = word & 0xff;
          const payloadSize = word >>> 8;
          const name = reader.ascii(nameLength);
          const payloadStart = reader.x;
          reader.skip(payloadSize);
          if (payloadSize) {
            const value = readPayload(decoder, reader.uint8, payloadStart, payloadSize);
            messages.push([CompactMessageType.Notification, name, value]);
          } else Emessages.push([CompactMessageType.Notification, name]);
          break;
        }
        case BinaryMessageType.RequestData:
        case BinaryMessageType.RequestComplete:
        case BinaryMessageType.RequestError: {
          const nameLength = reader.u8();
          const name = reader.ascii(nameLength);
          const {payloadStart, payloadSize, id} = readPayloadId(reader, word);
          const hasPayload = payloadSize > 0;
          const value = hasPayload ? readPayload(decoder, reader.uint8, payloadStart, payloadSize) : undefined;
          switch (type) {
            case BinaryMessageType.RequestData:
              if (hasPayload) messages.push([CompactMessageType.RequestData, id, name, value]);
              else messages.push([CompactMessageType.RequestData, id, name]);
              break;
            case BinaryMessageType.RequestComplete:
              if (hasPayload) messages.push([CompactMessageType.RequestComplete, id, name, value]);
              else messages.push([CompactMessageType.RequestComplete, id, name]);
              break;
            default:
              messages.push([CompactMessageType.RequestError, id, name, value]);
              break;
          }
          break;
        }
        case BinaryMessageType.ResponseData:
        case BinaryMessageType.ResponseComplete:
        case BinaryMessageType.ResponseError: {
          const {payloadStart, payloadSize, id} = readPayloadId(reader, word);
          const hasPayload = payloadSize > 0;
          const value = hasPayload ? readPayload(decoder, reader.uint8, payloadStart, payloadSize) : undefined;
          switch (type) {
            case BinaryMessageType.ResponseData:
              messages.push([CompactMessageType.ResponseData, id, value]);
              break;
            case BinaryMessageType.ResponseComplete:
              if (hasPayload) messages.push([CompactMessageType.ResponseComplete, id, value]);
              else messages.push([CompactMessageType.ResponseComplete, id]);
              break;
            default:
              messages.push([CompactMessageType.ResponseError, id, value]);
              break;
          }
          break;
        }
        case BinaryMessageType.Control: {
          const id = word & 0xffff;
          if (word & 0b1_00000000_00000000) messages.push([CompactMessageType.ResponseUnsubscribe, id]);
          else messages.push([CompactMessageType.RequestUnsubscribe, id]);
          break;
        }
        default:
          throw new Error('UNKNOWN_MSG');
      }
    }
    return messages;
  }
}