Module likelihood.models.regression
Classes
class AbstractArima (datapoints: numpy.ndarray, noise: float = 0, tol: float = 0.0001)-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------")A class that implements the auto-regressive arima (1, 0, 0) model
Parameters
datapoints:np.ndarray- The input data points for training.
noise:float, optional- Noise level for the model, by default 0
tol:float, optional- Tolerance for convergence, by default 1e-4
Attributes
datapoints:np.ndarray- The input data points for training.
n_steps:int- Number of steps to predict.
noise:float- Noise level for the model.
p:int- Order of autoregressive part.
q:int- Order of moving average part.
tol:float- Tolerance for convergence.
nwalkers:int- Number of walkers for sampling.
mov:int- Maximum number of iterations.
theta_trained:np.ndarray- Trained parameters of the model.
Initialize the ARIMA model.
Parameters
datapoints:np.ndarray- The input data points for training.
noise:float, optional- Noise level for the model, by default 0
tol:float, optional- Tolerance for convergence, by default 1e-4
Ancestors
Subclasses
Instance variables
var datapoints-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var mov-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var n_steps-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var noise-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var nwalkers-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var p-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var q-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var theta_trained-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") var tol-
Expand source code
class AbstractArima(FeaturesArima): """A class that implements the auto-regressive arima (1, 0, 0) model Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 Attributes ---------- datapoints : `np.ndarray` The input data points for training. n_steps : `int` Number of steps to predict. noise : `float` Noise level for the model. p : `int` Order of autoregressive part. q : `int` Order of moving average part. tol : `float` Tolerance for convergence. nwalkers : `int` Number of walkers for sampling. mov : `int` Maximum number of iterations. theta_trained : `np.ndarray` Trained parameters of the model. """ __slots__ = [ "datapoints", "n_steps", "noise", "p", "q", "tol", "nwalkers", "mov", "theta_trained", ] def __init__(self, datapoints: np.ndarray, noise: float = 0, tol: float = 1e-4): """Initialize the ARIMA model. Parameters ---------- datapoints : `np.ndarray` The input data points for training. noise : `float`, optional Noise level for the model, by default 0 tol : `float`, optional Tolerance for convergence, by default 1e-4 """ self.datapoints = datapoints self.noise = noise self.p = datapoints.shape[0] self.q = 0 self.tol = tol self.n_steps = 0 def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise) def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:] def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trained def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred) def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------")
Methods
def eval(self, y_val: numpy.ndarray, y_pred: numpy.ndarray)-
Expand source code
def eval(self, y_val: np.ndarray, y_pred: np.ndarray): rmse = np.sqrt(np.mean((y_pred - y_val) ** 2)) square_error = np.sqrt((y_pred - y_val) ** 2) accuracy = np.sum(square_error[np.where(square_error < rmse)]) accuracy /= np.sum(square_error) print("Accuracy: {:.4f}".format(accuracy)) print("RMSE: {:.4f}".format(rmse)) def load_model(self, name: str = 'model')-
Expand source code
def load_model(self, name: str = "model"): with open(name + ".pkl", "rb") as file: self.theta_trained = pickle.load(file) def model(self, datapoints: numpy.ndarray, theta: list, mode=True)-
Expand source code
def model(self, datapoints: np.ndarray, theta: list, mode=True): """Compute the model forward pass. Parameters ---------- datapoints : `np.ndarray` The input data points. theta : `list` Model parameters. mode : `bool`, optional Forward pass mode, by default True Returns ------- `np.ndarray` Model output. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta return super().forward(datapoints, theta, mode, noise)Compute the model forward pass.
Parameters
datapoints:np.ndarray- The input data points.
theta:list- Model parameters.
mode:bool, optional- Forward pass mode, by default True
Returns
np.ndarrayModel output. def plot_pred(self,
y_real: numpy.ndarray,
y_pred: numpy.ndarray,
ci: float = 0.9,
mode: bool = True)-
Expand source code
def plot_pred( self, y_real: np.ndarray, y_pred: np.ndarray, ci: float = 0.90, mode: bool = True ): sns.set_theme(style="whitegrid") plt.figure(figsize=(5, 3)) n = self.n_steps y_mean = np.mean(y_pred, axis=0) y_std = np.std(y_pred, axis=0) if ci < 0.95: Z = (ci / 0.90) * 1.64 else: Z = (ci / 0.95) * 1.96 plt.plot(y_pred, label="Predicted", linewidth=2, color=sns.color_palette("deep")[1]) plt.plot( y_real, ".--", label="Real", color=sns.color_palette("deep")[0], alpha=0.6, markersize=6 ) plt.fill_between( range(y_pred.shape[0])[-n:], (y_pred - Z * y_std)[-n:], (y_pred + Z * y_std)[-n:], alpha=0.2, color=sns.color_palette("deep")[1], ) plt.title("Predicted vs Real Values with Confidence Interval", fontsize=12) plt.xlabel("Time Steps", fontsize=12) plt.ylabel("y", fontsize=12) plt.grid(True, linestyle="--", alpha=0.7) plt.xticks(fontsize=10) plt.yticks(fontsize=10) print(f"Confidence Interval: ±{Z * y_std:.4f}") plt.legend(loc="upper left", fontsize=9) if mode: plt.savefig(f"pred_{n}.png", dpi=300) plt.tight_layout() plt.show() def predict(self, n_steps: int = 0)-
Expand source code
def predict(self, n_steps: int = 0): """Make predictions for future steps. Parameters ---------- n_steps : `int`, optional Number of steps to predict, by default 0 Returns ------- `np.ndarray` Predicted values. """ self.n_steps = n_steps datapoints = self.datapoints model = self.model theta_trained = self.theta_trained y_pred = model(datapoints, theta_trained) for i in range(n_steps): self.datapoints = y_pred[i:] y_new = model(datapoints, theta_trained, mode=False) y_pred = y_pred.tolist() y_pred.append(y_new) y_pred = np.array(y_pred) return np.array(y_pred)Make predictions for future steps.
Parameters
n_steps:int, optional- Number of steps to predict, by default 0
Returns
np.ndarrayPredicted values. def save_model(self, name: str = 'model')-
Expand source code
def save_model(self, name: str = "model"): with open(name + ".pkl", "wb") as file: pickle.dump(self.theta_trained, file) def summary(self)-
Expand source code
def summary(self): print("\nSummary:") print("-----------------------") print("Lenght of theta: {}".format(len(self.theta_trained))) print("Mean of theta: {:.4f}".format(np.mean(self.theta_trained))) print("-----------------------") def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False)-
Expand source code
def train(self, nwalkers: int = 10, mov: int = 200, weights: bool = False): """Train the model using sampling method. Parameters ---------- nwalkers : `int`, optional Number of walkers for sampling, by default 10 mov : `int`, optional Maximum number of iterations, by default 200 weights : `bool`, optional Whether to use weights in sampling, by default False """ datapoints = self.datapoints xvec = self.xvec self.nwalkers = nwalkers self.mov = mov assert self.nwalkers <= self.mov, "n_walkers must be less or equal than mov" model = self.model n = self.p + self.q theta = np.random.rand(n) x_vec = xvec(datapoints) if weights: par, error = walkers( nwalkers, x_vec, datapoints, model, theta=self.theta_trained, mov=mov, tol=self.tol, figname=None, ) else: par, error = walkers( nwalkers, x_vec, datapoints, model, theta, mov=mov, tol=self.tol, figname=None ) index = np.where(error == np.min(error))[0][0] trained = np.array(par[index]) self.theta_trained = trainedTrain the model using sampling method.
Parameters
nwalkers:int, optional- Number of walkers for sampling, by default 10
mov:int, optional- Maximum number of iterations, by default 200
weights:bool, optional- Whether to use weights in sampling, by default False
def xvec(self, datapoints: numpy.ndarray, n_steps: int = 0)-
Expand source code
def xvec(self, datapoints: np.ndarray, n_steps: int = 0): """Extract vector of data points. Parameters ---------- datapoints : `np.ndarray` The input data points. n_steps : `int`, optional Number of steps to consider, by default 0 Returns ------- `np.ndarray` Extracted data points vector. """ datapoints = self.datapoints self.n_steps = n_steps return datapoints[n_steps:]Extract vector of data points.
Parameters
datapoints:np.ndarray- The input data points.
n_steps:int, optional- Number of steps to consider, by default 0
Returns
np.ndarrayExtracted data points vector.
Inherited members
class Arima (datapoints: numpy.ndarray,
p: float = 1,
d: int = 0,
q: float = 0,
noise: float = 0,
tol: float = 1e-05)-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise)A class that implements the (p, d, q) ARIMA model.
Parameters
datapoints:np.ndarray- A set of points to train the ARIMA model.
p:float- Number of auto-regressive terms (ratio). By default it is set to
1. d:int- Degree of differencing. By default it is set to
0. q:float- Number of forecast errors in the model (ratio). By default it is set to
0. n_steps:int- Number of steps to predict ahead.
noise:float- Amount of noise added during training.
tol:float- Tolerance for convergence checks.
Returns
None
Notes
The values of
p,qare scaled based on the length ofdatapoints.Initializes the ARIMA model with given parameters.
Parameters
datapoints:np.ndarray- A set of points to train the ARIMA model.
p:float- Auto-regressive term (scaled by length of data).
d:int- Degree of differencing.
q:float- Moving average term (scaled by length of data).
noise:float- Noise level for training.
tol:float- Tolerance for numerical convergence.
Returns
None
Ancestors
Instance variables
var d-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var datapoints-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var noise-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var p-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var q-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var theta_trained-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise) var tol-
Expand source code
class Arima(AbstractArima): """A class that implements the (p, d, q) ARIMA model. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Number of auto-regressive terms (ratio). By default it is set to `1`. d : int Degree of differencing. By default it is set to `0`. q : float Number of forecast errors in the model (ratio). By default it is set to `0`. n_steps : int Number of steps to predict ahead. noise : float Amount of noise added during training. tol : float Tolerance for convergence checks. Returns ------- None Notes ----- The values of `p`, `q` are scaled based on the length of `datapoints`. """ __slots__ = ["datapoints", "noise", "p", "d", "q", "tol", "theta_trained"] def __init__( self, datapoints: np.ndarray, p: float = 1, d: int = 0, q: float = 0, noise: float = 0, tol: float = 1e-5, ): """Initializes the ARIMA model with given parameters. Parameters ---------- datapoints : np.ndarray A set of points to train the ARIMA model. p : float Auto-regressive term (scaled by length of data). d : int Degree of differencing. q : float Moving average term (scaled by length of data). noise : float Noise level for training. tol : float Tolerance for numerical convergence. Returns ------- None """ self.datapoints = datapoints self.noise = noise assert p > 0 and p <= 1, "p must be less than 1 but greater than 0" self.p = int(p * len(datapoints)) assert d >= 0 and d <= 1, "p must be less than 1 but greater than or equal to 0" self.d = d self.q = int(q * len(datapoints)) self.tol = tol def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise)
Methods
def model(self, datapoints: numpy.ndarray, theta: list, mode: bool = True)-
Expand source code
def model(self, datapoints: np.ndarray, theta: list, mode: bool = True): """Computes the prior probability or prediction based on ARIMA model. Parameters ---------- datapoints : np.ndarray The input data used for modeling. theta : list Model parameters. mode : bool If True, computes in forward mode; otherwise in backward mode. Returns ------- y_vec : np.ndarray Predicted values according to the ARIMA model. """ datapoints = self.datapoints noise = self.noise self.theta_trained = theta assert type(self.d) == int, "d must be 0 or 1" if self.d != 0 or self.q != 0: if self.d != 0: y_sum = super().integrated(datapoints) norm_datapoints = np.linalg.norm(datapoints) norm_y_sum = np.linalg.norm(y_sum) if norm_y_sum != 0 and norm_datapoints != 0: y_sum = cal_average( np.abs(y_sum * (norm_datapoints / norm_y_sum)) * np.sign(datapoints), 0.05 ) else: y_sum = datapoints.copy() y_sum_regr = y_sum[-self.p :] y_regr_vec = super().forward(y_sum_regr, theta[0 : self.p], mode, 0) if self.q != 0: y_sum_average = super().average(y_sum[-self.q :]) y_vec_magnitude = np.linalg.norm(y_regr_vec.copy()) y_sum_average_magnitude = np.linalg.norm(y_sum_average) if y_sum_average_magnitude > y_vec_magnitude: scaling_factor = y_vec_magnitude / y_sum_average_magnitude y_sum_average = y_sum_average * scaling_factor theta_mean = np.mean(theta[-self.q :]) if abs(theta_mean) > 1: additional_scaling_factor = 1.0 - abs(theta_mean) y_sum_average = y_sum_average * additional_scaling_factor y_average_vec = super().forward(y_sum_average, theta[-self.q :], mode, 0) if mode: y_vec = y_regr_vec.copy() for i in reversed(range(y_average_vec.shape[0])): y_vec[i] += y_average_vec[i] else: y_vec = y_regr_vec + y_average_vec else: y_vec = y_regr_vec return y_vec else: return super().forward(datapoints, theta, mode, noise)Computes the prior probability or prediction based on ARIMA model.
Parameters
datapoints:np.ndarray- The input data used for modeling.
theta:list- Model parameters.
mode:bool- If True, computes in forward mode; otherwise in backward mode.
Returns
y_vec:np.ndarray- Predicted values according to the ARIMA model.
Inherited members
class FourierRegression (datapoints: numpy.ndarray)-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapointsA class that implements the arima model with FFT noise filtering
Parameters
datapoints:np.ndarray- A set of points to train the arima model.
Returns
new_datapoints:np.ndarray- It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit()
Initialize the ARIMA model.
Parameters
datapoints:np.ndarray- The input data points for training.
noise:float, optional- Noise level for the model, by default 0
tol:float, optional- Tolerance for convergence, by default 1e-4
Ancestors
Instance variables
var datapoints_-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints var mode-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints var mov-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints var n_walkers-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints var name-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints var sigma-
Expand source code
class FourierRegression(AbstractArima): """A class that implements the arima model with FFT noise filtering Parameters ---------- datapoints : np.ndarray A set of points to train the arima model. Returns ------- new_datapoints : np.ndarray It is the number of predicted points. It is necessary to apply predict(n_steps) followed by fit() """ __slots__ = ["datapoints_", "sigma", "mode", "mov", "n_walkers", "name"] def __init__(self, datapoints: np.ndarray): self.datapoints_ = datapoints def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def predict( self, n_steps: int, n_walkers: int = 1, name: str = "fourier_model", save: bool = True ): self.n_walkers = n_walkers self.name = name mov = self.mov assert self.n_walkers <= mov, "n_walkers must be less or equal than mov" new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().train(n_walkers, mov) if save: super().save_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints
Methods
def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False)-
Expand source code
def fit(self, sigma: int = 0, mov: int = 200, mode: bool = False): self.sigma = sigma self.mode = mode self.mov = mov datapoints = self.datapoints_ self.datapoints_, _ = fft_denoise(datapoints, sigma, mode) def load_predict(self, n_steps: int, name: str = 'fourier_model')-
Expand source code
def load_predict(self, n_steps: int, name: str = "fourier_model"): new_datapoints = [] for i in range(self.datapoints_.shape[0]): super().__init__(self.datapoints_[i, :]) super().load_model(str(i) + "_" + name) y_pred_ = super().predict(n_steps) new_datapoints.append(y_pred_) new_datapoints = np.array(new_datapoints) new_datapoints = np.reshape(new_datapoints, (len(new_datapoints), -1)) return new_datapoints
Inherited members