Monday 4 December 2023

svelte: Issue with API call staying pending when accessing application via IP address or hostname, server response as stream data, chat-UI

Title: Issue with API call staying pending when accessing application via IP address

Description:

I have forked the chat-ui project and made several changes, including Azure AD integration, OpenAI API compatible serving layer support, and making it more container-friendly. The application works fine on localhost, but when I try to access it via an IP address, I encounter an issue.

Problem:

The backend code provides data as a stream to the frontend using the POST method. On localhost, everything works as expected, but when accessing the application via an IP address, the API call from the network stays pending until it reaches component.close() in the frontend. The issue seems to be related to the stream not being processed properly.

Backend Code (excerpt):

export async function POST({ request, locals, params, getClientAddress }) {
    const id = z.string().parse(params.id);
    const convId = new ObjectId(id);
    const promptedAt = new Date();

    const userId = locals.user?._id ?? locals.sessionId;

    // check user
    if (!userId) {
        throw error(401, "Unauthorized");
    }
    console.log("post", {userId, params, ip: getClientAddress()})

    // check if the user has access to the conversation
    const conv = await collections.conversations.findOne({
        _id: convId,
        ...authCondition(locals),
    });

    if (!conv) {
        throw error(404, "Conversation not found");
    }

    // register the event for ratelimiting
    await collections.messageEvents.insertOne({
        userId: userId,
        createdAt: new Date(),
        ip: getClientAddress(),
    });

    // guest mode check
    if (
        !locals.user?._id &&
        requiresUser &&
        (MESSAGES_BEFORE_LOGIN ? parseInt(MESSAGES_BEFORE_LOGIN) : 0) > 0
    ) {
        const totalMessages =
            (
                await collections.conversations
                    .aggregate([
                        { $match: authCondition(locals) },
                        { $project: { messages: 1 } },
                        { $unwind: "$messages" },
                        { $match: { "messages.from": "assistant" } },
                        { $count: "messages" },
                    ])
                    .toArray()
            )[0]?.messages ?? 0;

        if (totalMessages > parseInt(MESSAGES_BEFORE_LOGIN)) {
            throw error(429, "Exceeded number of messages before login");
        }
    }

    // check if the user is rate limited
    const nEvents = Math.max(
        await collections.messageEvents.countDocuments({ userId }),
        await collections.messageEvents.countDocuments({ ip: getClientAddress() })
    );

    if (RATE_LIMIT != "" && nEvents > parseInt(RATE_LIMIT)) {
        throw error(429, ERROR_MESSAGES.rateLimited);
    }

    // fetch the model
    const model = models.find((m) => m.id === conv.model);

    if (!model) {
        throw error(410, "Model not available anymore");
    }

    // finally parse the content of the request
    const json = await request.json();

    const {
        inputs: newPrompt,
        response_id: responseId,
        id: messageId,
        is_retry,
        web_search: webSearch,
    } = z
        .object({
            inputs: z.string().trim().min(1),
            id: z.optional(z.string().uuid()),
            response_id: z.optional(z.string().uuid()),
            is_retry: z.optional(z.boolean()),
            web_search: z.optional(z.boolean()),
        })
        .parse(json);

    // get the list of messages
    // while checking for retries
    let messages = (() => {
        if (is_retry && messageId) {
            // if the message is a retry, replace the message and remove the messages after it
            let retryMessageIdx = conv.messages.findIndex((message) => message.id === messageId);
            if (retryMessageIdx === -1) {
                retryMessageIdx = conv.messages.length;
            }
            return [
                ...conv.messages.slice(0, retryMessageIdx),
                { content: newPrompt, from: "user", id: messageId as Message["id"], updatedAt: new Date() },
            ];
        } // else append the message at the bottom

        return [
            ...conv.messages,
            {
                content: newPrompt,
                from: "user",
                id: (messageId as Message["id"]) || crypto.randomUUID(),
                createdAt: new Date(),
                updatedAt: new Date(),
            },
        ];
    })() satisfies Message[];

    await collections.conversations.updateOne(
        {
            _id: convId,
        },
        {
            $set: {
                messages,
                title: conv.title,
                updatedAt: new Date(),
            },
        }
    );

    // we now build the stream
    const stream = new ReadableStream({
        async start(controller) {
            const updates: MessageUpdate[] = [];

            function update(newUpdate: MessageUpdate) {
                if (newUpdate.type !== "stream") {
                    updates.push(newUpdate);
                }
                controller.enqueue(JSON.stringify(newUpdate) + "\n");
            }

            update({ type: "status", status: "started" });

            if (conv.title === "New Chat" && messages.length === 1) {
                try {
                    conv.title = (await summarize(newPrompt)) ?? conv.title;
                    update({ type: "status", status: "title", message: conv.title });
                } catch (e) {
                    console.error(e);
                }
            }

            await collections.conversations.updateOne(
                {
                    _id: convId,
                },
                {
                    $set: {
                        messages,
                        title: conv.title,
                        updatedAt: new Date(),
                    },
                }
            );

            let webSearchResults: WebSearch | undefined;

            if (webSearch) {
                webSearchResults = await runWebSearch(conv, newPrompt, update);
            }

            messages[messages.length - 1].webSearch = webSearchResults;

            conv.messages = messages;

            const endpoint = await model.getEndpoint();

            for await (const output of await endpoint({ conversation: conv })) {
                // if not generated_text is here it means the generation is not done
                if (!output.generated_text) {
                    // else we get the next token
                    if (!output.token.special) {
                        update({
                            type: "stream",
                            token: output.token.text,
                        });

                        // if the last message is not from assistant, it means this is the first token
                        const lastMessage = messages[messages.length - 1];

                        if (lastMessage?.from !== "assistant") {
                            // so we create a new message
                            messages = [
                                ...messages,
                                // id doesn't match the backend id but it's not important for assistant messages
                                // First token has a space at the beginning, trim it
                                {
                                    from: "assistant",
                                    content: output.token.text.trimStart(),
                                    webSearch: webSearchResults,
                                    updates: updates,
                                    id: (responseId as Message["id"]) || crypto.randomUUID(),
                                    createdAt: new Date(),
                                    updatedAt: new Date(),
                                },
                            ];
                        } else {
                            // abort check
                            const date = abortedGenerations.get(convId.toString());
                            if (date && date > promptedAt) {
                                break;
                            }

                            if (!output) {
                                break;
                            }

                            // otherwise we just concatenate tokens
                            lastMessage.content += output.token.text;
                        }
                    }
                } else {
                    // add output.generated text to the last message
                    messages = [
                        ...messages.slice(0, -1),
                        {
                            ...messages[messages.length - 1],
                            content: output.generated_text,
                            updates: updates,
                            updatedAt: new Date(),
                        },
                    ];
                }
            }

            await collections.conversations.updateOne(
                {
                    _id: convId,
                },
                {
                    $set: {
                        messages,
                        title: conv?.title,
                        updatedAt: new Date(),
                    },
                }
            );

            update({
                type: "finalAnswer",
                text: messages[messages.length - 1].content,
            });
            controller.close();
        },
        async cancel() {
            await collections.conversations.updateOne(
                {
                    _id: convId,
                },
                {
                    $set: {
                        messages,
                        title: conv.title,
                        updatedAt: new Date(),
                    },
                }
            );
        },
    });

    // Todo: maybe we should wait for the message to be saved before ending the response - in case of errors
    return new Response(stream, {
        headers: {
            "Content-Type": "application/x-ndjson",
        },
    });
}

