Module pydbsmgr.fast_upload

Classes

class DataFrameToSQL (connection_string: str, max_pool_size: int = 5)
Expand source code
class DataFrameToSQL(ColumnsCheck):
    """Optimized class for creating tables from DataFrames and uploading data to SQL databases.

    This class provides efficient methods for importing DataFrames into SQL tables with
    automatic schema inference, connection pooling, and robust error handling.

    Parameters
    ----------
    connection_string : `str`
        Database connection string
    max_pool_size : `int`, `optional`
        Maximum number of connections in the pool. Defaults to `5`.
    """

    def __init__(self, connection_string: str, max_pool_size: int = 5) -> None:
        """Initialize the DataFrameToSQL instance.

        Parameters
        ----------
        connection_string : `str`
            Database connection string
        max_pool_size : `int`, `optional`
            Maximum number of connections in the pool. Defaults to `5`.
        """
        self._connection_string = connection_string
        self._max_pool_size = max_pool_size
        self._connection_pool = []
        self._transaction_batch_size = 1000

    @contextmanager
    def _get_connection(self):
        """Context manager for database connections with pooling.

        Yields
        ------
        `pyodbc.Connection`
            Database connection object for use within context
        """
        conn = None
        try:
            if self._connection_pool:
                conn = self._connection_pool.pop()
                if conn.closed:
                    conn = pyodbc.connect(self._connection_string, autocommit=False)
            else:
                conn = pyodbc.connect(self._connection_string, autocommit=False)

            yield conn

            if not conn.closed and len(self._connection_pool) < self._max_pool_size:
                self._connection_pool.append(conn)
            elif conn and not conn.closed:
                conn.close()

        except Exception as e:
            if conn and not conn.closed:
                conn.rollback()
                conn.close()
            logger.error(f"Connection error: {e}")
            raise
        finally:
            if conn and not conn.closed and conn not in self._connection_pool:
                conn.close()

    def import_table(
        self,
        df: DataFrame,
        table_name: str,
        overwrite: bool = True,
        char_length: int = 512,
        override_length: bool = True,
        batch_size: int = 1000,
        use_transactions: bool = True,
        verbose: bool = False,
    ) -> Dict[str, Any]:
        """Import a DataFrame into the database as a new table with optimized performance.

        Parameters
        ----------
        df : `DataFrame`
            DataFrame to import
        table_name : `str`
            Name of the target table
        overwrite : `bool`, `optional`
            Whether to overwrite existing table. Defaults to `True`.
        char_length : `int`, `optional`
            Default length for VARCHAR columns. Defaults to `512`.
        override_length : `bool`, `optional`
            Whether to override column lengths. Defaults to `True`.
        batch_size : `int`, `optional`
            Number of rows to insert per batch. Defaults to `1000`.
        use_transactions : `bool`, `optional`
            Whether to use transactions for data integrity. Defaults to `True`.
        verbose : `bool`, `optional`
            Whether to print progress information. Defaults to `False`.

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with operation statistics
        """
        start_time = datetime.now()
        stats = {
            "table_name": table_name,
            "rows_processed": 0,
            "rows_inserted": 0,
            "execution_time": 0,
            "errors": [],
            "warnings": [],
        }

        try:
            df = self._preprocess_dataframe(df)
            stats["rows_processed"] = len(df)

            with self._get_connection() as conn:
                cursor = conn.cursor()

                try:
                    if use_transactions:
                        conn.autocommit = False

                    if self._table_exists(cursor, table_name):
                        if overwrite:
                            if verbose:
                                logger.info(f"Dropping existing table {table_name}")
                            cursor.execute(f"DROP TABLE {self._sanitize_identifier(table_name)}")
                        else:
                            raise ValueError(
                                f"Table {table_name} already exists and overwrite=False"
                            )

                    create_query = self._create_table_query(
                        table_name, df, char_length, override_length
                    )
                    if verbose:
                        logger.info(f"Creating table {table_name}")
                    cursor.execute(create_query)

                    if use_transactions:
                        conn.commit()

                except Exception as e:
                    if use_transactions:
                        conn.rollback()
                    stats["errors"].append(str(e))
                    raise

                if len(df) > 0:
                    insert_stats = self._batch_insert_data(
                        conn, cursor, table_name, df, batch_size, use_transactions, verbose
                    )
                    stats.update(insert_stats)

                if verbose:
                    execution_time = (datetime.now() - start_time).total_seconds()
                    logger.info(
                        f"Table {table_name} successfully imported in {execution_time:.2f} seconds"
                    )

        except Exception as e:
            logger.error(f"Failed to import table {table_name}: {e}")
            stats["errors"].append(str(e))
            raise
        finally:
            stats["execution_time"] = (datetime.now() - start_time).total_seconds()

        return stats

    def upload_table(
        self,
        df: DataFrame,
        table_name: str,
        batch_size: int = 1000,
        use_transactions: bool = True,
        verbose: bool = False,
    ) -> Dict[str, Any]:
        """Update data in an existing table from a DataFrame with optimized performance.

        Parameters
        ----------
        df : `DataFrame`
            DataFrame to upload
        table_name : `str`
            Name of the target table
        batch_size : `int`, `optional`
            Number of rows to insert per batch. Defaults to `1000`.
        use_transactions : `bool`, `optional`
            Whether to use transactions for data integrity. Defaults to `True`.
        verbose : `bool`, `optional`
            Whether to print progress information. Defaults to `False`.

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with operation statistics
        """
        start_time = datetime.now()
        stats = {
            "table_name": table_name,
            "rows_processed": 0,
            "rows_inserted": 0,
            "execution_time": 0,
            "errors": [],
        }

        try:
            df = self._preprocess_dataframe(df)
            stats["rows_processed"] = len(df)

            with self._get_connection() as conn:
                cursor = conn.cursor()
                if not self._table_exists(cursor, table_name):
                    raise ValueError(f"Table {table_name} does not exist")

                if len(df) > 0:
                    insert_stats = self._batch_insert_data(
                        conn, cursor, table_name, df, batch_size, use_transactions, verbose
                    )
                    stats.update(insert_stats)

                if verbose:
                    execution_time = (datetime.now() - start_time).total_seconds()
                    logger.info(
                        f"Data successfully uploaded to table {table_name} in {execution_time:.2f} seconds"
                    )

        except Exception as e:
            logger.error(f"Failed to upload data to table {table_name}: {e}")
            stats["errors"].append(str(e))
            raise
        finally:
            stats["execution_time"] = (datetime.now() - start_time).total_seconds()

        return stats

    def _preprocess_dataframe(self, df: DataFrame) -> DataFrame:
        """Clean and preprocess a DataFrame for database upload.

        Parameters
        ----------
        df : `DataFrame`
            Input DataFrame to clean

        Returns
        -------
        `DataFrame`
            Cleaned DataFrame with null values handled
        """
        df_cleaned = df.copy()

        df_cleaned.replace(
            ["", " ", "<NA>", "NULL", "nan"],
            np.nan,
            inplace=True,
        )

        df_cleaned.replace([np.inf, -np.inf], np.nan, inplace=True)

        df_cleaned = df_cleaned.where(pd.notna(df_cleaned), None)

        return df_cleaned

    def _batch_insert_data(
        self,
        conn: pyodbc.Connection,
        cursor: pyodbc.Cursor,
        table_name: str,
        df: DataFrame,
        batch_size: int,
        use_transactions: bool,
        verbose: bool,
    ) -> Dict[str, Any]:
        """Insert DataFrame data in optimized batches.

        Parameters
        ----------
        conn : `pyodbc.Connection`
            Database connection object
        cursor : `pyodbc.Cursor`
            Database cursor for executing queries
        table_name : `str`
            Name of the target table
        df : `DataFrame`
            DataFrame containing data to insert
        batch_size : `int`
            Number of rows to insert per batch
        use_transactions : `bool`
            Whether to use transactions for data integrity
        verbose : `bool`
            Whether to print progress information

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with insertion statistics (rows_inserted, batches_processed)
        """
        stats = {"rows_inserted": 0, "batches_processed": 0}

        try:
            insert_query = self._insert_table_query(table_name, df)
            cursor.fast_executemany = True

            total_rows = len(df)
            for start_idx in range(0, total_rows, batch_size):
                end_idx = min(start_idx + batch_size, total_rows)
                batch_df = df.iloc[start_idx:end_idx]

                if use_transactions and start_idx % self._transaction_batch_size == 0:
                    conn.autocommit = False

                try:
                    batch_data = self._prepare_data_for_insertion(batch_df)

                    cursor.executemany(insert_query, batch_data)
                    stats["rows_inserted"] += len(batch_data)
                    stats["batches_processed"] += 1

                    if (
                        use_transactions
                        and (start_idx + batch_size) % self._transaction_batch_size == 0
                    ):
                        conn.commit()

                    if verbose:
                        progress = (end_idx / total_rows) * 100
                        logger.info(f"Progress: {progress:.1f}% ({end_idx}/{total_rows} rows)")

                except Exception as e:
                    if use_transactions:
                        conn.rollback()
                    logger.error(f"Batch insert failed at row {start_idx}: {e}")
                    raise

            if use_transactions:
                conn.commit()

        except Exception as e:
            logger.error(f"Batch insert failed: {e}")
            raise

        return stats

    def _table_exists(self, cursor: pyodbc.Cursor, table_name: str) -> bool:
        """Check if a table exists in the database.

        Parameters
        ----------
        cursor : `pyodbc.Cursor`
            Database cursor for executing queries
        table_name : `str`
            Name of the table to check

        Returns
        -------
        `bool`
            True if table exists, False otherwise
        """
        try:
            query = """
                SELECT COUNT(*) 
                FROM INFORMATION_SCHEMA.TABLES 
                WHERE TABLE_NAME = ?
            """
            cursor.execute(query, (table_name,))
            result = cursor.fetchone()
            return bool(result[0])
        except Exception as e:
            logger.error(f"Error checking if table {table_name} exists: {e}")
            return False

    def _sanitize_identifier(self, identifier: str) -> str:
        """Sanitize SQL identifiers to prevent injection attacks.

        Parameters
        ----------
        identifier : `str`
            SQL identifier (table name, column name) to sanitize

        Returns
        -------
        `str`
            Sanitized identifier safe for SQL queries
        """
        sanitized = identifier.replace("'", "").replace('"', "").replace(";", "").replace("--", "")
        if not sanitized[0].isalpha() and sanitized[0] != "_":
            sanitized = "_" + sanitized
        return sanitized.replace(" ", "")

    def _create_table_query(
        self, table_name: str, df: DataFrame, char_length: int, override_length: bool
    ) -> str:
        """Generate CREATE TABLE query with optimized data type inference.

        Parameters
        ----------
        table_name : `str`
            Name of the table to create
        df : `DataFrame`
            DataFrame containing column definitions
        char_length : `int`
            Default length for VARCHAR columns
        override_length : `bool`
            Whether to override column lengths

        Returns
        -------
        `str`
            CREATE TABLE SQL query string
        """
        columns = []

        for col in df.columns:
            col_name = self._sanitize_identifier(col)
            data_type = self._infer_schema(col, df, char_length, override_length)
            columns.append(f"{col_name} {data_type}")

        return f"CREATE TABLE {self._sanitize_identifier(table_name)} ({', '.join(columns)})"

    def _insert_table_query(self, table_name: str, df: DataFrame) -> str:
        """Generate INSERT INTO query with proper parameterization.

        Parameters
        ----------
        table_name : `str`
            Name of the target table
        df : `DataFrame`
            DataFrame containing column definitions

        Returns
        -------
        `str`
            INSERT INTO SQL query string with parameterized values
        """
        columns = [self._sanitize_identifier(col) for col in df.columns]
        placeholders = ", ".join(["?" for _ in columns])
        return f"INSERT INTO {self._sanitize_identifier(table_name)} ({', '.join(columns)}) VALUES ({placeholders})"

    def _infer_schema(
        self, column: str, df: DataFrame, char_length: int, override_length: bool
    ) -> str:
        """Enhanced data type inference with better SQL type mapping.

        Parameters
        ----------
        column : `str`
            Name of the column to infer schema for
        df : `DataFrame`
            DataFrame containing the column data
        char_length : `int`
            Default length for VARCHAR columns
        override_length : `bool`
            Whether to override column lengths

        Returns
        -------
        `str`
            SQL data type string (e.g., VARCHAR, INT, FLOAT, DATETIME2)
        """
        dtype = str(df[column].dtype).lower()

        try:
            if df[column].isnull().all():
                return f"VARCHAR({char_length})"

            non_null_values = df[column].dropna()
            if len(non_null_values) == 0:
                return f"VARCHAR({char_length})"

            if "float" in dtype:
                if non_null_values.apply(lambda x: float(x).is_integer()).all():
                    return "BIGINT"
                return "FLOAT"
            elif "int" in dtype:
                if (
                    "64" in dtype
                    or non_null_values.max() > 2147483647
                    or non_null_values.min() < -2147483648
                ):
                    return "BIGINT"
                return "INT"
            elif "datetime" in dtype:
                return "DATETIME2"
            elif "object" in dtype or "category" in dtype:
                max_length = non_null_values.astype(str).str.len().max()
                length = (
                    char_length if override_length or max_length == 0 else int(max_length * 1.2)
                )  # Add 20% buffer
                # Cap at reasonable maximum
                length = min(length, 4000)
                return f"VARCHAR({length})"
            elif "bool" in dtype:
                return "BIT"
            else:
                return f"VARCHAR({char_length})"

        except Exception as e:
            logger.warning(f"Could not infer schema for column {column}: {e}")
            return f"VARCHAR({char_length})"

    def _prepare_data_for_insertion(self, df: DataFrame) -> List[List[Any]]:
        """Prepare DataFrame data for SQL insertion with enhanced type handling.

        Parameters
        ----------
        df : `DataFrame`
            DataFrame containing data to prepare for insertion

        Returns
        -------
        `List[List[Any]]`
            List of lists with processed values ready for SQL insertion
        """
        prepared_data = []

        for _, row in df.iterrows():
            processed_row = []
            for value in row:
                if isinstance(value, float):
                    if np.isnan(value) or np.isinf(value):
                        processed_row.append(None)
                    else:
                        processed_row.append(value)
                elif isinstance(value, pd.Timestamp):
                    processed_row.append(value.to_pydatetime() if not pd.isna(value) else None)
                elif isinstance(value, np.datetime64):
                    processed_row.append(
                        pd.Timestamp(value).to_pydatetime() if not pd.isna(value) else None
                    )
                else:
                    processed_row.append(value)

            prepared_data.append(processed_row)

        return prepared_data

