database_utils module

This module contains utility functions for working with SQLAlchemy engine connections.

Class: - SQLAlchemyEngine: Represents a class to connect with any database using SQLAlchemy.

Methods: - __init__: Initializes the class with database connection details. - test: Tests the connection to the database. - get_metadata: Retrieves schema metadata from the connection. - execute_query: Executes a SQL query against the connection. - get_metadata_df: Retrieves schema metadata in a dataframe format.

class DatabaseUtils(engine=None, hostname=None, username=None, password=None, port=None, database=None, connection_name=None, connection_type=None)

Bases: object

A class to connect with any database using SQLAlchemy.

Attributes: - engine (string): Sqlalchemy dialect - hostname (string): Your database hostname - username (string): Your database username - password (string): Your database password - port (string): Your database port - database (string): Your database - connection_name (string, optional): Description of the connection. Defaults to None. - connection_type (string, optional): Description of the connection type. Defaults to None.

Methods: - __init__: Initializes the class with database connection details. - test: Tests the connection to the database. - get_metadata: Retrieves schema metadata from the connection. - execute_query: Executes a SQL query against the connection. - get_metadata_df: Retrieves schema metadata in a dataframe format. - dataframe_details: Generates details about each column in a DataFrame. - create_table: Creates a new table in the database. - fill_na_based_on_dtype: Replaces NaN values in a DataFrame based on column data types. - alter_table_column_add_or_drop: Alters a table by adding or dropping a column. - drop_table: Drops the specified table from the database. - truncate_table: Truncates a table by deleting all rows. - cast_columns: Casts columns in a DataFrame to specific data types. - map_to_spark_type: Maps Pandas DataFrame data types to Spark DataFrame data types. - match_pandas_schema_to_spark: Matches data types of columns in a Spark DataFrame to a specified schema. - write_data: Writes data to a table in the database. - create_session: Creates a new session and initializes metadata and base. - commit_changes: Commits the changes made within the session. - close_session: Closes the session and sets the session close flag. - __enter__: Enters the ‘with’ context and creates a new session. - __exit__: Exits the ‘with’ context, commits changes, and closes the session. - __dispose__: Disposes the session and engine. - create_schema_if_not_exists: Creates a schema in the database if it does not already exist. - alter_table_column_add_primary_key: Alters a table by adding a primary key to a specified column. - create_document_table: Creates a document table in the database. - create_batch_table: Creates a batch table in the database. - fetch_rows: Executes a select query on the specified table with provided conditions. - fetch_document: Fetches a single document based on the specified table, schema, and conditions. - write_document: Writes a document to the specified table in the database. - get_created_connections: Returns a list of created connections for the specified connector type. - insert_openetl_batch: Inserts a new OpenETLBatch instance into the database. - update_openetl_batch: Updates an OpenETLBatch instance in the database. - get_dashboard_data: Retrieves dashboard data including total counts and integration details.

alter_table_column_add_or_drop(table_name, column_name=None, column_details=None, action=ColumnActions.ADD)

Alters a SQLAlchemy table by either adding or dropping a column.

Parameters:
  • action (ColumnActions)

  • table_name (str) – The name of the table to be altered.

  • column_name (str) – The name of the column to be added or dropped. Required if drop_column is False.

  • column_details (str) – The details of the column to be added. Required if drop_column is False.

Returns:

A tuple containing a boolean indicating success or failure and a message.

Return type:

tuple

alter_table_column_add_primary_key(table_name, column_name='id', schema_name='open_etl')

Alter a table by adding a primary key to a specified column.

Parameters:
  • table_name (str) – The name of the table to alter.

  • column_name (str) – The name of the column to set as the primary key. Defaults to ‘id’.

  • database. (Currently not working changes do not reflect in the)

Returns:

True if the primary key addition is successful, False otherwise.

Return type:

bool

cast_columns(df)

Function to cast columns in a DataFrame to specific data types based on the majority of data types in the columns.

Parameters: - self: The object instance - df: The DataFrame containing the columns to be cast

Returns: - df: The DataFrame with columns cast to specific data types

close_session()

Close the session and set the session close flag.

commit_changes()

Commit the changes made within the session.

create_integration(integration_name, integration_type, target_schema, source_schema, spark_config, hadoop_config, cron_expression, source_connection, target_connection, source_table, target_table, batch_size)
create_integration_history(**kwargs)
create_schema_if_not_exists(schema_name='open_etl')

