Module pydbsmgr.fast_upload

Classes

class DataFrameToSQL (connection_string: str)

Allows creation of a table from a DataFrame and uploading data to the database

Expand source code
class DataFrameToSQL(ColumnsCheck):
    """Allows creation of a table from a DataFrame and uploading data to the database"""

    def __init__(self, connection_string: str) -> None:
        self._connection_string = connection_string
        self._con = pyodbc.connect(self._connection_string, autocommit=True)
        self._cur = self._con.cursor()

    def import_table(
        self,
        df: DataFrame,
        table_name: str,
        overwrite: bool = True,
        char_length: int = 512,
        override_length: bool = True,
        close_connection: bool = True,
        verbose: bool = False,
    ) -> None:
        """Imports a DataFrame into the database as a new table"""

        df = self._preprocess_dataframe(df)

        if self._con.closed:
            self._reconnect()

        try:
            query = self._create_table_query(table_name, df, char_length, override_length)
            self._cur.execute(query)
        except pyodbc.Error as e:
            if overwrite:
                self._drop_and_recreate_table(table_name, query)
            else:
                print(f"UserWarning: Could not create table {table_name}. Error: {e}")

        query = self._insert_table_query(table_name, df)
        self._cur.fast_executemany = True
        self._cur.executemany(query, self._prepare_data_for_insertion(df))

        if close_connection:
            self._con.close()

        if verbose:
            print(f"Table {table_name} successfully imported!")

    def upload_table(
        self, df: DataFrame, table_name: str, close_connection: bool = True, verbose: bool = False
    ) -> None:
        """Updates data in an existing table from a DataFrame"""

        df = self._preprocess_dataframe(df)

        if self._con.closed:
            self._reconnect()

        try:
            query = self._insert_table_query(table_name, df)
            self._cur.fast_executemany = True
            self._cur.executemany(query, self._prepare_data_for_insertion(df))
        except pyodbc.Error as e:
            print(f"UserWarning: Could not upload data to table {table_name}. Error: {e}")

        if close_connection:
            self._con.close()

        if verbose:
            print(f"Data successfully uploaded to table {table_name}!")

    def _preprocess_dataframe(self, df: DataFrame) -> DataFrame:
        super().__init__(df)
        return self.get_frame().replace([" ", "<NA>", np.datetime64("NaT")], None)

    def _reconnect(self):
        self._con = pyodbc.connect(self._connection_string, autocommit=True)
        self._cur = self._con.cursor()

    def _drop_and_recreate_table(self, table_name: str, query: str) -> None:
        try:
            self._cur.execute(f"DROP TABLE {table_name}")
            self._cur.execute(query)
        except pyodbc.Error as e:
            print(f"UserWarning: Could not recreate table {table_name}. Error: {e}")

    def _create_table_query(
        self, table_name: str, df: DataFrame, char_length: int, override_length: bool
    ) -> str:
        columns = ", ".join(
            f"{col} {self._infer_schema(col, df, char_length, override_length)}"
            for col in df.columns
        )
        return f"CREATE TABLE {table_name} ({columns})"

    def _insert_table_query(self, table_name: str, df: DataFrame) -> str:
        columns = ", ".join(df.columns)
        placeholders = ", ".join("?" * len(df.columns))
        return f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    def _infer_schema(
        self, column: str, df: DataFrame, char_length: int, override_length: bool
    ) -> str:
        dtype = str(df[column].dtype).lower()
        if "float" in dtype:
            return "FLOAT"
        elif "int" in dtype:
            return "BIGINT" if "64" in dtype else "INT"
        elif "datetime" in dtype:
            return "DATE"
        elif "object" in dtype or "category" in dtype:
            max_length = df[column].astype(str).str.len().max()
            length = char_length if override_length or max_length == 0 else max_length
            return f"VARCHAR({length})"
        elif "bool" in dtype:
            return "BIT"
        raise ValueError(f"Data type of column {column} could not be inferred: {dtype}")

    def _prepare_data_for_insertion(self, df: DataFrame) -> list:
        return [
            [None if (isinstance(value, float) and np.isnan(value)) else value for value in row]
            for row in df.values.tolist()
        ]

Ancestors

Subclasses

Methods

def import_table(self,
df: pandas.core.frame.DataFrame,
table_name: str,
overwrite: bool = True,
char_length: int = 512,
override_length: bool = True,
close_connection: bool = True,
verbose: bool = False) ‑> None

Imports a DataFrame into the database as a new table

