pipeline_utils module
This module contains utility functions related to Airflow tasks and workflows.
Functions: - run_pipeline: Runs the specified pipeline based on the provided parameters. - get_data: Retrieves data based on the specified parameters. - preprocess_data: Preprocesses the data before further processing. - post_process_data: Performs post-processing tasks on the data. - validate_data: Validates the data to ensure quality and consistency.
- create_airflow_dag(config)
Store pipeline configuration in .local/pipelines directory
- Parameters:
config (string) – Name of the pipeline
- Returns:
Boolean, Config. True if stored
- Return type:
tuple
- extract_xcom_value(task_id, **context)
- read_data(connector_name, auth_values, auth_type, table, connection_type, schema='public', config={}, batch_size=100000, logger=None)
Reads data from a specified connection type.
- Parameters:
connection_type (str) – The type of connection to use. Valid values are “database” or “api”.
table (str) – The name of the table to read from.
schema (str) – The schema of the table.
connection_name (str) – The name of the connection.
- Returns:
If the connection type is “database”, returns a Spark DataFrame containing the data. If the connection type is “api” returns a pandas DataFrame containing the data.
- Return type:
Union[DataFrame]
- Raises:
ValueError – If the connection type is not “database” or “api”.
- run_pipeline(spark_config=None, hadoop_config=None, job_name=None, job_id=None, job_type=None, source_table=None, source_schema=None, target_table=None, target_schema=None, source_connection_details=None, target_connection_details=None, batch_size=100000, logger=None)
A function that runs a pipeline with the specified configurations, particularly used in the airflow DAG to run a pipeline.
- Parameters:
logger
target_schema
job_type
job_id
job_name
target_table
source_connection_details
target_connection_details
batch_size
source_table (str) – The name of the source table.
source_schema (str) – The schema of the source table.
spark_config (dict, optional) – The Spark configuration. Defaults to None.
hadoop_config (dict, optional) – The Hadoop configuration. Defaults to None.
- Raises:
Exception – If no data is found in the source table.
NotImplementedError – If the target connection type is API.
- run_pipeline_target(df, integration_id, spark_class, job_id, job_name, con_string, target_table, driver, spark_session, db_class, logger, batch_size=100000)
- update_integration_in_db(celery_task_id, integration, error_message, run_status, start_date, row_count=0)