Monday, 30 December 2019

Slowly Changing Lookup Cache from BigQuery - Dataflow Python Streaming SDK

I am trying to follow the design pattern for Slowly Changing Lookup Cache (https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1) for a streaming pipeline using the Python SDK for Apache Beam on DataFlow.

Our reference table for the lookup cache sits in BigQuery, and we are able to read and pass it in as a Side Input to the ParDo operation but it does not refresh regardless of how we set up the trigger/windows.

class FilterAlertDoFn(beam.DoFn):
  def process(self, element, alertlist):

    print len(alertlist)
    print alertlist

    …  # function logic

alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
                        | ‘alert_side_input’ >> beam.WindowInto(
                            beam.window.GlobalWindows(),
                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
                                late=trigger.AfterCount(1)
                            )),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                          )
                       | beam.Map(lambda elem: elem[‘SOMEKEY’])
)

...


main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))

Based on the I/O page here (https://beam.apache.org/documentation/io/built-in/) it says Python SDK supports streaming for the BigQuery Sink only, does that mean that BQ reads are a bounded source and therefore can’t be refreshed in this method?

Trying to set non-global windows on the source results in an empty PCollection in the Side Input.


UPDATE: When trying to implement the strategy suggested by Pablo's answer, the ParDo operation that uses the side input wont run.

There is a single input source that goes to two output's, one of then using the Side Input. The Non-SideInput will still reach it's destination and the SideInput pipeline wont enter the FilterAlertDoFn().

By substituting the side input for a dummy value the pipeline will enter the function. Is it perhaps waiting for a suitable window that doesn't exist?

With the same FilterAlertDoFn() as above, my side_input and call now look like this:

def refresh_side_input(_):
   query = 'select col from table'
   client = bigquery.Client(project='gcp-project')
   query_job = client.query(query)

   return query_job.result()


trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
            subscription=known_args.trigger_subscription))


bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
         | beam.WindowInto(beam.window.GlobalWindows(),
                           trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
         | beam.Map(refresh_side_input)
        ))

...

# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)

# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])

I tried a few different versions of refresh_side_input(), They report the expect result when checking the return inside the function.


UPDATE 2:

I made some minor modifications to Pablo's code, and I get the same behaviour - the DoFn never executes.

In the below example I will see 'in_load_conversion_data' whenever I post to some_other_topic but will never see 'in_DoFn' when posting to some_topic

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def load_my_conversion_data():
    return {'EURUSD': 1.1, 'USDMXN': 4.4}


def load_conversion_data(_):
    # I will suppose that these are currency conversions. E.g.
    # {'EURUSD': 1.1, 'USDMXN' 20,}
    print 'in_load_conversion_data'
    return load_my_conversion_data()


class ConvertTo(beam.DoFn):
    def __init__(self, target_currency):
        self.target_currency = target_currency

    def process(self, elm, rates):
        print 'in_DoFn'
        elm = elm.attributes
        if elm['currency'] == self.target_currency:
            yield elm
        elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
            rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
            result = {}.update(elm).update({'currency': self.target_currency,
            'value': elm['value']*rate})
             yield result
         else:
             return  # We drop that value


pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'

with beam.Pipeline(options=pipeline_options) as p:

    table_pcv = beam.pvalue.AsSingleton((
      p
      | 'some_other_topic' >>  beam.io.ReadFromPubSub(topic=some_other_topic,  with_attributes=True)
      | 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
                        trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                        accumulation_mode=trigger.AccumulationMode.DISCARDING)
      | beam.Map(load_conversion_data)))


    _ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
         | 'some_window' >> beam.WindowInto(window.FixedWindows(1))
         | beam.ParDo(ConvertTo('USD'), rates=table_pcv))


from Slowly Changing Lookup Cache from BigQuery - Dataflow Python Streaming SDK

No comments:

Post a Comment