Module likelihood.tools.models_tools
Functions
def apply_lora(model, rank=4)
-
Expand source code
def apply_lora(model, rank=4): inputs = tf.keras.Input(shape=model.input_shape[1:]) x = inputs for layer in model.layers: if isinstance(layer, tf.keras.layers.Dense): print(f"Applying LoRA to layer {layer.name}") x = LoRALayer(units=layer.units, rank=rank)(x) else: x = layer(x) new_model = tf.keras.Model(inputs=inputs, outputs=x) return new_model
def graph_metrics(adj_matrix: numpy.ndarray, eigenvector_threshold: float = 1e-06) ‑> pandas.core.frame.DataFrame
-
Expand source code
def graph_metrics(adj_matrix: np.ndarray, eigenvector_threshold: float = 1e-6) -> DataFrame: """ Calculate various graph metrics based on the given adjacency matrix and return them in a single DataFrame. Parameters ---------- adj_matrix : `np.ndarray` The adjacency matrix representing the graph, where each element denotes the presence/weight of an edge between nodes. eigenvector_threshold : `float` A threshold for the eigenvector centrality calculation, used to determine the cutoff for small eigenvalues. Default is `1e-6`. Returns ---------- DataFrame : A DataFrame containing the following graph metrics as columns. - `Degree Centrality`: Degree centrality values for each node, indicating the number of direct connections each node has. - `Clustering Coefficient`: Clustering coefficient values for each node, representing the degree to which nodes cluster together. - `Eigenvector Centrality`: Eigenvector centrality values, indicating the influence of a node in the graph based on the eigenvectors of the adjacency matrix. - `Degree`: The degree of each node, representing the number of edges connected to each node. - `Betweenness Centrality`: Betweenness centrality values, representing the extent to which a node lies on the shortest paths between other nodes. - `Closeness Centrality`: Closeness centrality values, indicating the inverse of the average shortest path distance from a node to all other nodes in the graph. - `Assortativity`: The assortativity coefficient of the graph, measuring the tendency of nodes to connect to similar nodes. Notes ---------- The returned DataFrame will have one row for each node and one column for each of the computed metrics. """ adj_matrix = adj_matrix.astype(int) G = nx.from_numpy_array(adj_matrix) degree_centrality = nx.degree_centrality(G) clustering_coeff = nx.clustering(G) try: eigenvector_centrality = nx.eigenvector_centrality(G, max_iter=500) except nx.PowerIterationFailedConvergence: print("Power iteration failed to converge. Returning NaN for eigenvector centrality.") eigenvector_centrality = {node: float("nan") for node in G.nodes()} for node, centrality in eigenvector_centrality.items(): if centrality < eigenvector_threshold: eigenvector_centrality[node] = 0.0 degree = dict(G.degree()) betweenness_centrality = nx.betweenness_centrality(G) closeness_centrality = nx.closeness_centrality(G) assortativity = nx.degree_assortativity_coefficient(G) metrics_df = pd.DataFrame( { "Degree": degree, "Degree Centrality": degree_centrality, "Clustering Coefficient": clustering_coeff, "Eigenvector Centrality": eigenvector_centrality, "Betweenness Centrality": betweenness_centrality, "Closeness Centrality": closeness_centrality, } ) metrics_df["Assortativity"] = assortativity return metrics_df
Calculate various graph metrics based on the given adjacency matrix and return them in a single DataFrame.
Parameters
adj_matrix
:np.ndarray
- The adjacency matrix representing the graph, where each element denotes the presence/weight of an edge between nodes.
eigenvector_threshold
:float
- A threshold for the eigenvector centrality calculation, used to determine the cutoff for small eigenvalues. Default is
1e-6
.
Returns
DataFrame : A DataFrame containing the following graph metrics as columns. -
Degree Centrality
: Degree centrality values for each node, indicating the number of direct connections each node has. -Clustering Coefficient
: Clustering coefficient values for each node, representing the degree to which nodes cluster together. -Eigenvector Centrality
: Eigenvector centrality values, indicating the influence of a node in the graph based on the eigenvectors of the adjacency matrix. -Degree
: The degree of each node, representing the number of edges connected to each node. -Betweenness Centrality
: Betweenness centrality values, representing the extent to which a node lies on the shortest paths between other nodes. -Closeness Centrality
: Closeness centrality values, indicating the inverse of the average shortest path distance from a node to all other nodes in the graph. -Assortativity
: The assortativity coefficient of the graph, measuring the tendency of nodes to connect to similar nodes.Notes
The returned DataFrame will have one row for each node and one column for each of the computed metrics.
def remove_collinearity(df: pandas.core.frame.DataFrame, threshold: float = 0.9)
-
Expand source code
def remove_collinearity(df: DataFrame, threshold: float = 0.9): """ Removes highly collinear features from the DataFrame based on a correlation threshold. This function calculates the correlation matrix of the DataFrame and removes columns that are highly correlated with any other column in the DataFrame. It uses an absolute correlation value greater than the specified threshold to identify which columns to drop. Parameters ---------- df : `DataFrame` The input DataFrame containing numerical data. threshold : `float` The correlation threshold above which features will be removed. Default is `0.9`. Returns ---------- DataFrame: A DataFrame with highly collinear features removed. """ corr_matrix = df.corr().abs() upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) to_drop = [ column for column in upper_triangle.columns if any(upper_triangle[column] > threshold) ] df_reduced = df.drop(columns=to_drop) return df_reduced
Removes highly collinear features from the DataFrame based on a correlation threshold.
This function calculates the correlation matrix of the DataFrame and removes columns that are highly correlated with any other column in the DataFrame. It uses an absolute correlation value greater than the specified threshold to identify which columns to drop.
Parameters
df
:DataFrame
- The input DataFrame containing numerical data.
threshold
:float
- The correlation threshold above which features will be removed. Default is
0.9
.
Returns
DataFrame: A DataFrame with highly collinear features removed.
def suppress_warnings(func)
-
Expand source code
def suppress_warnings(func): @wraps(func) def wrapper(*args, **kwargs): with warnings.catch_warnings(): warnings.simplefilter("ignore") return func(*args, **kwargs) return wrapper
def train_and_insights(x_data: numpy.ndarray,
y_act: numpy.ndarray,
model: keras.src.models.model.Model,
patience: int = 3,
reg: bool = False,
frac: float = 1.0,
**kwargs: Dict | None) ‑> keras.src.models.model.Model-
Expand source code
def train_and_insights( x_data: np.ndarray, y_act: np.ndarray, model: tf.keras.Model, patience: int = 3, reg: bool = False, frac: float = 1.0, **kwargs: Optional[Dict], ) -> tf.keras.Model: """ Train a Keras model and provide insights on the training and validation metrics. Parameters ---------- x_data : `np.ndarray` Input data for training the model. y_act : `np.ndarray` Actual labels corresponding to x_data. model : `tf.keras.Model` The Keras model to train. patience : `int` The patience parameter for early stopping callback (default is 3). reg : `bool` Flag to determine if residual analysis should be performed (default is `False`). frac : `float` Fraction of data to use (default is 1.0). Keyword Arguments: ---------- Additional keyword arguments passed to the `model.fit` function, such as validation split and callbacks. Returns ---------- `tf.keras.Model` The trained model after fitting. """ validation_split = kwargs.get("validation_split", 0.2) callback = kwargs.get( "callback", [tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=patience)] ) for key in ["validation_split", "callback"]: if key in kwargs: del kwargs[key] history = model.fit( x_data, y_act, validation_split=validation_split, verbose=False, callbacks=callback, **kwargs, ) hist = pd.DataFrame(history.history) hist["epoch"] = history.epoch columns = hist.columns train_err, train_metric = columns[0], columns[1] val_err, val_metric = columns[2], columns[3] train_err, val_err = hist[train_err].values, hist[val_err].values with suppress_prints(): n = int(len(x_data) * frac) y_pred = model.predict(x_data[:n]) y_act = y_act[:n] if reg: residual(y_act, y_pred) residual_hist(y_act, y_pred) act_pred(y_act, y_pred) loss_curve(hist["epoch"].values, train_err, val_err) return model
Train a Keras model and provide insights on the training and validation metrics.
Parameters
x_data
:np.ndarray
- Input data for training the model.
y_act
:np.ndarray
- Actual labels corresponding to x_data.
model
:tf.keras.Model
- The Keras model to train.
patience
:int
- The patience parameter for early stopping callback (default is 3).
reg
:bool
- Flag to determine if residual analysis should be performed (default is
False
). frac
:float
- Fraction of data to use (default is 1.0).
Keyword Arguments:
Additional keyword arguments passed to the
model.fit
function, such as validation split and callbacks.Returns
tf.keras.Model
The trained model after fitting.
Classes
class LoRALayer (units, rank=4, **kwargs)
-
Expand source code
@tf.keras.utils.register_keras_serializable(package="Custom", name="LoRALayer") class LoRALayer(tf.keras.layers.Layer): def __init__(self, units, rank=4, **kwargs): super(LoRALayer, self).__init__(**kwargs) self.units = units self.rank = rank def build(self, input_shape): input_dim = input_shape[-1] print(f"Input shape: {input_shape}") if self.rank > input_dim: raise ValueError( f"Rank ({self.rank}) cannot be greater than input dimension ({input_dim})." ) if self.rank > self.units: raise ValueError( f"Rank ({self.rank}) cannot be greater than number of units ({self.units})." ) self.A = self.add_weight( shape=(input_dim, self.rank), initializer="random_normal", trainable=True, name="A" ) self.B = self.add_weight( shape=(self.rank, self.units), initializer="random_normal", trainable=True, name="B" ) print(f"Dense weights shape: {input_dim}x{self.units}") print(f"LoRA weights shape: A{self.A.shape}, B{self.B.shape}") def call(self, inputs): lora_output = tf.matmul(tf.matmul(inputs, self.A), self.B) return lora_output
This is the class from which all layers inherit.
A layer is a callable object that takes as input one or more tensors and that outputs one or more tensors. It involves computation, defined in the
call()
method, and a state (weight variables). State can be created:- in
__init__()
, for instance viaself.add_weight()
; - in the optional
build()
method, which is invoked by the first__call__()
to the layer, and supplies the shape(s) of the input(s), which may not have been known at initialization time.
Layers are recursively composable: If you assign a Layer instance as an attribute of another Layer, the outer layer will start tracking the weights created by the inner layer. Nested layers should be instantiated in the
__init__()
method orbuild()
method.Users will just instantiate a layer and then treat it as a callable.
Args
trainable
- Boolean, whether the layer's variables should be trainable.
name
- String name of the layer.
dtype
- The dtype of the layer's computations and weights. Can also be a
keras.DTypePolicy
, which allows the computation and weight dtype to differ. Defaults toNone
.None
means to usekeras.config.dtype_policy()
, which is afloat32
policy unless set to different value (viakeras.config.set_dtype_policy()
).
Attributes
name
- The name of the layer (string).
dtype
- Dtype of the layer's weights. Alias of
layer.variable_dtype
. variable_dtype
- Dtype of the layer's weights.
compute_dtype
- The dtype of the layer's computations.
Layers automatically cast inputs to this dtype, which causes
the computations and output to also be in this dtype.
When mixed precision is used with a
keras.DTypePolicy
, this will be different thanvariable_dtype
. trainable_weights
- List of variables to be included in backprop.
non_trainable_weights
- List of variables that should not be included in backprop.
weights
- The concatenation of the lists trainable_weights and non_trainable_weights (in this order).
trainable
- Whether the layer should be trained (boolean), i.e.
whether its potentially-trainable weights should be returned
as part of
layer.trainable_weights
. input_spec
- Optional (list of)
InputSpec
object(s) specifying the constraints on inputs that can be accepted by the layer.
We recommend that descendants of
Layer
implement the following methods:__init__()
: Defines custom layer attributes, and creates layer weights that do not depend on input shapes, usingadd_weight()
, or other state.build(self, input_shape)
: This method can be used to create weights that depend on the shape(s) of the input(s), usingadd_weight()
, or other state.__call__()
will automatically build the layer (if it has not been built yet) by callingbuild()
.call(self, *args, **kwargs)
: Called in__call__
after making surebuild()
has been called.call()
performs the logic of applying the layer to the input arguments. Two reserved keyword arguments you can optionally use incall()
are: 1.training
(boolean, whether the call is in inference mode or training mode). 2.mask
(boolean tensor encoding masked timesteps in the input, used e.g. in RNN layers). A typical signature for this method iscall(self, inputs)
, and user could optionally addtraining
andmask
if the layer need them.get_config(self)
: Returns a dictionary containing the configuration used to initialize this layer. If the keys differ from the arguments in__init__()
, then overridefrom_config(self)
as well. This method is used when saving the layer or a model that contains this layer.
Examples:
Here's a basic example: a layer with two variables,
w
andb
, that returnsy = w . x + b
. It shows how to implementbuild()
andcall()
. Variables set as attributes of a layer are tracked as weights of the layers (inlayer.weights
).class SimpleDense(Layer): def __init__(self, units=32): super().__init__() self.units = units # Create the state of the layer (weights) def build(self, input_shape): self.kernel = self.add_weight( shape=(input_shape[-1], self.units), initializer="glorot_uniform", trainable=True, name="kernel", ) self.bias = self.add_weight( shape=(self.units,), initializer="zeros", trainable=True, name="bias", ) # Defines the computation def call(self, inputs): return ops.matmul(inputs, self.kernel) + self.bias # Instantiates the layer. linear_layer = SimpleDense(4) # This will also call `build(input_shape)` and create the weights. y = linear_layer(ops.ones((2, 2))) assert len(linear_layer.weights) == 2 # These weights are trainable, so they're listed in `trainable_weights`: assert len(linear_layer.trainable_weights) == 2
Besides trainable weights, updated via backpropagation during training, layers can also have non-trainable weights. These weights are meant to be updated manually during
call()
. Here's a example layer that computes the running sum of its inputs:class ComputeSum(Layer): def __init__(self, input_dim): super(ComputeSum, self).__init__() # Create a non-trainable weight. self.total = self.add_weight( shape=(), initializer="zeros", trainable=False, name="total", ) def call(self, inputs): self.total.assign(self.total + ops.sum(inputs)) return self.total my_sum = ComputeSum(2) x = ops.ones((2, 2)) y = my_sum(x) assert my_sum.weights == [my_sum.total] assert my_sum.non_trainable_weights == [my_sum.total] assert my_sum.trainable_weights == []
Ancestors
- keras.src.layers.layer.Layer
- keras.src.backend.tensorflow.layer.TFLayer
- keras.src.backend.tensorflow.trackable.KerasAutoTrackable
- tensorflow.python.trackable.autotrackable.AutoTrackable
- tensorflow.python.trackable.base.Trackable
- keras.src.ops.operation.Operation
- keras.src.saving.keras_saveable.KerasSaveable
Methods
def build(self, input_shape)
-
Expand source code
def build(self, input_shape): input_dim = input_shape[-1] print(f"Input shape: {input_shape}") if self.rank > input_dim: raise ValueError( f"Rank ({self.rank}) cannot be greater than input dimension ({input_dim})." ) if self.rank > self.units: raise ValueError( f"Rank ({self.rank}) cannot be greater than number of units ({self.units})." ) self.A = self.add_weight( shape=(input_dim, self.rank), initializer="random_normal", trainable=True, name="A" ) self.B = self.add_weight( shape=(self.rank, self.units), initializer="random_normal", trainable=True, name="B" ) print(f"Dense weights shape: {input_dim}x{self.units}") print(f"LoRA weights shape: A{self.A.shape}, B{self.B.shape}")
def call(self, inputs)
-
Expand source code
def call(self, inputs): lora_output = tf.matmul(tf.matmul(inputs, self.A), self.B) return lora_output
- in
class suppress_prints
-
Expand source code
class suppress_prints: def __enter__(self): self.original_stdout = sys.stdout sys.stdout = open(os.devnull, "w") def __exit__(self, exc_type, exc_value, traceback): sys.stdout.close() sys.stdout = self.original_stdout