Frontend Code (excerpt):

async function writeMessage(message: string, messageId = randomUUID()) {
        if (!message.trim()) return;

        try {
            isAborted = false;
            loading = true;
            pending = true;

            // first we check if the messageId already exists, indicating a retry

            let retryMessageIndex = messages.findIndex((msg) => msg.id === messageId);
            const isRetry = retryMessageIndex !== -1;
            // if it's not a retry we just use the whole array
            if (!isRetry) {
                retryMessageIndex = messages.length;
            }

            // slice up to the point of the retry
            messages = [
                ...messages.slice(0, retryMessageIndex),
                { from: "user", content: message, id: messageId },
            ];

            const responseId = randomUUID();

            const response = await fetch(`${base}/conversation/${$page.params.id}`, {
                method: "POST",
                headers: { "Content-Type": "application/json" },
                body: JSON.stringify({
                    inputs: message,
                    id: messageId,
                    response_id: responseId,
                    is_retry: isRetry,
                    web_search: $webSearchParameters.useSearch,
                }),
            });

            if (!response.body) {
                throw new Error("Body not defined");
            }

            if (!response.ok) {
                error.set((await response.json())?.message);
                return;
            }
            // eslint-disable-next-line no-undef
            const encoder = new TextDecoderStream();
            const reader = response?.body?.pipeThrough(encoder).getReader();
            let finalAnswer = "";

            // this is a bit ugly
            // we read the stream until we get the final answer
            while (finalAnswer === "") {
                await new Promise((r) => setTimeout(r, 25));

                // check for abort
                if (isAborted) {
                    reader?.cancel();
                    break;
                }

                // if there is something to read
                await reader?.read().then(async ({ done, value }) => {
                    // we read, if it's done we cancel
                    if (done) {
                        reader.cancel();
                        return;
                    }

                    if (!value) {
                        return;
                    }

                    // if it's not done we parse the value, which contains all messages
                    const inputs = value.split("\n");
                    inputs.forEach(async (el: string) => {
                        try {
                            const update = JSON.parse(el) as MessageUpdate;
                            if (update.type === "finalAnswer") {
                                finalAnswer = update.text;
                                reader.cancel();
                                invalidate(UrlDependency.Conversation);
                            } else if (update.type === "stream") {
                                pending = false;

                                let lastMessage = messages[messages.length - 1];

                                if (lastMessage.from !== "assistant") {
                                    messages = [
                                        ...messages,
                                        { from: "assistant", id: randomUUID(), content: update.token },
                                    ];
                                } else {
                                    lastMessage.content += update.token;
                                    messages = [...messages];
                                }
                            } else if (update.type === "webSearch") {
                                webSearchMessages = [...webSearchMessages, update];
                            } else if (update.type === "status") {
                                if (update.status === "title" && update.message) {
                                    const conv = data.conversations.find(({ id }) => id === $page.params.id);
                                    if (conv) {
                                        conv.title = update.message;

                                        $titleUpdate = {
                                            title: update.message,
                                            convId: $page. params.id,
                                        };
                                    }
                                }
                            }
                        } catch (parseError) {
                            // in case of parsing error we wait for the next message
                            return;
                        }
                    });
                });
            }

            // reset the websearchmessages
            webSearchMessages = [];

            await invalidate(UrlDependency.ConversationList);
        } catch (err) {
            if (err instanceof Error && err.message.includes("overloaded")) {
                $error = "Too much traffic, please try again.";
            } else if (err instanceof Error && err.message.includes("429")) {
                $error = ERROR_MESSAGES.rateLimited;
            } else if (err instanceof Error) {
                $error = err.message;
            } else {
                $error = ERROR_MESSAGES.default;
            }
            console.error(err);
        } finally {
            loading = false;
            pending = false;
        }
    }