Optimized class for creating tables from DataFrames and uploading data to SQL databases.

This class provides efficient methods for importing DataFrames into SQL tables with automatic schema inference, connection pooling, and robust error handling.

Parameters

connection_string : str
Database connection string
max_pool_size : int,optional
Maximum number of connections in the pool. Defaults to 5.

Initialize the DataFrameToSQL instance.

Parameters

connection_string : str
Database connection string
max_pool_size : int,optional
Maximum number of connections in the pool. Defaults to 5.

Ancestors

Subclasses

Methods

def import_table(self,
df: pandas.core.frame.DataFrame,
table_name: str,
overwrite: bool = True,
char_length: int = 512,
override_length: bool = True,
batch_size: int = 1000,
use_transactions: bool = True,
verbose: bool = False) ‑> Dict[str, Any]
Expand source code
def import_table(
    self,
    df: DataFrame,
    table_name: str,
    overwrite: bool = True,
    char_length: int = 512,
    override_length: bool = True,
    batch_size: int = 1000,
    use_transactions: bool = True,
    verbose: bool = False,
) -> Dict[str, Any]:
    """Import a DataFrame into the database as a new table with optimized performance.

    Parameters
    ----------
    df : `DataFrame`
        DataFrame to import
    table_name : `str`
        Name of the target table
    overwrite : `bool`, `optional`
        Whether to overwrite existing table. Defaults to `True`.
    char_length : `int`, `optional`
        Default length for VARCHAR columns. Defaults to `512`.
    override_length : `bool`, `optional`
        Whether to override column lengths. Defaults to `True`.
    batch_size : `int`, `optional`
        Number of rows to insert per batch. Defaults to `1000`.
    use_transactions : `bool`, `optional`
        Whether to use transactions for data integrity. Defaults to `True`.
    verbose : `bool`, `optional`
        Whether to print progress information. Defaults to `False`.

    Returns
    -------
    `Dict[str, Any]`
        Dictionary with operation statistics
    """
    start_time = datetime.now()
    stats = {
        "table_name": table_name,
        "rows_processed": 0,
        "rows_inserted": 0,
        "execution_time": 0,
        "errors": [],
        "warnings": [],
    }

    try:
        df = self._preprocess_dataframe(df)
        stats["rows_processed"] = len(df)

        with self._get_connection() as conn:
            cursor = conn.cursor()

            try:
                if use_transactions:
                    conn.autocommit = False

                if self._table_exists(cursor, table_name):
                    if overwrite:
                        if verbose:
                            logger.info(f"Dropping existing table {table_name}")
                        cursor.execute(f"DROP TABLE {self._sanitize_identifier(table_name)}")
                    else:
                        raise ValueError(
                            f"Table {table_name} already exists and overwrite=False"
                        )

                create_query = self._create_table_query(
                    table_name, df, char_length, override_length
                )
                if verbose:
                    logger.info(f"Creating table {table_name}")
                cursor.execute(create_query)

                if use_transactions:
                    conn.commit()

            except Exception as e:
                if use_transactions:
                    conn.rollback()
                stats["errors"].append(str(e))
                raise

            if len(df) > 0:
                insert_stats = self._batch_insert_data(
                    conn, cursor, table_name, df, batch_size, use_transactions, verbose
                )
                stats.update(insert_stats)

            if verbose:
                execution_time = (datetime.now() - start_time).total_seconds()
                logger.info(
                    f"Table {table_name} successfully imported in {execution_time:.2f} seconds"
                )

    except Exception as e:
        logger.error(f"Failed to import table {table_name}: {e}")
        stats["errors"].append(str(e))
        raise
    finally:
        stats["execution_time"] = (datetime.now() - start_time).total_seconds()

    return stats

