Title: Issue with API call staying pending when accessing application via IP address
Description:
I have forked the chat-ui project and made several changes, including Azure AD integration, OpenAI API compatible serving layer support, and making it more container-friendly. The application works fine on localhost, but when I try to access it via an IP address, I encounter an issue.
Problem:
The backend code provides data as a stream to the frontend using the POST
method. On localhost, everything works as expected, but when accessing the application via an IP address, the API call from the network stays pending until it reaches component.close()
in the frontend. The issue seems to be related to the stream not being processed properly.
Backend Code (excerpt):
export async function POST({ request, locals, params, getClientAddress }) {
const id = z.string().parse(params.id);
const convId = new ObjectId(id);
const promptedAt = new Date();
const userId = locals.user?._id ?? locals.sessionId;
// check user
if (!userId) {
throw error(401, "Unauthorized");
}
console.log("post", {userId, params, ip: getClientAddress()})
// check if the user has access to the conversation
const conv = await collections.conversations.findOne({
_id: convId,
...authCondition(locals),
});
if (!conv) {
throw error(404, "Conversation not found");
}
// register the event for ratelimiting
await collections.messageEvents.insertOne({
userId: userId,
createdAt: new Date(),
ip: getClientAddress(),
});
// guest mode check
if (
!locals.user?._id &&
requiresUser &&
(MESSAGES_BEFORE_LOGIN ? parseInt(MESSAGES_BEFORE_LOGIN) : 0) > 0
) {
const totalMessages =
(
await collections.conversations
.aggregate([
{ $match: authCondition(locals) },
{ $project: { messages: 1 } },
{ $unwind: "$messages" },
{ $match: { "messages.from": "assistant" } },
{ $count: "messages" },
])
.toArray()
)[0]?.messages ?? 0;
if (totalMessages > parseInt(MESSAGES_BEFORE_LOGIN)) {
throw error(429, "Exceeded number of messages before login");
}
}
// check if the user is rate limited
const nEvents = Math.max(
await collections.messageEvents.countDocuments({ userId }),
await collections.messageEvents.countDocuments({ ip: getClientAddress() })
);
if (RATE_LIMIT != "" && nEvents > parseInt(RATE_LIMIT)) {
throw error(429, ERROR_MESSAGES.rateLimited);
}
// fetch the model
const model = models.find((m) => m.id === conv.model);
if (!model) {
throw error(410, "Model not available anymore");
}
// finally parse the content of the request
const json = await request.json();
const {
inputs: newPrompt,
response_id: responseId,
id: messageId,
is_retry,
web_search: webSearch,
} = z
.object({
inputs: z.string().trim().min(1),
id: z.optional(z.string().uuid()),
response_id: z.optional(z.string().uuid()),
is_retry: z.optional(z.boolean()),
web_search: z.optional(z.boolean()),
})
.parse(json);
// get the list of messages
// while checking for retries
let messages = (() => {
if (is_retry && messageId) {
// if the message is a retry, replace the message and remove the messages after it
let retryMessageIdx = conv.messages.findIndex((message) => message.id === messageId);
if (retryMessageIdx === -1) {
retryMessageIdx = conv.messages.length;
}
return [
...conv.messages.slice(0, retryMessageIdx),
{ content: newPrompt, from: "user", id: messageId as Message["id"], updatedAt: new Date() },
];
} // else append the message at the bottom
return [
...conv.messages,
{
content: newPrompt,
from: "user",
id: (messageId as Message["id"]) || crypto.randomUUID(),
createdAt: new Date(),
updatedAt: new Date(),
},
];
})() satisfies Message[];
await collections.conversations.updateOne(
{
_id: convId,
},
{
$set: {
messages,
title: conv.title,
updatedAt: new Date(),
},
}
);
// we now build the stream
const stream = new ReadableStream({
async start(controller) {
const updates: MessageUpdate[] = [];
function update(newUpdate: MessageUpdate) {
if (newUpdate.type !== "stream") {
updates.push(newUpdate);
}
controller.enqueue(JSON.stringify(newUpdate) + "\n");
}
update({ type: "status", status: "started" });
if (conv.title === "New Chat" && messages.length === 1) {
try {
conv.title = (await summarize(newPrompt)) ?? conv.title;
update({ type: "status", status: "title", message: conv.title });
} catch (e) {
console.error(e);
}
}
await collections.conversations.updateOne(
{
_id: convId,
},
{
$set: {
messages,
title: conv.title,
updatedAt: new Date(),
},
}
);
let webSearchResults: WebSearch | undefined;
if (webSearch) {
webSearchResults = await runWebSearch(conv, newPrompt, update);
}
messages[messages.length - 1].webSearch = webSearchResults;
conv.messages = messages;
const endpoint = await model.getEndpoint();
for await (const output of await endpoint({ conversation: conv })) {
// if not generated_text is here it means the generation is not done
if (!output.generated_text) {
// else we get the next token
if (!output.token.special) {
update({
type: "stream",
token: output.token.text,
});
// if the last message is not from assistant, it means this is the first token
const lastMessage = messages[messages.length - 1];
if (lastMessage?.from !== "assistant") {
// so we create a new message
messages = [
...messages,
// id doesn't match the backend id but it's not important for assistant messages
// First token has a space at the beginning, trim it
{
from: "assistant",
content: output.token.text.trimStart(),
webSearch: webSearchResults,
updates: updates,
id: (responseId as Message["id"]) || crypto.randomUUID(),
createdAt: new Date(),
updatedAt: new Date(),
},
];
} else {
// abort check
const date = abortedGenerations.get(convId.toString());
if (date && date > promptedAt) {
break;
}
if (!output) {
break;
}
// otherwise we just concatenate tokens
lastMessage.content += output.token.text;
}
}
} else {
// add output.generated text to the last message
messages = [
...messages.slice(0, -1),
{
...messages[messages.length - 1],
content: output.generated_text,
updates: updates,
updatedAt: new Date(),
},
];
}
}
await collections.conversations.updateOne(
{
_id: convId,
},
{
$set: {
messages,
title: conv?.title,
updatedAt: new Date(),
},
}
);
update({
type: "finalAnswer",
text: messages[messages.length - 1].content,
});
controller.close();
},
async cancel() {
await collections.conversations.updateOne(
{
_id: convId,
},
{
$set: {
messages,
title: conv.title,
updatedAt: new Date(),
},
}
);
},
});
// Todo: maybe we should wait for the message to be saved before ending the response - in case of errors
return new Response(stream, {
headers: {
"Content-Type": "application/x-ndjson",
},
});
}
Frontend Code (excerpt):
async function writeMessage(message: string, messageId = randomUUID()) {
if (!message.trim()) return;
try {
isAborted = false;
loading = true;
pending = true;
// first we check if the messageId already exists, indicating a retry
let retryMessageIndex = messages.findIndex((msg) => msg.id === messageId);
const isRetry = retryMessageIndex !== -1;
// if it's not a retry we just use the whole array
if (!isRetry) {
retryMessageIndex = messages.length;
}
// slice up to the point of the retry
messages = [
...messages.slice(0, retryMessageIndex),
{ from: "user", content: message, id: messageId },
];
const responseId = randomUUID();
const response = await fetch(`${base}/conversation/${$page.params.id}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
inputs: message,
id: messageId,
response_id: responseId,
is_retry: isRetry,
web_search: $webSearchParameters.useSearch,
}),
});
if (!response.body) {
throw new Error("Body not defined");
}
if (!response.ok) {
error.set((await response.json())?.message);
return;
}
// eslint-disable-next-line no-undef
const encoder = new TextDecoderStream();
const reader = response?.body?.pipeThrough(encoder).getReader();
let finalAnswer = "";
// this is a bit ugly
// we read the stream until we get the final answer
while (finalAnswer === "") {
await new Promise((r) => setTimeout(r, 25));
// check for abort
if (isAborted) {
reader?.cancel();
break;
}
// if there is something to read
await reader?.read().then(async ({ done, value }) => {
// we read, if it's done we cancel
if (done) {
reader.cancel();
return;
}
if (!value) {
return;
}
// if it's not done we parse the value, which contains all messages
const inputs = value.split("\n");
inputs.forEach(async (el: string) => {
try {
const update = JSON.parse(el) as MessageUpdate;
if (update.type === "finalAnswer") {
finalAnswer = update.text;
reader.cancel();
invalidate(UrlDependency.Conversation);
} else if (update.type === "stream") {
pending = false;
let lastMessage = messages[messages.length - 1];
if (lastMessage.from !== "assistant") {
messages = [
...messages,
{ from: "assistant", id: randomUUID(), content: update.token },
];
} else {
lastMessage.content += update.token;
messages = [...messages];
}
} else if (update.type === "webSearch") {
webSearchMessages = [...webSearchMessages, update];
} else if (update.type === "status") {
if (update.status === "title" && update.message) {
const conv = data.conversations.find(({ id }) => id === $page.params.id);
if (conv) {
conv.title = update.message;
$titleUpdate = {
title: update.message,
convId: $page. params.id,
};
}
}
}
} catch (parseError) {
// in case of parsing error we wait for the next message
return;
}
});
});
}
// reset the websearchmessages
webSearchMessages = [];
await invalidate(UrlDependency.ConversationList);
} catch (err) {
if (err instanceof Error && err.message.includes("overloaded")) {
$error = "Too much traffic, please try again.";
} else if (err instanceof Error && err.message.includes("429")) {
$error = ERROR_MESSAGES.rateLimited;
} else if (err instanceof Error) {
$error = err.message;
} else {
$error = ERROR_MESSAGES.default;
}
console.error(err);
} finally {
loading = false;
pending = false;
}
}
Steps to Reproduce:
- Fork the chat-ui project.
- Make the specified changes related to Azure AD integration, OpenAI API, and container support.
- Run the application on localhost and access it via an IP address.
- Observe the behavior where the API call stays pending until
component.close()
is reached.
Expected Behavior:
The application should behave consistently whether accessed via localhost or an IP address or hostname. The API call should not stay pending, and the stream should be processed correctly.
Additional Information:
-
before Adding component.close() in the stream code the windows machine was not at all returning the response and at the end it was saying promise uncaught
-
Network requests and responses from the browser's developer tools. Local host starts immediately and waterfall of api start getting response and stream is wring at FE correctly : With IP it stays in pending state and when final message arrives at that time it writes everything at once
server LOGs: you can see the difference when ip ::1 it starts writing at FE and processing stream data, when IP '::ffff:10.10.100.106' it stays in pending until the stream generation is completed i assume -
Any specific configurations or dependencies related to hosting the application via an IP address.
Environment:
- Operating System: it is working fine on mac but facing issues in windows
- Browser: chrome
- Node.js version: >18
- Any other relevant environment details.
Note: Please let me know if additional code snippets or information are needed. Thanks for your help!
Feel free to customize the template based on your specific situation and provide any additional details that might be relevant to the issue.
from svelte: Issue with API call staying pending when accessing application via IP address or hostname, server response as stream data, chat-UI
No comments:
Post a Comment