Thursday, 28 June 2018

Implementing luigi dynamic graph configuration

I am new to luigi, came across it while designing a pipeline for our ML efforts and though it wasn't fitted to my particular use case it had so many extra features I decided to fit it to my use case, basically what I was looking for was a way to be able to persist a custom built pipeline and thus have its result repeatable, after reading most of the online tutorials I tried to implement my serialization using the existing luigi.cfg configuration and command line mechanisms and it might have sufficed for the tasks' parameters but it provided no way of serializing the DAG connectivity of my pipeline, so I decided to have a WrapperTask which received a json config file which would create all the task instances and connect all the plumbing. I hereby enclose a small test program for your scrutiny:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

So, basically, as states in the question title, this focuses on the dynamic dependencies and generates a 513 node dependency DAG with p=1/35 connectivity probability, it also makes the All WrapperTask class require all nodes (I have a version which only connects it to heads of connected DAG components but I didn't want to over complicate).

Is there a more standard way of implementing this? Especially note the not so pretty complication with the TaskNode init and set_required methods, I only did it this way because receiving parameters in the init method clashes somehow with the way luigi registers parameters. I also tried several other ways but this was basically the most decent one (that worked)

If there isn't a standard way I'd still love to hear any insights you have on the way I plan to go before I implement the entire piping framework.



from Implementing luigi dynamic graph configuration

No comments:

Post a Comment