Sunday, 28 February 2021

How to asynchronously distribute product-yielding from a dataframe in scrapy spider

Is there a way to utilize Scrapy's asynchronous architecture when yielding products from a dataframe?


Overview

I have a spider in one of my Scrapy projects that differs from your typical logic of a spider as follows:

  1. Crawls an online file directory to get the most recent versions of two zip files that both contain multiple csv files
  2. Extracts the csv files to the current working directory
  3. Utilizes pandas.read_csv to read each csv into its own dataframe
  4. Performs pandas.merge operations to combine the data into a two final dataframes (one is the main dataframe and the other is a supporting dataframe where there's a one-to-many (main-to-supporting) row relationship
  5. Finally, the spider loops through the main dataframe, starts populating a scrapy item, then gathers the additional data from the supporting dataframe, and yields a complete item

The logic works, but the process takes about 5 1/2 hours to complete since it's dealing with 500k items and essentially becomes synchronous once it starts yielding items from the dataframes.

Below is the code I'm using to accomplish all of this. The bottleneck happens in the process_csvs function.

from ..items.redacted import Loader, REDACTEDItem
from scrapy.exceptions import CloseSpider
from datetime import datetime
import pandas as pd
import numpy as np
import zipfile
import scrapy
import json
import os


class REDACTEDSpider(scrapy.Spider):
    name = 'REDACTED'
    allowed_domains = ['REDACTED']
    start_urls = ['https://REDACTED/datasets/']
    # custom_settings = dict(TESTING_MODE=True, LOG_LEVEL='DEBUG')
    zip_filename = 'temp_redacted_data.zip'

    def parse(self, response):
        main_file_date = supporting_file_date = datetime.min
        main_file = supporting_file = None
        for link in response.xpath('//a[contains(@href, "primary_csv")]/@href').getall():
            link_date = datetime.strptime(link.rstrip('.zip')[-10:], '%Y-%m-%d')
            if link_date > main_file_date:
                main_file = link
                main_file_date = link_date
        if not main_file:
            raise CloseSpider('primary_csv zip file not found')
        self.logger.info('Found latest primary_csv file link (%s)' % main_file)
        main_file = f"https://REDACTED/datasets/{main_file}"
        for link in response.xpath('//a[contains(@href, "supporting_csv")]/@href').getall():
            link_date = datetime.strptime(link.rstrip('.zip')[-10:], '%Y-%m-%d')
            if link_date > supporting_file_date:
                supporting_file = link
                supporting_file_date = link_date
        if not supporting_file:
            raise CloseSpider('supporting_csv zip file not found')
        self.logger.info('Found latest supporting_csv file link (%s)' % supporting_file)
        supporting_file = f"https://REDACTED/datasets/{supporting_file}"

        # we pass supporting_file to essentially download the files sequentially
        # and so that we can make sure the files are downloaded before moving on to ingesting them
        self.logger.info('Downloading primary_csv zip file')
        yield scrapy.Request(main_file, callback=self.handle_zip, cb_kwargs=dict(supporting_file=supporting_file))

    def handle_zip(self, response, supporting_file=None):
        file_alias = 'primary_csv' if supporting_file else 'supporting_csv'
        # download zip - if this is the second time this function is called it will overwrite the first zip file
        # since we've already extracted the files we need from it
        self.logger.info(f"Saving {file_alias} zip file")
        with open(self.zip_filename, 'wb') as usda_file:
            usda_file.write(response.body)
        # extract zip contents
        self.logger.info(f"Extracting files from {file_alias} zip file")
        with zipfile.ZipFile(self.zip_filename, 'r') as zfile:
            if supporting_file:
                # we're extracting the first file, and still need to queue up the supporting_file
                zfile.extract('primary_csv_file_1.csv')
                zfile.extract('primary_csv_file_2.csv')
                zfile.extract('primary_csv_file_3.csv')
            else:
                # we're extracting the supporting_file now
                zfile.extract('supporting_csv_file.csv')

        if supporting_file:
            self.logger.info('Downloading supporting_csv zip file')
            yield scrapy.Request(supporting_file, callback=self.handle_zip)
        else:
            # remove the zipfile since we no longer need it
            # this will free up some storage space in case we need extra for the staging db
            os.remove(self.zip_filename)
            # both files have been unzipped, so we can move onto processing the csvs
            self.logger.info('Processing CSV files')
            
            # FIXME: This essentially bottlenecks into yielding items from a single thread
            yield from self.process_csvs()
    
    def process_csvs(self):
        primary_csv_file_1 = pd.read_csv('primary_csv_file_1.csv', usecols=[...], dtype=dict(...))
        primary_csv_file_2 = pd.read_csv('primary_csv_file_2.csv', usecols=[...], dtype=dict(...))
        primary_csv_file_3 = pd.read_csv('primary_csv_file_3.csv', usecols=[...], dtype=dict(...))
        supporting_csv_file = pd.read_csv('supporting_csv_file.csv', usecols=[...], dtype=dict(...))

        # Join the above four files into two pandas dataframes
        # Step 1: Join primary_csv_file_2.csv into primary_csv_file_1.csv
        primary_csv_file_1 = pd.merge(primary_csv_file_1, primary_csv_file_2, on='id', how='left')
        primary_csv_file_1.replace(np.nan, '', regex=True, inplace=True)
        # primary_csv_file_1 should now have most of the essential columns needed to create a full item
        # Step 2: Join supporting_csv_file.csv into primary_csv_file_3.csv
        primary_csv_file_3 = pd.merge(primary_csv_file_3, supporting_csv_file, left_on='supporting_id', right_on='id', how='left')
        primary_csv_file_3.replace(np.nan, '', regex=True, inplace=True)
        # primary_csv_file_3 should now have an additional column from supporting_csv_file

        # TODO: This is where I would like to fork the function in order to take full advantage of Scrapy's asynchronous processing
        for product in primary_csv_file_1.itertuples():
            loader = Loader(item=REDACTEDItem())
            loader.add_value('url', 'REDACTED')
            loader.add_value('category', product.category)
            loader.add_value('upc', product.upc)
            loader.add_value('brand', product.brand)
            loader.add_value('product_name', product.name)

            # Filter primary_csv_file_3 by id to get all nutrients and nutrient values for this product
            p_nutrients = primary_csv_file_3[primary_csv_file_3.id == product.supporting_id]
            nutrients = []
            for nutrient in p_nutrients.itertuples():
                nutrients.append(dict(
                    alias=nutrient.name,
                    value=nutrient.amount,
                    unit_of_measure=nutrient.units
                ))
            loader.add_value('nutrition', json.dumps(nutrients))

            yield loader.load_item()

        # remove the csv files to free up space
        os.remove('primary_csv_file_1.csv')
        os.remove('primary_csv_file_2.csv')
        os.remove('primary_csv_file_3.csv')
        os.remove('supporting_csv_file.csv')


from How to asynchronously distribute product-yielding from a dataframe in scrapy spider

No comments:

Post a Comment