Module pydbsmgr.lightest

Functions

def process_dates(x: str, format_type: str, auxiliary_type: str = None, errors: str = 'ignore') ‑> str
Expand source code
def process_dates(
    x: str, format_type: str, auxiliary_type: str = None, errors: str = "ignore"
) -> str:
    """Auxiliary function in date type string processing."""
    x = str(x)
    if format_type in ["dayfirst", "monthfirst"] and len(x) < 10:
        separator = "/" if "/" in x else "-"
        parts = x.split(separator)
        if format_type == "dayfirst":
            day, month, year = parts[0], parts[1], parts[-1]
        elif format_type == "monthfirst":
            month, day, year = parts[0], parts[1], parts[-1]

        day = f"{int(day):02d}"
        month = f"{int(month):02d}"
        try:
            date = pd.to_datetime(f"{year}{month}{day}", format="%Y%m%d", errors="coerce")
        except ValueError:
            if auxiliary_type:
                date = pd.to_datetime(x, format=auxiliary_type, errors="coerce")
            elif errors == "raise":
                raise ValueError("Date value does not match the expected format.")
    else:
        x = x.replace("/", "").replace("-", "")

        if len(x) == 8:
            try:
                date = pd.to_datetime(x, format=format_type, errors="coerce")
            except ValueError:
                if auxiliary_type:
                    date = pd.to_datetime(x, format=auxiliary_type, errors="coerce")
                elif errors == "raise":
                    raise ValueError("Date value does not match the expected format.")
        else:
            try:
                date = pd.to_datetime(x[:8], format=format_type, errors="coerce")
            except ValueError:
                if auxiliary_type:
                    date = pd.to_datetime(x[:8], format=auxiliary_type, errors="coerce")
                elif errors == "raise":
                    raise ValueError("Date value does not match the expected format.")

    if not pd.isnull(date):
        return date.strftime("%Y-%m-%d")
    else:
        return x  # Return original string if no valid date is found

Auxiliary function in date type string processing.

Classes

class LightCleaner (df_: pandas.core.frame.DataFrame)
Expand source code
class LightCleaner:
    """Performs a light cleaning on the table."""

    __slots__ = ["df", "dict_dtypes"]

    def __init__(self, df_: pd.DataFrame):
        self.df = pl.from_pandas(df_)
        self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String}

    def clean_frame(
        self,
        sample_frac: float = 0.1,
        fast_execution: bool = True,
        two_date_formats: bool = True,
        **kwargs,
    ) -> pd.DataFrame:
        """DataFrame cleaning main function

        Parameters
        ----------
        sample_frac : `float`
            The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
        fast_execution : `bool`
            If `False` use `applymap` pandas for extra text cleanup. Default is `True`.

        Keyword Arguments:
        ----------
        no_emoji : `bool`
            By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`.
        title_mode : `bool`
            By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`.
        """
        table = self.df.clone()
        cols = table.columns
        errors = kwargs.get("errors", "ignore")

        if sample_frac != 1.0:
            table_sample = table.sample(frac=sample_frac, with_replacement=False)
        else:
            table_sample = table.clone()

        for column_index, datatype in enumerate(table.dtypes):
            if datatype == pl.String:
                datetype_column = (
                    table_sample[cols[column_index]]
                    .map_elements(check_if_contains_dates, return_dtype=pl.Boolean)
                    .any()
                )

                if datetype_column:
                    main_type, auxiliary_type = most_repeated_item(
                        list(
                            filter(
                                lambda item: item is not None,
                                table_sample[cols[column_index]].map_elements(
                                    get_date_format, return_dtype=pl.String
                                ),
                            )
                        ),
                        two_date_formats,
                    )

                    format_type = auxiliary_type or main_type

                    partial_dates = partial(
                        process_dates,
                        format_type=format_type,
                        auxiliary_type=None,
                        errors=errors,
                    )
                    vpartial_dates = np.vectorize(partial_dates)

                    _serie = pl.Series(
                        cols[column_index], vpartial_dates(table[cols[column_index]].to_list())
                    )
                    table = table.with_columns(_serie)
                    table = table.with_columns(
                        pl.col(cols[column_index]).str.strptime(
                            pl.Datetime, format="%Y-%m-%d", strict=False
                        )
                    )

                else:
                    try:
                        partial_clean = partial(clean)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )
                    except AttributeError as e:
                        msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}"
                        logging.warning(msg)
                        sys.exit("Perform correction manually")

                    if not fast_execution:
                        no_emoji = kwargs.get("no_emoji", False)
                        title_mode = kwargs.get("title_mode", True)

                        partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )

        table = self._remove_duplicate_columns(table)
        self.df = table.clone()
        return self.df.to_pandas()

    def _correct_type(self, value, datatype):
        """General type correction function."""
        val_type = type(value).__name__
        if self.dict_dtypes[val_type] != datatype:
            try:
                return {"float": float, "int": int, "str": str}[datatype](value)
            except ValueError:
                return np.nan if datatype in ["float", "int"] else ""
        return value

    def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame:
        """Remove duplicate columns based on column name."""
        seen = set()
        unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))]
        return df.select(unique_cols)