Import a DataFrame into the database as a new table with optimized performance.

Parameters

df : DataFrame
DataFrame to import
table_name : str
Name of the target table
overwrite : bool,optional
Whether to overwrite existing table. Defaults to True.
char_length : int,optional
Default length for VARCHAR columns. Defaults to 512.
override_length : bool,optional
Whether to override column lengths. Defaults to True.
batch_size : int,optional
Number of rows to insert per batch. Defaults to 1000.
use_transactions : bool,optional
Whether to use transactions for data integrity. Defaults to True.
verbose : bool,optional
Whether to print progress information. Defaults to False.

Returns

Dict[str, Any] Dictionary with operation statistics

def upload_table(self,
df: pandas.core.frame.DataFrame,
table_name: str,
batch_size: int = 1000,
use_transactions: bool = True,
verbose: bool = False) ‑> Dict[str, Any]
Expand source code
def upload_table(
    self,
    df: DataFrame,
    table_name: str,
    batch_size: int = 1000,
    use_transactions: bool = True,
    verbose: bool = False,
) -> Dict[str, Any]:
    """Update data in an existing table from a DataFrame with optimized performance.

    Parameters
    ----------
    df : `DataFrame`
        DataFrame to upload
    table_name : `str`
        Name of the target table
    batch_size : `int`, `optional`
        Number of rows to insert per batch. Defaults to `1000`.
    use_transactions : `bool`, `optional`
        Whether to use transactions for data integrity. Defaults to `True`.
    verbose : `bool`, `optional`
        Whether to print progress information. Defaults to `False`.

    Returns
    -------
    `Dict[str, Any]`
        Dictionary with operation statistics
    """
    start_time = datetime.now()
    stats = {
        "table_name": table_name,
        "rows_processed": 0,
        "rows_inserted": 0,
        "execution_time": 0,
        "errors": [],
    }

    try:
        df = self._preprocess_dataframe(df)
        stats["rows_processed"] = len(df)

        with self._get_connection() as conn:
            cursor = conn.cursor()
            if not self._table_exists(cursor, table_name):
                raise ValueError(f"Table {table_name} does not exist")

            if len(df) > 0:
                insert_stats = self._batch_insert_data(
                    conn, cursor, table_name, df, batch_size, use_transactions, verbose
                )
                stats.update(insert_stats)

            if verbose:
                execution_time = (datetime.now() - start_time).total_seconds()
                logger.info(
                    f"Data successfully uploaded to table {table_name} in {execution_time:.2f} seconds"
                )

    except Exception as e:
        logger.error(f"Failed to upload data to table {table_name}: {e}")
        stats["errors"].append(str(e))
        raise
    finally:
        stats["execution_time"] = (datetime.now() - start_time).total_seconds()

    return stats

