Scenario: I've written an AWS Lambda function that fires upon a CSV file being uploaded to an S3 bucket and will stream split the file by x-sized-MB chunks to multiple gzipped parquet files (the number of slices on a RedShift cluster for evenly distributed processing/loading). The idea here being that if I have a 3GB Lambda function, and receive an 8GB CSV file, or bigger, I should be able to process it in 1GB chunks, without reading the whole 8GB into memory and exceeding the 3GB limit.
import sys
import pandas as pd
import awswrangler as wr
import io
s3 = boto3.client('s3')
def split_file(file_size_in_MB, source_bucket, source_bucket_key):
body = s3.get_object(Bucket=source_bucket, Key=source_bucket_key)['Body'] #streaming body
chunk_size = 1024 * 1024 * file_size_in_MB # bytes
newline = '\r\n'.encode()
partial_chunk = b''
counter = 0
while (True):
data = body.read(chunk_size)
if counter == 0:
header = data[0:data.find(newline)] # determine header on first pass
chunk = partial_chunk + data
else:
chunk = header + partial_chunk + data
if chunk == b'':
break
last_newline = chunk.rfind(newline)
result = chunk[0:last_newline+1].decode('utf-8')
print('1 mem size of chunk', round(sys.getsizeof(result)/1024/1024,2))
if len(result) != 0:
df = pd.read_csv(io.StringIO(result))
print('2 mem size of df', round(sys.getsizeof(df)/1024/1024,2))
wr.s3.to_parquet(df=df*1,
path=f's3://{target_stage_bucket}/results{counter}.parquet.gzip',
compression='gzip')
else:
break
partial_chunk = chunk[last_newline+1:]
counter+=1
split_file(file_size_in_MB=50,
source_bucket=source_bucket,
source_bucket_key=source_bucket_key)
Disclaimer: I understand there are improvements that can be made with this code, such as the newline splitting, the while(True), and the potential for timeouts that will need to be handled, I get it, but please remember this is dev code and I would like to focus on the specific problem that is the apparent memory leak that is happening when fired in AWS Lambda - see below:
If I run this function locally on a 1GB sized file streamed into 100MB chunks, I can see the size of each chunked pass and its Pandas equivalent (with a little overhead as expected):
running...
1 mem size of chunk 100.0
2 mem size of df 132.02
1 mem size of chunk 100.0
2 mem size of df 131.97
.....
1 mem size of chunk 100.0
2 mem size of df 132.06
1 mem size of chunk 24.0
2 mem size of df 31.68
1 mem size of chunk 0.0
completed in 0:02:38.995711
and here you can see the memory trajectory is relatively flat for the duration of the script with the expected spikes from each chunk being processed: 
However, the problem is when I implement this same code in Lambda allocated with 512 MB of memory, I receive the following error:
{ "errorType": "MemoryError", "stackTrace": [ " File \"/var/task/lambda_function.py\", line 38, in lambda_handler\n split_file(file_size_in_MB=100,\n", " File \"/var/task/lambda_function.py\", line 79, in split_file\n result = chunk[0:last_newline+1].decode('utf-8')\n" ] }
and the following Log Output where you can see that the code is only making it to the first loop of 100MB of data:
1 mem size of chunk 100.0 [ERROR] MemoryError Traceback (most recent call last): File "/var/task/lambda_function.py", line 38, in lambda_handler split_file(file_size_in_MB=100, File "/var/task/lambda_function.py", line 79, in split_file result = chunk[0:last_newline+1].decode('utf-8')END
So my question is - what is happening here? I would think that 512MB should be plenty of allocated memory to process these 100MB chunks, but in Lambda I run out of memory on the first pass, any ideas?
from Python in AWS Lambda not properly garbage collecting?
No comments:
Post a Comment