Module pydbsmgr.utils.azure_sdk

Functions

def get_connection_string() ‑> str

Get connection string. Load env variables from .env

Classes

class StorageController (connection_string: str, container_name: str)

Retrieve blobs from a container/directory

Create blob storage client and container client

Expand source code
class StorageController(ControllerFeatures):
    """Retrieve blobs from a container/directory"""

    def __init__(self, connection_string: str, container_name: str):
        """Create blob storage client and container client"""
        self.__connection_string = connection_string
        self.container_name = container_name

        self._blob_service_client = BlobServiceClient.from_connection_string(
            self.__connection_string
        )
        self._container_client = self._blob_service_client.get_container_client(self.container_name)
        super().__init__(self._container_client)

    def get_blob_list(self, directory_name: str) -> List[str]:
        blob_prefixes = self._get_blob_prefix(directory_name)
        return sorted(blob["name"] for blob in blob_prefixes)

    def _get_blob_prefix(self, directory_name: str):
        return list(
            self._container_client.walk_blobs(name_starts_with=directory_name + "/", delimiter="/")
        )

    def get_parquet(
        self, directory_name: str, regex: str, manual_mode: bool = False
    ) -> Tuple[List[DataFrame], List[str]]:
        """Perform reading of `.parquet` and `.parquet.gzip` files in container-directory"""
        file_list = (
            self.file_list
            if manual_mode
            else self._container_client.walk_blobs(
                name_starts_with=directory_name + "/", delimiter="/"
            )
        )
        return self._read_files(file_list, regex, "parquet")

    def upload_parquet(
        self,
        directory_name: str,
        dfs: List[DataFrame],
        blob_names: List[str],
        format_type: str = "parquet",
        compression: bool = True,
        overwrite: bool = True,
    ) -> None:
        """Perform upload of `.parquet` and `.parquet.gzip` files in container-directory"""
        for df, blob_name in zip(dfs, blob_names):
            blob_path_name = f"{directory_name}/{blob_name}"
            parquet_data = df.to_parquet(
                index=False, engine="pyarrow", compression="gzip" if compression else None
            )
            self._container_client.upload_blob(
                name=(
                    f"{blob_path_name}.{format_type}.gz"
                    if compression
                    else f"{blob_path_name}.{format_type}"
                ),
                data=parquet_data,
                overwrite=overwrite,
            )

    def get_excel_csv(
        self, directory_name: str, regex: str, manual_mode: bool = False
    ) -> Tuple[List[DataFrame], List[str]]:
        """Perform reading of `.xlsx` and `.csv` files in container-directory"""
        file_list = (
            self.file_list
            if manual_mode
            else self._container_client.walk_blobs(
                name_starts_with=directory_name + "/", delimiter="/"
            )
        )
        return self._read_files(file_list, regex, "excel_csv")

    def upload_excel_csv(
        self,
        directory_name: str,
        dfs: List[DataFrame],
        blob_names: List[str],
        format_type: str = "csv",
        encoding: str = "utf-8",
        overwrite: bool = True,
    ) -> None:
        """Perform upload of `.xlsx` and `.csv` files in container-directory"""
        for df, blob_name in zip(dfs, blob_names):
            blob_path_name = f"{directory_name}/{blob_name}"
            if format_type == "csv":
                csv_data = df.to_csv(index=False, encoding=encoding)
                self._container_client.upload_blob(
                    name=f"{blob_path_name}.csv", data=csv_data, overwrite=overwrite
                )
            elif format_type == "xlsx":
                xlsx_data = BytesIO()
                with xlsx_data:
                    df.to_excel(xlsx_data, index=False)
                    self._container_client.upload_blob(
                        name=f"{blob_path_name}.xlsx",
                        data=xlsx_data.getvalue(),
                        overwrite=overwrite,
                    )
            else:
                raise ValueError(f"Unsupported format: {format_type}")

    def _read_files(self, file_list, regex, file_type):
        """Read files based on the given type and regex filter."""
        dataframes = []
        dataframe_names = []

        for file in file_list:
            if not re.search(regex, file.name, re.IGNORECASE):
                print(f"Ignoring {file.name}, does not match {regex}")
                continue

            blob_data = self._download_blob(file.name)

            if file_type == "parquet":
                df_name = file.name.rsplit(".", 2)[0].rsplit("/", 1)[-1]
                dataframe_names.append(df_name)
                with BytesIO(blob_data) as bytes_io:
                    df = pq.read_table(bytes_io).to_pandas()
                    dataframes.append(df)

            elif file_type == "excel_csv":
                filename, extension = os.path.splitext(file.name.split("/")[-1])
                if extension == ".csv":
                    try:
                        blob_str = blob_data.decode("utf-8")
                    except UnicodeDecodeError:
                        blob_str = blob_data.decode("latin-1")
                    dataframe_names.append(filename)
                    with StringIO(blob_str) as csv_file:
                        df = read_csv(csv_file, index_col=None, low_memory=False)
                        dataframes.append(df)
                elif extension == ".xlsx":
                    with BytesIO(blob_data) as xlsx_buffer:
                        all_sheets = read_excel(xlsx_buffer, sheet_name=None, index_col=None)
                        for sheet_name, df in all_sheets.items():
                            dataframe_names.append(f"{filename}-{sheet_name}")
                            dataframes.append(df.reset_index(drop=True))

        return dataframes, dataframe_names

    def _download_blob(self, blob_name):
        """Download a blob from Azure Storage."""
        blob_client = self._blob_service_client.get_blob_client(
            container=self.container_name, blob=blob_name
        )
        return blob_client.download_blob().readall()

    def show_all_blobs(self) -> None:
        """Show directories from a container"""
        print(f"Container Name: {self.container_name}")
        for blob in self._container_client.list_blobs():
            if len(blob.name.split("/")) > 1:
                print(f"\tBlob name : {blob.name}")

    def get_all_blob(self, filter_criteria: str = None) -> List[str]:
        """Get all blob names from a container"""
        blob_names = [
            blob.name
            for blob in self._container_client.list_blobs()
            if len(blob.name.split("/")) > 1
        ]
        return self._list_filter(blob_names, filter_criteria) if filter_criteria else blob_names

    def show_blobs(self, directory_name: str) -> None:
        """Show blobs from a directory"""
        print(f"Container Name: {self.container_name}")
        print(f"\tDirectory Name: {directory_name}")
        for file in self._container_client.walk_blobs(
            name_starts_with=directory_name + "/", delimiter="/"
        ):
            print(f"\t\tBlob name: {file.name.rsplit('/', 1)[-1]}")

    def _list_filter(self, elements: list, character: str) -> List[str]:
        """Filter a list based on a criteria."""
        return [element for element in elements if character in element]

