Friday, 1 February 2019

How i can maintain a temporary dictonary in a pysaprk application?

I want to use pretrained embedding model (fasttext) in a pyspark application.

So if a broadcast the file (.bin), the following exception is thrown: Traceback (most recent call last):

cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB

Instead, I tried to use sc.addFile(modelpath) where modelpath=path/to/model.bin as following:

i create a file called fasttextSpark.py

import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")

# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
    pred = model[msg]
    return pred

and testSubmit.sh:

#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py

and the test.py:

from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
         .setAppName(appName)
         .set("spark.executor.memory", "12G")
         .set("spark.network.timeout", "800s")
         .set("spark.executor.heartbeatInterval", "20s")
         .set("spark.driver.maxResultSize", "12g")
         .set("spark.executor.instances", 2)
         .set("spark.executor.cores", 30)
         )
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")

# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")

Now, each node will have a copy of the pretrained dataset. Some words are out of vocabulary so each time im facing such words i want to create for it a random but fixed vector and add the word and its vector to a dictionary.

So, How i can maintain such dictonary in each node?

Indeed, suppose my rdd is as following my_rdd = (id, sentence) and i want to find the embedding vector of the sentence by summing up the vectors of its words. How many times the embedding model will be loaded. For example:

suppose rdd=("id1", "motorcycle parts"), does my implementation load the model two times: one for motorcycle and one for parts? if yes, my approach is inefficacce? In this case what it should be the best approaches to be applied?



from How i can maintain a temporary dictonary in a pysaprk application?

No comments:

Post a Comment