Tuesday, 24 August 2021

PySpark - Loop in ForEachBatch leads to "SparkContext should only be created and accessed on the driver" Error

I am trying to read in Data Transformations from a Python Dictionary, apply them in forEachBatch on a DataFrame and then output the data in PySpark:

I am calling writeStream in Spark Streaming as follows:

pipeline.writeStream \
        .queryName(self.pipeline_name) \
        .foreachBatch(self.transformation_chain()) \
        .start()

where transformation_chain is this method:

    def transformation_chain(self):
    def chain(df, epoch_id):
        df_in_edit = df
        df_in_edit = filter_out_null(df_in_edit)
        df_in_edit = split_one_stream_into_two(df_in_edit, "direction", "velocity")
        #for activity in self.data_transformations_workflow:
        #    df_in_edit = activity["transformation"](df_in_edit, *activity["args"])

        df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')

    return chain

However I want to apply a list of generic DataTransformations which take DF as and input and output a DF as well and therefore specify

    data_transformations_workflow = [                              
                                    {
                                      "transformation": filter_out_sensor_columns_presence_validation,
                                      "args": ("value", "direction velocity")
                                    },
                                    {
                                      "transformation": split_one_stream_into_two,
                                      "args": ("direction", "velocity")
                                    }
                                ]

and then iterate over it as:

    def transformation_chain(self):
    def chain(df, epoch_id):
        df_in_edit = df
        for activity in self.data_transformations_workflow:
            df_in_edit = activity["transformation"](df_in_edit, *activity["args"])

        df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')

    return chain

However I receive this error in PySpark:

    WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-30b1f701-8caa-47db-b5fe-5408ccfe09b2;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
    found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
    found org.apache.kafka#kafka-clients;2.4.1 in central
    found com.github.luben#zstd-jni;1.4.4-3 in central
    found org.lz4#lz4-java;1.7.1 in central
    found org.xerial.snappy#snappy-java;1.1.7.5 in central
    found org.slf4j#slf4j-api;1.7.30 in central
    found org.spark-project.spark#unused;1.0.0 in central
    found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 697ms :: artifacts dl 13ms
    :: modules in use:
    com.github.luben#zstd-jni;1.4.4-3 from central in [default]
    org.apache.commons#commons-pool2;2.6.2 from central in [default]
    org.apache.kafka#kafka-clients;2.4.1 from central in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]
    org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in [default]
    org.lz4#lz4-java;1.7.1 from central in [default]
    org.slf4j#slf4j-api;1.7.30 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   9   |   0   |   0   |   0   ||   9   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-30b1f701-8caa-47db-b5fe-5408ccfe09b2
    confs: [default]
    0 artifacts copied, 9 already retrieved (0kB/16ms)
21/08/18 09:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/local/spark/python/lib/pyspark.zip/pyspark/shell.py:42: UserWarning: Failed to initialize Spark session.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shell.py", line 38, in <module>
    spark = SparkSession._create_shell_session()  # type: ignore
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 543, in _create_shell_session
    return SparkSession.builder\
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 384, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1277, in _assert_on_driver
    raise Exception("SparkContext should only be created and accessed on the driver.")
Exception: SparkContext should only be created and accessed on the driver.

it seems that it is reinitiliazing Spark on every Iteration. I only receive the Error if I am doing the CSV Write Operation after the For Loop. It works if I put it before the for Loop (but that's obviously not what I am intending to achieve)

So it seems, that the combination of this loop, which executes the exact same functions as before and the csv writes leads to the error?

I appreciate any other hints! I don't think the error is really helpful as I am not really using the Context in an Executor

Thank you very much and best regards!

UPDATE1:

        def chain(df, epoch_id):
            df_in_edit = df
            data_transformations_workflow_local = [
                {
                    "transformation": filter_out_sensor_columns_presence_validation,
                    "args": ("value", "direction velocity")
                },
                {
                    "transformation": split_one_stream_into_two,
                    "args": ("direction", "velocity")
                }
            ]
            for activity in data_transformations_workflow_local:
                df_in_edit = activity["transformation"](df_in_edit, *activity["args"])

            df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')

        return chain

locally declaring the variable does not help: self in fact is a class I designed myself for this class:

class DataPipelineGenerator:
    pipeline_name = "pipeline"
    subscribed_topic = "subscribed"
    data_transformations_workflow = [                              
                                        {
                                          "transformation": filter_out_sensor_columns_presence_validation,
                                          "args": ("value", "direction velocity")
                                        },
                                        {
                                          "transformation": split_one_stream_into_two,
                                          "args": ("direction", "velocity")
                                        }
                                    ]
    output_path = "/home/jovyan/work/notebooks/pipelines/"
    output_format = "csv"
    read_stream = []

    def __init__(self, pipeline_name, subscribed_topic, data_transformations_workflow, output_path, output_format,spark):
        self.pipeline_name = pipeline_name
        self.subscribed_topic = subscribed_topic
        self.output_path = output_path
        self.output_format = output_format
        self.read_stream = self.init_read_stream(spark)

    def init_read_stream(self, spark):
        return spark \
            .readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', bootstrap_servers) \
            .option("startingOffsets", "earliest") \
            .option('subscribe', self.subscribed_topic) \
            .load() \
            .selectExpr('CAST(key AS STRING)', 'CAST(value AS STRING)') \
            .select(json_tuple(col("value"), "ts", "key", "value")) \
            .selectExpr("c0 as ts", "c1 as key", "c2 as value") \
            .withColumn("key_temp", expr("substring(key, {}, length(key))".format(clipping_prefix))) \
            .drop("key") \
            .withColumnRenamed("key_temp", "key")

    def init_write_stream(self):
        return self.read_stream.writeStream \
            .queryName(self.pipeline_name) \
            .foreachBatch(self.transformation_chain()) \
            .start()

    def transformation_chain(self):
        def chain(df, epoch_id):
            df_in_edit = df
            data_transformations_workflow_local = [
                {
                    "transformation": filter_out_sensor_columns_presence_validation,
                    "args": ("value", "direction velocity")
                },
                {
                    "transformation": split_one_stream_into_two,
                    "args": ("direction", "velocity")
                }
            ]
            for activity in data_transformations_workflow_local:
                df_in_edit = activity["transformation"](df_in_edit, *activity["args"])

            df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')

        return chain

UPDATE 2: I think I might have found the problem. In the Python File, where I imported the Data Tranformations from, the following was called:

        emp_RDD = spark.sparkContext.emptyRDD()

Therefore on every import/usage of this file a new context was trying to be created. I am still unsure however why this problem did not occur when I was using the methods individually from the file (like using it iteratively instead of the for loop)



from PySpark - Loop in ForEachBatch leads to "SparkContext should only be created and accessed on the driver" Error

No comments:

Post a Comment