Ancestors

Methods

def get_all_blob(self, filter_criteria: str = None) ‑> List[str]

Get all blob names from a container

def get_blob_list(self, directory_name: str) ‑> List[str]
def get_excel_csv(self, directory_name: str, regex: str, manual_mode: bool = False) ‑> Tuple[List[pandas.core.frame.DataFrame], List[str]]

Perform reading of .xlsx and .csv files in container-directory

def get_parquet(self, directory_name: str, regex: str, manual_mode: bool = False) ‑> Tuple[List[pandas.core.frame.DataFrame], List[str]]

Perform reading of .parquet and .parquet.gzip files in container-directory

def show_all_blobs(self) ‑> None

Show directories from a container

def show_blobs(self, directory_name: str) ‑> None

Show blobs from a directory

def upload_excel_csv(self,
directory_name: str,
dfs: List[pandas.core.frame.DataFrame],
blob_names: List[str],
format_type: str = 'csv',
encoding: str = 'utf-8',
overwrite: bool = True) ‑> None

Perform upload of .xlsx and .csv files in container-directory

def upload_parquet(self,
directory_name: str,
dfs: List[pandas.core.frame.DataFrame],
blob_names: List[str],
format_type: str = 'parquet',
compression: bool = True,
overwrite: bool = True) ‑> None

Perform upload of .parquet and .parquet.gzip files in container-directory

Inherited members