I have a PySpark application that needs to read files from an Azure blob storage account where the files are partitioned into folders every 5 minutes in this format:
\Root\yyyy\mm\dd\HH\MM\files.csv
I have a process that runs every hour and wants to process all the files since it last ran (which could be longer than an hour if a run was missed). I manage a high watermark which tells me the last folder time processed.
Inside the file there is also a datetime field which matches the path datetime (with more detail to the second).
Note that I cannot change the folder structure to Sparks preferred partitioning method of year=yyyy\month=mm etc.
I've written this function:
from datetime import datetime
def folderDateTimeRange(startDateTime, endDateTime, levels=5):
if startDateTime.year != endDateTime.year:
return '/{*}' * levels
elif startDateTime.month != endDateTime.month:
return datetime.strftime(startDateTime, '%Y') + '/{*}' * (levels - 1)
elif startDateTime.day != endDateTime.day:
return datetime.strftime(startDateTime, '%Y/%m') + '/{*}' * (levels - 2)
elif startDateTime.hour != endDateTime.hour:
return datetime.strftime(startDateTime, '%Y/%m/%d') + '/{*}' * (levels - 3)
else:
return ""
This limits the number of folders read in most cases. I still need to filter that data is read by the same Start and End times that are passed into the function because 23:00 to 01:00 the next day would return {*} in the day and hour portions - hence I think this could be more efficient.
In the worst example you pass in start = 2018-12-31 22:00:00 and end = 2019-01-01 01:00:00 - this causes all data for all years to be read.
My knowledge of globs is limited - but is it possible to pass a range rather than {*}?
from Reading files in \yyyy\mm\dd\HH\MM\ ranges
No comments:
Post a Comment