Tuesday, 19 January 2021

copy information to few DB in parallel with single read

Currently I have a code- that take any update on "Source DB" and copy it to "Destination DB"

I need to make it that with single read call from the "Source DB" of the Update and copy in parallel to few "Destination DB"

def loop(partial_src_db=None, partial_dst_db=None, collection_name=None, 
**options):

src_db = partial_src_db()

stream = None
while True:
try:

    collection_in = source_db.get_collection(collection_name)
    collection_out = destination_db.get_collection(collection_name)

    with collection_in.watch(full_document='updateLookup') as stream:

        for change in stream:
            oper_type = change['operationType']

            logger.debug(f"{oper_type} received: {collection_name}")

            if oper_type in ignored_ops:
                logger.debug(f"{oper_type} operation ignored")
                continue

            if oper_type == "insert":
                callback = db_insert_callback

            elif oper_type == "replace":
                # db_update_callback(change)
                callback = db_update_callback

            elif oper_type == "delete":
                callback = db_remove_callback

            callback(change, collection_out=collection_out)


except Exception as ex:
    logger.exception(ex)

finally:
    if stream:
        stream.close()

example to insert

def db_insert_callback(insert_change, collection_out):
doc = insert_change['fullDocument']
# logger.info(insert_change)
try:
    collection_out.insert(doc)
except Exception as ex:
    logger.exception(ex)

The main look like that

def main(**options):
partial_src_db = partial(db_connect, db_host=settings.SRC_RB_HOST, 
db_name=settings.SRC_RB_NAME,
                         db_user=settings.SRC_RB_USER, 
db_pass=settings.SRC_RB_PASS,
                         replica=settings.SRC_RB_REPLICA)
partial_dst_db = partial(db_connect, db_host=settings.DEST_RB_HOST, 
db_name=settings.DEST_RB_NAME)
src_db = partial_src_db()
src_db = partial_src_db()
logger.debug("connected to src db")
db_coll_names = src_db.list_collection_names()
cli_collection_names = options.get('collection') or db_coll_names
coll_names = list(set(db_coll_names) & set(cli_collection_names))
for coll_name in coll_names:                                                      
    partial_loop = partial(loop, partial_src_db=partial_dst_db, 
partial_dst_db=environment_list,
                           collection_name=coll_name, **options)
    t = Thread(target=partial_loop, daemon=False, name=coll_name)
    t.start()

hope someone will success help me , thank you



from copy information to few DB in parallel with single read

No comments:

Post a Comment