Monday 27 November 2023

Email queue keeps sending out duplicate emails

I'm trying to implement an email queue, but it's not quite working as expected. The idea is to respond from an endpoint as quickly as possible and then just have the queue manager attempt to process the queue in the background, in this case send out an email.

One of the problems is that the interval keeps executing when this.running is set to true. Another problem is that an email sometimes sends out multiple times, when it should only be sent out once.

I've also tried using redis for this because I recognise the closure issue with initialize and also saw bull as a potential solution, but I'm trying to avoid as many unneccessary libraries as possible. Ideally, I'd like to make some sort of base class that I can reuse for different queues, for examples an SMS queue, not just an email queue. But for now, I guess I'm just trying to get this email queue to work.

What am I doing wrong here? Here's my code:

import emailClient from 'services/emailClient';
import randomUuid from 'services/randomUuid'

export interface EmailClientConfig {
    to: string;
}

const MAX_ATTEMPTS_FOR_EMAILS = 5;
class EmailQueueManager {
    private running = false;
    private emailQueue = new Map();

    private handleRemoveSentEmailsFromQueue(sentEmailUuids: string[]) {
        // delete each sent out email from queue
        sentEmailUuids.forEach((sentEmailUuid) => {
            this.emailQueue.delete(sentEmailUuid);
        });

        // set state to "ready to process"
        this.running = false;
    }

    private async handleProcessEmailQueue() {
        // determines if queue is currently being executed
        this.running = true;
        const processedEmails: Record<string, boolean> = {};
        // list of successfully sent emails
        const sentEmailUuids: string[] = [];

        // do nothing if currently being executed, so as
        // to not send out duplicate emails
        if (!this.emailQueue.size) {
            this.running = false;
            return;
        }

        const emails = [...this.emailQueue];

        for (let i = 0; i < emails.length; i++) {
            const [emailUuid, { attempts, emailLogEvent, ...emailConfig }] = emails[i];

            emailClient(emailConfig)
                .then((result) => {
                    sentEmailUuids.push(emailUuid);

                    // maybe do something else
                })
                .catch((err) => {
                    if (attempts > MAX_ATTEMPTS_FOR_EMAILS) {
                        // remove from queue permanently if
                        // attempt count exceeds what is allowed
                        this.emailQueue.delete(emailUuid);
                    } else {
                        // send back to queue and attempt to send email out again
                        this.emailQueue.set(emailUuid, {
                            attempts: attempts + 1,
                            emailLogEvent,
                            ...emailConfig,
                        });
                    }
                })
                .finally(() => {
                    // flag email as "processed", regardless of whether
                    // or not is was successfully sent
                    processedEmails[emailUuid] = true;
                });
        }

        // check every 250ms if all emails in queue
        // were processed (not necessarily successfully sent out)
        const checkProcessedEmailsInterval = setInterval(() => {
            if (Object.values(processedEmails).every(Boolean)) {
                this.handleRemoveSentEmailsFromQueue(sentEmailUuids);
                clearInterval(checkProcessedEmailsInterval);
            }
        }, 250);
    }
    public addEmailToQueue({
        emailConfig,
        emailLogEvent,
    }: {
        emailConfig: EmailClientConfig;
        emailLogEvent: string;
    }) {
        // add email to queue
        this.emailQueue.set(randomUuid(), { attempts: 0, emailLogEvent, ...emailConfig });
        // and immediately process the queue
        this.handleProcessEmailQueue();
    }

    public initialize() {
        // check every minute if there are
        // emails in queue that need to be processed
        setInterval(() => {
            if (!this.running) {
                this.handleProcessEmailQueue();
            }
        }, 1000 * 60);
    }
}

const emailQueueManager = new EmailQueueManager();

// instantiate the interval
emailQueueManager.initialize();

export default emailQueueManager;

And I use this like so:

import emailQueueManager from 'src/managers/queues/emailQueue';

. . .
async function someEndpoint(req, res) {
    // do stuff
    emailQueueManager.addEmailToQueue({
        emailConfig: {
            to: 'test@test.com',
        },
        emailLogEvent: 'sent_email',
    });
    // do more stuff

    res.end()
}
. . .


from Email queue keeps sending out duplicate emails

No comments:

Post a Comment