Performs a light cleaning on the table.

Instance variables

var df
Expand source code
class LightCleaner:
    """Performs a light cleaning on the table."""

    __slots__ = ["df", "dict_dtypes"]

    def __init__(self, df_: pd.DataFrame):
        self.df = pl.from_pandas(df_)
        self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String}

    def clean_frame(
        self,
        sample_frac: float = 0.1,
        fast_execution: bool = True,
        two_date_formats: bool = True,
        **kwargs,
    ) -> pd.DataFrame:
        """DataFrame cleaning main function

        Parameters
        ----------
        sample_frac : `float`
            The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
        fast_execution : `bool`
            If `False` use `applymap` pandas for extra text cleanup. Default is `True`.

        Keyword Arguments:
        ----------
        no_emoji : `bool`
            By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`.
        title_mode : `bool`
            By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`.
        """
        table = self.df.clone()
        cols = table.columns
        errors = kwargs.get("errors", "ignore")

        if sample_frac != 1.0:
            table_sample = table.sample(frac=sample_frac, with_replacement=False)
        else:
            table_sample = table.clone()

        for column_index, datatype in enumerate(table.dtypes):
            if datatype == pl.String:
                datetype_column = (
                    table_sample[cols[column_index]]
                    .map_elements(check_if_contains_dates, return_dtype=pl.Boolean)
                    .any()
                )

                if datetype_column:
                    main_type, auxiliary_type = most_repeated_item(
                        list(
                            filter(
                                lambda item: item is not None,
                                table_sample[cols[column_index]].map_elements(
                                    get_date_format, return_dtype=pl.String
                                ),
                            )
                        ),
                        two_date_formats,
                    )

                    format_type = auxiliary_type or main_type

                    partial_dates = partial(
                        process_dates,
                        format_type=format_type,
                        auxiliary_type=None,
                        errors=errors,
                    )
                    vpartial_dates = np.vectorize(partial_dates)

                    _serie = pl.Series(
                        cols[column_index], vpartial_dates(table[cols[column_index]].to_list())
                    )
                    table = table.with_columns(_serie)
                    table = table.with_columns(
                        pl.col(cols[column_index]).str.strptime(
                            pl.Datetime, format="%Y-%m-%d", strict=False
                        )
                    )

                else:
                    try:
                        partial_clean = partial(clean)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )
                    except AttributeError as e:
                        msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}"
                        logging.warning(msg)
                        sys.exit("Perform correction manually")

                    if not fast_execution:
                        no_emoji = kwargs.get("no_emoji", False)
                        title_mode = kwargs.get("title_mode", True)

                        partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )

        table = self._remove_duplicate_columns(table)
        self.df = table.clone()
        return self.df.to_pandas()

    def _correct_type(self, value, datatype):
        """General type correction function."""
        val_type = type(value).__name__
        if self.dict_dtypes[val_type] != datatype:
            try:
                return {"float": float, "int": int, "str": str}[datatype](value)
            except ValueError:
                return np.nan if datatype in ["float", "int"] else ""
        return value

    def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame:
        """Remove duplicate columns based on column name."""
        seen = set()
        unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))]
        return df.select(unique_cols)
