All files / rpc-server/src/__tests__/json-crdt-server pubsub.ts

98.85% Statements 86/87
66.66% Branches 2/3
86.36% Functions 19/22
98.73% Lines 78/79

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 1291x     6x   1x 1x   1x 1x 1x 1x 1x           1x   1x     1x 1x 1x 1x 1x 1x 1x   1x       2x 1x 1x 1x     1x 1x 1x 1x 1x 2x   1x       1x       2x 1x 1x 1x       1x 1x 1x 1x     1x 1x 1x 1x 1x 1x 1x 1x 1x 1x   1x 1x   1x 1x   1x 2x   1x       1x 1x 1x 1x 1x       1x       2x 1x 1x 1x 1x 1x 1x 1x 1x       2x 1x 1x 1x 1x 1x 1x          
import {tick, until} from 'thingies';
import type {ApiTestSetup, JsonCrdtTestSetup} from '../../__demos__/json-crdt-server/__tests__/setup';
 
const channelId = () => 'channel-' + Math.random().toString(36).slice(2) + '-' + Date.now().toString(36);
 
export const runPubsubTests = (_setup: ApiTestSetup, params: {staticOnly?: true} = {}) => {
  const setup = _setup as JsonCrdtTestSetup;
 
  describe('pubsub', () => {
    test('throws error on invalid input', async () => {
      const {call, stop} = await setup();
      try {
        await call('pubsub.publish', {
          channel2: channelId(),
          message: 'hello world',
        } as any);
        throw new Error('should not reach here');
      } catch (err: any) {
        expect(err.meta.path).toEqual(['channel2']);
      }
      stop();
    });
 
    Eif (!params.staticOnly) {
      test('can subscribe and receive published messages', async () => {
        const {call, call$, stop} = await setup();
        const emits: any[] = [];
        const channel = channelId();
        const subscription = call$('pubsub.listen', {channel}).subscribe((res) => {
          emits.push(res.message);
        });
        await call('pubsub.publish', {
          channel,
          message: 'hello world',
        });
        await until(() => emits.length === 1);
        expect(emits).toEqual(['hello world']);
        subscription.unsubscribe();
        stop();
      });
 
      test('does not receive messages after un-subscription', async () => {
        const {call, call$, stop} = await setup();
        const emits: any[] = [];
        const channel = channelId();
        const sub = call$('pubsub.listen', {channel}).subscribe((res) => {
          emits.push(res.message);
        });
        await call('pubsub.publish', {
          channel,
          message: 'msg1',
        });
        await call('pubsub.publish', {
          channel,
          message: 'msg2',
        });
        await until(() => emits.length === 2);
        sub.unsubscribe();
        await tick(25);
        await call('pubsub.publish', {
          channel,
          message: 'msg3',
        });
        await tick(50);
        expect(emits.indexOf('msg1') > -1).toBe(true);
        expect(emits.indexOf('msg2') > -1).toBe(true);
        stop();
      });
 
      test('multiple multiple subscribers can subscribe to multiple channels', async () => {
        const {call, call$, stop} = await setup();
        const channel1 = channelId();
        const channel2 = channelId();
        const channel3 = channelId();
        const user1: any[] = [];
        const user2: any[] = [];
        const user3: any[] = [];
        call$('pubsub.listen', {channel: channel1}).subscribe((res) => {
          user1.push(res.message);
        });
        const sub2 = call$('pubsub.listen', {channel: channel2}).subscribe((res) => {
          user2.push(res.message);
        });
        call$('pubsub.listen', {channel: channel1}).subscribe((res) => {
          user3.push(res.message);
        });
        call$('pubsub.listen', {channel: channel2}).subscribe((res) => {
          user3.push(res.message);
        });
        await call('pubsub.publish', {
          channel: channel3,
          message: 'hello world',
        });
        await tick(50);
        expect(user1).toEqual([]);
        expect(user2).toEqual([]);
        expect(user3).toEqual([]);
        call('pubsub.publish', {
          channel: channel1,
          message: 'msg1',
        }).catch(() => {});
        call('pubsub.publish', {
          channel: channel2,
          message: 'msg2',
        }).catch(() => {});
        await until(() => user1.length === 1);
        await until(() => user2.length === 1);
        await until(() => user3.length === 2);
        expect(user1).toEqual(['msg1']);
        expect(user2).toEqual(['msg2']);
        expect(user3.indexOf('msg1') > -1).toBe(true);
        expect(user3.indexOf('msg2') > -1).toBe(true);
        sub2.unsubscribe();
        call('pubsub.publish', {
          channel: channel2,
          message: 'msg3',
        }).catch(() => {});
        await until(() => user3.length === 3);
        expect(user1).toEqual(['msg1']);
        expect(user2).toEqual(['msg2']);
        expect(user3.indexOf('msg1') > -1).toBe(true);
        expect(user3.indexOf('msg2') > -1).toBe(true);
        expect(user3.indexOf('msg3') > -1).toBe(true);
        stop();
      });
    }
  });
};