I'm trying to implement an email queue, but it's not quite working as expected. The idea is to respond from an endpoint as quickly as possible and then just have the queue manager attempt to process the queue in the background, in this case send out an email.
One of the problems is that the interval keeps executing when this.running
is set to true
. Another problem is that an email sometimes sends out multiple times, when it should only be sent out once.
I've also tried using redis for this because I recognise the closure issue with initialize
and also saw bull as a potential solution, but I'm trying to avoid as many unneccessary libraries as possible. Ideally, I'd like to make some sort of base class that I can reuse for different queues, for examples an SMS queue, not just an email queue. But for now, I guess I'm just trying to get this email queue to work.
What am I doing wrong here? Here's my code:
import emailClient from 'services/emailClient';
import randomUuid from 'services/randomUuid'
export interface EmailClientConfig {
to: string;
}
const MAX_ATTEMPTS_FOR_EMAILS = 5;
class EmailQueueManager {
private running = false;
private emailQueue = new Map();
private handleRemoveSentEmailsFromQueue(sentEmailUuids: string[]) {
// delete each sent out email from queue
sentEmailUuids.forEach((sentEmailUuid) => {
this.emailQueue.delete(sentEmailUuid);
});
// set state to "ready to process"
this.running = false;
}
private async handleProcessEmailQueue() {
// determines if queue is currently being executed
this.running = true;
const processedEmails: Record<string, boolean> = {};
// list of successfully sent emails
const sentEmailUuids: string[] = [];
// do nothing if currently being executed, so as
// to not send out duplicate emails
if (!this.emailQueue.size) {
this.running = false;
return;
}
const emails = [...this.emailQueue];
for (let i = 0; i < emails.length; i++) {
const [emailUuid, { attempts, emailLogEvent, ...emailConfig }] = emails[i];
emailClient(emailConfig)
.then((result) => {
sentEmailUuids.push(emailUuid);
// maybe do something else
})
.catch((err) => {
if (attempts > MAX_ATTEMPTS_FOR_EMAILS) {
// remove from queue permanently if
// attempt count exceeds what is allowed
this.emailQueue.delete(emailUuid);
} else {
// send back to queue and attempt to send email out again
this.emailQueue.set(emailUuid, {
attempts: attempts + 1,
emailLogEvent,
...emailConfig,
});
}
})
.finally(() => {
// flag email as "processed", regardless of whether
// or not is was successfully sent
processedEmails[emailUuid] = true;
});
}
// check every 250ms if all emails in queue
// were processed (not necessarily successfully sent out)
const checkProcessedEmailsInterval = setInterval(() => {
if (Object.values(processedEmails).every(Boolean)) {
this.handleRemoveSentEmailsFromQueue(sentEmailUuids);
clearInterval(checkProcessedEmailsInterval);
}
}, 250);
}
public addEmailToQueue({
emailConfig,
emailLogEvent,
}: {
emailConfig: EmailClientConfig;
emailLogEvent: string;
}) {
// add email to queue
this.emailQueue.set(randomUuid(), { attempts: 0, emailLogEvent, ...emailConfig });
// and immediately process the queue
this.handleProcessEmailQueue();
}
public initialize() {
// check every minute if there are
// emails in queue that need to be processed
setInterval(() => {
if (!this.running) {
this.handleProcessEmailQueue();
}
}, 1000 * 60);
}
}
const emailQueueManager = new EmailQueueManager();
// instantiate the interval
emailQueueManager.initialize();
export default emailQueueManager;
And I use this like so:
import emailQueueManager from 'src/managers/queues/emailQueue';
. . .
async function someEndpoint(req, res) {
// do stuff
emailQueueManager.addEmailToQueue({
emailConfig: {
to: 'test@test.com',
},
emailLogEvent: 'sent_email',
});
// do more stuff
res.end()
}
. . .
from Email queue keeps sending out duplicate emails
No comments:
Post a Comment