Module pydbsmgr.utils.tools
Functions
def coerce_datetime(x: str) ‑> numpy.datetime64
-
Expand source code
def coerce_datetime(x: str) -> np.datetime64: try: x = x.replace("-", "") return pd.to_datetime(x, format="%Y%m%d") except ValueError: return np.datetime64("NaT")
def column_coincidence(df1: pandas.core.frame.DataFrame, df2: pandas.core.frame.DataFrame) ‑> float
-
Expand source code
def column_coincidence(df1: DataFrame, df2: DataFrame) -> float: """Return the percentage of coincident columns between two pandas DataFrames.""" set_columns1 = set(df1.columns) set_columns2 = set(df2.columns) common_columns = set_columns1.intersection(set_columns2) total_columns = set_columns1.union(set_columns2) return len(common_columns) / len(total_columns)
Return the percentage of coincident columns between two pandas DataFrames.
def create_directories_from_yaml(yaml_file)
-
Expand source code
def create_directories_from_yaml(yaml_file): """Reads a `yaml` file and creates directories based on its content.""" with open(yaml_file, "r") as file: data = yaml.safe_load(file) create_directory(data)
Reads a
yaml
file and creates directories based on its content. def create_directory(data, parent_path='')
-
Expand source code
def create_directory(data, parent_path=""): """Creates the directory tree from a `yaml` file.""" for key, value in data.items(): path = os.path.join(parent_path, key) if isinstance(value, dict): os.makedirs(path, exist_ok=True) create_directory(value, path) else: os.makedirs(path, exist_ok=True)
Creates the directory tree from a
yaml
file. def disableprints(func: Callable) ‑> Callable
-
Expand source code
def disableprints(func: Callable) -> Callable: """Decorator to temporarily suppress print statements in a function""" def wrapper(*args, **kwargs): with open(os.devnull, "w") as devnull: old_stdout = sys.stdout sys.stdout = devnull result = func(*args, **kwargs) sys.stdout = old_stdout return result if func.__doc__ is not None: wrapper.__doc__ = func.__doc__ return wrapper
Decorator to temporarily suppress print statements in a function
def erase_files(format: str = 'log') ‑> None
-
Expand source code
def erase_files(format: str = "log") -> None: """Erase all files with the given format.""" for filename in glob.glob(f"*.{format}"): try: os.remove(filename) except OSError: terminate_process(filename) os.remove(filename)
Erase all files with the given format.
def generate_secure_password(pass_len: int = 24) ‑> str
-
Expand source code
def generate_secure_password(pass_len: int = 24) -> str: """ Generate a secure password with the length specified """ config = parse_config(load_config("./pydbsmgr/utils/config.ini")) char_matrix = config["security"]["char_matrix"] return "".join(random.choice(char_matrix) for _ in range(pass_len))
Generate a secure password with the length specified
def get_extraction_date(filename: str | List[str], REGEX_PATTERN: str = '\\d{4}-\\d{2}-\\d{2}') ‑> str | List[str]
-
Expand source code
def get_extraction_date( filename: str | List[str], REGEX_PATTERN: str = r"\d{4}-\d{2}-\d{2}" ) -> str | List[str]: """Allows to extract the date of extraction according to the directory within the storage account. Parameters ---------- filename : Union[str, List[str]] file path inside the storage account REGEX_PATTERN : `str`, `optional` regular expression pattern to extract the date. Returns ------- `Union[str, List[str]]` the date that was extracted if found in the file path. """ def sub_extraction_date(filename: str) -> str: extraction_date = re.findall(REGEX_PATTERN, filename) return extraction_date[0] if extraction_date else "" if isinstance(filename, list): return [sub_extraction_date(name) for name in filename] return sub_extraction_date(filename)
Allows to extract the date of extraction according to the directory within the storage account.
Parameters
filename
:Union[str, List[str]]
- file path inside the storage account
REGEX_PATTERN
:str
,
optional- regular expression pattern to extract the date.
Returns
Union[str, List[str]]
the date that was extracted if found in the file path. def merge_by_coincidence(df1: pandas.core.frame.DataFrame,
df2: pandas.core.frame.DataFrame,
tol: float = 0.9) ‑> pandas.core.frame.DataFrame-
Expand source code
def merge_by_coincidence(df1: DataFrame, df2: DataFrame, tol: float = 0.9) -> DataFrame: """Merge two pandas DataFrames by finding the most similar columns based on their names.""" percentage = column_coincidence(df1, df2) if percentage < tol: print( f"The following columns were missed with a match percentage of {percentage*100:.2f}%: " f"{set(df1.columns).union(set(df2.columns)) - set(df1.columns).intersection(set(df2.columns))}" ) common_cols = set(df1.columns).intersection(set(df2.columns)) df_combined = pd.concat([df1[common_cols], df2[common_cols]], ignore_index=True) return df_combined
Merge two pandas DataFrames by finding the most similar columns based on their names.
def most_repeated_item(items: List[str], two_most_common: bool = False) ‑> Tuple[str, str | None]
-
Expand source code
def most_repeated_item(items: List[str], two_most_common: bool = False) -> Tuple[str, str | None]: """Returns a `Tuple` with the most common elements of a `list`. Parameters ---------- items : `List[str]` The list containing the items to be evaluated. two_most_common : `bool`, optional If `False`, returns only one element. Defaults to `False`. Returns ------- Tuple[`str`, `str` | `None`] The two most common elements. """ counter = Counter(items) most_common = counter.most_common(2) if two_most_common: return tuple(item for item, _ in most_common) + (None,) * (2 - len(most_common)) else: return most_common[0], None
Returns a
Tuple
with the most common elements of alist
.Parameters
items
:List[str]
- The list containing the items to be evaluated.
two_most_common
:bool
, optional- If
False
, returns only one element. Defaults toFalse
.
Returns
Tuple[
str
,str
|None
] The two most common elements. def terminate_process(file_path: str) ‑> None
-
Expand source code
def terminate_process(file_path: str) -> None: """Terminate the process holding the file.""" for proc in psutil.process_iter(["pid", "open_files"]): try: if any(file_info.path == file_path for file_info in proc.info["open_files"] or []): print(f"Terminating process {proc.pid} holding the file.") proc.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass
Terminate the process holding the file.
Classes
class ColumnsCheck (df: pandas.core.frame.DataFrame)
-
Expand source code
class ColumnsCheck: """Performs checks on the columns of a DataFrame""" def __init__(self, df: DataFrame): self.df = df def get_frame(self, surrounding: bool = True) -> DataFrame: return self._process_columns(surrounding) def _process_columns(self, surrounding: bool) -> DataFrame: df = self.df.copy() df.columns = ( df.columns.str.lower() .str.replace("[.,]", "", regex=True) .str.replace(r"[^a-zA-Z0-9ñáéíóú_]", "_", regex=True) .str.replace("_+", "_", regex=True) .str.strip() .str.rstrip("_") ) if surrounding: df.columns = [f"[{col}]" for col in df.columns] return df
Performs checks on the columns of a DataFrame
Subclasses
Methods
def get_frame(self, surrounding: bool = True) ‑> pandas.core.frame.DataFrame
-
Expand source code
def get_frame(self, surrounding: bool = True) -> DataFrame: return self._process_columns(surrounding)
class ColumnsDtypes (df_: pandas.core.frame.DataFrame)
-
Expand source code
class ColumnsDtypes: """Convert all columns to specified dtype.""" def __init__(self, df_: DataFrame): self.df = df_.copy() def correct( self, drop_values: bool = False, drop_rows: bool = False, sample_frac: float = 0.1, ) -> DataFrame: self._check_int_float(drop_values, drop_rows) self._check_datetime(sample_frac) return self.df def get_frame(self) -> DataFrame: return self.df def _check_int_float(self, drop_values: bool, drop_rows: bool) -> None: """Check and correct the data types of columns in a `DataFrame`.""" def check_value(value): try: if float(value).is_integer(): return int(value) return float(value) except ValueError: return np.nan if len(self.df) < 1e5 or not drop_values: for col in self.df.columns: value = str(self.df[col].iloc[0]) if is_number_regex(value): with concurrent.futures.ThreadPoolExecutor() as executor: self.df[col] = list(executor.map(check_value, self.df[col])) try: self.df[col] = pd.to_numeric(self.df[col], errors="coerce") print(f"Successfully transformed the '{col}' column into numeric.") except ValueError: self.df[col] = self.df[col].astype("object") print(f"Failed to transform the '{col}' column, setting to object.") if drop_rows: self.df.dropna(inplace=True) def _check_datetime(self, sample_frac: float) -> None: """Check and convert date-time string columns to `datetime` objects.""" df_sample = self.df.sample(frac=sample_frac) for col in self.df.columns: if pd.api.types.is_string_dtype(df_sample[col]): if (df_sample[col].apply(check_if_contains_dates)).any(): try: with concurrent.futures.ThreadPoolExecutor() as executor: self.df[col] = list(executor.map(coerce_datetime, self.df[col])) print(f"Successfully transformed the '{col}' column into datetime64[ns].") except ValueError: print(f"Failed to transform the '{col}' column into datetime.")
Convert all columns to specified dtype.
Methods
def correct(self,
drop_values: bool = False,
drop_rows: bool = False,
sample_frac: float = 0.1) ‑> pandas.core.frame.DataFrame-
Expand source code
def correct( self, drop_values: bool = False, drop_rows: bool = False, sample_frac: float = 0.1, ) -> DataFrame: self._check_int_float(drop_values, drop_rows) self._check_datetime(sample_frac) return self.df
def get_frame(self) ‑> pandas.core.frame.DataFrame
-
Expand source code
def get_frame(self) -> DataFrame: return self.df
class ControllerFeatures (container_client)
-
Expand source code
class ControllerFeatures: def __init__(self, container_client): self.container_client = container_client def write_pyarrow( self, directory_name: str, pytables: List[Table], names: List[str], overwrite: bool = True, ) -> List[str] | None: """Write PyArrow tables as Parquet format.""" return self._write_tables(directory_name, pytables, names, "parquet", overwrite) def write_parquet( self, directory_name: str, dfs: List[DataFrame], names: List[str], overwrite: bool = True, upload: bool = True, ) -> List[str] | List[bytes] | None: """Write DataFrames as Parquet format by converting them to bytes first.""" pytables = [pa.Table.from_pandas(df) for df in dfs] return self._write_tables(directory_name, pytables, names, "parquet", overwrite, upload) def _write_tables( self, directory_name: str, tables: List[Table], names: List[str], format_type: str, overwrite: bool = True, upload: bool = True, ) -> List[str] | List[bytes] | None: files_not_loaded, collected_files = [], [] for table, blob_name in zip(tables, names): if table is not None: buf = pa.BufferOutputStream() pq.write_table(table, buf) parquet_data = buf.getvalue().to_pybytes() blob_path_name = os.path.join(directory_name, f"{blob_name}.{format_type}") if upload: self.container_client.upload_blob( name=blob_path_name, data=parquet_data, overwrite=overwrite, ) else: collected_files.append(parquet_data) else: files_not_loaded.append(blob_name) return files_not_loaded or (collected_files if not upload else None)
Subclasses
Methods
def write_parquet(self,
directory_name: str,
dfs: List[pandas.core.frame.DataFrame],
names: List[str],
overwrite: bool = True,
upload: bool = True) ‑> List[str] | List[bytes] | None-
Expand source code
def write_parquet( self, directory_name: str, dfs: List[DataFrame], names: List[str], overwrite: bool = True, upload: bool = True, ) -> List[str] | List[bytes] | None: """Write DataFrames as Parquet format by converting them to bytes first.""" pytables = [pa.Table.from_pandas(df) for df in dfs] return self._write_tables(directory_name, pytables, names, "parquet", overwrite, upload)
Write DataFrames as Parquet format by converting them to bytes first.
def write_pyarrow(self,
directory_name: str,
pytables: List[pyarrow.lib.Table],
names: List[str],
overwrite: bool = True) ‑> List[str] | None-
Expand source code
def write_pyarrow( self, directory_name: str, pytables: List[Table], names: List[str], overwrite: bool = True, ) -> List[str] | None: """Write PyArrow tables as Parquet format.""" return self._write_tables(directory_name, pytables, names, "parquet", overwrite)
Write PyArrow tables as Parquet format.