Update data in an existing table from a DataFrame with optimized performance.

Parameters

df : DataFrame
DataFrame to upload
table_name : str
Name of the target table
batch_size : int,optional
Number of rows to insert per batch. Defaults to 1000.
use_transactions : bool,optional
Whether to use transactions for data integrity. Defaults to True.
verbose : bool,optional
Whether to print progress information. Defaults to False.

Returns

Dict[str, Any] Dictionary with operation statistics

class UploadToSQL (connection_string: str, max_pool_size: int = 5)
Expand source code
class UploadToSQL(DataFrameToSQL):
    """Enhanced class for efficiently importing/updating tables from DataFrames.

    This class extends DataFrameToSQL with chunking, automatic batch size optimization,
    and comprehensive monitoring capabilities.

    Parameters
    ----------
    connection_string : `str`
        Database connection string
    max_pool_size : `int`, `optional`
        Maximum number of connections in the pool. Defaults to `5`.
    """

    def __init__(self, connection_string: str, max_pool_size: int = 5) -> None:
        """Initialize the UploadToSQL instance.

        Parameters
        ----------
        connection_string : `str`
            Database connection string
        max_pool_size : `int`, `optional`
            Maximum number of connections in the pool. Defaults to `5`.
        """
        super().__init__(connection_string, max_pool_size)
        self._verbose = True
        self._auto_batch_size = True

    def execute(
        self,
        df: DataFrame,
        table_name: str,
        chunk_size: Optional[int] = None,
        method: str = "override",
        char_length: int = 512,
        override_length: bool = True,
        use_transactions: bool = True,
        auto_resolve: bool = True,
        frac: float = 0.01,
        verbose: bool = False,
    ) -> Dict[str, Any]:
        """Execute the import/update operation with intelligent chunking and optimization.

        Parameters
        ----------
        df : `DataFrame`
            DataFrame to process
        table_name : `str`
            Name of the target table
        chunk_size : `int`, `optional`
            Number of chunks to split DataFrame into (auto-calculated if None)
        method : `str`, `optional`
            Operation method ("override" or "append"). Defaults to "override".
        char_length : `int`, `optional`
            Default length for VARCHAR columns. Defaults to 512.
        override_length : `bool`, `optional`
            Whether to override column lengths. Defaults to True.
        use_transactions : `bool`, `optional`
            Whether to use transactions. Defaults to True.
        auto_resolve : `bool`, `optional`
            Whether to auto-resolve large DataFrames. Defaults to True.
        frac : `float`, `optional`
            Fraction for auto-resolution. Defaults to 0.01.
        verbose : `bool`, `optional`
            Whether to print progress information. Defaults to False.

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with comprehensive operation statistics
        """
        start_time = datetime.now()
        stats = {
            "operation": method,
            "table_name": table_name,
            "total_rows": len(df),
            "chunks_processed": 0,
            "execution_time": 0,
            "errors": [],
            "warnings": [],
            "chunk_stats": [],
        }

        try:
            if df.empty:
                raise ValueError("DataFrame is empty")

            if method not in ["override", "append"]:
                raise ValueError('Invalid method. Choose from ["override", "append"]')

            if chunk_size is None:
                chunk_size = self._calculate_optimal_chunk_size(len(df))

            if chunk_size <= 0:
                raise ValueError("chunk_size must be positive")

            if auto_resolve and len(df) >= 500000:  # 0.5M rows
                n = max(int(len(df) * frac), 1000)  # Minimum 1000 rows per chunk
                df_chunks = [df[i : i + n] for i in range(0, len(df), n)]
                if verbose:
                    logger.info(f"Auto-resolved to {len(df_chunks)} chunks of ~{n} rows each")
            else:
                df_chunks = np.array_split(df, chunk_size)

            stats["chunks_processed"] = len(df_chunks)

            if method == "override":
                stats.update(
                    self._execute_override(
                        df_chunks,
                        table_name,
                        char_length,
                        override_length,
                        use_transactions,
                        verbose,
                    )
                )
            elif method == "append":
                stats.update(self._execute_append(df_chunks, table_name, use_transactions, verbose))

            if verbose:
                execution_time = (datetime.now() - start_time).total_seconds()
                logger.info(f"Operation completed in {execution_time:.2f} seconds")

        except Exception as e:
            logger.error(f"Operation failed: {e}")
            stats["errors"].append(str(e))
            raise
        finally:
            stats["execution_time"] = (datetime.now() - start_time).total_seconds()

        return stats

    def _calculate_optimal_chunk_size(self, total_rows: int) -> int:
        """Calculate optimal chunk size based on DataFrame size and system resources.

        Parameters
        ----------
        total_rows : `int`
            Total number of rows in the DataFrame

        Returns
        -------
        `int`
            Optimal number of chunks for processing
        """
        if total_rows < 10000:
            return 1
        elif total_rows < 100000:
            return 4
        elif total_rows < 1000000:
            return 10
        else:
            return 20

    def _execute_override(
        self,
        df_chunks: List[DataFrame],
        table_name: str,
        char_length: int,
        override_length: bool,
        use_transactions: bool,
        verbose: bool,
    ) -> Dict[str, Any]:
        """Execute override method with first chunk creating the table.

        Parameters
        ----------
        df_chunks : `List[DataFrame]`
            List of DataFrame chunks to process
        table_name : `str`
            Name of the target table
        char_length : `int`
            Default length for VARCHAR columns
        override_length : `bool`
            Whether to override column lengths
        use_transactions : `bool`
            Whether to use transactions for data integrity
        verbose : `bool`
            Whether to print progress information

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with chunk statistics and operation results
        """
        stats = {"chunk_stats": []}

        with self._get_connection() as conn:
            cursor = conn.cursor()
            if self._table_exists(cursor, table_name):
                if verbose:
                    logger.info(f"Table {table_name} exists, dropping...")
                cursor.execute(f"DROP TABLE {self._sanitize_identifier(table_name)}")
                conn.commit()

        for i, chunk in enumerate(df_chunks):
            chunk_stats = {"chunk_index": i, "rows": len(chunk)}

            try:
                if i == 0:
                    result = self.import_table(
                        df=chunk,
                        table_name=table_name,
                        overwrite=True,
                        char_length=char_length,
                        override_length=override_length,
                        batch_size=1000,
                        use_transactions=use_transactions,
                        verbose=verbose and len(df_chunks) == 1,
                    )
                else:
                    result = self.upload_table(
                        df=chunk,
                        table_name=table_name,
                        batch_size=1000,
                        use_transactions=use_transactions,
                        verbose=False,
                    )

                chunk_stats.update(result)
                stats["chunk_stats"].append(chunk_stats)

                if verbose and len(df_chunks) > 1:
                    progress = ((i + 1) / len(df_chunks)) * 100
                    logger.info(f"Progress: {progress:.1f}% ({i + 1}/{len(df_chunks)} chunks)")

            except Exception as e:
                chunk_stats["error"] = str(e)
                stats["chunk_stats"].append(chunk_stats)
                logger.error(f"Chunk {i} failed: {e}")
                raise

        return stats

    def _execute_append(
        self, df_chunks: List[DataFrame], table_name: str, use_transactions: bool, verbose: bool
    ) -> Dict[str, Any]:
        """Execute append method for existing table.

        Parameters
        ----------
        df_chunks : `List[DataFrame]`
            List of DataFrame chunks to process
        table_name : `str`
            Name of the target table
        use_transactions : `bool`
            Whether to use transactions for data integrity
        verbose : `bool`
            Whether to print progress information

        Returns
        -------
        `Dict[str, Any]`
            Dictionary with chunk statistics and operation results
        """
        stats = {"chunk_stats": []}

        with self._get_connection() as conn:
            cursor = conn.cursor()
            if not self._table_exists(cursor, table_name):
                raise ValueError(f"Table {table_name} does not exist for append operation")

        for i, chunk in enumerate(df_chunks):
            chunk_stats = {"chunk_index": i, "rows": len(chunk)}

            try:
                result = self.upload_table(
                    df=chunk,
                    table_name=table_name,
                    batch_size=1000,
                    use_transactions=use_transactions,
                    verbose=False,
                )

                chunk_stats.update(result)
                stats["chunk_stats"].append(chunk_stats)

                if verbose and len(df_chunks) > 1:
                    progress = ((i + 1) / len(df_chunks)) * 100
                    logger.info(f"Progress: {progress:.1f}% ({i + 1}/{len(df_chunks)} chunks)")

            except Exception as e:
                chunk_stats["error"] = str(e)
                stats["chunk_stats"].append(chunk_stats)
                logger.error(f"Chunk {i} failed: {e}")
                raise

        return stats

    @property
    def verbose(self) -> bool:
        return self._verbose

    @verbose.setter
    def verbose(self, value: bool):
        self._verbose = value

