From ff6d962b7dbf61957008a55298b76d2710b94c48 Mon Sep 17 00:00:00 2001 From: Joris Gillis Date: Sun, 23 Feb 2025 11:03:23 +0100 Subject: [PATCH] Testing FM scripts --- .github/workflows/code.yml | 6 + NN_MPC/Controllers.py | 470 ++++++++++++++++++++++++++ NN_MPC/README.md | 4 + NN_MPC/Utility.py | 216 ++++++++++++ NN_MPC/models/cart_pendulum.yaml | 22 ++ NN_MPC/models/cart_pendulum2.yaml | 11 + NN_MPC/models/furuta.yaml | 39 +++ NN_MPC/models/furuta_pendulum.yaml | 39 +++ NN_MPC/models/spring_mass_damper.yaml | 18 + NN_MPC/nnmpc_tutorial.py | 266 +++++++++++++++ 10 files changed, 1091 insertions(+) create mode 100644 NN_MPC/Controllers.py create mode 100644 NN_MPC/README.md create mode 100644 NN_MPC/Utility.py create mode 100644 NN_MPC/models/cart_pendulum.yaml create mode 100644 NN_MPC/models/cart_pendulum2.yaml create mode 100644 NN_MPC/models/furuta.yaml create mode 100644 NN_MPC/models/furuta_pendulum.yaml create mode 100644 NN_MPC/models/spring_mass_damper.yaml create mode 100644 NN_MPC/nnmpc_tutorial.py diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index 2b07a2d..a707084 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -56,3 +56,9 @@ jobs: conda activate workshop_dirac echo 'y' | python test.py + - name: Run tests + shell: bash -l {0} + run: | + conda activate workshop_dirac + cd NN_MPC + python nmpc_tutorial.py diff --git a/NN_MPC/Controllers.py b/NN_MPC/Controllers.py new file mode 100644 index 0000000..6712cc7 --- /dev/null +++ b/NN_MPC/Controllers.py @@ -0,0 +1,470 @@ +from abc import ABC, abstractmethod +import numpy as np +import copy +import torch +import torch.nn as nn +import torch.optim as optim +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import MinMaxScaler +import tqdm +import plotly.graph_objects as go +from plotly.subplots import make_subplots +import warnings +import contextlib +import os + +import impact +import casadi + + +class Controller(ABC): + + @abstractmethod + def control_action(self, x0 : np.array, xf : np.array, param:dict ): + pass + + @abstractmethod + def control_plan(self, x0 : np.array, xf : np.array, param:dict): + pass + + +class MPC(impact.MPC, Controller): + + impact_model : impact.Model = None + bound_states : np.ndarray = None + bound_inputs : np.ndarray = None + OCP_mode : bool = False + planned_control : np.ndarray= None + solver_options = { "ipopt.print_level":0, + "print_time":0, + "expand": True, + "verbose": False, + "print_time": False, + "error_on_fail": False, + "ipopt": {"sb":"yes", "tol": 1e-8, "max_iter":1e3}} + + def __init__(self): + # # setting up boundary constraints + # self.x_current = impact_mpc.parameter('x_current',impact_model.nx) + # self.x_final = impact_mpc.parameter('x_final',impact_model.nx) + # impact_mpc.subject_to(impact_mpc.at_t0(impact_model.x) == self.x_current) + # impact_mpc.subject_to(impact_mpc.at_tf(impact_model.x) == self.x_final) + + impact.MPC.__init__(self) + self.solver('ipopt', self.solver_options) + + return None + + def add_model(self, model_name, path): + model = super().add_model(model_name, path) + self.impact_model = model + self.method(impact.MultipleShooting(N=20, M = 1, intg='rk')) + model.sim_step = self._method.discrete_system(self) + + self.bound_states = np.array([[-np.inf, np.inf]] * model.nx) + self.bound_inputs = np.array([[-np.inf, np.inf]] * model.nu) + return model + + def control_action(self, x0: np.array, xf: np.array, param:dict): + # U, X = self.control_plan(x0, xf, param) + # u = U[0] + + if self.OCP_mode and self.planned_control is not None: + U = self.planned_control + k = param["current_index"] + u = U[k] + else: + U, X = self.control_plan(x0, xf, param) + u = U[0] + + return u + + def control_plan(self, x0: np.array, xf: np.array, param:dict): + # set control duration + # mpc = copy.deepcopy(self.impact_model) + # mpc = self.impact_mpc + model = self.impact_model + dt = param["dt"] + time2go = param["time2go"] + + self.clear_constraints() + + N_steps = int(np.ceil(time2go/dt)) + self.set_T(N_steps*dt) + + X = np.full((N_steps + 1, model.nx), np.nan) + U = np.full((N_steps + 1, model.nu), np.nan) + + # set initial conditions + self.subject_to(self.at_t0(model.x) == x0) + # self.set_value(self.x_current, x0) + + # set final conditions using soft constraint if necessary + if N_steps > model.nx: + self.subject_to(self.at_tf(model.x) == xf) + # self.set_value(self.x_final, xf) + else: + for i in range(model.nx): + self.add_objective( 1e9 * (self.at_tf(model.x[i]) - xf[i])**2 ) + + # set bounds + self.subject_to(self.bound_states[:,0]<= ( model.x <= self.bound_states[:,1]) ) + self.subject_to(self.bound_inputs[:,0]<= ( model.u <= self.bound_inputs[:,1]) ) + + # Solve + self.method(impact.MultipleShooting(N=N_steps, M = 1, intg='rk')) + + # if 'init_guess_X' in param.keys(): + # print('using init_guess') + # mpc.set_initial(model.x, param["init_guess_X"]) + # mpc.set_initial(model.u, param["init_guess_U"]) + # else: + # print('using linear interp') + + self.set_initial(model.x, x0+self.t/self.T*(xf-x0)) + + try: + # solve mpc (while preventing annoing prints) + with open(os.devnull, 'w') as f, contextlib.redirect_stdout(f): + sol = self.solve() + + ts, X = sol.sample(model.x, grid='integrator') + ts, U = sol.sample(model.u, grid='integrator') + + assert np.allclose(X[0], x0, rtol=1.e-3, atol=1.e1), f"Initial state mismatch: {np.round(X[0], 2).tolist()} != {np.round(x0, 2).tolist()}" + #assert np.allclose(X[-1], xf, rtol=1.e-3, atol=1.e-3), f"Final state mismatch: {X[-1]} != {xf}" + if not np.allclose(X[-1], xf, rtol=1.e-3, atol=1.e1): + print(f"Final state mismatch: {np.round(X[-1], 2).tolist()} != {np.round(xf, 2).tolist()}") + except: + print(f"MPC failed! Continuing...") + + # param["init_guess_X"] = X[1:] + # param["init_guess_U"] = U[1:] + + self.planned_control = U + + return U, X + +class NNMPC(Controller): + n_inputs = None + n_outputs = None + selected_inputs = None + use_error = False + nn_model = None + scalerIn = None + scalerOut = None + loss_history = [] + + def __init__(self, impact_model : impact.MPC = None, selected_inputs = None, use_error = False): + if impact_model is not None: + if selected_inputs is None: + selected_inputs = [True] * impact_model.nx + + if use_error: + self.n_inputs = sum(selected_inputs)+1 + else: + self.n_inputs = sum(selected_inputs)*2+1 + self.n_outputs = impact_model.nu + + self.use_error = use_error + self.selected_inputs = selected_inputs + + return None + + def load_nnmpc(path): + nnmpc = NNMPC() + path = path + '.pt' + nnmpc.nn_model = nn.Sequential() + + load_temp = torch.load(path, weights_only = False) + if type(load_temp) == dict and "nn_model" in load_temp.keys(): + nnmpc.nn_model = load_temp["nn_model"] + if "loss_history" in load_temp.keys(): + nnmpc.loss_history = load_temp["loss_history"] + else: + nnmpc.nn_model = load_temp + + nnmpc.n_inputs = nnmpc.nn_model[0].in_features + nnmpc.n_outputs = nnmpc.nn_model[-1].out_features + + if nnmpc.selected_inputs is None: + if nnmpc.use_error: + nnmpc.selected_inputs = [True] * (nnmpc.n_inputs-1) + else: + nnmpc.selected_inputs = [True] * ((nnmpc.n_inputs-1)//2) + + print('NNMPC loaded from:', path) + return nnmpc + + def save_nnmpc(self, path): + path = path + '.pt' + torch.save({'nn_model': self.nn_model, 'loss_history': self.loss_history}, path) + # torch.save(self.nn_model, path) + return print('NNMPC saved as:', path) + + def control_action(self, x0: np.array, xf: np.array, param:dict): + time2go = param["time2go"] + x0 = x0[self.selected_inputs] + xf = xf[self.selected_inputs] + if self.use_error: + inputs = np.concatenate((xf-x0, np.array([time2go]))) + else: + inputs = np.concatenate((x0, xf, np.array([time2go]))) + + scaled_inputs = self.scalerIn.transform(inputs.reshape(1, -1)) + scaled_u = self.nn_model(torch.tensor(scaled_inputs, dtype=torch.float32)).detach().numpy() + u = self.scalerOut.inverse_transform(scaled_u) + + return u + + def control_plan(self, x0: np.array, xf: np.array, param:dict): + return None + + def define_nn_structure(self, depth:int, width:int, activation = nn.Tanh()): + model = nn.Sequential() + model.add_module('input', nn.Linear(self.n_inputs, width)) + model.add_module('act', activation) + for i in range(depth-1): + model.add_module(f'hidden_{i}', nn.Linear(width, width)) + model.add_module(f'act_{i}', activation) + model.add_module('output', nn.Linear(width, self.n_outputs)) + + self.nn_model = model + return model + + def arrange_sim_data(self, X_series, U_series, R_series, T_series): + X_flat = X_series.reshape(-1, X_series.shape[-1]) + R_flat = R_series.reshape(-1, R_series.shape[-1]) + T_flat = T_series.reshape(-1, 1) + T2go_flat = np.max(T_flat)-T_flat + U_flat = U_series.reshape(-1, U_series.shape[-1]) + + X_flat = X_flat[:,self.selected_inputs] + R_flat = R_flat[:,self.selected_inputs] + + assert U_flat.shape[1] == self.n_outputs + assert X_flat.shape[0] == R_flat.shape[0] == T2go_flat.shape[0] == U_flat.shape[0] + + if self.use_error: + E_flat = R_flat - X_flat + assert E_flat.shape[1]+1 == self.n_inputs + inputs = np.concatenate((E_flat, T2go_flat), axis=1) + else: + assert X_flat.shape[1]+R_flat.shape[1]+1 == self.n_inputs + inputs = np.concatenate((X_flat, R_flat, T2go_flat), axis=1) + + outputs = U_flat + + return inputs, outputs + + def clean_dataset(self, inputs:np.ndarray, outputs:np.ndarray, out_thresh:np.number=1.5): + io = np.concatenate([inputs,outputs], axis=1) + + # Remove NANs + mask = ~np.isnan(io).any(axis=1) + io = io[mask] + n_nans = np.sum(~mask) + + # Remove outliers using IQR + Q1 = np.percentile(io, 25, axis=0) + Q3 = np.percentile(io, 75, axis=0) + IQR = Q3 - Q1 + lower_bound = Q1 - out_thresh * IQR + upper_bound = Q3 + out_thresh * IQR + + mask = np.all((io >= lower_bound) & (io <= upper_bound), axis=1) + io = io[mask] + n_outliers = np.sum(~mask) + + inputs = io[:,0:self.n_inputs] + outputs = io[:,self.n_inputs:] + + self.scalerIn = MinMaxScaler() + self.scalerIn.fit(inputs) + self.scalerOut = MinMaxScaler() + self.scalerOut.fit(outputs) + + return inputs, outputs, n_nans, n_outliers + + def train_nn(self, inputs_train, outputs_train, n_epochs=1000, lr=0.001): + # Normalize data + inputs_train = self.scalerIn.transform(inputs_train) + outputs_train = self.scalerOut.transform(outputs_train) + + # Convert to torch tensors + inputs_train = torch.tensor((inputs_train), dtype=torch.float32) + outputs_train = torch.tensor((outputs_train), dtype=torch.float32) + # inputs_test = torch.tensor((inputs_test), dtype=torch.float32) + # outputs_test = torch.tensor((outputs_test), dtype=torch.float32) + + loss_fn = nn.MSELoss(reduction='mean') + optimizer = optim.Adam(self.nn_model.parameters(), lr=lr) + self.nn_model.train(True) + best_mse = np.min(self.loss_history) if len(self.loss_history) > 0 else np.inf + best_weights = copy.deepcopy(self.nn_model.state_dict()) + # fig = go.FigureWidget() + # fig.display() + print(f"Training: {inputs_train.shape[0]} data points, learning rate={lr}") + with tqdm.tqdm(range(n_epochs), unit = "epoch", mininterval = 0, disable = False) as bar: + for epoch in bar: + bar.set_description(f"Epoch {epoch}") + #forward pass + y_pred = self.nn_model(inputs_train) + loss = loss_fn(y_pred, outputs_train) + + # backward pass + optimizer.zero_grad() + loss.backward() + # update weights + optimizer.step() + # track progress + mse = float(loss) + bar.set_postfix(mse=mse) + self.loss_history.append(mse) + + if mse 0: + fig_train.add_trace(go.Scatter(y=self.loss_history, mode='lines', name='loss', showlegend=False)) + fig_train.update_yaxes(type="log") + fig_train.update_layout(title='NNMPC training', xaxis_title='Epoch', yaxis_title='Loss [MSE]') + else: + fig_train.data[0].y = self.loss_history + + return fig_train + + def plot_approximation(self, inputs:np.ndarray, outputs:np.ndarray, fig=None): + + # Remove NANs + io = np.concatenate([inputs,outputs], axis=1) + mask = ~np.isnan(io).any(axis=1) + inputs = inputs[mask] + outputs = outputs[mask] + + if outputs.size > 10000: + factor = outputs.size // 10000 + inputs = inputs[::factor] + outputs = outputs[::factor] + + # Compute predictions + scaled_inputs = self.scalerIn.transform(inputs) + scaled_inputs = torch.tensor(scaled_inputs, dtype=torch.float32) + scaled_y_pred = self.nn_model(scaled_inputs).detach().numpy() + y_pred = self.scalerOut.inverse_transform(scaled_y_pred) + + # Fitting error + nmse = np.linalg.norm(outputs - y_pred)**2 / np.linalg.norm(outputs - np.mean(outputs))**2 + + # Plot + if not fig: + fig = make_subplots(rows=1, cols=self.n_outputs) + + for i in range(self.n_outputs): + u_range = [np.min(outputs[:,i]), np.max(outputs[:,i])] + fig.add_trace( + go.Scatter(x=outputs[:,i], y=y_pred[:,i], mode='markers', name=f'Output {i}'), + row=1, col=i+1) + fig.add_trace(go.Scatter(x=u_range,y=u_range,line=dict(color='black')),row=1, col=i+1) + fig.add_annotation( + xref="paper", yref="paper", + x=0.1, y=0.9, + showarrow=False, + text=f"NMSE = {nmse*100:.1f}%", + font=dict(size=14) + ) + fig.update_xaxes(scaleanchor="y", scaleratio=1,title_text=f'u {i+1}', row=i+1, col=1) + fig.update_yaxes(scaleanchor="x", scaleratio=1,title_text=f'pred u {i+1}', row=i+1, col=1) + fig.update_layout(height=500, width=500*self.n_outputs,title='NNMPC approximation', showlegend=False) + + return fig, nmse + +class NullController(Controller): + nu = None + + def __init__(self, nu = None): + self.nu = nu + return + + def control_action(self, x0 : np.array, xf : np.array, param:dict ): + return np.zeros(self.nu) + + def control_plan(self, x0 : np.array, xf : np.array, param:dict): + pass \ No newline at end of file diff --git a/NN_MPC/README.md b/NN_MPC/README.md new file mode 100644 index 0000000..27c508c --- /dev/null +++ b/NN_MPC/README.md @@ -0,0 +1,4 @@ +# NN_MPC +### NNMPC tutorial for IMPACT workshop + +1) Open NN_MPC folder in your IDE and run `nnmpc_tutorial.py`. diff --git a/NN_MPC/Utility.py b/NN_MPC/Utility.py new file mode 100644 index 0000000..de2c9c5 --- /dev/null +++ b/NN_MPC/Utility.py @@ -0,0 +1,216 @@ +import numpy as np +import plotly.graph_objects as go +from plotly.subplots import make_subplots +from itertools import product + +import impact + +import Controllers + +class Simulator(): + Tf : np.number + dt : np.number + verbose : int = 2 + + def simulate(self, impact_model : impact.Model, controller : Controllers.Controller, x0:np.ndarray, xf:np.ndarray, closed_loop:bool=True): + N_steps = int(np.ceil(self.Tf/self.dt)) + t = np.linspace(0, self.Tf, N_steps+1) + param = {"time2go":self.Tf, "dt":self.dt} + + if closed_loop: + # impact_mpc.method(impact.MultipleShooting(N=N_steps, M = 1, intg='rk')) + # impact_sim = impact_mpc._method.discrete_system(impact_mpc) + impact_sim = impact_model.sim_step + X = np.ndarray([len(t), impact_model.nx]) * np.nan + U = np.zeros([len(t), impact_model.nu]) * np.nan + X[0] = x0 + + for now in range(len(t)-1): + time2go = self.Tf - t[now] + param["time2go"] = time2go + param["current_index"] = now + U[now] = controller.control_action(X[now], xf, param) + temp = impact_sim(x0 = X[now], u = U[now], T = self.dt) + X[now+1] = temp['xf'].full().flatten() + + U[-1] = U[-2] + + else: + U, X = controller.control_plan(x0, xf, param) + X = X.reshape((N_steps+1), impact_model.nx) + U = U.reshape((N_steps+1), impact_model.nu) + + return X, U, t + + def simulate_multi(self, impact_model : impact.Model, controller : Controllers.Controller, x0_combinations:np.ndarray, xf_combinations:np.ndarray, closed_loop:bool=True): + # Compute all combinations of initial and final conditions + N_trajectories = x0_combinations.shape[0]*xf_combinations.shape[0] + N_points = int(np.ceil(self.Tf/self.dt)) + cnt = 0 + + X_series = np.ndarray([N_trajectories, (N_points+1), impact_model.nx]) * np.nan + R_series = np.ndarray([N_trajectories, (N_points+1), impact_model.nx]) * np.nan + U_series = np.ndarray([N_trajectories, (N_points+1), impact_model.nu]) * np.nan + T_series = np.ndarray([N_trajectories, (N_points+1)]) * np.nan + + for i in range(x0_combinations.shape[0]): + for j in range(xf_combinations.shape[0]): + if Simulator.verbose > 1: + print('> Solving trajectory ',cnt+1,' of ', N_trajectories,': ', np.round(x0_combinations[i],2),' -> ',np.round(xf_combinations[j],2)) + + x0 = x0_combinations[i] + xf = xf_combinations[j] + + X, U, t = self.simulate(impact_model, controller, x0, xf, closed_loop) + + X_series[cnt] = X + U_series[cnt] = U + R_series[cnt] = np.tile(xf, (N_points+1, 1)) + T_series[cnt] = t + + cnt = cnt + 1 + + # Remove NANs + mask = ~(np.isnan(X_series).any(axis=(1,2))) + X_series = X_series[mask] + U_series = U_series[mask] + R_series = R_series[mask] + T_series = T_series[mask] + + n_nans = np.sum(~mask) + + if Simulator.verbose > 0: + print('Simulations completed: ', n_nans, 'failed, ', X_series.shape[0], 'succeeded') + + return X_series, U_series, R_series, T_series + + + +## Utility functions + +def save_sims(path, X_series:np.ndarray, U_series:np.ndarray, R_series:np.ndarray, T_series:np.ndarray): + np.savez(path,X_series = X_series,U_series = U_series,R_series = R_series, + T_series=T_series) + return print('Simulation data saved in:', path) + +def load_sim_data(path): + data = np.load(path) + X_series = data['X_series'] + U_series = data['U_series'] + R_series = data['R_series'] + T_series = data['T_series'] + return X_series, U_series, R_series, T_series + +def get_states_grid(model:impact.MPC, n_points:int, x_range:list): + x_values = [] + for i in range(len(x_range)): + if len(x_range[i]) > 1: + x_values.append(np.linspace(x_range[i][0], x_range[i][1], n_points)) + + x_values = np.array(x_values) + x_combinations = np.array(list(product(*x_values))) + + if x_combinations.shape[1] < model.nx: + x_combinations = np.concatenate(([x_combinations, np.zeros([x_combinations.shape[0], model.nx-x_combinations.shape[1]])]), axis=1) + + for i in range(len(x_range)): + if len(x_range[i]) == 1: + x_combinations[:,i] = x_range[i] + + return x_combinations + +def get_random_state(model:impact.MPC, x_range:np.ndarray): + x_range = np.array(x_range) + x = np.random.uniform(x_range[:, 0], x_range[:, 1]) + + if x.shape[0] < model.nx: + x = np.concatenate(([x, np.zeros([model.nx-x.shape[0]])]), axis=0) + + return x + +def plot_io_samples(inputs:np.ndarray, outputs:np.ndarray, colors = ["blue","red","black"], names=["in","out"], bounds=None, max_points=1e5): + + if inputs.ndim <= 2: + inputs_flat = inputs + inputs = np.expand_dims(inputs, axis=0) + outputs_flat = outputs + outputs = np.expand_dims(outputs, axis=0) + else: + inputs_flat = inputs.reshape(-1, inputs.shape[-1]) + outputs_flat = outputs.reshape(-1, outputs.shape[-1]) + + if inputs_flat.size > max_points: + factor = inputs_flat.size // max_points + inputs_flat = inputs_flat[::factor] + outputs_flat = outputs_flat[::factor] + + n_in = inputs.shape[-1] + n_out = outputs.shape[-1] + + fig = make_subplots(rows=n_out, cols=n_in) + + for i in range(n_in): + for j in range(n_out): + fig.add_trace( + go.Scatter(x=inputs_flat[:, i], y=outputs_flat[:, j], mode='lines', line=dict(color=colors[0]), showlegend=False), + row=j+1, col=i+1 + ) + fig.add_trace( + go.Scatter(x=inputs[:, 0, i], y=outputs[:, 0, j], mode='markers', marker=dict(symbol='circle-open'), line=dict(color=colors[1]), name="Initial", showlegend=(i==0 and j==0)), + row=j+1, col=i+1 + ) + fig.add_trace( + go.Scatter(x=inputs[:, -1, i], y=outputs[:, -1, j], mode='markers', line=dict(color=colors[2]), name="Final", showlegend=(i==0 and j==0)), + row=j+1, col=i+1 + ) + + # plot bounds + if bounds: + if bounds[0].ndim < 2: + bounds[0] = np.expand_dims(bounds[0], axis=0) + + if bounds[1].ndim < 2: + bounds[1] = np.expand_dims(bounds[1], axis=0) + + if i < bounds[0].shape[0]: + fig.add_vline(x=bounds[0][i,0], line=dict(color=colors[2], dash='dash'), row=j+1, col=i+1) + fig.add_vline(x=bounds[0][i,1], line=dict(color=colors[2], dash='dash'), row=j+1, col=i+1) + + if j < bounds[1].shape[0]: + fig.add_hline(y=bounds[1][j,0], line=dict(color=colors[2], dash='dash'), row=j+1, col=i+1) + fig.add_hline(y=bounds[1][j,1], line=dict(color=colors[2], dash='dash'), row=j+1, col=i+1) + + # set axis labels + if n_in + n_out == 2: + fig.update_xaxes(title_text=names[0], row=j+1, col=i+1) + fig.update_yaxes(title_text=names[1], row=j+1, col=i+1) + else: + fig.update_xaxes(title_text=f'{names[0]} {i+1}', row=j+1, col=i+1) + fig.update_yaxes(title_text=f'{names[1]} {j+1}', row=j+1, col=i+1) + + fig.update_layout(height=300*n_out, width=300*n_in, title_text="Inputs vs Outputs Scatter Plots") + return fig + +def plot_time_evolution(X, U, R, t, fig=None, colors = ["blue","red","black"], name="x"): + nx = X.shape[-1] + nu = U.shape[-1] + + if R.ndim > 1: + Rf = R[-1] + else: + Rf = R + + if not fig: + fig = make_subplots(rows=nx + nu, cols=1) + + for i in range(nx): + fig.add_trace(go.Scatter(x=t, y=X[:,i], mode='lines', line=dict(color=colors[0]), name=name, legendgroup=i+1), row=i+1, col=1) + fig.add_trace(go.Scatter(x=[t[-1]], y=np.array(Rf[i]), line=dict(color=colors[2]), showlegend=False), row=i+1, col=1) + fig.update_yaxes(title_text=f'x {i+1}', row=i+1, col=1) + fig.update_xaxes(range=[0, t[-1]], row=i+1, col=1) + for i in range(nu): + fig.add_trace(go.Scatter(x=t, y=U[:,i], mode='lines', line=dict(color=colors[1]), name=name, legendgroup=nx+i+1), row=nx+i+1, col=1) + fig.update_yaxes(title_text=f'u {i+1}', row=nx+i+1, col=1) + fig.update_xaxes(title_text=f'time [s]', row=nx+i+1, col=1) + fig.update_layout(showlegend=False, height=100*(nx+nu+2), width=700, legend_tracegroupgap=40) + return fig \ No newline at end of file diff --git a/NN_MPC/models/cart_pendulum.yaml b/NN_MPC/models/cart_pendulum.yaml new file mode 100644 index 0000000..8b75e28 --- /dev/null +++ b/NN_MPC/models/cart_pendulum.yaml @@ -0,0 +1,22 @@ +equations: + inline: + ode: + pos: dpos + theta: dtheta + dpos: (-m*L*sin(theta)*dtheta*dtheta + m*g*cos(theta)*sin(theta)+F)/(mcart + m - m*cos(theta)*cos(theta)) + dtheta: (-m*L*cos(theta)*sin(theta)*dtheta*dtheta + F*cos(theta)+(mcart+m)*g*sin(theta))/(L*(mcart + m - m*cos(theta)*cos(theta))) +differential_states: + - name: pos + - name: theta + - name: dpos + - name: dtheta +controls: + - name: F +constants: + inline: + mcart: 0.5 # cart mass [kg] + m: 1 # pendulum mass [kg] + L: 2 # pendulum length [m] + g: 9.81 # gravitation [m/s^2] + + diff --git a/NN_MPC/models/cart_pendulum2.yaml b/NN_MPC/models/cart_pendulum2.yaml new file mode 100644 index 0000000..9e3b9de --- /dev/null +++ b/NN_MPC/models/cart_pendulum2.yaml @@ -0,0 +1,11 @@ +equations: + external: + type: casadi_serialized + file_name: cart_pendulum_equations.casadi +differential_states: + - name: pos + - name: theta + - name: dpos + - name: dtheta +controls: + - name: F diff --git a/NN_MPC/models/furuta.yaml b/NN_MPC/models/furuta.yaml new file mode 100644 index 0000000..d95c233 --- /dev/null +++ b/NN_MPC/models/furuta.yaml @@ -0,0 +1,39 @@ +# Simplified Futura Pendulum Model for tutorial +# Cazzolato, B. S., & Prime, Z. (2011). On the dynamics of the furuta pendulum. Journal of Control Science and Engineering, 2011(1), 528341. +# equation 33 and 34 +equations: + inline: + ode: + theta1: dtheta1 + theta2: dtheta2 + dtheta1: ddtheta1 + dtheta2: ddtheta2 + ddtheta1: ( (-J2h*b1*dtheta1) + (m2*L1*l2*cos(theta2)*b2*dtheta2) - (J2h**2*sin(2*theta2)*dtheta1*dtheta2) - (0.5*J2h*m2*L1*l2*cos(theta2)*sin(2*theta2)*dtheta1**2) + (J2h*m2*L1*l2*sin(theta2)*dtheta2**2) + (J2h*Torque1) - (m2*L1*l2*cos(theta2)*Torque2) + (0.5*g*m2**2*l2**2*L1*sin(2*theta2)) ) / ( (J0h*J2h) + (J2h**2*sin(theta2)**2) - (m2**2*L1**2*l2**2*cos(theta2)**2) ) + ddtheta2: ( (m2*L1*l2*cos(theta2)*b1*dtheta1) - (b2*(J0h + J2h*sin(theta2)**2)*dtheta2) + (m2*L1*l2*J2h*cos(theta2)*sin(2*theta2)*dtheta1*dtheta2) - (0.5*sin(2*theta2)*(J0h*J2h + J2h**2*sin(theta2)**2)*dtheta1**2) - (0.5*m2**2*L1**2*l2**2*sin(2*theta2)*dtheta2**2) -(m2*L1*l2*cos(theta2)*Torque1) + (J0h + J2h*sin(theta2)**2*Torque2) - (m2*g*l2*sin(theta2)*(J0h + J2h*sin(theta2)**2))) /( (J0h*J2h) + (J2h**2*sin(theta2)**2) - (m2**2*L1**2*l2**2*cos(theta2)**2) ) +differential_states: + - name: theta1 + - name: theta2 + - name: dtheta1 + - name: dtheta2 + - name: ddtheta1 + - name: ddtheta2 +controls: + - name: Torque1 +constants: + inline: + # parameters from the section 8 of the paper + L1: 0.278 # m + L2: 0.300 # m + l1: 0.150 # m + l2: 0.148 # m + m1: 0.300 # kg + m2: 0.075 # kg + J1: 0.0248 # kg m^2 + J2: 0.00386 # kg m^2 + b1: 0.0001 # N m s + b2: 0.00028 # N m s + J0h: 0.0373 # kg m^2 + J1h: 0.0316 # kg m^2 + J2h: 0.0055 # kg m^2 + g: 9.81 # m/s^2 + Torque2: 0.0 # N m \ No newline at end of file diff --git a/NN_MPC/models/furuta_pendulum.yaml b/NN_MPC/models/furuta_pendulum.yaml new file mode 100644 index 0000000..057d507 --- /dev/null +++ b/NN_MPC/models/furuta_pendulum.yaml @@ -0,0 +1,39 @@ +# Simplified Futura Pendulum Model for tutorial +# Cazzolato, B. S., & Prime, Z. (2011). On the dynamics of the furuta pendulum. Journal of Control Science and Engineering, 2011(1), 528341. +# equation 33 and 34 +equations: + inline: + ode: + theta1: dtheta1 + theta2: dtheta2 + dtheta1: ddtheta1 + dtheta2: ddtheta2 + ddtheta1: ( (-J2h*b1*dtheta1) + (m2*L1*l2*cos(theta2)*b2*dtheta2) - (J2h**2*sin(2*theta2)*dtheta1*dtheta2) - (0.5*J2h*m2*L1*l2*cos(theta2)*sin(2*theta2)*dtheta1**2) + (J2h*m2*L1*l2*sin(theta2)*dtheta2**2) + (J2h*Torque1) - (m2*L1*l2*cos(theta2)*Torque2) + (0.5*g*m2**2*l2**2*L1*sin(2*theta2)) ) / ( (J0h*J2h) + (J2h**2*sin(theta2)**2) - (m2**2*L1**2*l2**2*cos(theta2)**2) ) + ddtheta2: ( (m2*L1*l2*cos(theta2)*b1*dtheta1) - (b2*(J0h + J2h*sin(theta2)**2)*dtheta2) + (m2*L1*l2*J2h*cos(theta2)*sin(2*theta2)*dtheta1*dtheta2) - (0.5*sin(2*theta2)*(J0h*J2h + J2h**2*sin(theta2)**2)*dtheta1**2) - (0.5*m2**2*L1**2*l2**2*sin(2*theta2)*dtheta2**2) -(m2*L1*l2*cos(theta2)*Torque1) + (J0h + J2h*sin(theta2)**2*Torque2) - (m2*g*l2*sin(theta2)*(J0h + J2h*sin(theta2)**2))) /( (J0h*J2h) + (J2h**2*sin(theta2)**2) - (m2**2*L1**2*l2**2*cos(theta2)**2) ) +differential_states: + - name: theta1 + - name: theta2 + - name: dtheta1 + - name: dtheta2 + - name: ddtheta1 + - name: ddtheta2 +controls: + - name: Torque1 + - name: Torque2 +constants: + inline: + # parameters from the section 8 of the paper + L1: 0.278 # m + L2: 0.300 # m + l1: 0.150 # m + l2: 0.148 # m + m1: 0.300 # kg + m2: 0.075 # kg + J1: 0.0248 # kg m^2 + J2: 0.00386 # kg m^2 + b1: 0.0001 # N m s + b2: 0.00028 # N m s + J0h: 0.0373 # kg m^2 + J1h: 0.0316 # kg m^2 + J2h: 0.0055 # kg m^2 + g: 9.81 # m/s^2 \ No newline at end of file diff --git a/NN_MPC/models/spring_mass_damper.yaml b/NN_MPC/models/spring_mass_damper.yaml new file mode 100644 index 0000000..601ff94 --- /dev/null +++ b/NN_MPC/models/spring_mass_damper.yaml @@ -0,0 +1,18 @@ +equations: + inline: + ode: + pos: dpos + dpos: (u/m)-(dpos*b/m)-(((pos**3)*stiff)/m)-g +differential_states: + - name: pos + - name: dpos +controls: + - name: F +constants: + inline: + stiff: 5.0 # N/m + m: 1.0 # kg + b: 0.1 # Ns/m + g: 9.81 # m/s^2 + + diff --git a/NN_MPC/nnmpc_tutorial.py b/NN_MPC/nnmpc_tutorial.py new file mode 100644 index 0000000..4419495 --- /dev/null +++ b/NN_MPC/nnmpc_tutorial.py @@ -0,0 +1,266 @@ +# NN-MPC tutorial +# By: Taranjit Singh and Andrea Giusti +# Date : 2021-06-01 + +# %% +# 0) Bootstrap + +import matplotlib.pyplot as plt +import numpy as np +import copy +import pandas as pd +import torch +import torch.nn as nn +import torch.optim as optim +import tqdm +import time +import plotly.graph_objects as go +from plotly.subplots import make_subplots +from itertools import product +import contextlib +import importlib +import os +from sklearn.model_selection import train_test_split + +import casadi +import rockit +import impact + +import Controllers +import Utility +importlib.reload(Controllers) +importlib.reload(Utility) + +# OUTLINE +# 1) Define plant model and parameters +# - Define dynamical model +# - Define control problem +# 2) Generate MPC solutions +# - Define grid of initial/final conditions +# - Solve MPC problems +# - Store data +# - Visualize data +# 3) Build Neural Network MPC +# - Define NN structure +# - Train NN +# - Visualize training +# - Save NNMPC +# - Export NNMPC as casadi function and Simulink block +# 4) Validate NNMPC in closed loop +# - Open loop validation +# - Closed loop validation + + +# %% ----------------------------------------------- +# 1) Define plant model and parameters + +# Options +generate_mpc_data = True # generate new MPC data, if False load saved ones +train_nn = True # train new NNMPC, if False load saved one +save_outputs = True # save outputs simulations and NN model + +# parameters for mass-spring-damper +model_name = 'spring_mass_damper' # name of the model (corresponding yaml file must be in the models folder) +# Bounds +u_max = 20 # max control input [N] +x_range = np.array([[-1, 1], [-4, 4]]) # state bounds +# MPC parameters +n_points = 5 # number of grid points for each state +x0_range = [[-0.5, 0.5], [-2, 2]] # initial state range +xf_range = x0_range # final state range +# NN parameters +# NN_inputs = None +# NN_use_err = False +nn_epochs = 10000 # number of training epochs +nn_depth = 3 # number of activation function layers +nn_width = 150 # number of neurons per layer +test_size = 0.4 # fraction of data used for validation +lr = 0.01 # learning rate +# Simulation parameters +sim = Utility.Simulator() +sim.Tf = 1.0 # total simulation time [s] +sim.dt = 0.025 # sampling time [s] + +# Init +sims_path = "results/"+model_name+"_MPC_sims.npz" +NN_path = "results/"+model_name+"_NNMPC" + +mpc = Controllers.MPC() +model = mpc.add_model(model_name, "models/"+model_name+".yaml") +print("Loaded",model_name,"model with",model.nx,"states and",model.nu, "inputs") + +# Define control problem +# impact_mpc.add_objective(impact_mpc.integral(casadi.sumsqr(model.u))) +mpc.add_objective(mpc.integral(model.u[0]**2)) + +# %% ----------------------------------------------- +# 2) Generate MPC solutions + +# Define MPC controller for the given impact model +mpc.bound_states[0:len(x0_range)] = x_range +mpc.bound_inputs[0] = np.array([-u_max, u_max]) + +if generate_mpc_data: + # Define grid of initial/final conditions + x0_combinations = Utility.get_states_grid(model, n_points, x0_range) + xf_combinations = Utility.get_states_grid(model, n_points, xf_range) + + # Simulate MPC solutions + X_series, U_series, R_series, T_series = sim.simulate_multi(model, mpc, x0_combinations, xf_combinations, closed_loop=False) + + # Save MPC simulation data + if save_outputs: + Utility.save_sims(sims_path, X_series, U_series, R_series, T_series) + +else: + X_series, U_series, R_series, T_series = Utility.load_sim_data(sims_path) + print("Loaded "+ str(X_series.shape[0]) +" simulations data from "+sims_path) + + +# Visualize MPC data +indx = 0 +fig = Utility.plot_time_evolution(X_series[indx],U_series[indx],R_series[indx],T_series[indx]) +fig.update_layout(title='Example MPC trajectory') +fig.show() +if model.nx <= 2: + fig = Utility.plot_io_samples(X_series[:,:,0:1], X_series[:,:,1:2], names=["x1","x2"], bounds=[x_range[0],x_range[1]]) +else: + fig = Utility.plot_io_samples(X_series[:,:,0:4], X_series[:,:,0:4], names=["x","x"], bounds=[x_range[0:4],x_range[0:4]]) +fig.update_layout(title_text="MPC States", height=600, width=600) +fig.show() +fig = Utility.plot_io_samples(X_series, U_series, names=["x","u"], bounds=[x_range, np.array([-u_max, u_max])]) +fig.update_layout(title_text="MPC States vs Inputs", ) +fig.show() + +# %% ----------------------------------------------- +# 3) Generate Neural Network MPC + +if train_nn: + # Define NN-MPC controller for the given impact model + nnmpc = Controllers.NNMPC(model) + + # Define NN structure + nnmpc.define_nn_structure(depth=nn_depth, width=nn_width) + struct = nnmpc.get_structure() + print("\nBuilt NNMPC with:") + [print(f"> {key}: {value}") for key, value in struct.items()] + + # Train NN from simulation data + print("\nTraining NNMPC from simulation data...") + inputs, outputs = nnmpc.arrange_sim_data(X_series, U_series, R_series, T_series) + inputs, outputs, n_nans, n_outliers = nnmpc.clean_dataset(inputs,outputs, out_thresh=5) + print(f"{inputs.shape[0]} valid datapoints. Removed {n_nans} NaNs and {n_outliers} outliers.") + + # Split dataset into training and test sets + if test_size == 0: + inputs_train, outputs_train = inputs, outputs + inputs_test, outputs_test = inputs, outputs + else: + inputs_train, inputs_test, outputs_train, outputs_test = train_test_split(inputs, outputs, test_size=test_size, random_state=0) + + # fig = Utility.plot_io_samples(inputs, outputs) + # fig.show() + + # Train NN + loss = nnmpc.train_nn(inputs_train, outputs_train, n_epochs=nn_epochs, lr=lr) + + +else: + # Load existing NN model and data + nnmpc = Controllers.NNMPC.load_nnmpc(path=NN_path) + struct = nnmpc.get_structure() + print("NNMPC has:") + [print(f"> {key}: {value}") for key, value in struct.items()] + inputs, outputs = nnmpc.arrange_sim_data(X_series, U_series, R_series, T_series) + inputs, outputs, n_nans, n_outliers = nnmpc.clean_dataset(inputs,outputs, out_thresh=5) + print(f"{inputs.shape[0]} valid datapoints. Removed {n_nans} NaNs and {n_outliers} outliers.") + # Split dataset into training and test sets + inputs_train, inputs_test, outputs_train, outputs_test = train_test_split(inputs, outputs, test_size=test_size, random_state=0) + +# Visualize training +fig_train = go.Figure() # training figure +fig_train.add_trace(go.Scatter(y=nnmpc.loss_history, mode='lines', name='loss')) +fig_train.update_yaxes(type="log") +fig_train.update_layout(title='NNMPC training', xaxis_title='Epoch', yaxis_title='Loss [MSE]') +fig_train.show() + +# visualize NN approximation +fig, NMSE = nnmpc.plot_approximation(inputs_train, outputs_train) +fig.update_layout(title='NNMPC approximation: training set') +fig.show() +fig, NMSE = nnmpc.plot_approximation(inputs_test, outputs_test) +fig.update_layout(title='NNMPC approximation: test set') +fig.show() + + +if save_outputs: + # Save NN model + nnmpc.save_nnmpc(NN_path) + + # Export NN model as casadi function + nnmpc_casadi = nnmpc.export2casadi(NN_path) + + # Export NN model to Simulink + try: + with open(os.devnull, 'w') as f, contextlib.redirect_stdout(f): + mpc.clear_constraints() + x_current = mpc.parameter('x_current',model.nx) + mpc.set_value(x_current, np.zeros(model.nx)) + mpc.add_function(nnmpc_casadi) + name = NN_path.split('/')[-1] + mpc.export(name, src_dir=NN_path+"_build_dir") + print(f"NNMPC exported to {name}_build_dir for Simulink. ", + "To use it:\n", + f"1) Open Matlab and navigate to {name}_build_dir.\n", + "2) Add casadi to Matlab path.\n", + "3) Run build.m (can take few minutes).\n", + f"4) Copy the {name} block from library_{name}.xls to your Simulink model.\n", + "5) The block gets [state, target_state, time2go] and returns [control_output]." + ) + except: + print("Error exporting NNMPC to Simulink. Try setting generate_mpc_data = False.") + +# %% ----------------------------------------------- +# 4) Validate NN + +# Select initial and final condition for validation +# indx=X_series.shape[0]//2 +indx=0 +x0 = X_series[indx,0,:] +# x0 = np.zeros(model.nx) +# x0 = Utility.get_random_state(model, x0_range) + +xf = X_series[indx,-1,:] +# xf = np.zeros_like(x0) +# xf = Utility.get_random_state(model, x0_range) + +print('\nRunning validation...') + +# MPC reference trajectory +X_mpc, U_mpc, t_mpc = sim.simulate(model, mpc, x0, xf, closed_loop=False) + + +# open loop validation +u_nn = [] +t = t_mpc +for i in range(X_series.shape[1]): + x = X_mpc[i] + time2go = sim.Tf - i*sim.dt + param = {"time2go": time2go} + u_nn.append(nnmpc.control_action(x, xf, param)) + +u_nn = np.array(u_nn) +fig = Utility.plot_time_evolution(np.array([]), U_mpc, xf,t_mpc, colors=["black","black","black"], name="MPC") +fig = Utility.plot_time_evolution(np.array([]), u_nn[:,:,0], xf,t, fig, colors=["blue","red","black"], name="NNMPC") +fig.update_layout(title='Open loop validation: NNMPC vs MPC', showlegend=True) +fig.show() + +# closed loop validation +X_nn, U_nn, t_nn = sim.simulate(model, nnmpc, x0, xf) + +fig = Utility.plot_time_evolution(X_mpc[:,0:4],U_mpc, xf,t_mpc, colors=["black","black","black"], name="MPC") +fig = Utility.plot_time_evolution(X_nn[:,0:4],U_nn, xf,t_nn, fig, colors=["blue","red","black"], name="NNMPC") +fig.update_layout(title='Closed loop validation: NNMPC vs MPC', showlegend=True) +fig.show() + +# %% -----------------------------------------------