Creates a schema in the database if it does not already exist.

Parameters:

schema_name (str) – The name of the schema to create. Defaults to ‘open_etl’.

Returns:

None

create_session()

Create a new session and initialize metadata and base.

create_table(table_name, df, target_schema='public')

Create a new table in the database. If already exists, skip creation.

Parameters:
  • table_name (str) – The name of the table.

  • schema_details (dict) – column_name with python datatypes. Valid values are str, float, bool, int, list.

Returns:

A tuple indicating the success status and a message.

Return type:

tuple

create_table_from_base(target_schema='public', base=None)
dataframe_details(df)

Generate a dictionary containing details about each column in the DataFrame.

Parameters:

df (DataFrame) – The input DataFrame for which details are to be generated.

Returns:

A dictionary where keys are column names and values are their data types.

Return type:

dict

delete_document(document_id=None)

Deletes a document from the specified table in the database.

Parameters:

document_id (int)

Returns:

True if the document is successfully deleted, False otherwise.

Return type:

bool

Raises:

Exception – If an error occurs while deleting the document. The error message is logged.

delete_integration(record_id)
delete_oauth_token(connection_id)
drop_table(table_name)

Drop the specified table from the database.

Parameters:

table_name (str) – The name of the table to drop.

Returns:

A tuple indicating the success status and a message.

Return type:

tuple

execute_query(query)

Execute query against the connection

Parameters:

query (string) – Valid SQL query

Returns:

Pandas dataframe of your query results

Return type:

Dataframe

fetch_document(table_name='openetl_documents', schema_name='open_etl', conditions={})

Fetches a single document based on the specified table, schema, and conditions.

Parameters:
  • table_name (str, optional) – The name of the table to fetch the document from. Defaults to ‘openetl_documents’.

  • schema_name (str, optional) – The schema of the table. Defaults to ‘open_etl’.

  • conditions (dict, optional) – The conditions to filter the document retrieval. Defaults to {}.

Returns:

A dictionary representing the fetched document with column names as keys.

Return type:

dict

fetch_rows(table_name='openetl_documents', schema_name='open_etl', conditions={})

Executes a select query on the specified table with the provided conditions.

Parameters:
  • table_name (str, optional) – The name of the table to fetch rows from. Defaults to ‘openetl_documents’.

  • schema_name (str, optional) – The schema of the table. Defaults to ‘open_etl’.

  • conditions (dict, optional) – The conditions to filter the rows. Defaults to {}.

Returns:

The result of the select query.

Return type:

ResultProxy

fill_na_based_on_dtype(df)

Replace NaN values in a Pandas DataFrame based on the data type of each column.

Parameters:

df – Pandas DataFrame.

Returns:

DataFrame with NaN values replaced based on column data types.

Return type:

DataFrame

get_all_integration(page=1, per_page=30, integration_id=None)

Get all integrations paginated.

Parameters:
  • page (int, optional) – The page number. Defaults to 1.

  • per_page (int, optional) – The number of items per page. Defaults to 30.

Returns:

A dictionary containing the paginated results.

Return type:

dict

get_created_connections(connector_type=None, connection_name=None, id=None)

Returns a list of created connections for the specified connector type.

Parameters:

connector_type (ConnectionType) – The value of type of connector, defaults to ConnectionType.DATABASE.

Returns:

A dataframe of created connections.

Return type:

list

get_dashboard_data(page=1, per_page=30)

Retrieves dashboard data including total counts and paginated integration details.

Parameters:
  • page (int, optional) – The page number for integrations. Defaults to 1.

  • per_page (int, optional) – The number of items per page for integrations. Defaults to 30.

Returns:

A dictionary containing total counts and paginated integration details.

Return type:

dict

get_integration_history(integration_id, page=1, per_page=30)
get_integrations_to_schedule()
Return type:

list[Type[OpenETLIntegrations]]

get_metadata()

Get schema metadata from the connection

Returns:

{“tables”: tables,”schema”:[]}

Return type:

dict

get_metadata_df()

Get your schema metadata in a dataframe

Returns:

Pandas dataframe of your schema

Return type:

Dataframe

get_oauth_token(connection_id)
insert_openetl_batch(start_date, integration_id, batch_type, batch_status, batch_id, integration_name, rows_count=0, end_date=None)

Inserts a new OpenETLBatch instance into the database.

