Module pydbsmgr.lightest
Functions
def process_dates(x: str, format_type: str, auxiliary_type: str = None, errors: str = 'ignore') ‑> str
-
Expand source code
def process_dates( x: str, format_type: str, auxiliary_type: str = None, errors: str = "ignore" ) -> str: """Auxiliary function in date type string processing.""" x = str(x) if format_type in ["dayfirst", "monthfirst"] and len(x) < 10: separator = "/" if "/" in x else "-" parts = x.split(separator) if format_type == "dayfirst": day, month, year = parts[0], parts[1], parts[-1] elif format_type == "monthfirst": month, day, year = parts[0], parts[1], parts[-1] day = f"{int(day):02d}" month = f"{int(month):02d}" try: date = pd.to_datetime(f"{year}{month}{day}", format="%Y%m%d", errors="coerce") except ValueError: if auxiliary_type: date = pd.to_datetime(x, format=auxiliary_type, errors="coerce") elif errors == "raise": raise ValueError("Date value does not match the expected format.") else: x = x.replace("/", "").replace("-", "") if len(x) == 8: try: date = pd.to_datetime(x, format=format_type, errors="coerce") except ValueError: if auxiliary_type: date = pd.to_datetime(x, format=auxiliary_type, errors="coerce") elif errors == "raise": raise ValueError("Date value does not match the expected format.") else: try: date = pd.to_datetime(x[:8], format=format_type, errors="coerce") except ValueError: if auxiliary_type: date = pd.to_datetime(x[:8], format=auxiliary_type, errors="coerce") elif errors == "raise": raise ValueError("Date value does not match the expected format.") if not pd.isnull(date): return date.strftime("%Y-%m-%d") else: return x # Return original string if no valid date is found
Auxiliary function in date type string processing.
Classes
class LightCleaner (df_: pandas.core.frame.DataFrame)
-
Expand source code
class LightCleaner: """Performs a light cleaning on the table.""" __slots__ = ["df", "dict_dtypes"] def __init__(self, df_: pd.DataFrame): self.df = pl.from_pandas(df_) self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String} def clean_frame( self, sample_frac: float = 0.1, fast_execution: bool = True, two_date_formats: bool = True, **kwargs, ) -> pd.DataFrame: """DataFrame cleaning main function Parameters ---------- sample_frac : `float` The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%. fast_execution : `bool` If `False` use `applymap` pandas for extra text cleanup. Default is `True`. Keyword Arguments: ---------- no_emoji : `bool` By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`. title_mode : `bool` By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`. """ table = self.df.clone() cols = table.columns errors = kwargs.get("errors", "ignore") if sample_frac != 1.0: table_sample = table.sample(frac=sample_frac, with_replacement=False) else: table_sample = table.clone() for column_index, datatype in enumerate(table.dtypes): if datatype == pl.String: datetype_column = ( table_sample[cols[column_index]] .map_elements(check_if_contains_dates, return_dtype=pl.Boolean) .any() ) if datetype_column: main_type, auxiliary_type = most_repeated_item( list( filter( lambda item: item is not None, table_sample[cols[column_index]].map_elements( get_date_format, return_dtype=pl.String ), ) ), two_date_formats, ) format_type = auxiliary_type or main_type partial_dates = partial( process_dates, format_type=format_type, auxiliary_type=None, errors=errors, ) vpartial_dates = np.vectorize(partial_dates) _serie = pl.Series( cols[column_index], vpartial_dates(table[cols[column_index]].to_list()) ) table = table.with_columns(_serie) table = table.with_columns( pl.col(cols[column_index]).str.strptime( pl.Datetime, format="%Y-%m-%d", strict=False ) ) else: try: partial_clean = partial(clean) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) except AttributeError as e: msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}" logging.warning(msg) sys.exit("Perform correction manually") if not fast_execution: no_emoji = kwargs.get("no_emoji", False) title_mode = kwargs.get("title_mode", True) partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) table = self._remove_duplicate_columns(table) self.df = table.clone() return self.df.to_pandas() def _correct_type(self, value, datatype): """General type correction function.""" val_type = type(value).__name__ if self.dict_dtypes[val_type] != datatype: try: return {"float": float, "int": int, "str": str}[datatype](value) except ValueError: return np.nan if datatype in ["float", "int"] else "" return value def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame: """Remove duplicate columns based on column name.""" seen = set() unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))] return df.select(unique_cols)
Performs a light cleaning on the table.
Instance variables
var df
-
Expand source code
class LightCleaner: """Performs a light cleaning on the table.""" __slots__ = ["df", "dict_dtypes"] def __init__(self, df_: pd.DataFrame): self.df = pl.from_pandas(df_) self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String} def clean_frame( self, sample_frac: float = 0.1, fast_execution: bool = True, two_date_formats: bool = True, **kwargs, ) -> pd.DataFrame: """DataFrame cleaning main function Parameters ---------- sample_frac : `float` The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%. fast_execution : `bool` If `False` use `applymap` pandas for extra text cleanup. Default is `True`. Keyword Arguments: ---------- no_emoji : `bool` By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`. title_mode : `bool` By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`. """ table = self.df.clone() cols = table.columns errors = kwargs.get("errors", "ignore") if sample_frac != 1.0: table_sample = table.sample(frac=sample_frac, with_replacement=False) else: table_sample = table.clone() for column_index, datatype in enumerate(table.dtypes): if datatype == pl.String: datetype_column = ( table_sample[cols[column_index]] .map_elements(check_if_contains_dates, return_dtype=pl.Boolean) .any() ) if datetype_column: main_type, auxiliary_type = most_repeated_item( list( filter( lambda item: item is not None, table_sample[cols[column_index]].map_elements( get_date_format, return_dtype=pl.String ), ) ), two_date_formats, ) format_type = auxiliary_type or main_type partial_dates = partial( process_dates, format_type=format_type, auxiliary_type=None, errors=errors, ) vpartial_dates = np.vectorize(partial_dates) _serie = pl.Series( cols[column_index], vpartial_dates(table[cols[column_index]].to_list()) ) table = table.with_columns(_serie) table = table.with_columns( pl.col(cols[column_index]).str.strptime( pl.Datetime, format="%Y-%m-%d", strict=False ) ) else: try: partial_clean = partial(clean) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) except AttributeError as e: msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}" logging.warning(msg) sys.exit("Perform correction manually") if not fast_execution: no_emoji = kwargs.get("no_emoji", False) title_mode = kwargs.get("title_mode", True) partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) table = self._remove_duplicate_columns(table) self.df = table.clone() return self.df.to_pandas() def _correct_type(self, value, datatype): """General type correction function.""" val_type = type(value).__name__ if self.dict_dtypes[val_type] != datatype: try: return {"float": float, "int": int, "str": str}[datatype](value) except ValueError: return np.nan if datatype in ["float", "int"] else "" return value def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame: """Remove duplicate columns based on column name.""" seen = set() unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))] return df.select(unique_cols)
var dict_dtypes
-
Expand source code
class LightCleaner: """Performs a light cleaning on the table.""" __slots__ = ["df", "dict_dtypes"] def __init__(self, df_: pd.DataFrame): self.df = pl.from_pandas(df_) self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String} def clean_frame( self, sample_frac: float = 0.1, fast_execution: bool = True, two_date_formats: bool = True, **kwargs, ) -> pd.DataFrame: """DataFrame cleaning main function Parameters ---------- sample_frac : `float` The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%. fast_execution : `bool` If `False` use `applymap` pandas for extra text cleanup. Default is `True`. Keyword Arguments: ---------- no_emoji : `bool` By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`. title_mode : `bool` By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`. """ table = self.df.clone() cols = table.columns errors = kwargs.get("errors", "ignore") if sample_frac != 1.0: table_sample = table.sample(frac=sample_frac, with_replacement=False) else: table_sample = table.clone() for column_index, datatype in enumerate(table.dtypes): if datatype == pl.String: datetype_column = ( table_sample[cols[column_index]] .map_elements(check_if_contains_dates, return_dtype=pl.Boolean) .any() ) if datetype_column: main_type, auxiliary_type = most_repeated_item( list( filter( lambda item: item is not None, table_sample[cols[column_index]].map_elements( get_date_format, return_dtype=pl.String ), ) ), two_date_formats, ) format_type = auxiliary_type or main_type partial_dates = partial( process_dates, format_type=format_type, auxiliary_type=None, errors=errors, ) vpartial_dates = np.vectorize(partial_dates) _serie = pl.Series( cols[column_index], vpartial_dates(table[cols[column_index]].to_list()) ) table = table.with_columns(_serie) table = table.with_columns( pl.col(cols[column_index]).str.strptime( pl.Datetime, format="%Y-%m-%d", strict=False ) ) else: try: partial_clean = partial(clean) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) except AttributeError as e: msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}" logging.warning(msg) sys.exit("Perform correction manually") if not fast_execution: no_emoji = kwargs.get("no_emoji", False) title_mode = kwargs.get("title_mode", True) partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) table = self._remove_duplicate_columns(table) self.df = table.clone() return self.df.to_pandas() def _correct_type(self, value, datatype): """General type correction function.""" val_type = type(value).__name__ if self.dict_dtypes[val_type] != datatype: try: return {"float": float, "int": int, "str": str}[datatype](value) except ValueError: return np.nan if datatype in ["float", "int"] else "" return value def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame: """Remove duplicate columns based on column name.""" seen = set() unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))] return df.select(unique_cols)
Methods
def clean_frame(self,
sample_frac: float = 0.1,
fast_execution: bool = True,
two_date_formats: bool = True,
**kwargs) ‑> pandas.core.frame.DataFrame-
Expand source code
def clean_frame( self, sample_frac: float = 0.1, fast_execution: bool = True, two_date_formats: bool = True, **kwargs, ) -> pd.DataFrame: """DataFrame cleaning main function Parameters ---------- sample_frac : `float` The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%. fast_execution : `bool` If `False` use `applymap` pandas for extra text cleanup. Default is `True`. Keyword Arguments: ---------- no_emoji : `bool` By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`. title_mode : `bool` By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`. """ table = self.df.clone() cols = table.columns errors = kwargs.get("errors", "ignore") if sample_frac != 1.0: table_sample = table.sample(frac=sample_frac, with_replacement=False) else: table_sample = table.clone() for column_index, datatype in enumerate(table.dtypes): if datatype == pl.String: datetype_column = ( table_sample[cols[column_index]] .map_elements(check_if_contains_dates, return_dtype=pl.Boolean) .any() ) if datetype_column: main_type, auxiliary_type = most_repeated_item( list( filter( lambda item: item is not None, table_sample[cols[column_index]].map_elements( get_date_format, return_dtype=pl.String ), ) ), two_date_formats, ) format_type = auxiliary_type or main_type partial_dates = partial( process_dates, format_type=format_type, auxiliary_type=None, errors=errors, ) vpartial_dates = np.vectorize(partial_dates) _serie = pl.Series( cols[column_index], vpartial_dates(table[cols[column_index]].to_list()) ) table = table.with_columns(_serie) table = table.with_columns( pl.col(cols[column_index]).str.strptime( pl.Datetime, format="%Y-%m-%d", strict=False ) ) else: try: partial_clean = partial(clean) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) except AttributeError as e: msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}" logging.warning(msg) sys.exit("Perform correction manually") if not fast_execution: no_emoji = kwargs.get("no_emoji", False) title_mode = kwargs.get("title_mode", True) partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode) vpartial_clean = np.vectorize(partial_clean) table = table.with_columns( pl.Series( cols[column_index], vpartial_clean(table[cols[column_index]].to_list()), ) ) table = self._remove_duplicate_columns(table) self.df = table.clone() return self.df.to_pandas()
DataFrame cleaning main function
Parameters
sample_frac
:float
- The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
fast_execution
:bool
- If
False
useapplymap
pandas for extra text cleanup. Default isTrue
.
Keyword Arguments:
no_emoji :
bool
By default it is set toFalse
. IfTrue
, removes all emojis from text data. Works only whenfast_execution
=False
. title_mode :bool
By default it is set toTrue
. IfFalse
, converts the text to lowercase. Works only whenfast_execution
=False
. By default, converts everything totitle
.