celery_utils module

configure_task_logger(task_id=None, task=None, args=None, **kwargs)

Dynamically configure logging for each task based on job_id. This appends Celery worker logs to the job-specific log file and includes console logs.

get_task_details(task_id)

Fetches task details for the specified task ID.

Parameters:

task_id (str) – Unique identifier for the task.

Returns:

Task details.

Return type:

celery.result.AsyncResult

retry(tries, delay)

Decorator to retry a task if it fails.

Parameters:
  • tries (int) – Number of times to retry a task.

  • delay (float) – Time to wait between retries in seconds.

Returns:

Decorated function with retry logic.

Return type:

callable

run_pipeline(self, job_id, job_name, job_type, source_connection, target_connection, source_table, target_table, source_schema, target_schema, spark_config, hadoop_config, batch_size, **kwargs)

Runs a pipeline with the specified configurations.

Parameters:
  • job_id (str) – Unique identifier for the job.

  • job_name (str) – Name of the job.

  • job_type (str) – Type of the job.

  • source_connection (dict) – Connection details for the source database.

  • target_connection (dict) – Connection details for the target database.

  • source_table (str) – Source table name.

  • target_table (str) – Target table name.

  • source_schema (str) – Source schema name.

  • target_schema (str) – Target schema name.

  • spark_config (dict) – Spark configuration.

  • hadoop_config (dict) – Hadoop configuration.

  • batch_size (int) – Batch size for processing.

  • **kwargs (dict) – Additional keyword arguments.

Raises:

Exception – If there is an error running the pipeline.