var dict_dtypes
Expand source code
class LightCleaner:
    """Performs a light cleaning on the table."""

    __slots__ = ["df", "dict_dtypes"]

    def __init__(self, df_: pd.DataFrame):
        self.df = pl.from_pandas(df_)
        self.dict_dtypes = {"float": pl.Float64, "int": pl.Int64, "str": pl.String}

    def clean_frame(
        self,
        sample_frac: float = 0.1,
        fast_execution: bool = True,
        two_date_formats: bool = True,
        **kwargs,
    ) -> pd.DataFrame:
        """DataFrame cleaning main function

        Parameters
        ----------
        sample_frac : `float`
            The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
        fast_execution : `bool`
            If `False` use `applymap` pandas for extra text cleanup. Default is `True`.

        Keyword Arguments:
        ----------
        no_emoji : `bool`
            By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`.
        title_mode : `bool`
            By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`.
        """
        table = self.df.clone()
        cols = table.columns
        errors = kwargs.get("errors", "ignore")

        if sample_frac != 1.0:
            table_sample = table.sample(frac=sample_frac, with_replacement=False)
        else:
            table_sample = table.clone()

        for column_index, datatype in enumerate(table.dtypes):
            if datatype == pl.String:
                datetype_column = (
                    table_sample[cols[column_index]]
                    .map_elements(check_if_contains_dates, return_dtype=pl.Boolean)
                    .any()
                )

                if datetype_column:
                    main_type, auxiliary_type = most_repeated_item(
                        list(
                            filter(
                                lambda item: item is not None,
                                table_sample[cols[column_index]].map_elements(
                                    get_date_format, return_dtype=pl.String
                                ),
                            )
                        ),
                        two_date_formats,
                    )

                    format_type = auxiliary_type or main_type

                    partial_dates = partial(
                        process_dates,
                        format_type=format_type,
                        auxiliary_type=None,
                        errors=errors,
                    )
                    vpartial_dates = np.vectorize(partial_dates)

                    _serie = pl.Series(
                        cols[column_index], vpartial_dates(table[cols[column_index]].to_list())
                    )
                    table = table.with_columns(_serie)
                    table = table.with_columns(
                        pl.col(cols[column_index]).str.strptime(
                            pl.Datetime, format="%Y-%m-%d", strict=False
                        )
                    )

                else:
                    try:
                        partial_clean = partial(clean)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )
                    except AttributeError as e:
                        msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}"
                        logging.warning(msg)
                        sys.exit("Perform correction manually")

                    if not fast_execution:
                        no_emoji = kwargs.get("no_emoji", False)
                        title_mode = kwargs.get("title_mode", True)

                        partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode)
                        vpartial_clean = np.vectorize(partial_clean)

                        table = table.with_columns(
                            pl.Series(
                                cols[column_index],
                                vpartial_clean(table[cols[column_index]].to_list()),
                            )
                        )

        table = self._remove_duplicate_columns(table)
        self.df = table.clone()
        return self.df.to_pandas()

    def _correct_type(self, value, datatype):
        """General type correction function."""
        val_type = type(value).__name__
        if self.dict_dtypes[val_type] != datatype:
            try:
                return {"float": float, "int": int, "str": str}[datatype](value)
            except ValueError:
                return np.nan if datatype in ["float", "int"] else ""
        return value

    def _remove_duplicate_columns(self, df: pl.DataFrame) -> pl.DataFrame:
        """Remove duplicate columns based on column name."""
        seen = set()
        unique_cols = [col for col in df.columns if not (col in seen or seen.add(col))]
        return df.select(unique_cols)