Parameters:
  • integration_id – The ID of the integration.

  • start_date (datetime) – The start date of the batch.

  • batch_type (str) – The type of the batch.

  • batch_status (str) – The status of the batch.

  • batch_id (str) – The unique identifier of the batch.

  • integration_name (str) – The name of the integration.

  • rows_count (int, optional) – The number of rows in the batch. Defaults to 0.

  • end_date (datetime, optional) – The end date of the batch. Defaults to None.

Returns:

The newly created OpenETLBatch instance.

Return type:

OpenETLBatch

map_to_spark_type(pandas_dtype)

Map Pandas DataFrame data types to equivalent Spark DataFrame data types.

Parameters:

pandas_dtype (str) – Pandas DataFrame data type.

Returns:

Spark DataFrame data type.

match_pandas_schema_to_spark(spark_df, schema_details=None)

Match the data types of columns in a Spark DataFrame to a specified schema.

Parameters:
  • schema_details

  • spark_df (DataFrame) – Spark DataFrame.

Returns:

Spark DataFrame with matched data types.

Return type:

DataFrame

save_oauth_token(access_token, refresh_token, expires_in, scope, connection_id)
Parameters:
  • access_token

  • refresh_token

  • expires_in

  • scope

  • connection_id

test()

Tests the database connection by attempting to establish a connection using the configured SQLAlchemy engine.

This method calls self.engine.connect() to verify that a connection to the database can be opened. If the connection attempt fails, an exception raised by the SQLAlchemy engine will propagate. On success, it returns True.

Returns:

True if the connection was successfully established.

Return type:

bool

truncate_table(table_name)

Truncates a table by deleting all rows.

Parameters:

table_name (str) – The name of the table to truncate.

Returns:

A tuple indicating the success status and a message.

The success status is True if truncation is successful, False otherwise. The message provides information about the truncation result.

Return type:

tuple

update_integration(record_id, **kwargs)
update_integration_runtime(job_id, **kwargs)
update_openetl_batch(batch_id, integration_id, **kwargs)

Updates an OpenETLBatch object in the database with the specified batch_id.

Parameters:
  • batch_id (int) – The ID of the batch to update.

  • **kwargs – Keyword arguments specifying the fields to update and their new values.

Returns:

The updated OpenETLBatch object.

Return type:

OpenETLBatch

Raises:

Exception – If no OpenETLBatch object with the specified batch_id is found.

update_openetl_document(document_id, **kwargs)

Updates an OpenETLBatch object in the database with the specified batch_id.

Parameters:
  • document_id (int) – The ID of the batch to update.

  • **kwargs – Keyword arguments specifying the fields to update and their new values.

Returns:

The updated OpenETLBatch object.

Return type:

OpenETLBatch

Raises:

Exception – If no OpenETLBatch object with the specified batch_id is found.

write_data(data, table_name='openetl_batches', if_exists='append', schema='public')

Writes data to a table in etl_batches.

Parameters:
  • df (DataFrame) – The DataFrame to write to the table.

  • table_name (str) – The name of the table to write to.

write_document(document, table_name='openetl_documents', schema_name='open_etl')

Writes a document to the specified table in the database.

Parameters:
  • document (dict) – The document to be written. It should contain a ‘document’ key with the document content.

  • table_name (str, optional) – The name of the table to write the document to. Defaults to ‘openetl_documents’.

  • schema_name (str, optional) – The schema of the table. Defaults to ‘open_etl’.

Returns:

True if the document is successfully written, False otherwise.

Return type:

bool

Raises:

Exception – If an error occurs while writing the document. The error message is logged.

generate_cron_expression(schedule_time, schedule_dates=None, frequency=None)

Generates a cron expression based on provided scheduling details.

Parameters:
  • schedule_time – Time string in ‘HH:MM:SS’ format.

  • schedule_dates – List of date strings in ‘YYYY-MM-DD’ format or None.

  • frequency – A string defining the frequency (‘daily’, ‘weekly’, ‘hourly’) or None.

Returns:

List of cron expression strings.

get_open_etl_document_connection_details(url=False)

Get connection details for OpenETL Document

parse_cron_expression(cron_expr)

Parses a cron expression into its components, determines the next execution time, and provides a human-readable explanation.

Args: - cron_expr (str): A standard 5-part cron expression.

Returns: - dict: Parsed cron details and next execution time.