Module aqua_blue_hyperopt.hyper

Functions

def default_loss(mp: ModelParams) ‑> Callable[[HyperParams], Output]
Expand source code
def default_loss(mp: ModelParams) -> Callable[[HyperParams], Output]:
    def inner(p : HyperParams) -> Output: 
        spectral_radius, leaking_rate, sparsity, rcond = p['spectral_radius'], p['leaking_rate'], p['sparsity'], p['rcond']
        
        normalizer = Normalizer()
        model = Model( 
            reservoir=DynamicalReservoir(
                reservoir_dimensionality = mp.reservoir_dimensionality, 
                input_dimensionality = mp.input_dimensionality,
                w_res = mp.w_res, 
                w_in = mp.w_in,
                spectral_radius = spectral_radius,
                leaking_rate = leaking_rate, 
                sparsity = sparsity
            ),
            readout = mp.readout(rcond = rcond)
        )

        normalized_time_series = normalizer.normalize(mp.time_series)
        
        try: 
            model.train(normalized_time_series)
        except np.linalg.LinAlgError:
            warnings.warn('SVD Error in Training', RuntimeWarning)
            return { 
                'loss': 1000, 
                'status': hyperopt.STATUS_FAIL
            }
        
        try: 
            prediction = model.predict(horizon = mp.horizon)
        except np.linalg.LinAlgError:
            warnings.warn('SVD Error in Training', RuntimeWarning)
            return { 
                'loss': 1000, 
                'status': hyperopt.STATUS_FAIL
            }
        
        prediction = normalizer.denormalize(prediction)
        
        if prediction.dependent_variable.shape != mp.actual_future.shape:
            raise ValueError('Dimension mismatch between actual future and prediction')
        
        loss = np.sqrt(np.mean((mp.actual_future - prediction.dependent_variable) ** 2))

        out : Output = { 
            'loss': loss, 
            'status': hyperopt.STATUS_OK
        }
        
        return out 
    
    return inner

Classes

class Algo (value, names=None, *, module=None, qualname=None, type=None, start=1)
Expand source code
class Algo(Enum):
    TREE_PARZEN_ESTIMATOR = hyperopt.tpe.suggest
    GRID_SEARCH = hyperopt.rand.suggest
    SIMULATED_ANNEALING = hyperopt.anneal.suggest

An enumeration.

Ancestors

  • enum.Enum

Methods

Expand source code
def suggest(new_ids, domain, trials, seed):
    rng = np.random.default_rng(seed)
    rval = []
    for ii, new_id in enumerate(new_ids):
        # -- sample new specs, idxs, vals
        idxs, vals = pyll.rec_eval(
            domain.s_idxs_vals, memo={domain.s_new_ids: [new_id], domain.s_rng: rng}
        )
        new_result = domain.new_result()
        new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir)
        miscs_update_idxs_vals([new_misc], idxs, vals)
        rval.extend(trials.new_trial_docs([new_id], [None], [new_result], [new_misc]))
    return rval
def SIMULATED_ANNEALING(new_ids, domain, trials, seed, *args, **kwargs)
Expand source code
def suggest(new_ids, domain, trials, seed, *args, **kwargs):
    (new_id,) = new_ids
    return AnnealingAlgo(domain, trials, seed, *args, **kwargs)(new_id)
