Friday, 19 August 2022

Dask SQLAlchemy query fails in Docker workers: Exception: cannot pickle 'weakref' object

I have a Dask distributed application running workers on Docker containers. Problem is that when I run an SQLAlchemy read_sql_query statement, I get an exception in the workers saying Exception: cannot pickle 'weakref' object

Note: this runs fine with local workers, only fails with Docker.

This is the reduced code:

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class test_loans(Base):
  __tablename__ = 'test_loans'
  loan_id = Column(String)
  fico_score = Column(Integer)
  __table_args__ = (PrimaryKeyConstraint('fico_score'),)

t = aliased(test_loans)
stmt2 = select([ t.loan_id,t.fico_score])
ddf = dd.read_sql_query(stmt2, con=db_str, index_col='fico_score', npartitions=20)
ddf.compute() # <----- fails here when the statement is triggered in the workers

And this is the exception:

2022-08-15 20:37:26,984 - distributed.protocol.pickle - INFO - Failed to serialize (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://user:xxxxx@host.docker.internal:3306/bank_0001', Empty DataFrame
Columns: [loan_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']])). Exception: cannot pickle 'weakref' object
2022-08-15 20:37:26,987 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/conda/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 100, in _encode_default
    frames.extend(create_serialized_sub_frames(obj))
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 60, in create_serialized_sub_frames
    sub_header, sub_frames = serialize_and_split(
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 444, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 266, in serialize
    return serialize(
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 316, in serialize
    headers_frames = [
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 317, in <listcomp>
    serialize(
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple', "(<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame\nColumns: [loan_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']]))")
2022-08-15 20:37:26,989 - distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph-hlg', 'hlg': {'layers': [{'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {"('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 8)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 8)>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)>, "('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)": <Serialize: (subgraph_callable-8219b5cb-9a04-436c-ad29-d643f53e22d3, (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame
Columns: [loan_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']])))>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 1)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 1)>, "('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)": <Serialize: (subgraph_callable-8219b5cb-9a04-436c-ad29-d643f53e22d3, (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5bb0>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame

Is this a defect or I'm doing something wrong?

Package versions:

dask                         2022.8.0
dask-glm                     0.2.0
dask-ml                      2022.5.27
dask-xgboost                 0.2.0
distributed                  2022.8.0
SQLAlchemy                   1.4.40
cloudpickle                  2.1.0


from Dask SQLAlchemy query fails in Docker workers: Exception: cannot pickle 'weakref' object

No comments:

Post a Comment