Thursday, 27 August 2020

Evaluate Xquery in pyspark on RDD elements

We are trying to read large number of XML's and run Xquery on them in pyspark for example books xml. We are using spark-xml-utils library.

  • We want to feed the directory containing xmls to pyspark.
  • Run Xquery on all of them to get our results.

reference answer: Calling scala code in pyspark for XSLT transformations

The definition of xquery processor where xquery is the string of xquery:

proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)

We are reading the files in a directory using:

sc.wholeTextFiles("xmls/test_files")

This gives us an RDD containing all the files as a list of tuples:

[ (Filename1,FileContentAsAString), (Filename2,File2ContentAsAString) ]

The xquery evaluates and gives us results if we run on the string (FileContentAsAString)

whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file

Problem:

If we try to run proc.evaluate() on the RDD using lambda function, it is failing.

test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()

# Should give us a list of xquery results 

Error:

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

These functions work somehow but not the evaluate above:

Print the content xquery is applied on

test_file.map(lambda x: x[1]).collect()

# Outputs the content. if x[0], gives us the list of filenames

Return the len of characters in the contents

test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]

Books example for reference:

books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""

proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)

books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want


from Evaluate Xquery in pyspark on RDD elements

No comments:

Post a Comment