def TREE_PARZEN_ESTIMATOR(new_ids,
domain,
trials,
seed,
prior_weight=1.0,
n_startup_jobs=20,
n_EI_candidates=24,
gamma=0.25,
verbose=True)
Expand source code
def suggest(
    new_ids,
    domain,
    trials,
    seed,
    prior_weight=_default_prior_weight,
    n_startup_jobs=_default_n_startup_jobs,
    n_EI_candidates=_default_n_EI_candidates,
    gamma=_default_gamma,
    verbose=True,
):
    """
    Given previous trials and the domain, suggest the best expected hp point
    according to the TPE-EI algo


    Args:
        prior_weight(
        n_startup_jobs:
        n_EI_candidates:
        gamma:
        verbose:

    Returns:

    """

    t0 = time.time()
    # use build_posterior_wrapper to create the pyll nodes
    observed, observed_loss, posterior = build_posterior_wrapper(
        domain, prior_weight, gamma
    )
    tt = time.time() - t0
    if verbose:
        logger.info("build_posterior_wrapper took %f seconds" % tt)

    # Loop over previous trials to collect best_docs and best_docs_loss
    best_docs = dict()
    best_docs_loss = dict()
    for doc in trials.trials:

        # get either these docs own tid or the one that it's from
        tid = doc["misc"].get("from_tid", doc["tid"])

        # associate infinite loss to new/running/failed jobs
        loss = doc["result"].get("loss")
        loss = float("inf") if loss is None else float(loss)

        # if set, update loss for this tid if it's higher than current loss
        # otherwise, set it
        best_docs_loss.setdefault(tid, loss)
        if loss <= best_docs_loss[tid]:
            best_docs_loss[tid] = loss
            best_docs[tid] = doc

    # -- sort docs by order of suggestion
    #    so that linear_forgetting removes the oldest ones
    tid_docs = sorted(best_docs.items())
    losses = [best_docs_loss[tid] for tid, doc in tid_docs]
    tids, docs = list(zip(*tid_docs)) if tid_docs else ([], [])

    if verbose:
        if docs:
            s = "%i/%i trials with best loss %f" % (
                len(docs),
                len(trials),
                np.nanmin(losses),
            )
        else:
            s = "0 trials"
        logger.info("TPE using %s" % s)

    if len(docs) < n_startup_jobs:
        # N.B. THIS SEEDS THE RNG BASED ON THE new_id
        return rand.suggest(new_ids, domain, trials, seed)

    # Sample and compute log-probability.
    first_new_id = new_ids[0]
    if tids:
        # -- the +2 coordinates with an assertion above
        #    to ensure that fake ids are used during sampling
        #    TODO: not sure what assertion this refers to...
        fake_id_0 = max(max(tids), first_new_id) + 2
    else:
        # -- weird - we're running the TPE algo from scratch
        assert n_startup_jobs <= 0
        fake_id_0 = first_new_id + 2

    fake_ids = list(range(fake_id_0, fake_id_0 + n_EI_candidates))

    # -- this dictionary will map pyll nodes to the values
    #    they should take during the evaluation of the pyll program
    memo = {domain.s_new_ids: fake_ids, domain.s_rng: np.random.default_rng(seed)}

    memo[observed_loss["idxs"]] = tids
    memo[observed_loss["vals"]] = losses

    observed_idxs_dict, observed_vals_dict = miscs_to_idxs_vals(
        [doc["misc"] for doc in docs], keys=list(domain.params.keys())
    )
    memo[observed["idxs"]] = observed_idxs_dict
    memo[observed["vals"]] = observed_vals_dict

    # evaluate `n_EI_candidates` pyll nodes in `posterior` using `memo`
    # TODO: it seems to return idxs, vals, all the same. Is this correct?
    idxs, vals = pyll.rec_eval(posterior, memo=memo, print_node_on_error=False)

    # hack to add offset again for randint params
    for label, param in domain.params.items():
        if param.name == "randint" and len(param.pos_args) == 2:
            offset = param.pos_args[0].obj
            vals[label] = [val + offset for val in vals[label]]

    # -- retrieve the best of the samples and form the return tuple

    # specs are deprecated since build_posterior makes all the same
    rval_specs = [None]
    rval_results = [domain.new_result()]
    rval_miscs = [{"tid": first_new_id, "cmd": domain.cmd, "workdir": domain.workdir}]

    miscs_update_idxs_vals(
        rval_miscs,
        idxs,
        vals,
        idxs_map={fake_ids[0]: first_new_id},
        assert_all_vals_used=False,
    )
    # return the doc for the best new trial
    return trials.new_trial_docs([first_new_id], rval_specs, rval_results, rval_miscs)

Given previous trials and the domain, suggest the best expected hp point according to the TPE-EI algo

Args

prior_weight( n_startup_jobs: n_EI_candidates: gamma: verbose: Returns:

class HyperParams (*args, **kwargs)
Expand source code
class HyperParams(TypedDict): 
    spectral_radius: hyperopt.pyll.base.Apply
    leaking_rate: hyperopt.pyll.base.Apply
    sparsity: hyperopt.pyll.base.Apply
    rcond: hyperopt.pyll.base.Apply

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var leaking_rate : hyperopt.pyll.base.Apply
var rcond : hyperopt.pyll.base.Apply
var sparsity : hyperopt.pyll.base.Apply
var spectral_radius : hyperopt.pyll.base.Apply
class ModelParams (time_series: aqua_blue.time_series.TimeSeries,
input_dimensionality: int,
reservoir_dimensionality: int,
horizon: int,
actual_future: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]],
readout: Type[aqua_blue.readouts.Readout],
w_in: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None = None,
w_res: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None = None)
Expand source code
@dataclass
class ModelParams: 
    time_series: TimeSeries
    input_dimensionality: int
    reservoir_dimensionality: int
    horizon: int
    actual_future: NDArray
    readout: Type[aqua_blue.readouts.Readout]
    w_in: Optional[NDArray] = None
    w_res: Optional[NDArray] = None

ModelParams(time_series: aqua_blue.time_series.TimeSeries, input_dimensionality: int, reservoir_dimensionality: int, horizon: int, actual_future: numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]], readout: Type[aqua_blue.readouts.Readout], w_in: Optional[numpy.ndarray[Any, numpy.dtype[+_ScalarType_co]]] = None, w_res: Optional[numpy.ndarray[Any, numpy.dtype[+_ScalarType_co]]] = None)

Instance variables

