I am using luigi to execute a chain of tasks, like so:
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('test.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
def requires(self):
return Task1(stuff=self.stuff)
def output(self):
return luigi.LocalTarget('something-else.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
This works exactly as desired when I start the entire workflow like so:
luigi.build([Task2(stuff='stuff')])
When using luigi.build
you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.
However, in my situation, I would also like to be able to run the business logic of Task2
completely independently of it's involvement in the workflow. This works fine for tasks that do not implement requires
, as per this example.
My question is, how can I run this method both as part of the workflow, as well as on it's own? Obviously, I could just add a new private method like _my_custom_run
, which takes the data and returns the result, and then use this method in run
, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi's best practices (still learning the framework). Any advice is appreciated, thanks!
from Luigi - Overriding Task requires/input
No comments:
Post a Comment