Module pydbsmgr.utils.tools
Functions
def coerce_datetime(x: str) ‑> numpy.datetime64
def column_coincidence(df1: pandas.core.frame.DataFrame, df2: pandas.core.frame.DataFrame) ‑> float
-
Return the percentage of coincident columns between two pandas DataFrames.
def create_directories_from_yaml(yaml_file)
-
Reads a
yaml
file and creates directories based on its content. def create_directory(data, parent_path='')
-
Creates the directory tree from a
yaml
file. def disableprints(func: Callable) ‑> Callable
-
Decorator to temporarily suppress print statements in a function
def erase_files(format: str = 'log') ‑> None
-
Erase all files with the given format.
def generate_secure_password(pass_len: int = 24) ‑> str
-
Generate a secure password with the length specified
def get_extraction_date(filename: str | List[str], REGEX_PATTERN: str = '\\d{4}-\\d{2}-\\d{2}') ‑> str | List[str]
-
Allows to extract the date of extraction according to the directory within the storage account.
Parameters
filename
:Union[str, List[str]]
- file path inside the storage account
REGEX_PATTERN
:str
,
optional- regular expression pattern to extract the date.
Returns
Union[str, List[str]]
the date that was extracted if found in the file path. def merge_by_coincidence(df1: pandas.core.frame.DataFrame,
df2: pandas.core.frame.DataFrame,
tol: float = 0.9) ‑> pandas.core.frame.DataFrame-
Merge two pandas DataFrames by finding the most similar columns based on their names.
def most_repeated_item(items: List[str], two_most_common: bool = False) ‑> Tuple[str, str | None]
-
Returns a
Tuple
with the most common elements of alist
.Parameters
items
:List[str]
- The list containing the items to be evaluated.
two_most_common
:bool
, optional- If
False
, returns only one element. Defaults toFalse
.
Returns
Tuple[
str
,str
|None
] The two most common elements. def terminate_process(file_path: str) ‑> None
-
Terminate the process holding the file.
Classes
class ColumnsCheck (df: pandas.core.frame.DataFrame)
-
Performs checks on the columns of a DataFrame
Expand source code
class ColumnsCheck: """Performs checks on the columns of a DataFrame""" def __init__(self, df: DataFrame): self.df = df def get_frame(self, surrounding: bool = True) -> DataFrame: return self._process_columns(surrounding) def _process_columns(self, surrounding: bool) -> DataFrame: df = self.df.copy() df.columns = ( df.columns.str.lower() .str.replace("[.,]", "", regex=True) .str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True) .str.replace("_+", "_", regex=True) .str.strip() .str.rstrip("_") ) if surrounding: df.columns = [f"[{col}]" for col in df.columns] return df
Subclasses
Methods
def get_frame(self, surrounding: bool = True) ‑> pandas.core.frame.DataFrame
class ColumnsDtypes (df_: pandas.core.frame.DataFrame)
-
Convert all columns to specified dtype.
Expand source code
class ColumnsDtypes: """Convert all columns to specified dtype.""" def __init__(self, df_: DataFrame): self.df = df_.copy() def correct( self, drop_values: bool = False, drop_rows: bool = False, sample_frac: float = 0.1, ) -> DataFrame: self._check_int_float(drop_values, drop_rows) self._check_datetime(sample_frac) return self.df def get_frame(self) -> DataFrame: return self.df def _check_int_float(self, drop_values: bool, drop_rows: bool) -> None: """Check and correct the data types of columns in a `DataFrame`.""" def check_value(value): try: if float(value).is_integer(): return int(value) return float(value) except ValueError: return np.nan if len(self.df) < 1e5 or not drop_values: for col in self.df.columns: value = str(self.df[col].iloc[0]) if is_number_regex(value): with concurrent.futures.ThreadPoolExecutor() as executor: self.df[col] = list(executor.map(check_value, self.df[col])) try: self.df[col] = pd.to_numeric(self.df[col], errors="coerce") print(f"Successfully transformed the '{col}' column into numeric.") except ValueError: self.df[col] = self.df[col].astype("object") print(f"Failed to transform the '{col}' column, setting to object.") if drop_rows: self.df.dropna(inplace=True) def _check_datetime(self, sample_frac: float) -> None: """Check and convert date-time string columns to `datetime` objects.""" df_sample = self.df.sample(frac=sample_frac) for col in self.df.columns: if pd.api.types.is_string_dtype(df_sample[col]): if (df_sample[col].apply(check_if_contains_dates)).any(): try: with concurrent.futures.ThreadPoolExecutor() as executor: self.df[col] = list(executor.map(coerce_datetime, self.df[col])) print(f"Successfully transformed the '{col}' column into datetime64[ns].") except ValueError: print(f"Failed to transform the '{col}' column into datetime.")
Methods
def correct(self,
drop_values: bool = False,
drop_rows: bool = False,
sample_frac: float = 0.1) ‑> pandas.core.frame.DataFramedef get_frame(self) ‑> pandas.core.frame.DataFrame
class ControllerFeatures (container_client)
-
Expand source code
class ControllerFeatures: def __init__(self, container_client): self.container_client = container_client def write_pyarrow( self, directory_name: str, pytables: List[Table], names: List[str], overwrite: bool = True, ) -> List[str] | None: """Write PyArrow tables as Parquet format.""" return self._write_tables(directory_name, pytables, names, "parquet", overwrite) def write_parquet( self, directory_name: str, dfs: List[DataFrame], names: List[str], overwrite: bool = True, upload: bool = True, ) -> List[str] | List[bytes] | None: """Write DataFrames as Parquet format by converting them to bytes first.""" pytables = [pa.Table.from_pandas(df) for df in dfs] return self._write_tables(directory_name, pytables, names, "parquet", overwrite, upload) def _write_tables( self, directory_name: str, tables: List[Table], names: List[str], format_type: str, overwrite: bool = True, upload: bool = True, ) -> List[str] | List[bytes] | None: files_not_loaded, collected_files = [], [] for table, blob_name in zip(tables, names): if table is not None: buf = pa.BufferOutputStream() pq.write_table(table, buf) parquet_data = buf.getvalue().to_pybytes() blob_path_name = os.path.join(directory_name, f"{blob_name}.{format_type}") if upload: self.container_client.upload_blob( name=blob_path_name, data=parquet_data, overwrite=overwrite, ) else: collected_files.append(parquet_data) else: files_not_loaded.append(blob_name) return files_not_loaded or (collected_files if not upload else None)
Subclasses
Methods
def write_parquet(self,
directory_name: str,
dfs: List[pandas.core.frame.DataFrame],
names: List[str],
overwrite: bool = True,
upload: bool = True) ‑> List[str] | List[bytes] | None-
Write DataFrames as Parquet format by converting them to bytes first.
def write_pyarrow(self,
directory_name: str,
pytables: List[pyarrow.lib.Table],
names: List[str],
overwrite: bool = True) ‑> List[str] | None-
Write PyArrow tables as Parquet format.