Methods

def clean_frame(self,
sample_frac: float = 0.1,
fast_execution: bool = True,
two_date_formats: bool = True,
**kwargs) ‑> pandas.core.frame.DataFrame
Expand source code
def clean_frame(
    self,
    sample_frac: float = 0.1,
    fast_execution: bool = True,
    two_date_formats: bool = True,
    **kwargs,
) -> pd.DataFrame:
    """DataFrame cleaning main function

    Parameters
    ----------
    sample_frac : `float`
        The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
    fast_execution : `bool`
        If `False` use `applymap` pandas for extra text cleanup. Default is `True`.

    Keyword Arguments:
    ----------
    no_emoji : `bool`
        By default it is set to `False`. If `True`, removes all emojis from text data. Works only when `fast_execution` = `False`.
    title_mode : `bool`
        By default it is set to `True`. If `False`, converts the text to lowercase. Works only when `fast_execution` = `False`. By default, converts everything to `title`.
    """
    table = self.df.clone()
    cols = table.columns
    errors = kwargs.get("errors", "ignore")

    if sample_frac != 1.0:
        table_sample = table.sample(frac=sample_frac, with_replacement=False)
    else:
        table_sample = table.clone()

    for column_index, datatype in enumerate(table.dtypes):
        if datatype == pl.String:
            datetype_column = (
                table_sample[cols[column_index]]
                .map_elements(check_if_contains_dates, return_dtype=pl.Boolean)
                .any()
            )

            if datetype_column:
                main_type, auxiliary_type = most_repeated_item(
                    list(
                        filter(
                            lambda item: item is not None,
                            table_sample[cols[column_index]].map_elements(
                                get_date_format, return_dtype=pl.String
                            ),
                        )
                    ),
                    two_date_formats,
                )

                format_type = auxiliary_type or main_type

                partial_dates = partial(
                    process_dates,
                    format_type=format_type,
                    auxiliary_type=None,
                    errors=errors,
                )
                vpartial_dates = np.vectorize(partial_dates)

                _serie = pl.Series(
                    cols[column_index], vpartial_dates(table[cols[column_index]].to_list())
                )
                table = table.with_columns(_serie)
                table = table.with_columns(
                    pl.col(cols[column_index]).str.strptime(
                        pl.Datetime, format="%Y-%m-%d", strict=False
                    )
                )

            else:
                try:
                    partial_clean = partial(clean)
                    vpartial_clean = np.vectorize(partial_clean)

                    table = table.with_columns(
                        pl.Series(
                            cols[column_index],
                            vpartial_clean(table[cols[column_index]].to_list()),
                        )
                    )
                except AttributeError as e:
                    msg = f"It was not possible to perform the cleaning, the column {cols[column_index]} is duplicated. Error: {e}"
                    logging.warning(msg)
                    sys.exit("Perform correction manually")

                if not fast_execution:
                    no_emoji = kwargs.get("no_emoji", False)
                    title_mode = kwargs.get("title_mode", True)

                    partial_clean = partial(clean, no_emoji=no_emoji, title_mode=title_mode)
                    vpartial_clean = np.vectorize(partial_clean)

                    table = table.with_columns(
                        pl.Series(
                            cols[column_index],
                            vpartial_clean(table[cols[column_index]].to_list()),
                        )
                    )

    table = self._remove_duplicate_columns(table)
    self.df = table.clone()
    return self.df.to_pandas()

DataFrame cleaning main function

Parameters

sample_frac : float
The fraction of rows to use for date type inference. Default is 0.1 i.e., 10%.
fast_execution : bool
If False use applymap pandas for extra text cleanup. Default is True.

Keyword Arguments:

no_emoji : bool By default it is set to False. If True, removes all emojis from text data. Works only when fast_execution = False. title_mode : bool By default it is set to True. If False, converts the text to lowercase. Works only when fast_execution = False. By default, converts everything to title.