var actual_future : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]]
var horizon : int
var input_dimensionality : int
var readout : Type[aqua_blue.readouts.Readout]
var reservoir_dimensionality : int
var time_series : aqua_blue.time_series.TimeSeries
var w_in : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None
var w_res : numpy.ndarray[typing.Any, numpy.dtype[+_ScalarType_co]] | None
class Optimizer (max_evals: int,
fn: Callable[[HyperParams], Output] | Callable[[ModelParams], Callable[[HyperParams], Output]] = <function default_loss>,
space: HyperParams = <factory>,
algo: Algo = <function suggest>,
trials: hyperopt.base.Trials | None = None)
Expand source code
@dataclass
class Optimizer: 
    max_evals: int
    fn: ObjectiveLike = default_loss
    space: HyperParams = field(default_factory = lambda: default_space)
    algo: Algo = Algo.GRID_SEARCH
    trials: Optional[hyperopt.Trials] = None
    
    def optimize(self) -> HyperParams:
        
        warnings.warn("This feature is currently experimental and may be unstable or subject to change. Feedback is welcome to help improve future versions.", UserWarning)

        return hyperopt.fmin(
            fn=self.fn, 
            space=self.space,
            algo=self.algo, 
            max_evals=self.max_evals,
            trials=self.trials
        )

Optimizer(max_evals: int, fn: Union[Callable[[aqua_blue_hyperopt.hyper.HyperParams], aqua_blue_hyperopt.hyper.Output], Callable[[aqua_blue_hyperopt.hyper.ModelParams], Callable[[aqua_blue_hyperopt.hyper.HyperParams], aqua_blue_hyperopt.hyper.Output]]] = , space: aqua_blue_hyperopt.hyper.HyperParams = , algo: aqua_blue_hyperopt.hyper.Algo = , trials: Optional[hyperopt.base.Trials] = None)

Instance variables

var max_evals : int
var spaceHyperParams
var trials : hyperopt.base.Trials | None

Methods

def algo(new_ids, domain, trials, seed) ‑> Algo
Expand source code
def suggest(new_ids, domain, trials, seed):
    rng = np.random.default_rng(seed)
    rval = []
    for ii, new_id in enumerate(new_ids):
        # -- sample new specs, idxs, vals
        idxs, vals = pyll.rec_eval(
            domain.s_idxs_vals, memo={domain.s_new_ids: [new_id], domain.s_rng: rng}
        )
        new_result = domain.new_result()
        new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir)
        miscs_update_idxs_vals([new_misc], idxs, vals)
        rval.extend(trials.new_trial_docs([new_id], [None], [new_result], [new_misc]))
    return rval
def fn(mp: ModelParams) ‑> Callable[[HyperParams], Output]
Expand source code
def default_loss(mp: ModelParams) -> Callable[[HyperParams], Output]:
    def inner(p : HyperParams) -> Output: 
        spectral_radius, leaking_rate, sparsity, rcond = p['spectral_radius'], p['leaking_rate'], p['sparsity'], p['rcond']
        
        normalizer = Normalizer()
        model = Model( 
            reservoir=DynamicalReservoir(
                reservoir_dimensionality = mp.reservoir_dimensionality, 
                input_dimensionality = mp.input_dimensionality,
                w_res = mp.w_res, 
                w_in = mp.w_in,
                spectral_radius = spectral_radius,
                leaking_rate = leaking_rate, 
                sparsity = sparsity
            ),
            readout = mp.readout(rcond = rcond)
        )

        normalized_time_series = normalizer.normalize(mp.time_series)
        
        try: 
            model.train(normalized_time_series)
        except np.linalg.LinAlgError:
            warnings.warn('SVD Error in Training', RuntimeWarning)
            return { 
                'loss': 1000, 
                'status': hyperopt.STATUS_FAIL
            }
        
        try: 
            prediction = model.predict(horizon = mp.horizon)
        except np.linalg.LinAlgError:
            warnings.warn('SVD Error in Training', RuntimeWarning)
            return { 
                'loss': 1000, 
                'status': hyperopt.STATUS_FAIL
            }
        
        prediction = normalizer.denormalize(prediction)
        
        if prediction.dependent_variable.shape != mp.actual_future.shape:
            raise ValueError('Dimension mismatch between actual future and prediction')
        
        loss = np.sqrt(np.mean((mp.actual_future - prediction.dependent_variable) ** 2))

        out : Output = { 
            'loss': loss, 
            'status': hyperopt.STATUS_OK
        }
        
        return out 
    
    return inner
def optimize(self) ‑> HyperParams
Expand source code
def optimize(self) -> HyperParams:
    
    warnings.warn("This feature is currently experimental and may be unstable or subject to change. Feedback is welcome to help improve future versions.", UserWarning)

    return hyperopt.fmin(
        fn=self.fn, 
        space=self.space,
        algo=self.algo, 
        max_evals=self.max_evals,
        trials=self.trials
    )
class Output (*args, **kwargs)
Expand source code
class Output(TypedDict, total=False): 
    loss: float
    status: HyperoptStatus

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var loss : float
var status : Literal['ok', 'fail']