Saturday 12 August 2023

How to append rows to an empty pyspark dataframe when each row is generated by a separate thread?

I have an empty dataframe created as below. schema:

table_schema = StructType([
    StructField('bookingNumber', StringType(), True),
    StructField('bookingProfile', StringType(), True),
    StructField('json_string', StringType(), True),
    StructField('json_ingestion_time', TimestampType(), True)
])

def prepare_empty_df(schema: StructType):
    empty_rdd = spark.sparkContext.emptyRDD()
    empty_df = spark.createDataFrame(empty_rdd, schema)
    return empty_df

My data is coming from an API call. Each GET call will return on JSON and I am converting the API response, which is a JSON into a text. I am parsing this JSON for some attributes and then insert into a table. Because I have 200k jsons, I dont want to run 200k insert queries on my table and wanted to append all the results of API JSON calls to an empty dataframe and simply ingest the dataframe. The API calls I make are not sequential rather are parallell threads. i.e., I am running 4 parallell API calls at a time and trying to append the 4 outputs to an empty dataframe. Below is how I am converting API JSON and appending it into the empty dataframe.

Main method:

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
    batch_size = 4
    booking_ids = []
    initial_df = prepare_empty_df(schema=raw_table_schema)

    initial_load = True

            
    cquery = f'select booking_id from db.table limit 20'
    booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings

    for i in range(0, len(booking_ids), batch_size):
        sub_list = booking_ids[i:i + batch_size]
        threads = []
        for index in range(batch_size):
            t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index], initial_df))
            threads.append(t)
            t.start()
        for index, thread in enumerate(threads):
            thread.join()

    print('Final Dataframe count')
    print(initial_df.count())
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    print('Final Dataframe contents')
    initial_df.show()
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')

get_json method:

def get_json(spark: SparkSession, booking_number: str, init_df: DataFrame):
    headers = {"Content-type": "some_content_type"}
    token = doing_something_to_get_token

    token_headers = {'Authorization': f"Bearer {token}"}
    api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
    json_data = spark.sparkContext.parallelize([api_response.text])
    df = spark.read.json(json_data)

    api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
        .withColumn('bookingNumber', lit(booking_number))
        .withColumn('json_string', lit(api_response.text))
        .withColumn('json_ingestion_time', lit(current_timestamp()))
    )
    api_df.show()
    init_df.unionAll(api_df)

I am unioning every row from the API output to initial_df I created in the main method. I can also see data from the api_df due to api_df.show() when the script runs. Four parallell threads are launching and I can see 4 API calls running at a time. But the at the end, empty dataframe: initial_df I created is still empty by the end of the script. The count is zero and basically it prints NULL when I displayed the contents of it.

-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe count
0
-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe contents
+--------------+-------------------+-----------+------------------------+
|bookingNumber |bookingProfile     |json_string|     json_ingestion_time|
+--------------+-------------------+-----------+------------------------+
+--------------+-------------------+-----------+------------------------+

Could anyone let me know what is the mistake I am doing here and how can I correct it? Any help is massively appreciated.



from How to append rows to an empty pyspark dataframe when each row is generated by a separate thread?

No comments:

Post a Comment