Module likelihood.models.hmm
Classes
class HMM (n_states: int, n_observations: int)
-
Expand source code
class HMM: def __init__(self, n_states: int, n_observations: int): self.n_states = n_states self.n_observations = n_observations # Initialize parameters with random values self.pi = np.random.dirichlet(np.ones(n_states), size=1)[0] self.A = np.random.dirichlet(np.ones(n_states), size=n_states) self.B = np.random.dirichlet(np.ones(n_observations), size=n_states) def save_model(self, filename: str = "./hmm") -> None: filename = filename if filename.endswith(".pkl") else filename + ".pkl" with open(filename, "wb") as f: pickle.dump(self, f) @staticmethod def load_model(filename: str = "./hmm") -> "HMM": filename = filename + ".pkl" if not filename.endswith(".pkl") else filename with open(filename, "rb") as f: return pickle.load(f) def forward(self, sequence: List[int]) -> np.ndarray: T = len(sequence) alpha = np.zeros((T, self.n_states)) # Add a small constant (smoothing) to avoid log(0) epsilon = 1e-10 # Small value to avoid taking log(0) # Initialization (log-space) alpha[0] = np.log(self.pi + epsilon) + np.log(self.B[:, sequence[0]] + epsilon) alpha[0] -= np.log(np.sum(np.exp(alpha[0]))) # Normalization (log-space) # Recursion (log-space) for t in range(1, T): for i in range(self.n_states): alpha[t, i] = np.log( np.sum(np.exp(alpha[t - 1] + np.log(self.A[:, i] + epsilon))) ) + np.log(self.B[i, sequence[t]] + epsilon) alpha[t] -= np.log(np.sum(np.exp(alpha[t]))) # Normalization return alpha def backward(self, sequence: List[int]) -> np.ndarray: T = len(sequence) beta = np.ones((T, self.n_states)) # Backward recursion for t in range(T - 2, -1, -1): for i in range(self.n_states): beta[t, i] = np.sum(self.A[i] * self.B[:, sequence[t + 1]] * beta[t + 1]) return beta def viterbi(self, sequence: List[int]) -> np.ndarray: T = len(sequence) delta = np.zeros((T, self.n_states)) psi = np.zeros((T, self.n_states), dtype=int) # Initialization delta[0] = self.pi * self.B[:, sequence[0]] # Recursion for t in range(1, T): for i in range(self.n_states): delta[t, i] = np.max(delta[t - 1] * self.A[:, i]) * self.B[i, sequence[t]] psi[t, i] = np.argmax(delta[t - 1] * self.A[:, i]) # Reconstruct the most probable path state_sequence = np.zeros(T, dtype=int) state_sequence[T - 1] = np.argmax(delta[T - 1]) for t in range(T - 2, -1, -1): state_sequence[t] = psi[t + 1, state_sequence[t + 1]] return state_sequence def baum_welch( self, sequences: List[List[int]], n_iterations: int, verbose: bool = False ) -> None: for iteration in range(n_iterations): # Initialize accumulators A_num = np.zeros((self.n_states, self.n_states)) B_num = np.zeros((self.n_states, self.n_observations)) pi_num = np.zeros(self.n_states) for sequence in sequences: T = len(sequence) alpha = self.forward(sequence) beta = self.backward(sequence) # Update pi gamma = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True) pi_num += gamma[0] # Update A and B for t in range(T - 1): xi = np.zeros((self.n_states, self.n_states)) denom = np.sum(alpha[t] * self.A * self.B[:, sequence[t + 1]] * beta[t + 1]) for i in range(self.n_states): for j in range(self.n_states): xi[i, j] = ( alpha[t, i] * self.A[i, j] * self.B[j, sequence[t + 1]] * beta[t + 1, j] ) / denom A_num[i] += xi[i] B_num[:, sequence[t]] += gamma[t] # For the last step of the sequence B_num[:, sequence[-1]] += gamma[-1] # Normalize and update parameters self.pi = pi_num / len(sequences) self.A = A_num / np.sum(A_num, axis=1, keepdims=True) self.B = B_num / np.sum(B_num, axis=1, keepdims=True) # Logging parameters every 10 iterations if iteration % 10 == 0 and verbose: os.system("cls" if os.name == "nt" else "clear") clear_output(wait=True) logging.info(f"Iteration {iteration}:") logging.info("Pi: %s", self.pi) logging.info("A:\n%s", self.A) logging.info("B:\n%s", self.B) def decoding_accuracy(self, sequences: List[List[int]], true_states: List[List[int]]) -> float: correct_predictions = 0 total_predictions = 0 for sequence, true_state in zip(sequences, true_states): predicted_states = self.viterbi(sequence) correct_predictions += np.sum(predicted_states == true_state) total_predictions += len(sequence) accuracy = (correct_predictions / total_predictions) * 100 return accuracy def state_probabilities(self, sequence: List[int]) -> np.ndarray: """ Returns the smoothed probabilities of the hidden states at each time step. This is done by using both forward and backward probabilities. """ alpha = self.forward(sequence) beta = self.backward(sequence) # Compute smoothed probabilities (gamma) smoothed_probs = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True) return smoothed_probs def sequence_probability(self, sequence: List[int]) -> np.ndarray: return self.state_probabilities(sequence)[-1]
Static methods
def load_model(filename: str = './hmm') ‑> HMM
-
Expand source code
@staticmethod def load_model(filename: str = "./hmm") -> "HMM": filename = filename + ".pkl" if not filename.endswith(".pkl") else filename with open(filename, "rb") as f: return pickle.load(f)
Methods
def backward(self, sequence: List[int]) ‑> numpy.ndarray
-
Expand source code
def backward(self, sequence: List[int]) -> np.ndarray: T = len(sequence) beta = np.ones((T, self.n_states)) # Backward recursion for t in range(T - 2, -1, -1): for i in range(self.n_states): beta[t, i] = np.sum(self.A[i] * self.B[:, sequence[t + 1]] * beta[t + 1]) return beta
def baum_welch(self, sequences: List[List[int]], n_iterations: int, verbose: bool = False) ‑> None
-
Expand source code
def baum_welch( self, sequences: List[List[int]], n_iterations: int, verbose: bool = False ) -> None: for iteration in range(n_iterations): # Initialize accumulators A_num = np.zeros((self.n_states, self.n_states)) B_num = np.zeros((self.n_states, self.n_observations)) pi_num = np.zeros(self.n_states) for sequence in sequences: T = len(sequence) alpha = self.forward(sequence) beta = self.backward(sequence) # Update pi gamma = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True) pi_num += gamma[0] # Update A and B for t in range(T - 1): xi = np.zeros((self.n_states, self.n_states)) denom = np.sum(alpha[t] * self.A * self.B[:, sequence[t + 1]] * beta[t + 1]) for i in range(self.n_states): for j in range(self.n_states): xi[i, j] = ( alpha[t, i] * self.A[i, j] * self.B[j, sequence[t + 1]] * beta[t + 1, j] ) / denom A_num[i] += xi[i] B_num[:, sequence[t]] += gamma[t] # For the last step of the sequence B_num[:, sequence[-1]] += gamma[-1] # Normalize and update parameters self.pi = pi_num / len(sequences) self.A = A_num / np.sum(A_num, axis=1, keepdims=True) self.B = B_num / np.sum(B_num, axis=1, keepdims=True) # Logging parameters every 10 iterations if iteration % 10 == 0 and verbose: os.system("cls" if os.name == "nt" else "clear") clear_output(wait=True) logging.info(f"Iteration {iteration}:") logging.info("Pi: %s", self.pi) logging.info("A:\n%s", self.A) logging.info("B:\n%s", self.B)
def decoding_accuracy(self, sequences: List[List[int]], true_states: List[List[int]]) ‑> float
-
Expand source code
def decoding_accuracy(self, sequences: List[List[int]], true_states: List[List[int]]) -> float: correct_predictions = 0 total_predictions = 0 for sequence, true_state in zip(sequences, true_states): predicted_states = self.viterbi(sequence) correct_predictions += np.sum(predicted_states == true_state) total_predictions += len(sequence) accuracy = (correct_predictions / total_predictions) * 100 return accuracy
def forward(self, sequence: List[int]) ‑> numpy.ndarray
-
Expand source code
def forward(self, sequence: List[int]) -> np.ndarray: T = len(sequence) alpha = np.zeros((T, self.n_states)) # Add a small constant (smoothing) to avoid log(0) epsilon = 1e-10 # Small value to avoid taking log(0) # Initialization (log-space) alpha[0] = np.log(self.pi + epsilon) + np.log(self.B[:, sequence[0]] + epsilon) alpha[0] -= np.log(np.sum(np.exp(alpha[0]))) # Normalization (log-space) # Recursion (log-space) for t in range(1, T): for i in range(self.n_states): alpha[t, i] = np.log( np.sum(np.exp(alpha[t - 1] + np.log(self.A[:, i] + epsilon))) ) + np.log(self.B[i, sequence[t]] + epsilon) alpha[t] -= np.log(np.sum(np.exp(alpha[t]))) # Normalization return alpha
def save_model(self, filename: str = './hmm') ‑> None
-
Expand source code
def save_model(self, filename: str = "./hmm") -> None: filename = filename if filename.endswith(".pkl") else filename + ".pkl" with open(filename, "wb") as f: pickle.dump(self, f)
def sequence_probability(self, sequence: List[int]) ‑> numpy.ndarray
-
Expand source code
def sequence_probability(self, sequence: List[int]) -> np.ndarray: return self.state_probabilities(sequence)[-1]
def state_probabilities(self, sequence: List[int]) ‑> numpy.ndarray
-
Expand source code
def state_probabilities(self, sequence: List[int]) -> np.ndarray: """ Returns the smoothed probabilities of the hidden states at each time step. This is done by using both forward and backward probabilities. """ alpha = self.forward(sequence) beta = self.backward(sequence) # Compute smoothed probabilities (gamma) smoothed_probs = (alpha * beta) / np.sum(alpha * beta, axis=1, keepdims=True) return smoothed_probs
Returns the smoothed probabilities of the hidden states at each time step. This is done by using both forward and backward probabilities.
def viterbi(self, sequence: List[int]) ‑> numpy.ndarray
-
Expand source code
def viterbi(self, sequence: List[int]) -> np.ndarray: T = len(sequence) delta = np.zeros((T, self.n_states)) psi = np.zeros((T, self.n_states), dtype=int) # Initialization delta[0] = self.pi * self.B[:, sequence[0]] # Recursion for t in range(1, T): for i in range(self.n_states): delta[t, i] = np.max(delta[t - 1] * self.A[:, i]) * self.B[i, sequence[t]] psi[t, i] = np.argmax(delta[t - 1] * self.A[:, i]) # Reconstruct the most probable path state_sequence = np.zeros(T, dtype=int) state_sequence[T - 1] = np.argmax(delta[T - 1]) for t in range(T - 2, -1, -1): state_sequence[t] = psi[t + 1, state_sequence[t + 1]] return state_sequence