I am working with AWS Athena to get results. I have to initiate a query and then check to see if its completed.
I am now trying to write a unit test for the various states. Here is a sample code. I generate the athena connection from another function and hand it off to this function, as well as the execution ID.
def check_athena_status(athena, execution):
running = True
print('Checking Athena Execution Running State')
while running:
running_state = athena.get_query_execution(QueryExecutionId=execution)['QueryExecution']['Status']['State']
if running_state == 'SUCCEEDED':
print('Run SUCCEEDED')
running = False
elif running_state == 'RUNNING':
time.sleep(3)
print('Athena Query Still Running')
else:
raise RuntimeError('Athena Query Failed')
return True
I am basically trying to figure out is there a way where I can change the value of running_state from RUNNING to SUCCEEDED. I currently use this as the unit test for a successful run.
athena_succeed = mock.Mock()
execution_id = 'RandomID'
athena_succeed.get_query_execution.return_value = test_data.athena_succeeded
result = inventory_validator.check_athena_status(athena_succeed, execution_id)
assert result == True
where test_data.athena_succeeded is basically a dict
athena_succeed = {'QueryExecution': {
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': '2021-08-08'}
}
}
I also have a "RUNNING" one.
athena_running = {'QueryExecution': {
'Status': {'State': 'RUNNING',
'SubmissionDateTime': '2021-08-08'}
}
}
I am trying to test branches so I want to go from running to succeed. I know I can change the while true value, but I want to change the actual "athena response" in the middle of the loop. I tried with PropertyMock but I am not sure thats the right use case.
from Unit Test a While Loop while checking state
No comments:
Post a Comment