Friday, 27 August 2021

PySpark: Efficient handling of foreign keys

I'm reading a CSV file that logs some transactions, like this:

l = [("Garfield", "Lasagna", "2021-08-01"),
     ("Calvin", "Tiger Plushie", "2021-08-02"),
     ("Calvin", "Lasagna", "2021-08-03")]
transactions = spark.createDataFrame(l, ["name", "product", "date"])

transactions.show()

+--------+-------------+----------+
|    name|      product|      date|
+--------+-------------+----------+
|Garfield|      Lasagna|2021-08-01|
|  Calvin|Tiger Plushie|2021-08-02|
|  Calvin|      Lasagna|2021-08-03|
+--------+-------------+----------+

I want to store these values in a SQL database, in a star schema, with tables created like this:

CREATE TABLE [dbo].[test_customer]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [name] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_product]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [product] VARCHAR(50) NOT NULL
);
CREATE TABLE [dbo].[test_transaction]
(
    [id] INT IDENTITY PRIMARY KEY, -- Primary Key column
    [fk_customer] INT NOT NULL,
    [fk_product] INT NOT NULL,
    [date] DATE
);

I can write to my database the customers and products info with:

JDBCURL = "jdbc:sqlserver://{}:1433;database={};user={};password={}"

customers = transactions.select("name").drop_duplicates()    
customers.write.jdbc(JDBCURL, table="dbo.test_customer", mode="append")

products = transactions.select("product").drop_duplicates()    
products.write.jdbc(JDBCURL, table="dbo.test_product", mode="append")

But if I want to store the transaction details, I need to fetch the data from my database, and join it to my in-memory dataframe:

sql_customers = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_customer").load()
sql_products = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable="dbo.test_product").load()

transactions_to_sql = (transactions.join(sql_customers, on="name")
                      .drop("name")
                       .withColumnRenamed("id", "fk_customer")
                      .join(sql_products, on="product")
                      .drop("product")
                       .withColumnRenamed("id", "fk_product")
                      )

transactions_to_sql.write.jdbc(JDBCURL, table="dbo.test_transaction", mode="append")

Which doesn't sound very efficient if (when) I have millions of customers and millions of products.

How can I handle auto-generated identifiers for my customers and products, and foreign keys between my tables?



from PySpark: Efficient handling of foreign keys

No comments:

Post a Comment