Enhanced class for efficiently importing/updating tables from DataFrames.

This class extends DataFrameToSQL with chunking, automatic batch size optimization, and comprehensive monitoring capabilities.

Parameters

connection_string : str
Database connection string
max_pool_size : int,optional
Maximum number of connections in the pool. Defaults to 5.

Initialize the UploadToSQL instance.

Parameters

connection_string : str
Database connection string
max_pool_size : int,optional
Maximum number of connections in the pool. Defaults to 5.

Ancestors

Instance variables

prop verbose : bool
Expand source code
@property
def verbose(self) -> bool:
    return self._verbose

Methods

def execute(self,
df: pandas.core.frame.DataFrame,
table_name: str,
chunk_size: int | None = None,
method: str = 'override',
char_length: int = 512,
override_length: bool = True,
use_transactions: bool = True,
auto_resolve: bool = True,
frac: float = 0.01,
verbose: bool = False) ‑> Dict[str, Any]
Expand source code
def execute(
    self,
    df: DataFrame,
    table_name: str,
    chunk_size: Optional[int] = None,
    method: str = "override",
    char_length: int = 512,
    override_length: bool = True,
    use_transactions: bool = True,
    auto_resolve: bool = True,
    frac: float = 0.01,
    verbose: bool = False,
) -> Dict[str, Any]:
    """Execute the import/update operation with intelligent chunking and optimization.

    Parameters
    ----------
    df : `DataFrame`
        DataFrame to process
    table_name : `str`
        Name of the target table
    chunk_size : `int`, `optional`
        Number of chunks to split DataFrame into (auto-calculated if None)
    method : `str`, `optional`
        Operation method ("override" or "append"). Defaults to "override".
    char_length : `int`, `optional`
        Default length for VARCHAR columns. Defaults to 512.
    override_length : `bool`, `optional`
        Whether to override column lengths. Defaults to True.
    use_transactions : `bool`, `optional`
        Whether to use transactions. Defaults to True.
    auto_resolve : `bool`, `optional`
        Whether to auto-resolve large DataFrames. Defaults to True.
    frac : `float`, `optional`
        Fraction for auto-resolution. Defaults to 0.01.
    verbose : `bool`, `optional`
        Whether to print progress information. Defaults to False.

    Returns
    -------
    `Dict[str, Any]`
        Dictionary with comprehensive operation statistics
    """
    start_time = datetime.now()
    stats = {
        "operation": method,
        "table_name": table_name,
        "total_rows": len(df),
        "chunks_processed": 0,
        "execution_time": 0,
        "errors": [],
        "warnings": [],
        "chunk_stats": [],
    }

    try:
        if df.empty:
            raise ValueError("DataFrame is empty")

        if method not in ["override", "append"]:
            raise ValueError('Invalid method. Choose from ["override", "append"]')

        if chunk_size is None:
            chunk_size = self._calculate_optimal_chunk_size(len(df))

        if chunk_size <= 0:
            raise ValueError("chunk_size must be positive")

        if auto_resolve and len(df) >= 500000:  # 0.5M rows
            n = max(int(len(df) * frac), 1000)  # Minimum 1000 rows per chunk
            df_chunks = [df[i : i + n] for i in range(0, len(df), n)]
            if verbose:
                logger.info(f"Auto-resolved to {len(df_chunks)} chunks of ~{n} rows each")
        else:
            df_chunks = np.array_split(df, chunk_size)

        stats["chunks_processed"] = len(df_chunks)

        if method == "override":
            stats.update(
                self._execute_override(
                    df_chunks,
                    table_name,
                    char_length,
                    override_length,
                    use_transactions,
                    verbose,
                )
            )
        elif method == "append":
            stats.update(self._execute_append(df_chunks, table_name, use_transactions, verbose))

        if verbose:
            execution_time = (datetime.now() - start_time).total_seconds()
            logger.info(f"Operation completed in {execution_time:.2f} seconds")

    except Exception as e:
        logger.error(f"Operation failed: {e}")
        stats["errors"].append(str(e))
        raise
    finally:
        stats["execution_time"] = (datetime.now() - start_time).total_seconds()

    return stats

Execute the import/update operation with intelligent chunking and optimization.

Parameters

df : DataFrame
DataFrame to process
table_name : str
Name of the target table
chunk_size : int,optional
Number of chunks to split DataFrame into (auto-calculated if None)
method : str,optional
Operation method ("override" or "append"). Defaults to "override".
char_length : int,optional
Default length for VARCHAR columns. Defaults to 512.
override_length : bool,optional
Whether to override column lengths. Defaults to True.
use_transactions : bool,optional
Whether to use transactions. Defaults to True.
auto_resolve : bool,optional
Whether to auto-resolve large DataFrames. Defaults to True.
frac : float,optional
Fraction for auto-resolution. Defaults to 0.01.
verbose : bool,optional
Whether to print progress information. Defaults to False.

Returns

Dict[str, Any] Dictionary with comprehensive operation statistics

Inherited members