def upload_table(self,
df: pandas.core.frame.DataFrame,
table_name: str,
close_connection: bool = True,
verbose: bool = False) ‑> None

Updates data in an existing table from a DataFrame

class UploadToSQL (connection_string: str)

Efficiently imports/updates a table from a DataFrame using the DataFrameToSQL class.

Establishes the connection to the database.

Expand source code
class UploadToSQL(DataFrameToSQL):
    """Efficiently imports/updates a table from a `DataFrame` using the `DataFrameToSQL` class."""

    def __init__(self, connection_string: str) -> None:
        """Establishes the connection to the database."""
        super().__init__(connection_string)
        self._verbose = True

    def execute(
        self,
        df: DataFrame,
        table_name: str,
        chunk_size: int,
        method: str = "override",  # or append
        char_length: int = 512,
        override_length: bool = True,
        close_connection: bool = True,
        auto_resolve: bool = True,
        frac: float = 0.01,
        verbose: bool = False,
    ) -> None:
        """Executes the import/update operation based on the specified method."""
        if len(df) <= chunk_size:
            raise ValueError(
                "'chunk_size' cannot be greater than or equal to the length of the 'DataFrame'. Change the 'chunk_size'."
            )

        # Get chunks of DataFrame
        if auto_resolve and len(df) >= 0.5e6:
            n = int(len(df) * frac)
            df_chunks = [df[i : i + n] for i in range(0, len(df), n)]
        else:
            df_chunks = np.array_split(df, chunk_size)

        if method == "override":
            if self._check_table_exists(table_name):
                print("Table exists, executing OVERRIDE...")
                self._drop_table(table_name)
                # Create table with the first chunk
                self.import_table(
                    df=df_chunks[0],
                    table_name=table_name,
                    overwrite=True,
                    char_length=char_length,
                    override_length=override_length,
                    close_connection=False,
                    verbose=verbose,
                )
            else:
                print("Table does not exist, proceeding with CREATE TABLE.")
                # Create table with the first chunk
                self.import_table(
                    df=df_chunks[0],
                    table_name=table_name,
                    overwrite=True,
                    char_length=char_length,
                    override_length=override_length,
                    close_connection=False,
                    verbose=verbose,
                )

            # Insert the rest of the chunks
            for i in range(1, len(df_chunks)):
                self.upload_table(df_chunks[i], table_name, close_connection=False)

        elif method == "append":
            if self._check_table_exists(table_name):
                for data in df_chunks:
                    self.upload_table(data, table_name, close_connection=False)
            else:
                raise ValueError("Method 'append' requires an existing table.")
        else:
            raise ValueError(
                'Invalid value for argument "method". Choose from ["override", "append"].'
            )

        if close_connection:
            self._con.close()

    def _drop_table(self, table_name: str) -> None:
        query = f"DROP TABLE IF EXISTS {table_name}"
        if self._con.closed:
            self._reconnect()
        try:
            self._cur.execute(query)
            if self.verbose:
                print(f"Table '{table_name}' dropped successfully.")
        except Exception as e:
            print(f"Failed to drop table '{table_name}'. Error message:\n{str(e)}")

    def _check_table_exists(self, table_name: str) -> bool:
        if self._con.closed:
            self._reconnect()
        query = f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME='{table_name}'"
        try:
            self._cur.execute(query)
            result = self._cur.fetchone()
            self._con.close()
            return bool(result[0])
        except Exception as e:
            print("Error checking if table exists.")
            raise ValueError(f"Query Error: {str(e)}")

    def _execute_query(self, query: str):
        try:
            self._cur.execute(query)
            results = {"columns": [desc[0] for desc in self._cur.description]}
            results["data"] = self._cur.fetchall()
            results["count"] = len(results["data"])
            return results
        except Exception as e:
            print("Error executing SQL Query")
            raise ValueError(f"Query Error: {str(e)}")

    @property
    def verbose(self) -> bool:
        return self._verbose

    @verbose.setter
    def verbose(self, value: bool):
        self._verbose = value

Ancestors

Instance variables

prop verbose : bool
Expand source code
@property
def verbose(self) -> bool:
    return self._verbose

Methods

def execute(self,
df: pandas.core.frame.DataFrame,
table_name: str,
chunk_size: int,
method: str = 'override',
char_length: int = 512,
override_length: bool = True,
close_connection: bool = True,
auto_resolve: bool = True,
frac: float = 0.01,
verbose: bool = False) ‑> None

Executes the import/update operation based on the specified method.

Inherited members