I'm writing a function that's responsible for updating account balance. In order to prevent concurrent updates, I'm first using with_for_update()
to grab a lock on the accounts, calculate the amount, update the balances, and then commit the session. In order to simulate concurrent requests, I spawn off two processes and run the function once in each. Here's the code for calculating the and updating the balances:
session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)
accounts = (
session.query(Account)
.filter(Account.id == [some account IDs])
.with_for_update()
.populate_existing()
.all()
)
print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print(hex(id(session)))
print("*" * 80)
# Calculate the total amount outstanding by account.
for account in accounts:
total_amount = _calculate_total_amount()
if account.balance >= total_amount:
# For accounts with sufficient balance, deduct the amount from the balance.
account.balance -= total_amount
else:
# Otherwise, save them for notification. Code omitted.
print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print("-" * 80)
session.commit()
Here's the output:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
0 and 1 are process numbers, and the hexadecimal number is the id of the session. You can see that the lock worked (process 0 blocked 1 until 0 committed), but 1 read stale data: the balance should have been 19930.01
, not 20000
, and in the output for process 1, the "Account version" should have been 2, not 1.
I've tried using populate_existing()
with no luck, although I suspect it was not going to be helpful anyway since the two sessions are distinct, and the session for process 1 shouldn't have populated anything until the lock is released by process 0. I've also tried "repeatable read" and "serializable" isolation levels, and was expecting an exception to be thrown in process 1 due to concurrent updates/read/write dependency between transactions, but nothing happend.
It's also interesting to note that the behavior is not consistent. Things work correctly when I run the block of code above locally, but almost never work when I build a Docker container with all the code and run it there. There is no difference in the package versions. I'm using Postgres and psycopg2.
I'm banging my head against the wall now trying to figure out what is happening. I feel like maybe I overlooked something simple. Any ideas?
from SQLAlchemy with_for_update reads stale data
No comments:
Post a Comment