Thursday, 26 May 2022

Using Python flock() across multiple processes

I have some legacy code that needs to access the same data file across multiple threads and processes. I am trying to implement locking to protect the data.

Multithreaded

import contextlib
import threading

FILE_PATH = "foo.txt"

USE_GLOBAL_LOCK = False

if USE_GLOBAL_LOCK:
    global_lock = threading.Lock()
else:
    global_lock = contextlib.nullcontext()

def do_write_then_read(results) -> None:
    # Write to disk
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            f.write(data)

    # Read from disk
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            data = f.read()
    results.append(data)

def run_multithreaded() -> None:
    results = []

    threads = []
    for _ in range(10):
        threads.append(threading.Thread(target=do_write_then_read, args=[results]))
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(results)

The output is usually correct:

 ['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

but sometimes there is missing data. I believe this is due to a race condition between the read() and the write(), which initially truncates the file:

 ['', '', 'FOO', '', '', 'FOO', '', 'FOO', 'FOO', 'FOO']`

Setting USE_GLOBAL_LOCK to protect the file access using a threading.Lock indeed fixes the problem.

Multithreaded and multiprocess

However, running this across multiple processes again results in missing data.

Here's some test code that forks subprocesses that each invoke the above run_multithreaded() method.

import fcntl
import subprocess

def run_multiprocess() -> None:
    processes = []
    for _ in range(3):
        CMD = "python3 -c 'import foo; foo.run_multithreaded()'"
        processes.append(subprocess.Popen(CMD, shell=True))
    for p in processes:
        p.wait()

Output with missing data:

['', '', 'FOO', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['', '', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

Here we redefine do_write_then_read() to add filesystem-based locks (flock) so that the file can be locked across multiple processes:

def do_write_then_read(results) -> None:
    # write
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            f.write(data)
            # Exiting the context closes file and releases lock

    # read
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            data = f.read()
            # Exiting the context closes file and releases lock
    results.append(data)

However, this doesn't fix the problem, and I can't figure out why, no matter what I try :P

I'm on Mac / Linux with Python 3.9.



from Using Python flock() across multiple processes

No comments:

Post a Comment