Monday 1 May 2023

PySpark remove duplicated messages within a 24h window after an initial new value

I have a dataframe with a status (integer) and a timestamp. Since I get a lot of "duplicated" status messages, I want to reduce the dataframe by removing any row which repeats a previous status within a 24h window after a "new" status, meaning:

  • The first 24h window starts with the first message of a specific status.
  • The next 24h window for that status starts with the next message that comes after that first 24h window (the windows are not back-to-back).

Given the example:

data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),

        (20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
      ]

myschema = StructType(
    [
        StructField("status", IntegerType()),
        StructField("ts", TimestampType())
    ]
)
df = spark.createDataFrame(data=data, schema=myschema)
  • The first 24h window for status 10 is from 2022-01-01 00:00:00 until 2022-01-02 00:00:00.
  • The second 24h window for status 10 is from 2022-01-02 05:00:00 until 2022-01-03 05:00:00.
  • The first 24h window for status 20 is from 2022-01-01 03:00:00 until 2022-01-02 03:00:00.

As a result, I want to keep the messages:

data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
        (10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),

        (20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
      ]

I know how to do this in Python by looping and keeping track of the latest change and I think I need to use a Window function with partitionBy + orderBy, but I cannot figure out the details... any help is appreciated.



from PySpark remove duplicated messages within a 24h window after an initial new value

No comments:

Post a Comment