Module pydbsmgr.utils.tools

Functions

def coerce_datetime(x: str) ‑> numpy.datetime64
def column_coincidence(df1: pandas.core.frame.DataFrame, df2: pandas.core.frame.DataFrame) ‑> float

Return the percentage of coincident columns between two pandas DataFrames.

def create_directories_from_yaml(yaml_file)

Reads a yaml file and creates directories based on its content.

def create_directory(data, parent_path='')

Creates the directory tree from a yaml file.

def disableprints(func: Callable) ‑> Callable

Decorator to temporarily suppress print statements in a function

def erase_files(format: str = 'log') ‑> None

Erase all files with the given format.

def generate_secure_password(pass_len: int = 24) ‑> str

Generate a secure password with the length specified

def get_extraction_date(filename: str | List[str], REGEX_PATTERN: str = '\\d{4}-\\d{2}-\\d{2}') ‑> str | List[str]

Allows to extract the date of extraction according to the directory within the storage account.

Parameters

filename : Union[str, List[str]]
file path inside the storage account
REGEX_PATTERN : str,optional
regular expression pattern to extract the date.

Returns

Union[str, List[str]] the date that was extracted if found in the file path.

def merge_by_coincidence(df1: pandas.core.frame.DataFrame,
df2: pandas.core.frame.DataFrame,
tol: float = 0.9) ‑> pandas.core.frame.DataFrame

Merge two pandas DataFrames by finding the most similar columns based on their names.

def most_repeated_item(items: List[str], two_most_common: bool = False) ‑> Tuple[str, str | None]

Returns a Tuple with the most common elements of a list.

Parameters

items : List[str]
The list containing the items to be evaluated.
two_most_common : bool, optional
If False, returns only one element. Defaults to False.

Returns

Tuple[str, str | None] The two most common elements.

def terminate_process(file_path: str) ‑> None

Terminate the process holding the file.

Classes

class ColumnsCheck (df: pandas.core.frame.DataFrame)

Performs checks on the columns of a DataFrame

Expand source code
class ColumnsCheck:
    """Performs checks on the columns of a DataFrame"""

    def __init__(self, df: DataFrame):
        self.df = df

    def get_frame(self, surrounding: bool = True) -> DataFrame:
        return self._process_columns(surrounding)

    def _process_columns(self, surrounding: bool) -> DataFrame:
        df = self.df.copy()
        df.columns = (
            df.columns.str.lower()
            .str.replace("[.,]", "", regex=True)
            .str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True)
            .str.replace("_+", "_", regex=True)
            .str.strip()
            .str.rstrip("_")
        )
        if surrounding:
            df.columns = [f"[{col}]" for col in df.columns]
        return df

Subclasses

Methods

def get_frame(self, surrounding: bool = True) ‑> pandas.core.frame.DataFrame
class ColumnsDtypes (df_: pandas.core.frame.DataFrame)

Convert all columns to specified dtype.

Expand source code
class ColumnsDtypes:
    """Convert all columns to specified dtype."""

    def __init__(self, df_: DataFrame):
        self.df = df_.copy()

    def correct(
        self,
        drop_values: bool = False,
        drop_rows: bool = False,
        sample_frac: float = 0.1,
    ) -> DataFrame:
        self._check_int_float(drop_values, drop_rows)
        self._check_datetime(sample_frac)
        return self.df

    def get_frame(self) -> DataFrame:
        return self.df

    def _check_int_float(self, drop_values: bool, drop_rows: bool) -> None:
        """Check and correct the data types of columns in a `DataFrame`."""

        def check_value(value):
            try:
                if float(value).is_integer():
                    return int(value)
                return float(value)
            except ValueError:
                return np.nan

        if len(self.df) < 1e5 or not drop_values:
            for col in self.df.columns:
                value = str(self.df[col].iloc[0])
                if is_number_regex(value):
                    with concurrent.futures.ThreadPoolExecutor() as executor:
                        self.df[col] = list(executor.map(check_value, self.df[col]))
                    try:
                        self.df[col] = pd.to_numeric(self.df[col], errors="coerce")
                        print(f"Successfully transformed the '{col}' column into numeric.")
                    except ValueError:
                        self.df[col] = self.df[col].astype("object")
                        print(f"Failed to transform the '{col}' column, setting to object.")

        if drop_rows:
            self.df.dropna(inplace=True)

    def _check_datetime(self, sample_frac: float) -> None:
        """Check and convert date-time string columns to `datetime` objects."""
        df_sample = self.df.sample(frac=sample_frac)
        for col in self.df.columns:
            if pd.api.types.is_string_dtype(df_sample[col]):
                if (df_sample[col].apply(check_if_contains_dates)).any():
                    try:
                        with concurrent.futures.ThreadPoolExecutor() as executor:
                            self.df[col] = list(executor.map(coerce_datetime, self.df[col]))
                        print(f"Successfully transformed the '{col}' column into datetime64[ns].")
                    except ValueError:
                        print(f"Failed to transform the '{col}' column into datetime.")

Methods

def correct(self,
drop_values: bool = False,
drop_rows: bool = False,
sample_frac: float = 0.1) ‑> pandas.core.frame.DataFrame
def get_frame(self) ‑> pandas.core.frame.DataFrame
class ControllerFeatures (container_client)
Expand source code
class ControllerFeatures:
    def __init__(self, container_client):
        self.container_client = container_client

    def write_pyarrow(
        self,
        directory_name: str,
        pytables: List[Table],
        names: List[str],
        overwrite: bool = True,
    ) -> List[str] | None:
        """Write PyArrow tables as Parquet format."""
        return self._write_tables(directory_name, pytables, names, "parquet", overwrite)

    def write_parquet(
        self,
        directory_name: str,
        dfs: List[DataFrame],
        names: List[str],
        overwrite: bool = True,
        upload: bool = True,
    ) -> List[str] | List[bytes] | None:
        """Write DataFrames as Parquet format by converting them to bytes first."""
        pytables = [pa.Table.from_pandas(df) for df in dfs]
        return self._write_tables(directory_name, pytables, names, "parquet", overwrite, upload)

    def _write_tables(
        self,
        directory_name: str,
        tables: List[Table],
        names: List[str],
        format_type: str,
        overwrite: bool = True,
        upload: bool = True,
    ) -> List[str] | List[bytes] | None:
        files_not_loaded, collected_files = [], []
        for table, blob_name in zip(tables, names):
            if table is not None:
                buf = pa.BufferOutputStream()
                pq.write_table(table, buf)
                parquet_data = buf.getvalue().to_pybytes()
                blob_path_name = os.path.join(directory_name, f"{blob_name}.{format_type}")
                if upload:
                    self.container_client.upload_blob(
                        name=blob_path_name,
                        data=parquet_data,
                        overwrite=overwrite,
                    )
                else:
                    collected_files.append(parquet_data)
            else:
                files_not_loaded.append(blob_name)

        return files_not_loaded or (collected_files if not upload else None)

Subclasses

Methods

def write_parquet(self,
directory_name: str,
dfs: List[pandas.core.frame.DataFrame],
names: List[str],
overwrite: bool = True,
upload: bool = True) ‑> List[str] | List[bytes] | None

Write DataFrames as Parquet format by converting them to bytes first.

def write_pyarrow(self,
directory_name: str,
pytables: List[pyarrow.lib.Table],
names: List[str],
overwrite: bool = True) ‑> List[str] | None

Write PyArrow tables as Parquet format.