celery_utils module
- configure_task_logger(task_id=None, task=None, args=None, **kwargs)
Dynamically configure logging for each task based on job_id. This appends Celery worker logs to the job-specific log file and includes console logs.
- get_task_details(task_id)
Fetches task details for the specified task ID.
- Parameters:
task_id (str) – Unique identifier for the task.
- Returns:
Task details.
- Return type:
celery.result.AsyncResult
- retry(tries, delay)
Decorator to retry a task if it fails.
- Parameters:
tries (int) – Number of times to retry a task.
delay (float) – Time to wait between retries in seconds.
- Returns:
Decorated function with retry logic.
- Return type:
callable
- run_pipeline(self, job_id, job_name, job_type, source_connection, target_connection, source_table, target_table, source_schema, target_schema, spark_config, hadoop_config, batch_size, **kwargs)
Runs a pipeline with the specified configurations.
- Parameters:
job_id (str) – Unique identifier for the job.
job_name (str) – Name of the job.
job_type (str) – Type of the job.
source_connection (dict) – Connection details for the source database.
target_connection (dict) – Connection details for the target database.
source_table (str) – Source table name.
target_table (str) – Target table name.
source_schema (str) – Source schema name.
target_schema (str) – Target schema name.
spark_config (dict) – Spark configuration.
hadoop_config (dict) – Hadoop configuration.
batch_size (int) – Batch size for processing.
**kwargs (dict) – Additional keyword arguments.
- Raises:
Exception – If there is an error running the pipeline.