NeuralODE for Univariate Time Series Forecasting¶
In this notebook, we build a NeuralODE using pytorch to forecast $\sin$ function as a time series.
In [ ]:
Copied!
import dataclasses
import dataclasses
In [ ]:
Copied!
import math
from functools import cached_property
from typing import Dict, List, Tuple
import lightning as L
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from loguru import logger
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchdyn.core import NeuralODE
from ts_dl_utils.datasets.pendulum import Pendulum, PendulumDataModule
from ts_dl_utils.evaluation.evaluator import Evaluator
from ts_dl_utils.naive_forecasters.last_observation import LastObservationForecaster
import math
from functools import cached_property
from typing import Dict, List, Tuple
import lightning as L
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from loguru import logger
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchdyn.core import NeuralODE
from ts_dl_utils.datasets.pendulum import Pendulum, PendulumDataModule
from ts_dl_utils.evaluation.evaluator import Evaluator
from ts_dl_utils.naive_forecasters.last_observation import LastObservationForecaster
Data¶
We create a dataset that models a damped pendulum. The pendulum is modelled as a damped harmonic oscillator, i.e.,
$$ \theta(t) = \theta(0) \cos(2 \pi t / p)\exp(-\beta t), $$where $\theta(t)$ is the angle of the pendulum at time $t$. The period $p$ is calculated using
$$ p = 2 \pi \sqrt(L / g), $$with $L$ being the length of the pendulum and $g$ being the surface gravity.
In [ ]:
Copied!
pen = Pendulum(length=100)
pen = Pendulum(length=100)
In [ ]:
Copied!
df = pd.DataFrame(pen(10, 400, initial_angle=1, beta=0.001))
df = pd.DataFrame(pen(10, 400, initial_angle=1, beta=0.001))
Since the damping constant is very small, the data generated is mostly a sin wave.
In [ ]:
Copied!
_, ax = plt.subplots(figsize=(10, 6.18))
df.plot(x="t", y="theta", ax=ax)
_, ax = plt.subplots(figsize=(10, 6.18))
df.plot(x="t", y="theta", ax=ax)
Model¶
In this section, we create the NeuralODE model.
In [ ]:
Copied!
@dataclasses.dataclass
class TSNODEParams:
"""A dataclass to be served as our parameters for the model.
:param hidden_widths: list of dimensions for the hidden layers
"""
hidden_widths: List[int]
time_span: torch.Tensor
class TSNODE(nn.Module):
"""NeuralODE for univaraite time series modeling.
:param history_length: the length of the input history.
:param horizon: the number of steps to be forecasted.
:param ffn_params: the parameters for the NODE network.
"""
def __init__(self, history_length: int, horizon: int, model_params: TSNODEParams):
super().__init__()
self.model_params = model_params
self.history_length = history_length
self.horizon = horizon
self.time_span = model_params.time_span
self.regulate_input = nn.Linear(
self.history_length, self.model_params.hidden_widths[0]
)
self.hidden_layers = nn.Sequential(
*[
self._linear_block(dim_in, dim_out)
for dim_in, dim_out in zip(
self.model_params.hidden_widths[:-1],
self.model_params.hidden_widths[1:],
)
]
)
self.regulate_output = nn.Linear(
self.model_params.hidden_widths[-1], self.history_length
)
self.network = nn.Sequential(
*[self.regulate_input, self.hidden_layers, self.regulate_output]
)
@property
def node_config(self):
return dataclasses.asdict(self.ffn_params)
def _linear_block(self, dim_in, dim_out):
return nn.Sequential(*[nn.Linear(dim_in, dim_out), nn.ReLU()])
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
@dataclasses.dataclass
class TSNODEParams:
"""A dataclass to be served as our parameters for the model.
:param hidden_widths: list of dimensions for the hidden layers
"""
hidden_widths: List[int]
time_span: torch.Tensor
class TSNODE(nn.Module):
"""NeuralODE for univaraite time series modeling.
:param history_length: the length of the input history.
:param horizon: the number of steps to be forecasted.
:param ffn_params: the parameters for the NODE network.
"""
def __init__(self, history_length: int, horizon: int, model_params: TSNODEParams):
super().__init__()
self.model_params = model_params
self.history_length = history_length
self.horizon = horizon
self.time_span = model_params.time_span
self.regulate_input = nn.Linear(
self.history_length, self.model_params.hidden_widths[0]
)
self.hidden_layers = nn.Sequential(
*[
self._linear_block(dim_in, dim_out)
for dim_in, dim_out in zip(
self.model_params.hidden_widths[:-1],
self.model_params.hidden_widths[1:],
)
]
)
self.regulate_output = nn.Linear(
self.model_params.hidden_widths[-1], self.history_length
)
self.network = nn.Sequential(
*[self.regulate_input, self.hidden_layers, self.regulate_output]
)
@property
def node_config(self):
return dataclasses.asdict(self.ffn_params)
def _linear_block(self, dim_in, dim_out):
return nn.Sequential(*[nn.Linear(dim_in, dim_out), nn.ReLU()])
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
Training¶
We use lightning to train our model.
Training Utilities¶
In [ ]:
Copied!
history_length_1_step = 100
horizon_1_step = 1
gap = 10
history_length_1_step = 100
horizon_1_step = 1
gap = 10
We will build a few utilities
- To be able to feed the data into our model, we build a class (
DataFrameDataset
) that converts the pandas dataframe into a Dataset for pytorch. - To make the lightning training code simpler, we will build a LightningDataModule (
PendulumDataModule
) and a LightningModule (FFNForecaster
).
In [ ]:
Copied!
class NODEForecaster(L.LightningModule):
def __init__(self, model: nn.Module):
super().__init__()
self.model = model
self.neural_ode = NeuralODE(
self.model.network,
sensitivity="adjoint",
solver="dopri5",
atol_adjoint=1e-4,
rtol_adjoint=1e-4,
)
self.time_span = self.model.time_span
self.horizon = self.model.horizon
def configure_optimizers(self):
optimizer = torch.optim.SGD(self.parameters(), lr=1e-3)
return optimizer
def training_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
loss = nn.functional.mse_loss(y_hat, y)
self.log_dict({"train_loss": loss}, prog_bar=True)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
loss = nn.functional.mse_loss(y_hat, y)
self.log_dict({"val_loss": loss}, prog_bar=True)
return loss
def predict_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
return x, y_hat
def forward(self, x):
x = x.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
return x, y_hat
class NODEForecaster(L.LightningModule):
def __init__(self, model: nn.Module):
super().__init__()
self.model = model
self.neural_ode = NeuralODE(
self.model.network,
sensitivity="adjoint",
solver="dopri5",
atol_adjoint=1e-4,
rtol_adjoint=1e-4,
)
self.time_span = self.model.time_span
self.horizon = self.model.horizon
def configure_optimizers(self):
optimizer = torch.optim.SGD(self.parameters(), lr=1e-3)
return optimizer
def training_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
loss = nn.functional.mse_loss(y_hat, y)
self.log_dict({"train_loss": loss}, prog_bar=True)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
loss = nn.functional.mse_loss(y_hat, y)
self.log_dict({"val_loss": loss}, prog_bar=True)
return loss
def predict_step(self, batch, batch_idx):
x, y = batch
x = x.squeeze(-1).type(self.dtype)
y = y.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
return x, y_hat
def forward(self, x):
x = x.squeeze(-1).type(self.dtype)
t_, y_hat = self.neural_ode(x, self.time_span)
y_hat = y_hat[-1, ..., -self.horizon :]
return x, y_hat
Data, Model and Training¶
DataModule¶
In [ ]:
Copied!
pdm_1_step = PendulumDataModule(
history_length=history_length_1_step,
horizon=horizon_1_step,
dataframe=df[["theta"]],
gap=gap,
)
pdm_1_step = PendulumDataModule(
history_length=history_length_1_step,
horizon=horizon_1_step,
dataframe=df[["theta"]],
gap=gap,
)
LightningModule¶
In [ ]:
Copied!
ts_model_params_1_step = TSNODEParams(
hidden_widths=[256], time_span=torch.linspace(0, 1, 101)
)
ts_node_1_step = TSNODE(
history_length=history_length_1_step,
horizon=horizon_1_step,
model_params=ts_model_params_1_step,
)
ts_node_1_step
ts_model_params_1_step = TSNODEParams(
hidden_widths=[256], time_span=torch.linspace(0, 1, 101)
)
ts_node_1_step = TSNODE(
history_length=history_length_1_step,
horizon=horizon_1_step,
model_params=ts_model_params_1_step,
)
ts_node_1_step
In [ ]:
Copied!
node_forecaster_1_step = NODEForecaster(model=ts_node_1_step)
node_forecaster_1_step = NODEForecaster(model=ts_node_1_step)
Trainer¶
In [ ]:
Copied!
logger_1_step = L.pytorch.loggers.TensorBoardLogger(
save_dir="lightning_logs", name="neuralode_ts_1_step"
)
trainer_1_step = L.Trainer(
precision="32",
max_epochs=10,
min_epochs=5,
callbacks=[
EarlyStopping(monitor="val_loss", mode="min", min_delta=1e-4, patience=2)
],
logger=logger_1_step,
)
logger_1_step = L.pytorch.loggers.TensorBoardLogger(
save_dir="lightning_logs", name="neuralode_ts_1_step"
)
trainer_1_step = L.Trainer(
precision="32",
max_epochs=10,
min_epochs=5,
callbacks=[
EarlyStopping(monitor="val_loss", mode="min", min_delta=1e-4, patience=2)
],
logger=logger_1_step,
)
Fitting¶
In [ ]:
Copied!
trainer_1_step.fit(model=node_forecaster_1_step, datamodule=pdm_1_step)
trainer_1_step.fit(model=node_forecaster_1_step, datamodule=pdm_1_step)
Retrieving Predictions¶
In [ ]:
Copied!
predictions_1_step = trainer_1_step.predict(
model=node_forecaster_1_step, datamodule=pdm_1_step
)
predictions_1_step = trainer_1_step.predict(
model=node_forecaster_1_step, datamodule=pdm_1_step
)
Naive Forecasters¶
In [ ]:
Copied!
trainer_naive_1_step = L.Trainer(precision="64")
lobs_forecaster_1_step = LastObservationForecaster(horizon=horizon_1_step)
lobs_1_step_predictions = trainer_naive_1_step.predict(
model=lobs_forecaster_1_step, datamodule=pdm_1_step
)
trainer_naive_1_step = L.Trainer(precision="64")
lobs_forecaster_1_step = LastObservationForecaster(horizon=horizon_1_step)
lobs_1_step_predictions = trainer_naive_1_step.predict(
model=lobs_forecaster_1_step, datamodule=pdm_1_step
)
Evaluations¶
In [ ]:
Copied!
evaluator_1_step = Evaluator(step=0)
evaluator_1_step = Evaluator(step=0)
In [ ]:
Copied!
fig, ax = plt.subplots(figsize=(10, 6.18))
ax.plot(
evaluator_1_step.y_true(dataloader=pdm_1_step.predict_dataloader()),
"g-",
label="truth",
)
ax.plot(evaluator_1_step.y(predictions_1_step), "r--", label="predictions")
ax.plot(evaluator_1_step.y(lobs_1_step_predictions), "b-.", label="naive predictions")
plt.legend()
fig, ax = plt.subplots(figsize=(10, 6.18))
ax.plot(
evaluator_1_step.y_true(dataloader=pdm_1_step.predict_dataloader()),
"g-",
label="truth",
)
ax.plot(evaluator_1_step.y(predictions_1_step), "r--", label="predictions")
ax.plot(evaluator_1_step.y(lobs_1_step_predictions), "b-.", label="naive predictions")
plt.legend()
To quantify the results, we compute a few metrics.
In [ ]:
Copied!
evaluator_1_step.metrics(predictions_1_step, pdm_1_step.predict_dataloader())
evaluator_1_step.metrics(predictions_1_step, pdm_1_step.predict_dataloader())
In [ ]:
Copied!
evaluator_1_step.metrics(lobs_1_step_predictions, pdm_1_step.predict_dataloader())
evaluator_1_step.metrics(lobs_1_step_predictions, pdm_1_step.predict_dataloader())
Forecasting (horizon=3)¶
Train a Model¶
In [ ]:
Copied!
history_length_m_step = 100
horizon_m_step = 3
history_length_m_step = 100
horizon_m_step = 3
In [ ]:
Copied!
pdm_m_step = PendulumDataModule(
history_length=history_length_m_step,
horizon=horizon_m_step,
dataframe=df[["theta"]],
gap=gap,
)
pdm_m_step = PendulumDataModule(
history_length=history_length_m_step,
horizon=horizon_m_step,
dataframe=df[["theta"]],
gap=gap,
)
In [ ]:
Copied!
ts_model_params_m_step = TSNODEParams(
hidden_widths=[256], time_span=torch.linspace(0, 1, 101)
)
ts_node_m_step = TSNODE(
history_length=history_length_m_step,
horizon=horizon_m_step,
model_params=ts_model_params_m_step,
)
ts_node_m_step
ts_model_params_m_step = TSNODEParams(
hidden_widths=[256], time_span=torch.linspace(0, 1, 101)
)
ts_node_m_step = TSNODE(
history_length=history_length_m_step,
horizon=horizon_m_step,
model_params=ts_model_params_m_step,
)
ts_node_m_step
In [ ]:
Copied!
node_forecaster_m_step = NODEForecaster(model=ts_node_m_step)
node_forecaster_m_step = NODEForecaster(model=ts_node_m_step)
In [ ]:
Copied!
logger_m_step = L.pytorch.loggers.TensorBoardLogger(
save_dir="lightning_logs", name="neuralode_ts_m_step"
)
trainer_m_step = L.Trainer(
precision="32",
max_epochs=10,
min_epochs=5,
callbacks=[
EarlyStopping(monitor="val_loss", mode="min", min_delta=1e-4, patience=2)
],
logger=logger_m_step,
)
logger_m_step = L.pytorch.loggers.TensorBoardLogger(
save_dir="lightning_logs", name="neuralode_ts_m_step"
)
trainer_m_step = L.Trainer(
precision="32",
max_epochs=10,
min_epochs=5,
callbacks=[
EarlyStopping(monitor="val_loss", mode="min", min_delta=1e-4, patience=2)
],
logger=logger_m_step,
)
In [ ]:
Copied!
trainer_m_step.fit(model=node_forecaster_m_step, datamodule=pdm_m_step)
trainer_m_step.fit(model=node_forecaster_m_step, datamodule=pdm_m_step)
In [ ]:
Copied!
predictions_m_step = trainer_m_step.predict(
model=node_forecaster_m_step, datamodule=pdm_m_step
)
predictions_m_step = trainer_m_step.predict(
model=node_forecaster_m_step, datamodule=pdm_m_step
)
Naive Forecaster¶
In [ ]:
Copied!
trainer_naive_m_step = L.Trainer(precision="64")
lobs_forecaster_m_step = LastObservationForecaster(horizon=horizon_m_step)
lobs_m_step_predictions = trainer_naive_m_step.predict(
model=lobs_forecaster_m_step, datamodule=pdm_m_step
)
trainer_naive_m_step = L.Trainer(precision="64")
lobs_forecaster_m_step = LastObservationForecaster(horizon=horizon_m_step)
lobs_m_step_predictions = trainer_naive_m_step.predict(
model=lobs_forecaster_m_step, datamodule=pdm_m_step
)
Evaluations¶
In [ ]:
Copied!
evaluator_m_step = Evaluator(step=2, gap=gap)
evaluator_m_step = Evaluator(step=2, gap=gap)
In [ ]:
Copied!
fig, ax = plt.subplots(figsize=(10, 6.18))
ax.plot(
evaluator_m_step.y_true(dataloader=pdm_m_step.predict_dataloader()),
"g-",
label="truth",
)
ax.plot(evaluator_m_step.y(predictions_m_step), "r--", label="predictions")
ax.plot(evaluator_m_step.y(lobs_m_step_predictions), "b-.", label="naive predictions")
plt.legend()
fig, ax = plt.subplots(figsize=(10, 6.18))
ax.plot(
evaluator_m_step.y_true(dataloader=pdm_m_step.predict_dataloader()),
"g-",
label="truth",
)
ax.plot(evaluator_m_step.y(predictions_m_step), "r--", label="predictions")
ax.plot(evaluator_m_step.y(lobs_m_step_predictions), "b-.", label="naive predictions")
plt.legend()
In [ ]:
Copied!
fig, ax = plt.subplots(figsize=(10, 6.18))
for i in np.arange(0, 1000, 120):
evaluator_m_step.plot_one_sample(ax=ax, predictions=predictions_m_step, idx=i)
fig, ax = plt.subplots(figsize=(10, 6.18))
for i in np.arange(0, 1000, 120):
evaluator_m_step.plot_one_sample(ax=ax, predictions=predictions_m_step, idx=i)
In [ ]:
Copied!
evaluator_m_step.metrics(predictions_m_step, pdm_m_step.predict_dataloader())
evaluator_m_step.metrics(predictions_m_step, pdm_m_step.predict_dataloader())
In [ ]:
Copied!
evaluator_m_step.metrics(lobs_m_step_predictions, pdm_m_step.predict_dataloader())
evaluator_m_step.metrics(lobs_m_step_predictions, pdm_m_step.predict_dataloader())
Contributors: