Tuesday, 10 May 2022

Celery task log using google-cloud-logging

I'm currently managing my API using Celery tasks and a kubernetes cluster on Google Cloud Platform.

Celery is automatically logging input and output of each task. This is something I want but I would like to use the possibility of google-cloud-logging to log input and output as jsonPayload.

I use for all other log the following:

from google.cloud.logging.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers import setup_logging

# Imports the Cloud Logging client library
import google.cloud.logging

# Instantiates a client
client = google.cloud.logging.Client()

handler = CloudLoggingHandler(client)
setup_logging(handler)

import logging

logger = logging.getLogger(__name__)

data_dict = {"my": "data"}
logger.info("this is an example", extra={"json_fields": data_dict})

And I use Celery with the following template:

app = Celery(**my_params)

@app.task
def task_test(data):
    # Update dictonary with new data
    data["key1"] = "value1"
    return data

...

detection_task = celery.signature('tasks.task_test', args=([[{"hello": "world"}]]))
r = detection_task.apply_async()
data = r.get()

Here's an example of log I receive from Celery: enter image description here

The blurred part correspond to the dict/json I would like to have in a jsonPayload instead of a textPayload. (Also note that this log is marked as error on GCP but INFO from celery)

Any idea how I could connect python built-in logging, celery logger and gcp logger ?



from Celery task log using google-cloud-logging

No comments:

Post a Comment