I have a Dask distributed application running workers on Docker containers. Problem is that when I run an SQLAlchemy read_sql_query statement, I get an exception in the workers saying Exception: cannot pickle 'weakref' object
Note: this runs fine with local workers, only fails with Docker.
This is the reduced code:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class test_loans(Base):
__tablename__ = 'test_loans'
loan_id = Column(String)
fico_score = Column(Integer)
__table_args__ = (PrimaryKeyConstraint('fico_score'),)
t = aliased(test_loans)
stmt2 = select([ t.loan_id,t.fico_score])
ddf = dd.read_sql_query(stmt2, con=db_str, index_col='fico_score', npartitions=20)
ddf.compute() # <----- fails here when the statement is triggered in the workers
And this is the exception:
2022-08-15 20:37:26,984 - distributed.protocol.pickle - INFO - Failed to serialize (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://user:xxxxx@host.docker.internal:3306/bank_0001', Empty DataFrame
Columns: [loan_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']])). Exception: cannot pickle 'weakref' object
2022-08-15 20:37:26,987 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/opt/conda/lib/python3.8/site-packages/msgpack/__init__.py", line 38, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 100, in _encode_default
frames.extend(create_serialized_sub_frames(obj))
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/core.py", line 60, in create_serialized_sub_frames
sub_header, sub_frames = serialize_and_split(
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 444, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 266, in serialize
return serialize(
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 316, in serialize
headers_frames = [
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 317, in <listcomp>
serialize(
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 366, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple', "(<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame\nColumns: [loan_id]\nIndex: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']]))")
2022-08-15 20:37:26,989 - distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph-hlg', 'hlg': {'layers': [{'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {"('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 8)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 8)>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)>, "('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 2)": <Serialize: (subgraph_callable-8219b5cb-9a04-436c-ad29-d643f53e22d3, (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5430>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame
Columns: [loan_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'fico_score']])))>, "('from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 1)": <Serialize: ('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 1)>, "('read_sql_chunk-from-delayed-45f566e88ca4c6d12a8197c6d219b27b', 5)": <Serialize: (subgraph_callable-8219b5cb-9a04-436c-ad29-d643f53e22d3, (<function apply at 0x7fcfee313160>, <function _read_sql_chunk at 0x7fcfbc7115e0>, [<sqlalchemy.sql.selectable.Select object at 0x7fcf6edc5bb0>, 'mariadb://xxxx:yyyy@host.docker.internal:3306/bank_0001', Empty DataFrame
Is this a defect or I'm doing something wrong?
Package versions:
dask 2022.8.0
dask-glm 0.2.0
dask-ml 2022.5.27
dask-xgboost 0.2.0
distributed 2022.8.0
SQLAlchemy 1.4.40
cloudpickle 2.1.0
from Dask SQLAlchemy query fails in Docker workers: Exception: cannot pickle 'weakref' object
No comments:
Post a Comment