Steps to Reproduce:

  1. Fork the chat-ui project.
  2. Make the specified changes related to Azure AD integration, OpenAI API, and container support.
  3. Run the application on localhost and access it via an IP address.
  4. Observe the behavior where the API call stays pending until component.close() is reached.

Expected Behavior:

The application should behave consistently whether accessed via localhost or an IP address or hostname. The API call should not stay pending, and the stream should be processed correctly.

Additional Information:

  • before Adding component.close() in the stream code the windows machine was not at all returning the response and at the end it was saying promise uncaught

  • Network requests and responses from the browser's developer tools. Local host starts immediately and waterfall of api start getting response and stream is wring at FE correctly : enter image description here With IP it stays in pending state and when final message arrives at that time it writes everything at once
    enter image description here server LOGs: enter image description here you can see the difference when ip ::1 it starts writing at FE and processing stream data, when IP '::ffff:10.10.100.106' it stays in pending until the stream generation is completed i assume

  • Any specific configurations or dependencies related to hosting the application via an IP address.

Environment:

  • Operating System: it is working fine on mac but facing issues in windows
  • Browser: chrome
  • Node.js version: >18
  • Any other relevant environment details.

Note: Please let me know if additional code snippets or information are needed. Thanks for your help!


Feel free to customize the template based on your specific situation and provide any additional details that might be relevant to the issue.



from svelte: Issue with API call staying pending when accessing application via IP address or hostname, server response as stream data, chat-UI

No comments:

Post a Comment