pipeline_utils module

This module contains utility functions related to Airflow tasks and workflows.

Functions: - run_pipeline: Runs the specified pipeline based on the provided parameters. - get_data: Retrieves data based on the specified parameters. - preprocess_data: Preprocesses the data before further processing. - post_process_data: Performs post-processing tasks on the data. - validate_data: Validates the data to ensure quality and consistency.

create_airflow_dag(config)

Store pipeline configuration in .local/pipelines directory

Parameters:

config (string) – Name of the pipeline

Returns:

Boolean, Config. True if stored

Return type:

tuple

extract_xcom_value(task_id, **context)
read_data(connector_name, auth_values, auth_type, table, connection_type, schema='public', config={}, batch_size=100000, logger=None)

Reads data from a specified connection type.

Parameters:
  • connection_type (str) – The type of connection to use. Valid values are “database” or “api”.

  • table (str) – The name of the table to read from.

  • schema (str) – The schema of the table.

  • connection_name (str) – The name of the connection.

Returns:

If the connection type is “database”, returns a Spark DataFrame containing the data. If the connection type is “api” returns a pandas DataFrame containing the data.

Return type:

Union[DataFrame]

Raises:

ValueError – If the connection type is not “database” or “api”.

run_pipeline(spark_config=None, hadoop_config=None, job_name=None, job_id=None, job_type=None, source_table=None, source_schema=None, target_table=None, target_schema=None, source_connection_details=None, target_connection_details=None, batch_size=100000, logger=None)

A function that runs a pipeline with the specified configurations, particularly used in the airflow DAG to run a pipeline.

Parameters:
  • logger

  • target_schema

  • job_type

  • job_id

  • job_name

  • target_table

  • source_connection_details

  • target_connection_details

  • batch_size

  • source_table (str) – The name of the source table.

  • source_schema (str) – The schema of the source table.

  • spark_config (dict, optional) – The Spark configuration. Defaults to None.

  • hadoop_config (dict, optional) – The Hadoop configuration. Defaults to None.

Raises:
  • Exception – If no data is found in the source table.

  • NotImplementedError – If the target connection type is API.

run_pipeline_target(df, integration_id, spark_class, job_id, job_name, con_string, target_table, driver, spark_session, db_class, logger, batch_size=100000)
update_integration_in_db(celery_task_id, integration, error_message, run_status, start_date, row_count=0)