I'm trying to use mrJobs with a csv file. The problem is the csv file has input spanned over multiple lines.
Searching through the mrJob documentation, I think I need to write a custom protocol to handle the input.
I tried to write my own protocol below, multiLineCsvInputProtocol
, but I am already getting an error: TypeError: a bytes-like object is required, not 'str'
Not going to lie in that I think I am over my head here.
Basically each new row of data in the multi-line csv file starts with a datestring. I want to read input line by line, spit each line on the commas, store the values in a list, and whenever a new line starts with a datestring, I want to yield
the entire list to the first mapper.
(That or find some other better way to read multi-line csv input)
Can anyone help me get passed this error?
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self, line):
key, val = enumerate(line.split(',', 1))
return key, val
class someTask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_, row):
yield (row, 1 )
if __name__ == '__main__':
MRFindReciprocal.run()
from how to write a custom protocol for multiple line input into mrJobs
No comments:
Post a Comment