Tuesday, 8 September 2020

PySpark: How To Call APIs/Web Services Without Hitting Rate Limit?

I have a Spark DataFrame with 4 columns: location_string, locality, region, and country. I am using Google Map's Geocode API to parse each location_string and then use the results to fill in the NULL locality, region and country fields.

I have made the function that calls the geocoding library a udf, but the problem I'm facing is that I eventually get an 'OVERLIMIT' response status when I exceed the rate limit of Google's API policy.

Here is an example of the Spark DataFrame:

+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string                                                                                         |locality    |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan                                                                             |Tainan City |null  |TWN    |
|093 Cicero, IL                                                                                          |null        |null  |null   |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429                                                            |null        |null  |null   |
|10210  Baltimore Avenue, College Park, MD, US 20740                                                     |College Park|MD    |null   |
|12 Braintree - Braintree, MA, 02184                                                                     |null        |null  |null   |
|1215 E.Main St. #1074 Carbondale, IL 62901,                                                             |null        |null  |null   |
|18 Fairview Heights - Fairview Heights, IL, 62208                                                       |null        |null  |null   |
|21000 Hayden Dr, Woodhaven, MI, US 48183                                                                |null        |null  |null   |
|2257 N. Germantown Pkwy in Cordova, TN                                                                  |null        |null  |null   |
|2335 S. Towne Ave., Pomona, CA, US 91766                                                                |Pomona      |CA    |null   |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES             |null        |null  |null   |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null        |null  |null   |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101                                                        |St. Paul    |MN    |null   |
|4604 Lowe Road, Louisville, KY, US 40220                                                                |Louisville  |KY    |null   |
|4691 Springboro Pike, Moraine, OH, US 45439                                                             |null        |null  |null   |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753                                                             |null        |null  |null   |
|5188 Commerce Dr., Baldwin Park, CA, US 91706                                                           |Baldwin Park|CA    |null   |
|55445                                                                                                   |null        |null  |null   |
|5695 Harvey St, Muskegon, MI 49444                                                                      |null        |null  |null   |
|6464 Downing Street, Denver, CO, US 80229                                                               |null        |null  |null   |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+

To navigate around this issue, I have a function like this:

def geocoder_decompose_location(location_string):
    if not location_string:
        return Row('nation', 'state', 'city')(None, None, None)
    
    GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]
    
    GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)
    
    attempts = 0
    success = False
    while status != True and attempts < 5:
        result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
        attempts += 1
        status = result.status
        if status == 'OVER_QUERY_LIMIT':
            time.sleep(2)
            
            # retry
            continue
        
        success = True
    
    if attempts == 5:
        print('Daily Limit Reached')
        
    return Row('nation', 'state', 'city')(result.country, result.state, result.city)

But it doesn't appear to be working on a spark dataframe as expected. Any guidance would be much appreciated!



from PySpark: How To Call APIs/Web Services Without Hitting Rate Limit?

No comments:

Post a Comment