Module aqua_blue_hyperopt.hyper
Functions
def default_loss(mp: ModelParams) ‑> Callable[[HyperParams], Output]-
Expand source code
def default_loss(mp: ModelParams) -> Callable[[HyperParams], Output]: def inner(p : HyperParams) -> Output: spectral_radius, leaking_rate, sparsity, rcond = p['spectral_radius'], p['leaking_rate'], p['sparsity'], p['rcond'] normalizer = Normalizer() model = Model( reservoir=DynamicalReservoir( reservoir_dimensionality = mp.reservoir_dimensionality, input_dimensionality = mp.input_dimensionality, w_res = mp.w_res, w_in = mp.w_in, spectral_radius = spectral_radius, leaking_rate = leaking_rate, sparsity = sparsity ), readout = mp.readout(rcond = rcond) ) normalized_time_series = normalizer.normalize(mp.time_series) try: model.train(normalized_time_series) except np.linalg.LinAlgError: warnings.warn('SVD Error in Training', RuntimeWarning) return { 'loss': 1000, 'status': hyperopt.STATUS_FAIL } try: prediction = model.predict(horizon = mp.horizon) except np.linalg.LinAlgError: warnings.warn('SVD Error in Training', RuntimeWarning) return { 'loss': 1000, 'status': hyperopt.STATUS_FAIL } prediction = normalizer.denormalize(prediction) if prediction.dependent_variable.shape != mp.actual_future.shape: raise ValueError('Dimension mismatch between actual future and prediction') loss = np.sqrt(np.mean((mp.actual_future - prediction.dependent_variable) ** 2)) out : Output = { 'loss': loss, 'status': hyperopt.STATUS_OK } return out return inner
Classes
class Algo (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Expand source code
class Algo(Enum): TREE_PARZEN_ESTIMATOR = hyperopt.tpe.suggest GRID_SEARCH = hyperopt.rand.suggest SIMULATED_ANNEALING = hyperopt.anneal.suggestAn enumeration.
Ancestors
- enum.Enum
Methods
def GRID_SEARCH(new_ids, domain, trials, seed)-
Expand source code
def suggest(new_ids, domain, trials, seed): rng = np.random.default_rng(seed) rval = [] for ii, new_id in enumerate(new_ids): # -- sample new specs, idxs, vals idxs, vals = pyll.rec_eval( domain.s_idxs_vals, memo={domain.s_new_ids: [new_id], domain.s_rng: rng} ) new_result = domain.new_result() new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir) miscs_update_idxs_vals([new_misc], idxs, vals) rval.extend(trials.new_trial_docs([new_id], [None], [new_result], [new_misc])) return rval def SIMULATED_ANNEALING(new_ids, domain, trials, seed, *args, **kwargs)-
Expand source code
def suggest(new_ids, domain, trials, seed, *args, **kwargs): (new_id,) = new_ids return AnnealingAlgo(domain, trials, seed, *args, **kwargs)(new_id) def TREE_PARZEN_ESTIMATOR(new_ids,
domain,
trials,
seed,
prior_weight=1.0,
n_startup_jobs=20,
n_EI_candidates=24,
gamma=0.25,
verbose=True)-
Expand source code
def suggest( new_ids, domain, trials, seed, prior_weight=_default_prior_weight, n_startup_jobs=_default_n_startup_jobs, n_EI_candidates=_default_n_EI_candidates, gamma=_default_gamma, verbose=True, ): """ Given previous trials and the domain, suggest the best expected hp point according to the TPE-EI algo Args: prior_weight( n_startup_jobs: n_EI_candidates: gamma: verbose: Returns: """ t0 = time.time() # use build_posterior_wrapper to create the pyll nodes observed, observed_loss, posterior = build_posterior_wrapper( domain, prior_weight, gamma ) tt = time.time() - t0 if verbose: logger.info("build_posterior_wrapper took %f seconds" % tt) # Loop over previous trials to collect best_docs and best_docs_loss best_docs = dict() best_docs_loss = dict() for doc in trials.trials: # get either these docs own tid or the one that it's from tid = doc["misc"].get("from_tid", doc["tid"]) # associate infinite loss to new/running/failed jobs loss = doc["result"].get("loss") loss = float("inf") if loss is None else float(loss) # if set, update loss for this tid if it's higher than current loss # otherwise, set it best_docs_loss.setdefault(tid, loss) if loss <= best_docs_loss[tid]: best_docs_loss[tid] = loss best_docs[tid] = doc # -- sort docs by order of suggestion # so that linear_forgetting removes the oldest ones tid_docs = sorted(best_docs.items()) losses = [best_docs_loss[tid] for tid, doc in tid_docs] tids, docs = list(zip(*tid_docs)) if tid_docs else ([], []) if verbose: if docs: s = "%i/%i trials with best loss %f" % ( len(docs), len(trials), np.nanmin(losses), ) else: s = "0 trials" logger.info("TPE using %s" % s) if len(docs) < n_startup_jobs: # N.B. THIS SEEDS THE RNG BASED ON THE new_id return rand.suggest(new_ids, domain, trials, seed) # Sample and compute log-probability. first_new_id = new_ids[0] if tids: # -- the +2 coordinates with an assertion above # to ensure that fake ids are used during sampling # TODO: not sure what assertion this refers to... fake_id_0 = max(max(tids), first_new_id) + 2 else: # -- weird - we're running the TPE algo from scratch assert n_startup_jobs <= 0 fake_id_0 = first_new_id + 2 fake_ids = list(range(fake_id_0, fake_id_0 + n_EI_candidates)) # -- this dictionary will map pyll nodes to the values # they should take during the evaluation of the pyll program memo = {domain.s_new_ids: fake_ids, domain.s_rng: np.random.default_rng(seed)} memo[observed_loss["idxs"]] = tids memo[observed_loss["vals"]] = losses observed_idxs_dict, observed_vals_dict = miscs_to_idxs_vals( [doc["misc"] for doc in docs], keys=list(domain.params.keys()) ) memo[observed["idxs"]] = observed_idxs_dict memo[observed["vals"]] = observed_vals_dict # evaluate `n_EI_candidates` pyll nodes in `posterior` using `memo` # TODO: it seems to return idxs, vals, all the same. Is this correct? idxs, vals = pyll.rec_eval(posterior, memo=memo, print_node_on_error=False) # hack to add offset again for randint params for label, param in domain.params.items(): if param.name == "randint" and len(param.pos_args) == 2: offset = param.pos_args[0].obj vals[label] = [val + offset for val in vals[label]] # -- retrieve the best of the samples and form the return tuple # specs are deprecated since build_posterior makes all the same rval_specs = [None] rval_results = [domain.new_result()] rval_miscs = [{"tid": first_new_id, "cmd": domain.cmd, "workdir": domain.workdir}] miscs_update_idxs_vals( rval_miscs, idxs, vals, idxs_map={fake_ids[0]: first_new_id}, assert_all_vals_used=False, ) # return the doc for the best new trial return trials.new_trial_docs([first_new_id], rval_specs, rval_results, rval_miscs)Given previous trials and the domain, suggest the best expected hp point according to the TPE-EI algo
Args
prior_weight( n_startup_jobs: n_EI_candidates: gamma: verbose: Returns:
class HyperParams (*args, **kwargs)-
Expand source code
class HyperParams(TypedDict): spectral_radius: hyperopt.pyll.base.Apply leaking_rate: hyperopt.pyll.base.Apply sparsity: hyperopt.pyll.base.Apply rcond: hyperopt.pyll.base.Applydict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var leaking_rate : hyperopt.pyll.base.Applyvar rcond : hyperopt.pyll.base.Applyvar sparsity : hyperopt.pyll.base.Applyvar spectral_radius : hyperopt.pyll.base.Apply
class ModelParams (time_series: aqua_blue.time_series.TimeSeries,
input_dimensionality: int,
reservoir_dimensionality: int,
horizon: int,
actual_future: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]],
readout: Type[aqua_blue.readouts.Readout],
w_in: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None = None,
w_res: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None = None)-
Expand source code
@dataclass class ModelParams: time_series: TimeSeries input_dimensionality: int reservoir_dimensionality: int horizon: int actual_future: NDArray readout: Type[aqua_blue.readouts.Readout] w_in: Optional[NDArray] = None w_res: Optional[NDArray] = NoneModelParams(time_series: aqua_blue.time_series.TimeSeries, input_dimensionality: int, reservoir_dimensionality: int, horizon: int, actual_future: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]], readout: Type[aqua_blue.readouts.Readout], w_in: Optional[numpy.ndarray[Any, numpy.dtype[+_ScalarType_co]]] = None, w_res: Optional[numpy.ndarray[Any, numpy.dtype[+_ScalarType_co]]] = None)
Instance variables
var actual_future : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]]var horizon : intvar input_dimensionality : intvar readout : Type[aqua_blue.readouts.Readout]var reservoir_dimensionality : intvar time_series : aqua_blue.time_series.TimeSeriesvar w_in : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | Nonevar w_res : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None
class Optimizer (max_evals: int,
fn: Callable[[HyperParams], Output] | Callable[[ModelParams], Callable[[HyperParams], Output]] = <function default_loss>,
space: HyperParams = <factory>,
algo: Algo = <function suggest>,
trials: hyperopt.base.Trials | None = None)-
Expand source code
@dataclass class Optimizer: max_evals: int fn: ObjectiveLike = default_loss space: HyperParams = field(default_factory = lambda: default_space) algo: Algo = Algo.GRID_SEARCH trials: Optional[hyperopt.Trials] = None def optimize(self) -> HyperParams: warnings.warn("This feature is currently experimental and may be unstable or subject to change. Feedback is welcome to help improve future versions.", UserWarning) return hyperopt.fmin( fn=self.fn, space=self.space, algo=self.algo, max_evals=self.max_evals, trials=self.trials )Optimizer(max_evals: int, fn: Union[Callable[[aqua_blue_hyperopt.hyper.HyperParams], aqua_blue_hyperopt.hyper.Output], Callable[[aqua_blue_hyperopt.hyper.ModelParams], Callable[[aqua_blue_hyperopt.hyper.HyperParams], aqua_blue_hyperopt.hyper.Output]]] =
, space: aqua_blue_hyperopt.hyper.HyperParams = , algo: aqua_blue_hyperopt.hyper.Algo = , trials: Optional[hyperopt.base.Trials] = None) Instance variables
var max_evals : intvar space : HyperParamsvar trials : hyperopt.base.Trials | None
Methods
def algo(new_ids, domain, trials, seed) ‑> Algo-
Expand source code
def suggest(new_ids, domain, trials, seed): rng = np.random.default_rng(seed) rval = [] for ii, new_id in enumerate(new_ids): # -- sample new specs, idxs, vals idxs, vals = pyll.rec_eval( domain.s_idxs_vals, memo={domain.s_new_ids: [new_id], domain.s_rng: rng} ) new_result = domain.new_result() new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir) miscs_update_idxs_vals([new_misc], idxs, vals) rval.extend(trials.new_trial_docs([new_id], [None], [new_result], [new_misc])) return rval def fn(mp: ModelParams) ‑> Callable[[HyperParams], Output]-
Expand source code
def default_loss(mp: ModelParams) -> Callable[[HyperParams], Output]: def inner(p : HyperParams) -> Output: spectral_radius, leaking_rate, sparsity, rcond = p['spectral_radius'], p['leaking_rate'], p['sparsity'], p['rcond'] normalizer = Normalizer() model = Model( reservoir=DynamicalReservoir( reservoir_dimensionality = mp.reservoir_dimensionality, input_dimensionality = mp.input_dimensionality, w_res = mp.w_res, w_in = mp.w_in, spectral_radius = spectral_radius, leaking_rate = leaking_rate, sparsity = sparsity ), readout = mp.readout(rcond = rcond) ) normalized_time_series = normalizer.normalize(mp.time_series) try: model.train(normalized_time_series) except np.linalg.LinAlgError: warnings.warn('SVD Error in Training', RuntimeWarning) return { 'loss': 1000, 'status': hyperopt.STATUS_FAIL } try: prediction = model.predict(horizon = mp.horizon) except np.linalg.LinAlgError: warnings.warn('SVD Error in Training', RuntimeWarning) return { 'loss': 1000, 'status': hyperopt.STATUS_FAIL } prediction = normalizer.denormalize(prediction) if prediction.dependent_variable.shape != mp.actual_future.shape: raise ValueError('Dimension mismatch between actual future and prediction') loss = np.sqrt(np.mean((mp.actual_future - prediction.dependent_variable) ** 2)) out : Output = { 'loss': loss, 'status': hyperopt.STATUS_OK } return out return inner def optimize(self) ‑> HyperParams-
Expand source code
def optimize(self) -> HyperParams: warnings.warn("This feature is currently experimental and may be unstable or subject to change. Feedback is welcome to help improve future versions.", UserWarning) return hyperopt.fmin( fn=self.fn, space=self.space, algo=self.algo, max_evals=self.max_evals, trials=self.trials )
class Output (*args, **kwargs)-
Expand source code
class Output(TypedDict, total=False): loss: float status: HyperoptStatusdict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var loss : floatvar status : Literal['ok', 'fail']