I'm trying to call a list of pyspark modules dynamically from one main.py python script, using Import module and subprocess. The child modules I'm trying to call does not return anything, it just does its ETL operation. I want my main.py program to wait until the child process completes. In the code below, everytime I'm trying to call the child process, I end up with the error "TypeError: 'NoneType' object is not iterable". One other problem is, after initiating the subprocess.Popen, I thought the flow will keep continuing in the main.py to next line, until it hits the j1.wait(), but the immediate print statment (print("etl_01_job is running") is not executing, Am I missing anything?
I googled and tried a lot of other ways, but nothing is working. can anyone shed some light on what am I doing wrong? Once I'm able to successfully call the child process, I have to add few other conditions based on the return code of the child proces. But at this point, I want to fix this issue. Thanks
main.py
import json
import importlib
import subprocess
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
def main():
with open('C:/Pyspark/test/config/config.json', 'r') as config_file:
config = json.load(config_file)
spark = SparkSession.builder\
.appName(config.get("app_name"))\
.getOrCreate()
job_module1 = importlib.import_module("etl_01_job")
print("main calling time :", datetime.now())
j1 = subprocess.Popen(job_module1.run_etl_01_job(spark, config))
print("etl_01_job is running")
j1.wait() #I'm expecting the main.py to wait until child process finishes
print("etl_01_job finished")
job_module2 = importlib.import_module("etl_02_job")
j2 = subprocess.Popen(job_module2.run_etl_02_job(spark, config))
if __name__ == "__main__":
main()
Child pyspark job:etl_01_job.py : Not the original code, just a sample script
from datetime import datetime
import time
import sys
def etl_01_job(spark, config):
print("I'm in 01etljob")
print(config)
print(config.get("app_name"))
time.sleep(10)
print("etljob 1 ending time :", datetime.now())
def run_etl_01_job(spark, config):
etl_01_job(spark, config)
The error I'm getting is
Traceback (most recent call last):
File "C:/py_spark/src/main.py", line 49, in <module>
main()
File "C:/py_spark/src/main.py", line 38, in main
p1 = subprocess.run(job_module1.run_etl_01_job(spark, config))
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 489, in run
with Popen(*popenargs, **kwargs) as process:
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 854, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 1247, in _execute_child
args = list2cmdline(args)
File "C:\ProgramData\Anaconda3\lib\subprocess.py", line 549, in list2cmdline
for arg in map(os.fsdecode, seq):
TypeError: 'NoneType' object is not iterable
from Error while Importing pyspark ETL module and running as child process using pything subprocess
No comments:
Post a Comment