I have a dataframe with a status (integer) and a timestamp. Since I get a lot of "duplicated" status messages, I want to reduce the dataframe by removing any row which repeats a previous status within a 24h window after a "new" status, meaning:
- The first 24h window starts with the first message of a specific status.
- The next 24h window for that status starts with the next message that comes after that first 24h window (the windows are not back-to-back).
Given the example:
data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-01 23:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 06:00:00", "%Y-%m-%d %H:%M:%S")),
(20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]
myschema = StructType(
[
StructField("status", IntegerType()),
StructField("ts", TimestampType())
]
)
df = spark.createDataFrame(data=data, schema=myschema)
- The first 24h window for status
10
is from2022-01-01 00:00:00
until2022-01-02 00:00:00
. - The second 24h window for status
10
is from2022-01-02 05:00:00
until2022-01-03 05:00:00
. - The first 24h window for status
20
is from2022-01-01 03:00:00
until2022-01-02 03:00:00
.
As a result, I want to keep the messages:
data = [(10, datetime.datetime.strptime("2022-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")),
(10, datetime.datetime.strptime("2022-01-02 05:00:00", "%Y-%m-%d %H:%M:%S")),
(20, datetime.datetime.strptime("2022-01-01 03:00:00", "%Y-%m-%d %H:%M:%S"))
]
I know how to do this in Python by looping and keeping track of the latest change and I think I need to use a Window function with partitionBy + orderBy, but I cannot figure out the details... any help is appreciated.
from PySpark remove duplicated messages within a 24h window after an initial new value
No comments:
Post a Comment