Sunday, 7 October 2018

multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects

I followed the below code in order to implement a parallel select query on a postgres database:

https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/

My basic problem is that I have ~6k queries that need to be executed, and I am trying to optimise the execution of these select queries. Initially it was a single query with the where id in (...) contained all 6k predicate IDs but I ran into issues with the query using up > 4GB of RAM on the machine it ran on, so I decided to split it out into 6k individual queries which when synchronously keeps a steady memory usage. However it takes a lot longer to run time wise, which is less of an issue for my use case. Even so I am trying to reduce the time as much as possible.

This is what my code looks like:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.engine = self.init_connection()
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS)

    def init_connection(self):
        LOGGER.info('Creating Postgres engine')
        return create_engine(self.db_url)

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            self.pool.close()
            self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        con = psycopg2.connect(self.db_url)
        cur = con.cursor()
        cur.execute(query)
        records = cur.fetchall()
        con.close()

        return list(records)

However whenever this runs, I get the following error:

TypeError: can't pickle _thread.RLock objects

I've read lots of similar questions regarding the use of multiprocessing and pickleable objects but I cant for the life of me figure out what I am doing wrong.

The pool is generally one per process (which I believe is the best practise) but shared per instance of the connector class so that its not creating a pool for each use of the parallel_query method.

The top answer to a similar question:

Accessing a MySQL connection pool from Python multiprocessing

Shows an almost identical implementation to my own, except using MySql instead of Postgres.

Am I doing something wrong?

Thanks!

EDIT:

I've found this answer:

Python Postgres psycopg2 ThreadedConnectionPool exhausted

which is incredibly detailed and looks as though I have misunderstood what multiprocessing.Pool vs a connection pool such as ThreadedConnectionPool gives me. However in the first link it doesn't mention needing any connection pools etc. This solution seems good but seems A LOT of code for what I think is a fairly simple problem?

EDIT 2:

So the above link solves another problem, which I would have likely run into anyway so I'm glad I found that, but it doesnt solve the initial issue of not being able to use imap_unordered down to the pickling error. Very frustrating.

Lastly, I think its probably worth noting that this runs in Heroku, on a worker dyno, using Redis rq for scheduling, background tasks etc and a hosted instance of Postgres as the database.



from multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects

No comments:

Post a Comment