Module likelihood.graph.nn
Functions
def cal_adjacency_matrix(df: pandas.core.frame.DataFrame,
exclude_subset: List[str] = [],
sparse: bool = True,
**kwargs) ‑> Tuple[dict, numpy.ndarray]-
Expand source code
def cal_adjacency_matrix( df: pd.DataFrame, exclude_subset: List[str] = [], sparse: bool = True, **kwargs ) -> Tuple[dict, np.ndarray]: """ Calculates the adjacency matrix for a given DataFrame using parallel processing. Parameters ---------- df : `DataFrame` The input DataFrame containing the features. exclude_subset : `List[str]`, `optional` A list of features to exclude from the calculation of the adjacency matrix. sparse : `bool`, `optional` Whether to return a sparse matrix or a dense matrix. **kwargs : `dict` Additional keyword arguments to pass to the `compare_similarity` function. Returns ------- adj_dict : `dict` A dictionary containing the features. adjacency_matrix : `ndarray` The adjacency matrix. Keyword Arguments: ---------- similarity: `int` The minimum number of features that must be the same in both arrays to be considered similar. threshold : `float` The threshold value used in the `compare_similarity` function. Default is 0.0 """ if len(exclude_subset) > 0: columns = [col for col in df.columns if col not in exclude_subset] df_ = df[columns].copy() else: df_ = df.copy() assert len(df_) > 0 similarity = kwargs.get("similarity", len(df_.columns) - 1) threshold = kwargs.get("threshold", 0.05) assert similarity <= df_.shape[1] data = df_.to_numpy() n = len(data) adj_dict = {i: data[i].tolist() for i in range(n)} def pair_generator(): for i in range(n): for j in range(i, n): yield (i, j) with Pool(cpu_count()) as pool: results = pool.starmap( compare_pair, ((pair, data, similarity, threshold) for pair in pair_generator()) ) adjacency_matrix = np.zeros((n, n), dtype=np.uint8) for i, j, val in results: if val: adjacency_matrix[i, j] = 1 adjacency_matrix[j, i] = 1 if sparse: num_nodes = adjacency_matrix.shape[0] indices = np.argwhere(adjacency_matrix != 0.0) indices = tf.constant(indices, dtype=tf.int64) values = tf.constant(adjacency_matrix[indices[:, 0], indices[:, 1]], dtype=tf.float32) adjacency_matrix = tf.sparse.SparseTensor( indices=indices, values=values, dense_shape=(num_nodes, num_nodes) ) return adj_dict, adjacency_matrix
Calculates the adjacency matrix for a given DataFrame using parallel processing.
Parameters
df
:DataFrame
- The input DataFrame containing the features.
exclude_subset
:List[str]
,
optional- A list of features to exclude from the calculation of the adjacency matrix.
sparse
:bool
,
optional- Whether to return a sparse matrix or a dense matrix.
**kwargs
:dict
- Additional keyword arguments to pass to the
compare_similarity
function.
Returns
adj_dict
:dict
- A dictionary containing the features.
adjacency_matrix
:ndarray
- The adjacency matrix.
Keyword Arguments:
similarity:
int
The minimum number of features that must be the same in both arrays to be considered similar. threshold :float
The threshold value used in thecompare_similarity
function. Default is 0.0 def compare_pair(pair, data, similarity, threshold)
-
Expand source code
def compare_pair(pair, data, similarity, threshold): i, j = pair sim = compare_similarity_np(data[i], data[j], threshold=threshold) return (i, j, 1 if sim >= similarity else 0)
def compare_similarity_np(arr1: numpy.ndarray, arr2: numpy.ndarray, threshold: float = 0.05) ‑> int
-
Expand source code
def compare_similarity_np(arr1: np.ndarray, arr2: np.ndarray, threshold: float = 0.05) -> int: """Vectorized similarity comparison between two numeric/categorical arrays.""" arr1 = np.asarray(arr1) arr2 = np.asarray(arr2) is_numeric = np.vectorize( lambda a, b: isinstance(a, (int, float)) and isinstance(b, (int, float)) )(arr1, arr2) similarity = np.zeros_like(arr1, dtype=bool) if np.any(is_numeric): a_num = arr1[is_numeric].astype(float) b_num = arr2[is_numeric].astype(float) both_zero = (a_num == 0) & (b_num == 0) nonzero = ~both_zero & (a_num != 0) & (b_num != 0) ratio = np.zeros_like(a_num) ratio[nonzero] = np.maximum(a_num[nonzero], b_num[nonzero]) / np.minimum( a_num[nonzero], b_num[nonzero] ) numeric_similar = both_zero | ((1 - threshold <= ratio) & (ratio <= 1 + threshold)) similarity[is_numeric] = numeric_similar similarity[~is_numeric] = arr1[~is_numeric] == arr2[~is_numeric] return np.count_nonzero(similarity)
Vectorized similarity comparison between two numeric/categorical arrays.
Classes
class Data (df: pandas.core.frame.DataFrame,
target: str | None = None,
exclude_subset: List[str] = [],
**kwargs)-
Expand source code
class Data: def __init__( self, df: DataFrame, target: str | None = None, exclude_subset: List[str] = [], **kwargs, ): sparse = kwargs.get("sparse", True) threshold = kwargs.get("threshold", 0.05) _, adjacency = cal_adjacency_matrix( df, exclude_subset=exclude_subset, sparse=sparse, threshold=threshold ) if target is not None: X = df.drop(columns=[target] + exclude_subset) else: X = df.drop(columns=exclude_subset) self.columns = X.columns X = X.to_numpy() self.x = np.asarray(X).astype(np.float32) self.adjacency = adjacency if target is not None: self.y = np.asarray(df[target].values).astype(np.int32)
class VanillaGNN (dim_in, dim_h, dim_out, rank=2, **kwargs)
-
Expand source code
@tf.keras.utils.register_keras_serializable(package="Custom", name="VanillaGNN") class VanillaGNN(tf.keras.Model): def __init__(self, dim_in, dim_h, dim_out, rank=2, **kwargs): super(VanillaGNN, self).__init__(**kwargs) self.dim_in = dim_in self.dim_h = dim_h self.dim_out = dim_out self.rank = rank self.gnn1 = VanillaGNNLayer(self.dim_in, self.dim_h, self.rank) self.gnn2 = VanillaGNNLayer(self.dim_h, self.dim_h, self.rank) self.gnn3 = VanillaGNNLayer(self.dim_h, self.dim_out, None) def call(self, x, adjacency): h = self.gnn1(x, adjacency) h = tf.nn.tanh(h) h = self.gnn2(h, adjacency) h = self.gnn3(h, adjacency) return tf.nn.softmax(h, axis=1) def f1_macro(self, y_true, y_pred): return f1_score(y_true, y_pred, average="macro") def compute_f1_score(self, logits, labels): predictions = tf.argmax(logits, axis=1, output_type=tf.int32) true_labels = tf.cast(labels, tf.int32) return self.f1_macro(true_labels.numpy(), predictions.numpy()) def evaluate(self, x, adjacency, y): y = tf.cast(y, tf.int32) out = self(x, adjacency) loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=out) loss = tf.reduce_mean(loss) f1 = round(self.compute_f1_score(out, y), 4) return loss.numpy(), f1 def test(self, data): out = self(data.x, data.adjacency) test_f1 = self.compute_f1_score(out, data.y) return round(test_f1, 4) def predict(self, data): out = self(data.x, data.adjacency) return tf.argmax(out, axis=1, output_type=tf.int32).numpy() def get_config(self): config = { "dim_in": self.dim_in, "dim_h": self.dim_h, "dim_out": self.dim_out, "rank": self.rank, } base_config = super(VanillaGNN, self).get_config() return dict(list(base_config.items()) + list(config.items())) @classmethod def from_config(cls, config): return cls( dim_in=config["dim_in"], dim_h=config["dim_h"], dim_out=config["dim_out"], rank=config["rank"], ) @tf.function def train_step(self, batch_x, batch_adjacency, batch_y, optimizer): with tf.GradientTape() as tape: out = self(batch_x, batch_adjacency) loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=batch_y, logits=out) loss = tf.reduce_mean(loss) gradients = tape.gradient(loss, self.trainable_variables) optimizer.apply_gradients(zip(gradients, self.trainable_variables)) return loss def fit(self, data, epochs, batch_size, test_size=0.2, optimizer="adam"): optimizers = { "sgd": tf.keras.optimizers.SGD(), "adam": tf.keras.optimizers.Adam(), "adamw": tf.keras.optimizers.AdamW(), "adadelta": tf.keras.optimizers.Adadelta(), "rmsprop": tf.keras.optimizers.RMSprop(), } optimizer = optimizers[optimizer] train_losses = [] train_f1_scores = [] val_losses = [] val_f1_scores = [] num_nodes = len(data.x) split_index = int((1 - test_size) * num_nodes) X_train, X_test = data.x[:split_index], data.x[split_index:] y_train, y_test = data.y[:split_index], data.y[split_index:] adjacency_train = tf.sparse.slice(data.adjacency, [0, 0], [split_index, split_index]) adjacency_test = tf.sparse.slice( data.adjacency, [split_index, split_index], [num_nodes - split_index, num_nodes - split_index], ) batch_starts = np.arange(0, len(X_train), batch_size) for epoch in range(epochs): np.random.shuffle(batch_starts) for start in batch_starts: end = start + batch_size batch_x = X_train[start:end, :] batch_adjacency = tf.sparse.slice( adjacency_train, [start, start], [batch_size, batch_size] ) batch_y = y_train[start:end] train_loss = self.train_step(batch_x, batch_adjacency, batch_y, optimizer) train_loss, train_f1 = self.evaluate(X_train, adjacency_train, y_train) train_losses.append(train_loss) train_f1_scores.append(train_f1) if epoch % 5 == 0: clear_output(wait=True) val_loss, val_f1 = self.evaluate(X_test, adjacency_test, y_test) val_losses.append(val_loss) val_f1_scores.append(val_f1) print( f"Epoch {epoch:>3} | Train Loss: {train_loss:.4f} | Train F1: {train_f1:.4f} | Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f}" ) return train_losses, train_f1_scores, val_losses, val_f1_scores
A model grouping layers into an object with training/inference features.
There are three ways to instantiate a
Model
:With the "Functional API"
You start from
Input
, you chain layer calls to specify the model's forward pass, and finally, you create your model from inputs and outputs:inputs = keras.Input(shape=(37,)) x = keras.layers.Dense(32, activation="relu")(inputs) outputs = keras.layers.Dense(5, activation="softmax")(x) model = keras.Model(inputs=inputs, outputs=outputs)
Note: Only dicts, lists, and tuples of input tensors are supported. Nested inputs are not supported (e.g. lists of list or dicts of dict).
A new Functional API model can also be created by using the intermediate tensors. This enables you to quickly extract sub-components of the model.
Example:
inputs = keras.Input(shape=(None, None, 3)) processed = keras.layers.RandomCrop(width=128, height=128)(inputs) conv = keras.layers.Conv2D(filters=32, kernel_size=3)(processed) pooling = keras.layers.GlobalAveragePooling2D()(conv) feature = keras.layers.Dense(10)(pooling) full_model = keras.Model(inputs, feature) backbone = keras.Model(processed, conv) activations = keras.Model(conv, feature)
Note that the
backbone
andactivations
models are not created withkeras.Input
objects, but with the tensors that originate fromkeras.Input
objects. Under the hood, the layers and weights will be shared across these models, so that user can train thefull_model
, and usebackbone
oractivations
to do feature extraction. The inputs and outputs of the model can be nested structures of tensors as well, and the created models are standard Functional API models that support all the existing APIs.By subclassing the
Model
classIn that case, you should define your layers in
__init__()
and you should implement the model's forward pass incall()
.class MyModel(keras.Model): def __init__(self): super().__init__() self.dense1 = keras.layers.Dense(32, activation="relu") self.dense2 = keras.layers.Dense(5, activation="softmax") def call(self, inputs): x = self.dense1(inputs) return self.dense2(x) model = MyModel()
If you subclass
Model
, you can optionally have atraining
argument (boolean) incall()
, which you can use to specify a different behavior in training and inference:class MyModel(keras.Model): def __init__(self): super().__init__() self.dense1 = keras.layers.Dense(32, activation="relu") self.dense2 = keras.layers.Dense(5, activation="softmax") self.dropout = keras.layers.Dropout(0.5) def call(self, inputs, training=False): x = self.dense1(inputs) x = self.dropout(x, training=training) return self.dense2(x) model = MyModel()
Once the model is created, you can config the model with losses and metrics with
model.compile()
, train the model withmodel.fit()
, or use the model to do prediction withmodel.predict()
.With the
Sequential
classIn addition,
keras.Sequential
is a special case of model where the model is purely a stack of single-input, single-output layers.model = keras.Sequential([ keras.Input(shape=(None, None, 3)), keras.layers.Conv2D(filters=32, kernel_size=3), ])
Ancestors
- keras.src.models.model.Model
- keras.src.backend.tensorflow.trainer.TensorFlowTrainer
- keras.src.trainers.trainer.Trainer
- keras.src.layers.layer.Layer
- keras.src.backend.tensorflow.layer.TFLayer
- keras.src.backend.tensorflow.trackable.KerasAutoTrackable
- tensorflow.python.trackable.autotrackable.AutoTrackable
- tensorflow.python.trackable.base.Trackable
- keras.src.ops.operation.Operation
- keras.src.saving.keras_saveable.KerasSaveable
Static methods
def from_config(config)
-
Creates an operation from its config.
This method is the reverse of
get_config
, capable of instantiating the same operation from the config dictionary.Note: If you override this method, you might receive a serialized dtype config, which is a
dict
. You can deserialize it as follows:if "dtype" in config and isinstance(config["dtype"], dict): policy = dtype_policies.deserialize(config["dtype"])
Args
config
- A Python dictionary, typically the output of
get_config
.
Returns
An operation instance.
Methods
def call(self, x, adjacency)
-
Expand source code
def call(self, x, adjacency): h = self.gnn1(x, adjacency) h = tf.nn.tanh(h) h = self.gnn2(h, adjacency) h = self.gnn3(h, adjacency) return tf.nn.softmax(h, axis=1)
def compute_f1_score(self, logits, labels)
-
Expand source code
def compute_f1_score(self, logits, labels): predictions = tf.argmax(logits, axis=1, output_type=tf.int32) true_labels = tf.cast(labels, tf.int32) return self.f1_macro(true_labels.numpy(), predictions.numpy())
def evaluate(self, x, adjacency, y)
-
Expand source code
def evaluate(self, x, adjacency, y): y = tf.cast(y, tf.int32) out = self(x, adjacency) loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=out) loss = tf.reduce_mean(loss) f1 = round(self.compute_f1_score(out, y), 4) return loss.numpy(), f1
Returns the loss value & metrics values for the model in test mode.
Computation is done in batches (see the
batch_size
arg.)Args
x
- Input data. It can be:
- A NumPy array (or array-like), or a list of arrays
(in case the model has multiple inputs).
- A backend-native tensor, or a list of tensors
(in case the model has multiple inputs).
- A dict mapping input names to the corresponding array/tensors,
if the model has named inputs.
- A
keras.utils.PyDataset
returning(inputs, targets)
or(inputs, targets, sample_weights)
. - Atf.data.Dataset
yielding(inputs, targets)
or(inputs, targets, sample_weights)
. - Atorch.utils.data.DataLoader
yielding(inputs, targets)
or(inputs, targets, sample_weights)
. - A Python generator function yielding(inputs, targets)
or(inputs, targets, sample_weights)
. y
- Target data. Like the input data
x
, it can be either NumPy array(s) or backend-native tensor(s). Ifx
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or a Python generator function,y
should not be specified since targets will be obtained fromx
. batch_size
- Integer or
None
. Number of samples per batch of computation. If unspecified,batch_size
will default to 32. Do not specify thebatch_size
if your input datax
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function since they generate batches. verbose
"auto"
, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line."auto"
becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, soverbose=2
is recommended when not running interactively (e.g. in a production environment). Defaults to"auto"
.sample_weight
- Optional NumPy array or tensor of weights for
the training samples, used for weighting the loss function
(during training only). You can either pass a flat (1D)
NumPy array or tensor with the same length as the input samples
(1:1 mapping between weights and samples), or in the case of
temporal data, you can pass a 2D NumPy array or tensor with
shape
(samples, sequence_length)
to apply a different weight to every timestep of every sample. This argument is not supported whenx
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function. Instead, providesample_weights
as the third element ofx
. Note that sample weighting does not apply to metrics specified via themetrics
argument incompile()
. To apply sample weighting to your metrics, you can specify them via theweighted_metrics
incompile()
instead. steps
- Integer or
None
. Total number of steps (batches of samples) to draw before declaring the evaluation round finished. Ifsteps
isNone
, it will run untilx
is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks
- List of
keras.callbacks.Callback
instances. List of callbacks to apply during evaluation. return_dict
- If
True
, loss and metric results are returned as a dict, with each key being the name of the metric. IfFalse
, they are returned as a list.
Returns
Scalar test loss (if the model has a single output and no metrics) or list of scalars (if the model has multiple outputs and/or metrics). The attribute
model.metrics_names
will give you the display labels for the scalar outputs. def f1_macro(self, y_true, y_pred)
-
Expand source code
def f1_macro(self, y_true, y_pred): return f1_score(y_true, y_pred, average="macro")
def fit(self, data, epochs, batch_size, test_size=0.2, optimizer='adam')
-
Expand source code
def fit(self, data, epochs, batch_size, test_size=0.2, optimizer="adam"): optimizers = { "sgd": tf.keras.optimizers.SGD(), "adam": tf.keras.optimizers.Adam(), "adamw": tf.keras.optimizers.AdamW(), "adadelta": tf.keras.optimizers.Adadelta(), "rmsprop": tf.keras.optimizers.RMSprop(), } optimizer = optimizers[optimizer] train_losses = [] train_f1_scores = [] val_losses = [] val_f1_scores = [] num_nodes = len(data.x) split_index = int((1 - test_size) * num_nodes) X_train, X_test = data.x[:split_index], data.x[split_index:] y_train, y_test = data.y[:split_index], data.y[split_index:] adjacency_train = tf.sparse.slice(data.adjacency, [0, 0], [split_index, split_index]) adjacency_test = tf.sparse.slice( data.adjacency, [split_index, split_index], [num_nodes - split_index, num_nodes - split_index], ) batch_starts = np.arange(0, len(X_train), batch_size) for epoch in range(epochs): np.random.shuffle(batch_starts) for start in batch_starts: end = start + batch_size batch_x = X_train[start:end, :] batch_adjacency = tf.sparse.slice( adjacency_train, [start, start], [batch_size, batch_size] ) batch_y = y_train[start:end] train_loss = self.train_step(batch_x, batch_adjacency, batch_y, optimizer) train_loss, train_f1 = self.evaluate(X_train, adjacency_train, y_train) train_losses.append(train_loss) train_f1_scores.append(train_f1) if epoch % 5 == 0: clear_output(wait=True) val_loss, val_f1 = self.evaluate(X_test, adjacency_test, y_test) val_losses.append(val_loss) val_f1_scores.append(val_f1) print( f"Epoch {epoch:>3} | Train Loss: {train_loss:.4f} | Train F1: {train_f1:.4f} | Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f}" ) return train_losses, train_f1_scores, val_losses, val_f1_scores
Trains the model for a fixed number of epochs (dataset iterations).
Args
x
- Input data. It can be:
- A NumPy array (or array-like), or a list of arrays
(in case the model has multiple inputs).
- A backend-native tensor, or a list of tensors
(in case the model has multiple inputs).
- A dict mapping input names to the corresponding array/tensors,
if the model has named inputs.
- A
keras.utils.PyDataset
returning(inputs, targets)
or(inputs, targets, sample_weights)
. - Atf.data.Dataset
yielding(inputs, targets)
or(inputs, targets, sample_weights)
. - Atorch.utils.data.DataLoader
yielding(inputs, targets)
or(inputs, targets, sample_weights)
. - A Python generator function yielding(inputs, targets)
or(inputs, targets, sample_weights)
. y
- Target data. Like the input data
x
, it can be either NumPy array(s) or backend-native tensor(s). Ifx
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or a Python generator function,y
should not be specified since targets will be obtained fromx
. batch_size
- Integer or
None
. Number of samples per gradient update. If unspecified,batch_size
will default to 32. Do not specify thebatch_size
if your input datax
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function since they generate batches. epochs
- Integer. Number of epochs to train the model.
An epoch is an iteration over the entire
x
andy
data provided (unless thesteps_per_epoch
flag is set to something other than None). Note that in conjunction withinitial_epoch
,epochs
is to be understood as "final epoch". The model is not trained for a number of iterations given byepochs
, but merely until the epoch of indexepochs
is reached. verbose
"auto"
, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch. "auto" becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, soverbose=2
is recommended when not running interactively (e.g., in a production environment). Defaults to"auto"
.callbacks
- List of
keras.callbacks.Callback
instances. List of callbacks to apply during training. Seekeras.callbacks
. Notekeras.callbacks.ProgbarLogger
andkeras.callbacks.History
callbacks are created automatically and need not be passed tomodel.fit()
.keras.callbacks.ProgbarLogger
is created or not based on theverbose
argument inmodel.fit()
. validation_split
- Float between 0 and 1.
Fraction of the training data to be used as validation data.
The model will set apart this fraction of the training data,
will not train on it, and will evaluate the loss and any model
metrics on this data at the end of each epoch. The validation
data is selected from the last samples in the
x
andy
data provided, before shuffling. This argument is only supported whenx
andy
are made of NumPy arrays or tensors. If bothvalidation_data
andvalidation_split
are provided,validation_data
will overridevalidation_split
. validation_data
- Data on which to evaluate
the loss and any model metrics at the end of each epoch.
The model will not be trained on this data. Thus, note the fact
that the validation loss of data provided using
validation_split
orvalidation_data
is not affected by regularization layers like noise and dropout.validation_data
will overridevalidation_split
. It can be: - A tuple(x_val, y_val)
of NumPy arrays or tensors. - A tuple(x_val, y_val, val_sample_weights)
of NumPy arrays. - Akeras.utils.PyDataset
, atf.data.Dataset
, atorch.utils.data.DataLoader
yielding(inputs, targets)
or a Python generator function yielding(x_val, y_val)
or(inputs, targets, sample_weights)
. shuffle
- Boolean, whether to shuffle the training data before each
epoch. This argument is ignored when
x
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function. class_weight
- Optional dictionary mapping class indices (integers)
to a weight (float) value, used for weighting the loss function
(during training only).
This can be useful to tell the model to
"pay more attention" to samples from
an under-represented class. When
class_weight
is specified and targets have a rank of 2 or greater, eithery
must be one-hot encoded, or an explicit final dimension of1
must be included for sparse class labels. sample_weight
- Optional NumPy array or tensor of weights for
the training samples, used for weighting the loss function
(during training only). You can either pass a flat (1D)
NumPy array or tensor with the same length as the input samples
(1:1 mapping between weights and samples), or in the case of
temporal data, you can pass a 2D NumPy array or tensor with
shape
(samples, sequence_length)
to apply a different weight to every timestep of every sample. This argument is not supported whenx
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function. Instead, providesample_weights
as the third element ofx
. Note that sample weighting does not apply to metrics specified via themetrics
argument incompile()
. To apply sample weighting to your metrics, you can specify them via theweighted_metrics
incompile()
instead. initial_epoch
- Integer. Epoch at which to start training (useful for resuming a previous training run).
steps_per_epoch
- Integer or
None
. Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch. When training with input tensors or NumPy arrays, the defaultNone
means that the value used is the number of samples in your dataset divided by the batch size, or 1 if that cannot be determined. Ifx
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function, the epoch will run until the input dataset is exhausted. When passing an infinitely repeating dataset, you must specify thesteps_per_epoch
argument, otherwise the training will run indefinitely. validation_steps
- Integer or
None
. Only relevant ifvalidation_data
is provided. Total number of steps (batches of samples) to draw before stopping when performing validation at the end of every epoch. Ifvalidation_steps
isNone
, validation will run until thevalidation_data
dataset is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. Ifvalidation_steps
is specified and only part of the dataset is consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used every time. validation_batch_size
- Integer or
None
. Number of samples per validation batch. If unspecified, will default tobatch_size
. Do not specify thevalidation_batch_size
if your data is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function since they generate batches. validation_freq
- Only relevant if validation data is provided.
Specifies how many training epochs to run
before a new validation run is performed,
e.g.
validation_freq=2
runs validation every 2 epochs.
Unpacking behavior for iterator-like inputs: A common pattern is to pass an iterator like object such as a
tf.data.Dataset
or akeras.utils.PyDataset
tofit()
, which will in fact yield not only features (x
) but optionally targets (y
) and sample weights (sample_weight
). Keras requires that the output of such iterator-likes be unambiguous. The iterator should return a tuple of length 1, 2, or 3, where the optional second and third elements will be used fory
andsample_weight
respectively. Any other type provided will be wrapped in a length-one tuple, effectively treating everything asx
. When yielding dicts, they should still adhere to the top-level tuple structure, e.g.({"x0": x0, "x1": x1}, y)
. Keras will not attempt to separate features, targets, and weights from the keys of a single dict. A notable unsupported data type is thenamedtuple
. The reason is that it behaves like both an ordered datatype (tuple) and a mapping datatype (dict). So given a namedtuple of the form:namedtuple("example_tuple", ["y", "x"])
it is ambiguous whether to reverse the order of the elements when interpreting the value. Even worse is a tuple of the form:namedtuple("other_tuple", ["x", "y", "z"])
where it is unclear if the tuple was intended to be unpacked intox
,y
, andsample_weight
or passed through as a single element tox
.Returns
A
History
object. ItsHistory.history
attribute is a record of training loss values and metrics values at successive epochs, as well as validation loss values and validation metrics values (if applicable). def get_config(self)
-
Expand source code
def get_config(self): config = { "dim_in": self.dim_in, "dim_h": self.dim_h, "dim_out": self.dim_out, "rank": self.rank, } base_config = super(VanillaGNN, self).get_config() return dict(list(base_config.items()) + list(config.items()))
Returns the config of the object.
An object config is a Python dictionary (serializable) containing the information needed to re-instantiate it.
def predict(self, data)
-
Expand source code
def predict(self, data): out = self(data.x, data.adjacency) return tf.argmax(out, axis=1, output_type=tf.int32).numpy()
Generates output predictions for the input samples.
Computation is done in batches. This method is designed for batch processing of large numbers of inputs. It is not intended for use inside of loops that iterate over your data and process small numbers of inputs at a time.
For small numbers of inputs that fit in one batch, directly use
__call__()
for faster execution, e.g.,model(x)
, ormodel(x, training=False)
if you have layers such asBatchNormalization
that behave differently during inference.Note: See this FAQ entry for more details about the difference between
Model
methodspredict()
and__call__()
.Args
x
- Input data. It can be:
- A NumPy array (or array-like), or a list of arrays
(in case the model has multiple inputs).
- A backend-native tensor, or a list of tensors
(in case the model has multiple inputs).
- A dict mapping input names to the corresponding array/tensors,
if the model has named inputs.
- A
keras.utils.PyDataset
. - Atf.data.Dataset
. - Atorch.utils.data.DataLoader
. - A Python generator function. batch_size
- Integer or
None
. Number of samples per batch of computation. If unspecified,batch_size
will default to 32. Do not specify thebatch_size
if your input datax
is akeras.utils.PyDataset
,tf.data.Dataset
,torch.utils.data.DataLoader
or Python generator function since they generate batches. verbose
"auto"
, 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = single line."auto"
becomes 1 for most cases. Note that the progress bar is not particularly useful when logged to a file, soverbose=2
is recommended when not running interactively (e.g. in a production environment). Defaults to"auto"
.steps
- Total number of steps (batches of samples) to draw before
declaring the prediction round finished. If
steps
isNone
, it will run untilx
is exhausted. In the case of an infinitely repeating dataset, it will run indefinitely. callbacks
- List of
keras.callbacks.Callback
instances. List of callbacks to apply during prediction.
Returns
NumPy array(s) of predictions.
def test(self, data)
-
Expand source code
def test(self, data): out = self(data.x, data.adjacency) test_f1 = self.compute_f1_score(out, data.y) return round(test_f1, 4)
def train_step(self, batch_x, batch_adjacency, batch_y, optimizer)
-
Expand source code
@tf.function def train_step(self, batch_x, batch_adjacency, batch_y, optimizer): with tf.GradientTape() as tape: out = self(batch_x, batch_adjacency) loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=batch_y, logits=out) loss = tf.reduce_mean(loss) gradients = tape.gradient(loss, self.trainable_variables) optimizer.apply_gradients(zip(gradients, self.trainable_variables)) return loss
class VanillaGNNLayer (dim_in, dim_out, rank=None, kernel_initializer='glorot_uniform', **kwargs)
-
Expand source code
@tf.keras.utils.register_keras_serializable(package="Custom", name="VanillaGNNLayer") class VanillaGNNLayer(tf.keras.layers.Layer): def __init__(self, dim_in, dim_out, rank=None, kernel_initializer="glorot_uniform", **kwargs): super(VanillaGNNLayer, self).__init__(**kwargs) self.dim_out = dim_out self.rank = rank self.kernel_initializer = kernel_initializer self.linear = None def build(self, input_shape): if self.rank: self.linear = LoRALayer(self.dim_out, rank=self.rank) else: self.linear = tf.keras.layers.Dense( self.dim_out, use_bias=False, kernel_initializer=self.kernel_initializer ) super(VanillaGNNLayer, self).build(input_shape) def call(self, x, adjacency): x = self.linear(x) x = tf.sparse.sparse_dense_matmul(adjacency, x) return x def get_config(self): config = super(VanillaGNNLayer, self).get_config() config.update( { "dim_out": self.dim_out, "rank": self.rank, "kernel_initializer": ( None if self.rank else tf.keras.initializers.serialize(self.linear.kernel_initializer) ), } ) return config
This is the class from which all layers inherit.
A layer is a callable object that takes as input one or more tensors and that outputs one or more tensors. It involves computation, defined in the
call()
method, and a state (weight variables). State can be created:- in
__init__()
, for instance viaself.add_weight()
; - in the optional
build()
method, which is invoked by the first__call__()
to the layer, and supplies the shape(s) of the input(s), which may not have been known at initialization time.
Layers are recursively composable: If you assign a Layer instance as an attribute of another Layer, the outer layer will start tracking the weights created by the inner layer. Nested layers should be instantiated in the
__init__()
method orbuild()
method.Users will just instantiate a layer and then treat it as a callable.
Args
trainable
- Boolean, whether the layer's variables should be trainable.
name
- String name of the layer.
dtype
- The dtype of the layer's computations and weights. Can also be a
keras.DTypePolicy
, which allows the computation and weight dtype to differ. Defaults toNone
.None
means to usekeras.config.dtype_policy()
, which is afloat32
policy unless set to different value (viakeras.config.set_dtype_policy()
).
Attributes
name
- The name of the layer (string).
dtype
- Dtype of the layer's weights. Alias of
layer.variable_dtype
. variable_dtype
- Dtype of the layer's weights.
compute_dtype
- The dtype of the layer's computations.
Layers automatically cast inputs to this dtype, which causes
the computations and output to also be in this dtype.
When mixed precision is used with a
keras.DTypePolicy
, this will be different thanvariable_dtype
. trainable_weights
- List of variables to be included in backprop.
non_trainable_weights
- List of variables that should not be included in backprop.
weights
- The concatenation of the lists trainable_weights and non_trainable_weights (in this order).
trainable
- Whether the layer should be trained (boolean), i.e.
whether its potentially-trainable weights should be returned
as part of
layer.trainable_weights
. input_spec
- Optional (list of)
InputSpec
object(s) specifying the constraints on inputs that can be accepted by the layer.
We recommend that descendants of
Layer
implement the following methods:__init__()
: Defines custom layer attributes, and creates layer weights that do not depend on input shapes, usingadd_weight()
, or other state.build(self, input_shape)
: This method can be used to create weights that depend on the shape(s) of the input(s), usingadd_weight()
, or other state.__call__()
will automatically build the layer (if it has not been built yet) by callingbuild()
.call(self, *args, **kwargs)
: Called in__call__
after making surebuild()
has been called.call()
performs the logic of applying the layer to the input arguments. Two reserved keyword arguments you can optionally use incall()
are: 1.training
(boolean, whether the call is in inference mode or training mode). 2.mask
(boolean tensor encoding masked timesteps in the input, used e.g. in RNN layers). A typical signature for this method iscall(self, inputs)
, and user could optionally addtraining
andmask
if the layer need them.get_config(self)
: Returns a dictionary containing the configuration used to initialize this layer. If the keys differ from the arguments in__init__()
, then overridefrom_config(self)
as well. This method is used when saving the layer or a model that contains this layer.
Examples:
Here's a basic example: a layer with two variables,
w
andb
, that returnsy = w . x + b
. It shows how to implementbuild()
andcall()
. Variables set as attributes of a layer are tracked as weights of the layers (inlayer.weights
).class SimpleDense(Layer): def __init__(self, units=32): super().__init__() self.units = units # Create the state of the layer (weights) def build(self, input_shape): self.kernel = self.add_weight( shape=(input_shape[-1], self.units), initializer="glorot_uniform", trainable=True, name="kernel", ) self.bias = self.add_weight( shape=(self.units,), initializer="zeros", trainable=True, name="bias", ) # Defines the computation def call(self, inputs): return ops.matmul(inputs, self.kernel) + self.bias # Instantiates the layer. linear_layer = SimpleDense(4) # This will also call `build(input_shape)` and create the weights. y = linear_layer(ops.ones((2, 2))) assert len(linear_layer.weights) == 2 # These weights are trainable, so they're listed in `trainable_weights`: assert len(linear_layer.trainable_weights) == 2
Besides trainable weights, updated via backpropagation during training, layers can also have non-trainable weights. These weights are meant to be updated manually during
call()
. Here's a example layer that computes the running sum of its inputs:class ComputeSum(Layer): def __init__(self, input_dim): super(ComputeSum, self).__init__() # Create a non-trainable weight. self.total = self.add_weight( shape=(), initializer="zeros", trainable=False, name="total", ) def call(self, inputs): self.total.assign(self.total + ops.sum(inputs)) return self.total my_sum = ComputeSum(2) x = ops.ones((2, 2)) y = my_sum(x) assert my_sum.weights == [my_sum.total] assert my_sum.non_trainable_weights == [my_sum.total] assert my_sum.trainable_weights == []
Ancestors
- keras.src.layers.layer.Layer
- keras.src.backend.tensorflow.layer.TFLayer
- keras.src.backend.tensorflow.trackable.KerasAutoTrackable
- tensorflow.python.trackable.autotrackable.AutoTrackable
- tensorflow.python.trackable.base.Trackable
- keras.src.ops.operation.Operation
- keras.src.saving.keras_saveable.KerasSaveable
Methods
def build(self, input_shape)
-
Expand source code
def build(self, input_shape): if self.rank: self.linear = LoRALayer(self.dim_out, rank=self.rank) else: self.linear = tf.keras.layers.Dense( self.dim_out, use_bias=False, kernel_initializer=self.kernel_initializer ) super(VanillaGNNLayer, self).build(input_shape)
def call(self, x, adjacency)
-
Expand source code
def call(self, x, adjacency): x = self.linear(x) x = tf.sparse.sparse_dense_matmul(adjacency, x) return x
def get_config(self)
-
Expand source code
def get_config(self): config = super(VanillaGNNLayer, self).get_config() config.update( { "dim_out": self.dim_out, "rank": self.rank, "kernel_initializer": ( None if self.rank else tf.keras.initializers.serialize(self.linear.kernel_initializer) ), } ) return config
Returns the config of the object.
An object config is a Python dictionary (serializable) containing the information needed to re-instantiate it.
- in