Transient Heat Diffusion Example#
This example solves transient heat diffusion on the unit square with zero Dirichlet boundaries and constant forcing, builds a POD-reduced state, and trains a linear structure-preserving NN-OpInf model with:
LinearAffineSpdTensorOperator(acts_on=x, depends_on=(), positive=False)for dissipative diffusion dynamics.VectorOffsetOperatorfor the constant forcing term.
Training uses ADAM with LBFGS acceleration:
(training_settings["optimizer"] = "ADAM",
training_settings["LBFGS-acceleration"] = True).
Run it:
python examples/diffusion/heat_diffusion_end_to_end.py --kappa 0.75 --forcing 1.0
Key outputs:
Plot:
examples/diffusion/heat_diffusion_solution.pdfTrained models:
examples/diffusion/ml-models/
Code#
import argparse
import os
import numpy as np
import torch
from matplotlib import pyplot as plt
import nnopinf
import nnopinf.models as models
import nnopinf.training
import nnopinf.training.trainers
def laplacian_dirichlet(u, n, h):
"""Five-point Laplacian on an n x n interior grid with zero Dirichlet boundaries."""
u2d = u.reshape(n, n)
upad = np.pad(u2d, ((1, 1), (1, 1)), mode="constant")
lap = (
upad[2:, 1:-1]
+ upad[:-2, 1:-1]
+ upad[1:-1, 2:]
+ upad[1:-1, :-2]
- 4.0 * upad[1:-1, 1:-1]
) / (h**2)
return lap.reshape(-1)
class HeatDiffusionFOM:
def __init__(self, n, forcing_value):
self.n = n
self.h = 1.0 / (n + 1)
self.x = np.linspace(self.h, 1.0 - self.h, n)
xx, yy = np.meshgrid(self.x, self.x, indexing="ij")
self.u0 = (
np.sin(np.pi * xx) * np.sin(np.pi * yy)
+ 0.25 * np.sin(2.0 * np.pi * xx) * np.sin(np.pi * yy)
).reshape(-1)
self.forcing = forcing_value * np.ones(self.n * self.n)
def velocity(self, u, kappa):
return kappa * laplacian_dirichlet(u, self.n, self.h) + self.forcing
def solve(self, dt, end_time, kappa):
u = self.u0.copy()
t = 0.0
rk4const = np.array([1.0 / 4.0, 1.0 / 3.0, 1.0 / 2.0, 1.0])
snapshots = []
time = []
while t <= end_time + 0.5 * dt:
snapshots.append(u.copy())
time.append(t)
u0 = u.copy()
for i in range(4):
f = self.velocity(u, kappa)
u = u0 + dt * rk4const[i] * f
t += dt
return np.array(snapshots).T, np.array(time)
class NnOpInfRom:
def __init__(self, model):
self.model_ = model
self.inputs_ = {}
def velocity(self, u):
self.inputs_["x"] = torch.tensor(u[None], dtype=torch.float64)
return self.model_.forward(self.inputs_)[0].detach().numpy()
def solve(self, u0, dt, end_time):
u = u0.copy()
t = 0.0
rk4const = np.array([1.0 / 4.0, 1.0 / 3.0, 1.0 / 2.0, 1.0])
snapshots = []
time = []
while t <= end_time + 0.5 * dt:
snapshots.append(u.copy())
time.append(t)
u0_l = u.copy()
for i in range(4):
f = self.velocity(u)
u = u0_l + dt * rk4const[i] * f
t += dt
return np.array(snapshots).T, np.array(time)
def flatten_for_training(q):
return np.reshape(q, (q.shape[0], q.shape[1] * q.shape[2])).T
def main():
parser = argparse.ArgumentParser(
description="Transient heat diffusion on the unit square with a linear SPD operator."
)
parser.add_argument("--grid-size", type=int, default=32, help="Interior points per direction.")
parser.add_argument("--dt", type=float, default=2.0e-4, help="Time step for RK4.")
parser.add_argument("--end-time", type=float, default=1.0, help="Final simulation time.")
parser.add_argument("--rom-dim", type=int, default=20, help="Requested POD rank.")
parser.add_argument("--kappa", type=float, default=0.75, help="Diffusion coefficient.")
parser.add_argument("--forcing", type=float, default=1.0, help="Constant forcing amplitude.")
parser.add_argument("--num-epochs", type=int, default=500, help="Training epochs.")
parser.add_argument("--tr-delta0", type=float, default=1.0, help="Initial trust-region radius.")
parser.add_argument("--tr-cg-max-iters", type=int, default=200, help="Maximum CG iterations per TR step.")
parser.add_argument("--tr-batch-size", type=int, default=50, help="Batch size used by TR optimizer.")
args = parser.parse_args()
torch.manual_seed(1)
np.random.seed(1)
output_dir = os.path.dirname(os.path.abspath(__file__))
model_dir = os.path.join(output_dir, "ml-models")
os.makedirs(model_dir, exist_ok=True)
fom = HeatDiffusionFOM(args.grid_size, args.forcing)
u, _ = fom.solve(args.dt, args.end_time, args.kappa)
snapshots_all = u[..., None]
# POD basis from all training trajectories.
snapshots_matrix = np.reshape(
snapshots_all, (snapshots_all.shape[0], snapshots_all.shape[1] * snapshots_all.shape[2])
)
phi, singular_values, _ = np.linalg.svd(snapshots_matrix, full_matrices=False)
relative_energy = np.cumsum(singular_values**2) / np.sum(singular_values**2)
pod_rank = np.searchsorted(relative_energy, 0.999999999) + 1
rom_dim = min(args.rom_dim, int(pod_rank))
phi = phi[:, :rom_dim]
print(f"Using ROM dimension K={rom_dim}")
# Reduced snapshots and time derivatives.
uhat = np.einsum("ij,ikn->jkn", phi, snapshots_all)
uhat_dot = (uhat[:, 2:, :] - uhat[:, :-2, :]) / (2.0 * args.dt)
uhat = uhat[:, 1:-1, :]
x_input = nnopinf.variables.Variable(size=rom_dim, name="x", normalization_strategy="MaxAbs")
target = nnopinf.variables.Variable(size=rom_dim, name="y", normalization_strategy="MaxAbs")
diffusion_operator = nnopinf.operators.LinearAffineSpdTensorOperator(
acts_on=x_input, depends_on=(), positive=False
)
forcing_operator = nnopinf.operators.VectorOffsetOperator(n_outputs=rom_dim)
model = models.Model([diffusion_operator, forcing_operator])
training_settings = nnopinf.training.get_default_settings()
training_settings["output-path"] = model_dir
training_settings["optimizer"] = "ADAM"
training_settings["batch-size"] = 5000
training_settings["num-epochs"] = args.num_epochs
training_settings["weight-decay"] = 1.0e-6
training_settings["LBFGS-acceleration"] = True
x_input.set_data(flatten_for_training(uhat))
target.set_data(flatten_for_training(uhat_dot))
print("Training linear SPD diffusion model")
nnopinf.training.trainers.train(
model, variables=[x_input], y=target, training_settings=training_settings
)
# Evaluate for the same diffusion coefficient used to generate training data.
test_kappa = args.kappa
u_fom, t = fom.solve(args.dt, args.end_time, test_kappa)
u0_r = phi.T @ fom.u0
rom = NnOpInfRom(model)
u_rom_r, _ = rom.solve(u0_r, args.dt, args.end_time)
u_rom = phi @ u_rom_r
relative_error = np.linalg.norm(u_rom - u_fom) / np.linalg.norm(u_fom)
print(f"Relative trajectory error (kappa={test_kappa:.3f}): {relative_error:.4e}")
u_fom_final = u_fom[:, -1].reshape(args.grid_size, args.grid_size)
u_rom_final = u_rom[:, -1].reshape(args.grid_size, args.grid_size)
diff_final = np.abs(u_fom_final - u_rom_final)
fig, axes = plt.subplots(1, 3, figsize=(12, 3.8), constrained_layout=True)
extent = (0.0, 1.0, 0.0, 1.0)
vmin = min(np.min(u_fom_final), np.min(u_rom_final))
vmax = max(np.max(u_fom_final), np.max(u_rom_final))
im0 = axes[0].imshow(u_fom_final, origin="lower", extent=extent, cmap="viridis", vmin=vmin, vmax=vmax)
axes[0].set_title("FOM final state")
axes[0].set_xlabel("x")
axes[0].set_ylabel("y")
im1 = axes[1].imshow(u_rom_final, origin="lower", extent=extent, cmap="viridis", vmin=vmin, vmax=vmax)
axes[1].set_title("NN-OpInf final state")
axes[1].set_xlabel("x")
axes[1].set_ylabel("y")
im2 = axes[2].imshow(diff_final, origin="lower", extent=extent, cmap="magma")
axes[2].set_title("Absolute error")
axes[2].set_xlabel("x")
axes[2].set_ylabel("y")
fig.colorbar(im0, ax=axes[0], fraction=0.046, pad=0.04)
fig.colorbar(im1, ax=axes[1], fraction=0.046, pad=0.04)
fig.colorbar(im2, ax=axes[2], fraction=0.046, pad=0.04)
fig.suptitle(f"Transient heat diffusion, kappa={test_kappa:.2f}, rel. error={relative_error:.2e}")
fig.savefig(os.path.join(output_dir, "heat_diffusion_solution.pdf"))
fig.savefig(os.path.join(output_dir, "heat_diffusion_solution.png"), dpi=200)
plt.close(fig)
if __name__ == "__main__":
main()
Final plot#
Transient heat diffusion on the unit square: final-state comparison and absolute error.#