-
-
Notifications
You must be signed in to change notification settings - Fork 234
/
Copy pathmcmc_gaussian_process.py
427 lines (362 loc) · 16.6 KB
/
mcmc_gaussian_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
from __future__ import annotations
from typing import Any, Optional, TypeVar, cast
import logging
import warnings
from copy import deepcopy
import emcee
import numpy as np
from ConfigSpace import ConfigurationSpace
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel
from smac.model.gaussian_process.abstract_gaussian_process import (
AbstractGaussianProcess,
)
from smac.model.gaussian_process.gaussian_process import GaussianProcess
from smac.model.gaussian_process.priors.abstract_prior import AbstractPrior
__copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI"
__license__ = "3-clause BSD"
logger = logging.getLogger(__name__)
Self = TypeVar("Self", bound="MCMCGaussianProcess")
class MCMCGaussianProcess(AbstractGaussianProcess):
"""Implementation of a Gaussian process model which out-integrates its hyperparameters by
Markow-Chain-Monte-Carlo (MCMC). If you use this class make sure that you also use an integrated acquisition
function to integrate over the GP's hyperparameter as proposed by Snoek et al.
This code is based on the implementation of RoBO:
Klein, A. and Falkner, S. and Mansur, N. and Hutter, F.
RoBO: A Flexible and Robust Bayesian Optimization Framework in Python
In: NIPS 2017 Bayesian Optimization Workshop
Parameters
----------
configspace : ConfigurationSpace
kernel : Kernel
Kernel which is used for the Gaussian process.
n_mcmc_walkers : int, defaults to 20
The number of hyperparameter samples. This also determines the number of walker for MCMC sampling as each
walker will return one hyperparameter sample.
chain_length : int, defaults to 50
The length of the MCMC chain. We start `n_mcmc_walkers` walker for `chain_length` steps, and we use the last
sample in the chain as a hyperparameter sample.
burning_steps : int, defaults to 50
The number of burning steps before the actual MCMC sampling starts.
mcmc_sampler : str, defaults to "emcee"
Choose a self-tuning MCMC sampler. Can be either ``emcee`` or ``nuts``.
normalize_y : bool, defaults to True
Zero mean unit variance normalization of the output values.
instance_features : dict[str, list[int | float]] | None, defaults to None
Features (list of int or floats) of the instances (str). The features are incorporated into the X data,
on which the model is trained on.
pca_components : float, defaults to 7
Number of components to keep when using PCA to reduce dimensionality of instance features.
seed : int
"""
def __init__(
self,
configspace: ConfigurationSpace,
kernel: Kernel,
n_mcmc_walkers: int = 20,
chain_length: int = 50,
burning_steps: int = 50,
mcmc_sampler: str = "emcee",
average_samples: bool = False,
normalize_y: bool = True,
instance_features: dict[str, list[int | float]] | None = None,
pca_components: int | None = 7,
seed: int = 0,
):
if mcmc_sampler not in ["emcee", "nuts"]:
raise ValueError(f"MCMC Gaussian process does not support the sampler `{mcmc_sampler}`.")
super().__init__(
configspace=configspace,
kernel=kernel,
instance_features=instance_features,
pca_components=pca_components,
seed=seed,
)
self._n_mcmc_walkers = n_mcmc_walkers
self._chain_length = chain_length
self._burning_steps = burning_steps
self._models: list[GaussianProcess] = []
self._normalize_y = normalize_y
self._mcmc_sampler = mcmc_sampler
self._average_samples = average_samples
self._set_has_conditions()
# Internal statistics
self._n_ll_evals = 0
self._burned = False
self._is_trained = False
self._samples: np.ndarray | None = None
@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
meta.update(
{
"n_mcmc_walkers": self._n_mcmc_walkers,
"chain_length": self._chain_length,
"burning_steps": self._burning_steps,
"mcmc_sampler": self._mcmc_sampler,
"average_samples": self._average_samples,
"normalize_y": self._normalize_y,
}
)
return meta
@property
def models(self) -> list[GaussianProcess]:
"""Returns the internally used gaussian processes."""
return self._models
def _train(
self: Self,
X: np.ndarray,
y: np.ndarray,
optimize_hyperparameters: bool = True,
) -> Self:
"""Performs MCMC sampling to sample hyperparameter configurations from the likelihood and
trains for each sample a Gaussian process on X and y.
Parameters
----------
X : np.ndarray [#samples, #hyperparameters + #features]
Input data points.
Y : np.ndarray [#samples, #objectives]
The corresponding target values.
optimize_hyperparameters: boolean
If set to true, we perform MCMC sampling. Otherwise, we just use the hyperparameter specified in the kernel.
"""
X = self._impute_inactive(X)
if self._normalize_y:
# A note on normalization for the Gaussian process with MCMC:
# Scikit-learn uses a different "normalization" than we use in SMAC3. Scikit-learn normalizes the data to
# have zero mean, while we normalize it to have zero mean unit variance. To make sure the scikit-learn GP
# behaves the same when we use it directly or indirectly (through the gaussian_process.py file), we
# normalize the data here. Then, after the individual GPs are fit, we inject the statistics into them, so
# they unnormalize the data at prediction time.
y = self._normalize(y)
self._gp = self._get_gaussian_process()
if optimize_hyperparameters:
self._gp.fit(X, y)
self._all_priors = self._get_all_priors(
add_bound_priors=True,
add_soft_bounds=True if self._mcmc_sampler == "nuts" else False,
)
if self._mcmc_sampler == "emcee":
sampler = emcee.EnsembleSampler(self._n_mcmc_walkers, len(self._kernel.theta), self._ll)
sampler.random_state = self._rng.get_state()
# Do a burn-in in the first iteration
if not self._burned:
# Initialize the walkers by sampling from the prior
dim_samples = []
prior: AbstractPrior | list[AbstractPrior] | None = None
for dim, prior in enumerate(self._all_priors):
# Always sample from the first prior
if isinstance(prior, list):
if len(prior) == 0:
prior = None
else:
prior = prior[0]
prior = cast(Optional[AbstractPrior], prior)
if prior is None:
raise NotImplementedError()
else:
dim_samples.append(prior.sample_from_prior(self._n_mcmc_walkers).flatten())
self.p0 = np.vstack(dim_samples).transpose()
# Run MCMC sampling
with warnings.catch_warnings():
warnings.filterwarnings("ignore", r"invalid value encountered in double_scalars.*")
self.p0, _, _ = sampler.run_mcmc(self.p0, self._burning_steps)
self.burned = True
# Start sampling & save the current position, it will be the start point in the next iteration
with warnings.catch_warnings():
warnings.filterwarnings("ignore", r"invalid value encountered in double_scalars.*")
self.p0, _, _ = sampler.run_mcmc(self.p0, self._chain_length)
# Take the last samples from each walker
self._samples = sampler.get_chain()[-1]
elif self._mcmc_sampler == "nuts":
# Originally published as:
# http://www.stat.columbia.edu/~gelman/research/published/nuts.pdf
# A good explanation of HMC:
# https://theclevermachine.wordpress.com/2012/11/18/mcmc-hamiltonian-monte-carlo-a-k-a-hybrid-monte-carlo/
# A good explanation of HMC and NUTS can be found in:
# https://besjournals.onlinelibrary.wiley.com/doi/full/10.1111/2041-210X.12681
# Do not require the installation of NUTS for SMAC
# This requires NUTS from https://github.com/mfeurer/NUTS
import nuts.nuts # type: ignore
# Perform initial fit to the data to obtain theta0
if not self.burned:
theta0 = self._gp.kernel.theta
self._burned = True
else:
theta0 = self.p0
samples, _, _ = nuts.nuts.nuts6(
f=self._ll_w_grad,
Madapt=self._burning_steps,
M=self._chain_length,
theta0=theta0,
# Increasing this value results in longer running times
delta=0.5,
adapt_mass=False,
# Rather low max depth to keep the number of required gradient steps low
max_depth=10,
rng=self._rng,
)
indices = [int(np.rint(ind)) for ind in np.linspace(start=0, stop=len(samples) - 1, num=10)]
self._samples = samples[indices]
assert self._samples is not None
self.p0 = self._samples.mean(axis=0)
else:
raise ValueError(self._mcmc_sampler)
if self._average_samples:
assert self._samples is not None
self._samples = [self._samples.mean(axis=0)] # type: ignore
else:
self._samples = self._gp.kernel.theta
self._samples = [self._samples] # type: ignore
self._models = []
assert self._samples is not None
for sample in self._samples:
if (sample < -50).any():
sample[sample < -50] = -50
if (sample > 50).any():
sample[sample > 50] = 50
# Instantiate a GP for each hyperparameter configuration
kernel = deepcopy(self._kernel)
kernel.theta = sample
model = GaussianProcess(
configspace=self._configspace,
kernel=kernel,
normalize_y=False,
seed=self._rng.randint(low=0, high=10000),
)
try:
model._train(X, y, optimize_hyperparameters=False)
self._models.append(model)
except np.linalg.LinAlgError:
pass
if len(self._models) == 0:
kernel = deepcopy(self._kernel)
kernel.theta = self.p0
model = GaussianProcess(
configspace=self._configspace,
kernel=kernel,
normalize_y=False,
seed=self._rng.randint(low=0, high=10000),
)
model._train(X, y, optimize_hyperparameters=False)
self._models.append(model)
if self._normalize_y:
# Inject the normalization statistics into the individual models. Setting normalize_y to True makes the
# individual GPs unnormalize the data at predict time.
for model in self._models:
model._normalize_y = True
model.mean_y_ = self.mean_y_
model.std_y_ = self.std_y_
if not self._models:
self._is_trained = False
else:
self._is_trained = all(model.is_trained for model in self._models)
return self
def _get_gaussian_process(self) -> GaussianProcessRegressor:
return GaussianProcessRegressor(
kernel=self._kernel,
normalize_y=False, # We do not use scikit-learn's normalize routine
optimizer=None,
n_restarts_optimizer=0, # We do not use scikit-learn's optimization routine
alpha=0, # Governed by the kernel
random_state=self._rng,
)
def _ll(self, theta: np.ndarray) -> float:
"""Returns the marginal log likelihood (+ the prior) for a hyperparameter configuration
theta.
Parameters
----------
theta : np.ndarray
Hyperparameter vector. Note that all hyperparameters are on a log scale.
"""
self._n_ll_evals += 1
# Bound the hyperparameter space to keep things sane. Note that all
# hyperparameters live on a log scale.
if (theta < -50).any():
theta[theta < -50] = -50
if (theta > 50).any():
theta[theta > 50] = 50
try:
lml = self._gp.log_marginal_likelihood(theta)
except ValueError:
return -np.inf
# Add prior
for dim, priors in enumerate(self._all_priors):
for prior in priors:
lml += prior.get_log_probability(theta[dim])
if not np.isfinite(lml):
return -np.inf
else:
return lml
def _ll_w_grad(self, theta: np.ndarray) -> tuple[float, np.ndarray]:
"""Returns the marginal log likelihood (+ the prior) for a hyperparameter configuration
theta.
Parameters
----------
theta : np.ndarray
Hyperparameter vector. Note that all hyperparameter are on a log scale.
"""
self._n_ll_evals += 1
# Bound the hyperparameter space to keep things sane. Note that all hyperparameters live on a log scale.
if (theta < -50).any():
theta[theta < -50] = -50
if (theta > 50).any():
theta[theta > 50] = 50
lml = 0.0
grad = np.zeros(theta.shape)
# Add prior
for dim, priors in enumerate(self._all_priors):
for prior in priors:
lml += prior.get_log_probability(theta[dim])
grad[dim] += prior.get_gradient(theta[dim])
# Check if one of the priors is invalid, if so, no need to compute the log marginal likelihood
if lml < -1e24:
return -1e25, np.zeros(theta.shape)
try:
lml_, grad_ = self._gp.log_marginal_likelihood(theta, eval_gradient=True)
lml += lml_
grad += grad_
except ValueError:
return -1e25, np.zeros(theta.shape)
# We add a minus here because scipy is minimizing
if not np.isfinite(lml) or (~np.isfinite(grad)).any():
return -1e25, np.zeros(theta.shape)
else:
return lml, grad
def _predict(
self,
X: np.ndarray,
covariance_type: str | None = "diagonal",
) -> tuple[np.ndarray, np.ndarray | None]:
r"""
Returns the predictive mean and variance of the objective function
at X averaged over all hyperparameter samples.
The mean is computed by:
:math \mu(x) = \frac{1}{M}\sum_{i=1}^{M}\mu_m(x)
And the variance by:
:math \sigma^2(x) = (\frac{1}{M}\sum_{i=1}^{M}(\sigma^2_m(x) + \mu_m(x)^2) - \mu^2
"""
if not self._is_trained:
raise Exception("Model has to be trained first!")
if covariance_type != "diagonal":
raise ValueError("`covariance_type` can only take `diagonal` for this model.")
X_test = self._impute_inactive(X)
mu = np.zeros([len(self._models), X_test.shape[0]])
var = np.zeros([len(self._models), X_test.shape[0]])
for i, model in enumerate(self._models):
mu_tmp, var_tmp = model.predict(X_test)
assert var_tmp is not None
mu[i] = mu_tmp.flatten()
var[i] = var_tmp.flatten()
m = mu.mean(axis=0)
# See the Algorithm Runtime Prediction paper by Hutter et al.
# for the derivation of the total variance
v = np.var(mu, axis=0) + np.mean(var, axis=0)
# Clip negative variances and set them to the smallest
# positive float value
if v.shape[0] == 1:
v = np.clip(v, np.finfo(v.dtype).eps, np.inf)
else:
v = np.clip(v, np.finfo(v.dtype).eps, np.inf)
v[np.where((v < np.finfo(v.dtype).eps) & (v > -np.finfo(v.dtype).eps))] = 0
return m, v