Currently I have a code- that take any update on "Source DB" and copy it to "Destination DB"
I need to make it that with single read call from the "Source DB" of the Update and copy in parallel to few "Destination DB"
def loop(partial_src_db=None, partial_dst_db=None, collection_name=None,
**options):
src_db = partial_src_db()
stream = None
while True:
try:
collection_in = source_db.get_collection(collection_name)
collection_out = destination_db.get_collection(collection_name)
with collection_in.watch(full_document='updateLookup') as stream:
for change in stream:
oper_type = change['operationType']
logger.debug(f"{oper_type} received: {collection_name}")
if oper_type in ignored_ops:
logger.debug(f"{oper_type} operation ignored")
continue
if oper_type == "insert":
callback = db_insert_callback
elif oper_type == "replace":
# db_update_callback(change)
callback = db_update_callback
elif oper_type == "delete":
callback = db_remove_callback
callback(change, collection_out=collection_out)
except Exception as ex:
logger.exception(ex)
finally:
if stream:
stream.close()
example to insert
def db_insert_callback(insert_change, collection_out):
doc = insert_change['fullDocument']
# logger.info(insert_change)
try:
collection_out.insert(doc)
except Exception as ex:
logger.exception(ex)
The main look like that
def main(**options):
partial_src_db = partial(db_connect, db_host=settings.SRC_RB_HOST,
db_name=settings.SRC_RB_NAME,
db_user=settings.SRC_RB_USER,
db_pass=settings.SRC_RB_PASS,
replica=settings.SRC_RB_REPLICA)
partial_dst_db = partial(db_connect, db_host=settings.DEST_RB_HOST,
db_name=settings.DEST_RB_NAME)
src_db = partial_src_db()
src_db = partial_src_db()
logger.debug("connected to src db")
db_coll_names = src_db.list_collection_names()
cli_collection_names = options.get('collection') or db_coll_names
coll_names = list(set(db_coll_names) & set(cli_collection_names))
for coll_name in coll_names:
partial_loop = partial(loop, partial_src_db=partial_dst_db,
partial_dst_db=environment_list,
collection_name=coll_name, **options)
t = Thread(target=partial_loop, daemon=False, name=coll_name)
t.start()
hope someone will success help me , thank you
from copy information to few DB in parallel with single read
No comments:
Post a Comment