Source code for bayesopt_server

#!/usr/bin/env python3

import actionlib
import rospy


from bayesopt4ros import BayesianOptimization, util
from bayesopt4ros.msg import BayesOptResult, BayesOptAction
from bayesopt4ros.msg import BayesOptStateResult, BayesOptStateAction


[docs]class BayesOptServer(object): """The Bayesian optimization server node. Acts as a layer between the actual Bayesian optimization and ROS. """
[docs] def __init__( self, config_file: str, server_name: str = "BayesOpt", log_file: str = None, anonymous: bool = True, log_level: int = rospy.INFO, silent: bool = False, node_rate: float = 5.0, ) -> None: """The BayesOptServer class initializer. Parameters ---------- config_file : str File that describes all settings for Bayesian optimization. server_name : str Name of the server that is used for ROS. log_file : str All input/output pairs are logged to this file. anonymous : bool Flag if the node should be anonymous or not (see ROS documentation). log_level : int Controls the log_level of the node's output. silent : bool Controls the verbosity of the node's output. node_rate : float Rate at which the server gives feedback. """ rospy.init_node( self.__class__.__name__, anonymous=anonymous, log_level=log_level, ) self._initialize_bayesopt(config_file) self._initialize_parameter_server(server_name) self._initialize_state_server(server_name + "State") self.parameter_server.start() self.state_server.start() self.request_count = 0 self.log_file = log_file self.config_file = config_file self.silent = silent self.rosrate = rospy.Rate(node_rate) rospy.loginfo(self._log_prefix + "Ready to receive requests.")
[docs] @util.count_requests def next_parameter_callback(self, goal: BayesOptAction) -> None: """Method that gets called when a new parameter vector is requested. The action message (goal/result/feedback) is defined here: ``action/BayesOpt.action`` .. literalinclude:: ../action/BayesOpt.action Parameters ---------- goal : BayesOptAction The action (goal) coming from a client. """ self._print_goal(goal) if not self.silent else None if self._check_final_iter(goal): return # Do not continue once we reached maximum iterations # Obtain the new parameter values. result = BayesOptResult() result.x_new = list(self.bo.next(goal)) self.parameter_server.set_succeeded(result) self._print_result(result) if not self.silent else None
[docs] def state_callback(self, goal) -> None: """Method that gets called when the BayesOpt state is requested. .. note:: We are calling this `state` instead of `result` to avoid confusion with the `result` variable in the action message. The action message (goal/result/feedback) is defined here: ``action/BayesOptState.action`` .. literalinclude:: ../action/BayesOptState.action Parameters ---------- goal : BayesOptStateAction The action (goal) coming from a client. """ state = BayesOptStateResult() # Best observed variables x_best, y_best = self.bo.get_best_observation() state.x_best = list(x_best) state.y_best = y_best # Posterior mean optimum x_opt, f_opt = self.bo.get_optimal_parameters() state.x_opt = list(x_opt) state.f_opt = f_opt self.state_server.set_succeeded(state)
def _initialize_bayesopt(self, config_file): try: self.bo = BayesianOptimization.from_file(config_file) except Exception as e: rospy.logerr(f"[BayesOpt] Something went wrong with initialization: '{e}'") rospy.signal_shutdown("Initialization of BayesOpt failed.") def _initialize_parameter_server(self, server_name): """This server obtains new function values and provides new parameters.""" self.parameter_server = actionlib.SimpleActionServer( server_name, BayesOptAction, execute_cb=self.next_parameter_callback, auto_start=False, ) def _initialize_state_server(self, server_name): """This server provides the current state/results of BO.""" self.state_server = actionlib.SimpleActionServer( server_name, BayesOptStateAction, execute_cb=self.state_callback, auto_start=False, ) def _check_final_iter(self, goal): if self.bo.max_iter and self.request_count > self.bo.max_iter: # Updates model with last function and logs the final GP model rospy.logwarn("[BayesOpt] Max iter reached. No longer responding!") self.bo.update_last_goal(goal) self.parameter_server.set_aborted() return True else: return False def _print_goal(self, goal): if not self.request_count == 1: rospy.loginfo(self._log_prefix + f"New value: {goal.y_new:.3f}") else: rospy.loginfo(self._log_prefix + f"Discard value: {goal.y_new:.3f}") def _print_result(self, result): s = util.iter_to_string(result.x_new, ".3f") rospy.loginfo(self._log_prefix + f"x_new: [{s}]") if self.request_count < self.bo.max_iter: rospy.loginfo(self._log_prefix + "Waiting for new request...") @property def _log_prefix(self) -> str: """Convenience property that pre-fixes the logging strings.""" return f"[{self.__class__.__name__}] Iteration {self.request_count}: "
[docs] @staticmethod def run() -> None: """Simply starts the server.""" rospy.spin()