Friday 19 March 2021

how to write a custom protocol for multiple line input into mrJobs

I'm trying to use mrJobs with a csv file. The problem is the csv file has input spanned over multiple lines.

Searching through the mrJob documentation, I think I need to write a custom protocol to handle the input.

I tried to write my own protocol below, multiLineCsvInputProtocol, but I am already getting an error: TypeError: a bytes-like object is required, not 'str'

Not going to lie in that I think I am over my head here.

Basically each new row of data in the multi-line csv file starts with a datestring. I want to read input line by line, spit each line on the commas, store the values in a list, and whenever a new line starts with a datestring, I want to yield the entire list to the first mapper.

(That or find some other better way to read multi-line csv input)

Can anyone help me get passed this error?

import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol

class multiLineCsvInputProtocol(object):
    def read(self, line):
        key, val = enumerate(line.split(',', 1))
        return key, val


class someTask(MRJob):

  INPUT_PROTOCOL = multiLineCsvInputProtocol

  def mapper1(self,_, row):
    yield (row, 1 )


if __name__ == '__main__':
    MRFindReciprocal.run()
    


from how to write a custom protocol for multiple line input into mrJobs

No comments:

Post a Comment