Sunday, 20 November 2022

How to stream x-ndjson content using Express and parse the streamed data?

I have a TS library using Node v19.1.0. The library has a function that observes streamed server events.

The server provides a /events route streaming 'application/x-ndjson' content which might be an event/ping/... ( sending a ping every x seconds is important to keep the connection alive )

My observe function parses the streamed data and inspects it. If it is a valid event it will pass it to a callback function. The caller also receives an abort function to abort the streaming on demand.

Whenever I run tests locally or via CI I get the following error

Warning: Test "observes events." generated asynchronous activity after the test ended. This activity created the error "AbortError: The operation was aborted." and would have caused the test to fail, but instead triggered an unhandledRejection event.

I tried to minimize the example code using plain JavaScript

const assert = require('assert/strict');
const express = require('express');
const { it } = require('node:test');

it('observes events.', async () => {
    const expectedEvent = { type: 'event', payload: { metadata: { type: 'entity-created', commandId: 'commandId' } } };

    const api = express();
    const server = api
        .use(express.json())
        .post('/events', (request, response) => {
            response.writeHead(200, {
                'content-type': 'application/x-ndjson',
            });

            const line = JSON.stringify(expectedEvent) + '\n';
            response.write(line);
        })
        .listen(3000);

    let stopObserving = () => {
        throw new Error('should never happen');
    };

    const actualEventPayload = await new Promise(async resolve => {
        stopObserving = await observeEvents(async newEvent => {
            resolve(newEvent);
        });
    });

    stopObserving();
    server.closeAllConnections();
    server.close();

    assert.deepEqual(actualEventPayload, expectedEvent.payload);
});

const observeEvents = async function (onReceivedFn) {
    const abortController = new AbortController();

    const response = await fetch('http://localhost:3000/events', {
        method: 'POST',
        headers: { 'content-type': 'application/json' },
        signal: abortController.signal,
    });

    if (!response.ok) {
        throw new Error('error handling goes here - request failed');
    }

    Promise.resolve().then(async () => {
        if (!response.body) {
            throw new Error('error handling goes here - missing response body');
        }

        for await (const item of parseStream(response.body, abortController)) {
            switch (item.type) {
                case 'event': {
                    await onReceivedFn(item.payload);

                    break;
                }
                case 'ping':
                    // Intentionally left blank
                    break;
                case 'error':
                    throw new Error('error handling goes here - stream failed');
                default:
                    throw new Error('error handling goes here - should never happen');
            }
        }
    });

    return () => { abortController.abort(); };
};

const parseLine = function () {
    return new TransformStream({
        transform(chunk, controller) {
            try {
                const data = JSON.parse(chunk);

                // ... check if this is a valid line...

                controller.enqueue(data);
            } catch (error) {
                controller.error(error);
            }
        },
    });
};

const splitLines = function () {
    let buffer = '';

    return new TransformStream({
        transform(chunk, controller) {
            buffer += chunk;

            const lines = buffer.split('\n');

            for (let i = 0; i < lines.length - 1; i++) {
                controller.enqueue(lines[i]);
            }

            buffer = lines.at(-1) ?? '';
        },
        flush(controller) {
            if (buffer.length > 0) {
                controller.enqueue(buffer);
            }
        },
    });
};

const parseStream = async function* (stream, abortController) {
    let streamReader;

    try {
        const pipedStream = stream
            .pipeThrough(new TextDecoderStream())
            .pipeThrough(splitLines())
            .pipeThrough(parseLine());

        streamReader = pipedStream.getReader();

        while (true) {
            const item = await streamReader.read();

            if (item.done) {
                break;
            }

            yield item.value;
        }
    } finally {
        await streamReader?.cancel();
        abortController.abort();
    }
};

Unfortunately, when running node --test, the test does not finish. I have to cancel it manually.

The test breaks with these lines

const actualEventPayload = await new Promise(async resolve => {
    stopObserving = await observeEvents(async newEvent => {
        resolve(newEvent);
    });
});

and I think that's because the Promise never resolves. I thought the stream parsing might have a bug but if you remove all the stream parsing stuff and replace

Promise.resolve().then(async () => {
    /* ... */
});

with

Promise.resolve().then(async () => {
    await onReceivedFn({ metadata: { type: 'entity-created', commandId: 'commandId' }});
});

it doesn't work neither. Does someone know what's wrong or missing?



from How to stream x-ndjson content using Express and parse the streamed data?

No comments:

Post a Comment