diff --git a/CHANGELOG.md b/CHANGELOG.md index 517b1b8..3651bc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ## [2.0.0] - 2024-07-30 +### Changed +- BREAKING: Migrated from gym to gymnasium. + ### Added - Add py.typed marker for mypy - screenshot_mode.py to run simulation without visualisations (for nicer screenshots). @@ -31,6 +34,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - trifinger_simulation does no longer maintain a copy of the robot model files for the pure-Python installation. Instead it depends on robot_properties_fingers now for both installation types (colcon and pure Python). +- The example Gym environments have been removed. They still depended on the old gym, + which is not working anymore with recent Python versions. Since they anyway where + mostly meant as examples and are not directly used in our code base, they were simply + removed. If you actually depend on them, you can easily retrieve them from an earlier + version of the code and maintain them in a separate package. ## [1.4.1] - 2022-06-24 diff --git a/demos/demo_load_gym_env.py b/demos/demo_load_gym_env.py deleted file mode 100644 index ebe4d0e..0000000 --- a/demos/demo_load_gym_env.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python3 -"""Minimum example to show how to create the available gym environments.""" -import gym -import argparse - - -def main(): - argparser = argparse.ArgumentParser(description=__doc__) - argparser.add_argument( - "--env", - default="push", - choices=["reach", "push"], - help="Specify which gym env to load, push or reach", - ) - args = argparser.parse_args() - - if args.env == "push": - env = gym.make( - "trifinger_simulation.gym_wrapper:push-v0", - control_rate_s=0.02, - finger_type="trifingerone", - enable_visualization=True, - ) - - elif args.env == "reach": - smoothing_params = { - "num_episodes": 700, - "start_after": 3.0 / 7.0, - "final_alpha": 0.975, - "stop_after": 5.0 / 7.0, - } - env = gym.make( - "trifinger_simulation.gym_wrapper:reach-v0", - control_rate_s=0.02, - finger_type="trifingerone", - smoothing_params=smoothing_params, - enable_visualization=True, - ) - - for episode in range(700): - env.reset() - for step in range(100): - action = env.action_space.sample() - observation, reward, done, info = env.step(action) - - -if __name__ == "__main__": - main() diff --git a/demos/demo_random_policy.py b/demos/demo_random_policy.py deleted file mode 100644 index ef27ce3..0000000 --- a/demos/demo_random_policy.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -"""Demo on how to run the simulation using the Gym environment - -This demo creates a CubeEnv environment and runs one episode with random -initialization using a dummy policy which uses random actions. -""" -import gym - -from trifinger_simulation.gym_wrapper.envs import cube_env - - -class RandomPolicy: - """Dummy policy which uses random actions.""" - - def __init__(self, action_space): - self.action_space = action_space - - def predict(self, observation): - return self.action_space.sample() - - -def main(): - # Use a random initializer with difficulty 1 - initializer = cube_env.RandomInitializer(difficulty=1) - - env = gym.make( - "trifinger_simulation.gym_wrapper:real_robot_challenge_phase_1-v1", - initializer=initializer, - action_type=cube_env.ActionType.POSITION, - frameskip=100, - visualization=True, - ) - - policy = RandomPolicy(env.action_space) - - observation = env.reset() - is_done = False - while not is_done: - action = policy.predict(observation) - observation, reward, is_done, info = env.step(action) - - print("Reward at final step: {:.3f}".format(reward)) - - -if __name__ == "__main__": - main() diff --git a/setup.cfg b/setup.cfg index ed11ebd..1b944fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,7 +33,7 @@ install_requires = # pinocchio pin >=2.4.7 pybullet >=3.0.8 - gym >=0.23.1 + gymnasium >=1.2.0 opencv-python >=4.2.0.34 pyyaml >=5.3.1 robot_properties_fingers>=2.0.2 @@ -42,9 +42,7 @@ scripts = demos/demo_cameras.py demos/demo_control.py demos/demo_inverse_kinematics.py - demos/demo_load_gym_env.py demos/demo_plain_torque_control.py - demos/demo_random_policy.py demos/demo_trifinger_platform.py scripts/check_position_control_accuracy.py scripts/profiling.py diff --git a/tests/test_cube_env.py b/tests/test_cube_env.py deleted file mode 100755 index cb14855..0000000 --- a/tests/test_cube_env.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python3 -import unittest - -from trifinger_simulation.gym_wrapper.envs import cube_env - - -class TestSample(unittest.TestCase): - """Test the CubeEnv gym environment.""" - - def test_if_observations_are_valid(self): - # Observations need to be contained in the observation space. If this - # is not the case, there is either an issue with the generation of the - # observations or the observation space is not defined properly. - env = cube_env.CubeEnv(cube_env.RandomInitializer(4)) - - observation = env.reset() - self.assertTrue( - env.observation_space.contains(observation), - msg="observation: {}".format(observation), - ) - - for i in range(3000): - action = env.action_space.sample() - observation, _, _, _ = env.step(action) - - self.assertTrue( - env.observation_space.contains(observation), - msg="Invalid observation: {}".format(observation), - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_determinism.py b/tests/test_determinism.py index 3bcbeb0..707297b 100755 --- a/tests/test_determinism.py +++ b/tests/test_determinism.py @@ -2,9 +2,6 @@ import unittest import numpy as np -import gym - -import trifinger_simulation.gym_wrapper # noqa from trifinger_simulation.sim_finger import SimFinger @@ -44,56 +41,6 @@ def run(): np.testing.assert_array_equal(first_run.position, second_run.position) np.testing.assert_array_equal(first_run.velocity, second_run.velocity) - def test_reach_rollouts(self): - smoothing_params = { - "num_episodes": 10, - # ratio of total training time after which smoothing starts - "start_after": 3.0 / 7.0, - # smoothing coeff. that shall be reached at end of training - "final_alpha": 0.975, - "stop_after": 5.0 / 7.0, - } - - num_samples = 10 - horizon = 100 - - env = gym.make( - "reach-v0", - control_rate_s=0.02, - enable_visualization=False, - finger_type="fingerone", - smoothing_params=smoothing_params, - ) - - start_position = [0.5, -0.7, -1.5] - - plans = -1.0 + 2.0 * np.random.rand(horizon, num_samples, 3) - - def run(): - states = [] - rewards = [] - - for i in range(num_samples): - env.reset() - env.finger.reset_finger_positions_and_velocities( - start_position - ) - for t in range(horizon): - state, reward, _, _ = env.step(plans[t, i]) - states.append(state) - rewards.append(reward) - states = np.array(states) - rewards = np.array(rewards) - return states, rewards - - first_states, first_rewards = run() - second_states, second_rewards = run() - - np.testing.assert_array_equal(first_states.all(), second_states.all()) - np.testing.assert_array_equal( - first_rewards.all(), second_rewards.all() - ) - if __name__ == "__main__": unittest.main() diff --git a/trifinger_simulation/gym_wrapper/__init__.py b/trifinger_simulation/gym_wrapper/__init__.py index 151db76..e69de29 100644 --- a/trifinger_simulation/gym_wrapper/__init__.py +++ b/trifinger_simulation/gym_wrapper/__init__.py @@ -1,21 +0,0 @@ -from gym.envs.registration import register - -register( - id="reach-v0", - entry_point="trifinger_simulation.gym_wrapper.envs.trifinger_reach:TriFingerReach", -) -register( - id="push-v0", - entry_point="trifinger_simulation.gym_wrapper.envs.trifinger_push:TriFingerPush", -) - -register( - id="real_robot_challenge_phase_1-v1", - entry_point="trifinger_simulation.gym_wrapper.envs.cube_env:CubeEnv", -) - - -register( - id="cube_trajectory-v1", - entry_point="trifinger_simulation.gym_wrapper.envs.cube_trajectory_env:CubeTrajectoryEnv", -) diff --git a/trifinger_simulation/gym_wrapper/envs/__init__.py b/trifinger_simulation/gym_wrapper/envs/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/trifinger_simulation/gym_wrapper/envs/cube_env.py b/trifinger_simulation/gym_wrapper/envs/cube_env.py deleted file mode 100644 index 2b6549c..0000000 --- a/trifinger_simulation/gym_wrapper/envs/cube_env.py +++ /dev/null @@ -1,341 +0,0 @@ -"""Gym environment for the Real Robot Challenge Phase 1 (Simulation).""" - -import enum - -import gym - -from trifinger_simulation import TriFingerPlatform -from trifinger_simulation import visual_objects -from trifinger_simulation.tasks import move_cube - - -class RandomInitializer: - """Initializer that samples random initial states and goals.""" - - def __init__(self, difficulty): - """Initialize. - - Args: - difficulty (int): Difficulty level for sampling goals. - """ - self.difficulty = difficulty - - def get_initial_state(self): - """Get a random initial object pose (always on the ground).""" - return move_cube.sample_goal(difficulty=-1) - - def get_goal(self): - """Get a random goal depending on the difficulty.""" - return move_cube.sample_goal(difficulty=self.difficulty) - - -class FixedInitializer: - """Initializer that uses fixed values for initial pose and goal.""" - - def __init__(self, difficulty, initial_state, goal): - """Initialize. - - Args: - difficulty (int): Difficulty level of the goal. This is still - needed even for a fixed goal, as it is also used for computing - the reward (the cost function is different for the different - levels). - initial_state (move_cube.Pose): Initial pose of the object. - goal (move_cube.Pose): Goal pose of the object. - - Raises: - Exception: If initial_state or goal are not valid. See - :meth:`move_cube.validate_goal` for more information. - """ - move_cube.validate_goal(initial_state) - move_cube.validate_goal(goal) - self.difficulty = difficulty - self.initial_state = initial_state - self.goal = goal - - def get_initial_state(self): - """Get the initial state that was set in the constructor.""" - return self.initial_state - - def get_goal(self): - """Get the goal that was set in the constructor.""" - return self.goal - - -class ActionType(enum.Enum): - """Different action types that can be used to control the robot.""" - - #: Use pure torque commands. The action is a list of torques (one per - #: joint) in this case. - TORQUE = enum.auto() - #: Use joint position commands. The action is a list of angular joint - #: positions (one per joint) in this case. Internally a PD controller is - #: executed for each action to determine the torques that are applied to - #: the robot. - POSITION = enum.auto() - #: Use both torque and position commands. In this case the action is a - #: dictionary with keys "torque" and "position" which contain the - #: corresponding lists of values (see above). The torques resulting from - #: the position controller are added to the torques in the action before - #: applying them to the robot. - TORQUE_AND_POSITION = enum.auto() - - -class CubeEnv(gym.Env): - """Gym environment for moving cubes with simulated TriFingerPro.""" - - def __init__( - self, - initializer, - action_type=ActionType.POSITION, - frameskip=1, - visualization=False, - ): - """Initialize. - - Args: - initializer: Initializer class for providing initial cube pose and - goal pose. See :class:`RandomInitializer` and - :class:`FixedInitializer`. - action_type (ActionType): Specify which type of actions to use. - See :class:`ActionType` for details. - frameskip (int): Number of actual control steps to be performed in - one call of step(). - visualization (bool): If true, the pyBullet GUI is run for - visualization. - """ - # Basic initialization - # ==================== - - self.initializer = initializer - self.action_type = action_type - self.visualization = visualization - - # TODO: The name "frameskip" makes sense for an atari environment but - # not really for our scenario. The name is also misleading as - # "frameskip = 1" suggests that one frame is skipped while it actually - # means "do one step per step" (i.e. no skip). - if frameskip < 1: - raise ValueError("frameskip cannot be less than 1.") - self.frameskip = frameskip - - # will be initialized in reset() - self.platform = None - - # Create the action and observation spaces - # ======================================== - - spaces = TriFingerPlatform.spaces - - object_state_space = gym.spaces.Dict( - { - "position": spaces.object_position.gym, - "orientation": spaces.object_orientation.gym, - } - ) - - if self.action_type == ActionType.TORQUE: - self.action_space = spaces.robot_torque.gym - elif self.action_type == ActionType.POSITION: - self.action_space = spaces.robot_position.gym - elif self.action_type == ActionType.TORQUE_AND_POSITION: - self.action_space = gym.spaces.Dict( - { - "torque": spaces.robot_torque.gym, - "position": spaces.robot_position.gym, - } - ) - else: - raise ValueError("Invalid action_type") - - self.observation_space = gym.spaces.Dict( - { - "observation": gym.spaces.Dict( - { - "position": spaces.robot_position.gym, - "velocity": spaces.robot_velocity.gym, - "torque": spaces.robot_torque.gym, - } - ), - "desired_goal": object_state_space, - "achieved_goal": object_state_space, - } - ) - - def compute_reward(self, achieved_goal, desired_goal, info): - """Compute the reward for the given achieved and desired goal. - - Args: - achieved_goal (dict): Current pose of the object. - desired_goal (dict): Goal pose of the object. - info (dict): An info dictionary containing a field "difficulty" - which specifies the difficulty level. - - Returns: - float: The reward that corresponds to the provided achieved goal - w.r.t. to the desired goal. Note that the following should always - hold true:: - - ob, reward, done, info = env.step() - assert reward == env.compute_reward( - ob['achieved_goal'], - ob['desired_goal'], - info, - ) - """ - return -move_cube.evaluate_state( - move_cube.Pose.from_dict(desired_goal), - move_cube.Pose.from_dict(achieved_goal), - info["difficulty"], - ) - - def step(self, action): - """Run one timestep of the environment's dynamics. - - When end of episode is reached, you are responsible for calling - ``reset()`` to reset this environment's state. - - Args: - action: An action provided by the agent (depends on the selected - :class:`ActionType`). - - Returns: - tuple: - - - observation (dict): agent's observation of the current - environment. - - reward (float) : amount of reward returned after previous action. - - done (bool): whether the episode has ended, in which case further - step() calls will return undefined results. - - info (dict): info dictionary containing the difficulty level of - the goal. - """ - if self.platform is None: - raise RuntimeError("Call `reset()` before starting to step.") - - if not self.action_space.contains(action): - raise ValueError( - "Given action is not contained in the action space." - ) - - num_steps = self.frameskip - - # ensure episode length is not exceeded due to frameskip - step_count_after = self.step_count + num_steps - if step_count_after > move_cube.episode_length: - excess = step_count_after - move_cube.episode_length - num_steps = max(1, num_steps - excess) - - reward = 0.0 - for _ in range(num_steps): - self.step_count += 1 - if self.step_count > move_cube.episode_length: - raise RuntimeError("Exceeded number of steps for one episode.") - - # send action to robot - robot_action = self._gym_action_to_robot_action(action) - t = self.platform.append_desired_action(robot_action) - - # Use observations of step t + 1 to follow what would be expected - # in a typical gym environment. Note that on the real robot, this - # will not be possible - observation = self._create_observation(t + 1) - - reward += self.compute_reward( - observation["achieved_goal"], - observation["desired_goal"], - self.info, - ) - - is_done = self.step_count == move_cube.episode_length - - return observation, reward, is_done, self.info - - def reset(self): - # reset simulation - del self.platform - - # initialize simulation - initial_robot_position = ( - TriFingerPlatform.spaces.robot_position.default - ) - initial_object_pose = self.initializer.get_initial_state() - goal_object_pose = self.initializer.get_goal() - - self.platform = TriFingerPlatform( - visualization=self.visualization, - initial_robot_position=initial_robot_position, - initial_object_pose=initial_object_pose, - ) - - self.goal = { - "position": goal_object_pose.position, - "orientation": goal_object_pose.orientation, - } - - # visualize the goal - if self.visualization: - self.goal_marker = visual_objects.CuboidMarker( - size=(0.02, 0.08, 0.02), - position=goal_object_pose.position, - orientation=goal_object_pose.orientation, - pybullet_client_id=self.platform.simfinger._pybullet_client_id, - ) - - self.info = {"difficulty": self.initializer.difficulty} - - self.step_count = 0 - - return self._create_observation(0) - - def seed(self, seed=None): - """Sets the seed for this env’s random number generator. - - .. note:: - - Spaces need to be seeded separately. E.g. if you want to sample - actions directly from the action space using - ``env.action_space.sample()`` you can set a seed there using - ``env.action_space.seed()``. - - Returns: - List of seeds used by this environment. This environment only uses - a single seed, so the list contains only one element. - """ - self.np_random, seed = gym.utils.seeding.np_random(seed) - move_cube.random = self.np_random - return [seed] - - def _create_observation(self, t): - robot_observation = self.platform.get_robot_observation(t) - camera_observation = self.platform.get_camera_observation(t) - object_observation = camera_observation.filtered_object_pose - - observation = { - "observation": { - "position": robot_observation.position, - "velocity": robot_observation.velocity, - "torque": robot_observation.torque, - }, - "desired_goal": self.goal, - "achieved_goal": { - "position": object_observation.position, - "orientation": object_observation.orientation, - }, - } - return observation - - def _gym_action_to_robot_action(self, gym_action): - # construct robot action depending on action type - if self.action_type == ActionType.TORQUE: - robot_action = self.platform.Action(torque=gym_action) - elif self.action_type == ActionType.POSITION: - robot_action = self.platform.Action(position=gym_action) - elif self.action_type == ActionType.TORQUE_AND_POSITION: - robot_action = self.platform.Action( - torque=gym_action["torque"], position=gym_action["position"] - ) - else: - raise ValueError("Invalid action_type") - - return robot_action diff --git a/trifinger_simulation/gym_wrapper/envs/cube_trajectory_env.py b/trifinger_simulation/gym_wrapper/envs/cube_trajectory_env.py deleted file mode 100644 index 81c8366..0000000 --- a/trifinger_simulation/gym_wrapper/envs/cube_trajectory_env.py +++ /dev/null @@ -1,345 +0,0 @@ -"""Gym environment for the Real Robot Challenge Phase 1 (Simulation).""" - -import enum - -import numpy as np -import gym - -from trifinger_simulation import TriFingerPlatform -from trifinger_simulation import visual_objects -from trifinger_simulation.tasks import move_cube_on_trajectory as mct - - -class Initializer: - """Base initializer for getting a trajectory.""" - - def get_trajectory(self) -> mct.Trajectory: - raise NotImplementedError() - - -class RandomInitializer(Initializer): - """Initializer that samples random trajectories.""" - - def get_trajectory(self): - """Get a random trajectory.""" - return mct.sample_goal() - - -class FixedInitializer(Initializer): - """Initializer that uses a fixed trajectory.""" - - def __init__(self, trajectory: mct.Trajectory): - """Initialize. - - Args: - trajectory: The goal trajectory. - - Raises: - Exception: If trajectory is not valid. See - :meth:`mct.validate_goal` for more information. - """ - mct.validate_goal(trajectory) - self.trajectory = trajectory - - def get_trajectory(self): - """Get the trajectory that was set in the constructor.""" - return self.trajectory - - -class ActionType(enum.Enum): - """Different action types that can be used to control the robot.""" - - #: Use pure torque commands. The action is a list of torques (one per - #: joint) in this case. - TORQUE = enum.auto() - #: Use joint position commands. The action is a list of angular joint - #: positions (one per joint) in this case. Internally a PD controller is - #: executed for each action to determine the torques that are applied to - #: the robot. - POSITION = enum.auto() - #: Use both torque and position commands. In this case the action is a - #: dictionary with keys "torque" and "position" which contain the - #: corresponding lists of values (see above). The torques resulting from - #: the position controller are added to the torques in the action before - #: applying them to the robot. - TORQUE_AND_POSITION = enum.auto() - - -class CubeTrajectoryEnv(gym.Env): - """Gym environment for moving cubes with simulated TriFingerPro.""" - - def __init__( - self, - initializer: Initializer, - action_type: ActionType = ActionType.POSITION, - step_size: int = 1, - visualization: bool = False, - ): - """Initialize. - - Args: - initializer: Initializer class for providing goal trajectories. - See :class:`RandomInitializer` and :class:`FixedInitializer`. - action_type (ActionType): Specify which type of actions to use. - See :class:`ActionType` for details. - step_size (int): Number of actual control steps to be performed in - one call of step(). - visualization (bool): If true, the pyBullet GUI is run for - visualization. - """ - # Basic initialization - # ==================== - - self.initializer = initializer - self.action_type = action_type - self.visualization = visualization - - if step_size < 1: - raise ValueError("step_size cannot be less than 1.") - self.step_size = step_size - - # will be initialized in reset() - self.platform = None - - # Create the action and observation spaces - # ======================================== - - spaces = TriFingerPlatform.spaces - - if self.action_type == ActionType.TORQUE: - self.action_space = spaces.robot_torque.gym - elif self.action_type == ActionType.POSITION: - self.action_space = spaces.robot_position.gym - elif self.action_type == ActionType.TORQUE_AND_POSITION: - self.action_space = gym.spaces.Dict( - { - "torque": spaces.robot_torque.gym, - "position": spaces.robot_position.gym, - } - ) - else: - raise ValueError("Invalid action_type") - - self.observation_space = gym.spaces.Dict( - { - "observation": gym.spaces.Dict( - { - "position": spaces.robot_position.gym, - "velocity": spaces.robot_velocity.gym, - "torque": spaces.robot_torque.gym, - } - ), - "desired_goal": spaces.object_position.gym, - "achieved_goal": spaces.object_position.gym, - } - ) - - def compute_reward( - self, - achieved_goal: mct.Position, - desired_goal: mct.Position, - info: dict, - ) -> float: - """Compute the reward for the given achieved and desired goal. - - Args: - achieved_goal: Current position of the object. - desired_goal: Goal trajectory of the object. - info: An info dictionary containing a field "time_index" which - contains the time index of the achieved_goal. - - Returns: - The reward that corresponds to the provided achieved goal w.r.t. to - the desired goal. Note that the following should always hold true:: - - ob, reward, done, info = env.step() - assert reward == env.compute_reward( - ob['achieved_goal'], - ob['desired_goal'], - info, - ) - """ - # This is just some sanity check to verify that the given desired_goal - # actually matches with the active goal in the trajectory. - active_goal = np.asarray( - mct.get_active_goal( - self.info["trajectory"], self.info["time_index"] - ) - ) - assert np.all(active_goal == desired_goal), "{}: {} != {}".format( - info["time_index"], active_goal, desired_goal - ) - - return -mct.evaluate_state( - info["trajectory"], info["time_index"], achieved_goal - ) - - def step(self, action): - """Run one timestep of the environment's dynamics. - - When end of episode is reached, you are responsible for calling - ``reset()`` to reset this environment's state. - - Args: - action: An action provided by the agent (depends on the selected - :class:`ActionType`). - - Returns: - tuple: - - - observation (dict): agent's observation of the current - environment. - - reward (float): amount of reward returned after previous action. - - done (bool): whether the episode has ended, in which case further - step() calls will return undefined results. - - info (dict): info dictionary containing the current time index. - """ - if self.platform is None: - raise RuntimeError("Call `reset()` before starting to step.") - - if not self.action_space.contains(action): - raise ValueError( - "Given action is not contained in the action space." - ) - - num_steps = self.step_size - - # ensure episode length is not exceeded due to step_size - step_count_after = self.step_count + num_steps - if step_count_after > mct.EPISODE_LENGTH: - excess = step_count_after - mct.EPISODE_LENGTH - num_steps = max(1, num_steps - excess) - - reward = 0.0 - for _ in range(num_steps): - self.step_count += 1 - if self.step_count > mct.EPISODE_LENGTH: - raise RuntimeError("Exceeded number of steps for one episode.") - - # send action to robot - robot_action = self._gym_action_to_robot_action(action) - t = self.platform.append_desired_action(robot_action) - - # update goal visualization - if self.visualization: - goal_position = mct.get_active_goal(self.info["trajectory"], t) - self.goal_marker.set_state(goal_position, (0, 0, 0, 1)) - - # Use observations of step t + 1 to follow what would be expected - # in a typical gym environment. Note that on the real robot, this - # will not be possible - self.info["time_index"] = t + 1 - - # Alternatively use the observation of step t. This is the - # observation from the moment before action_t is applied, i.e. the - # result of that action is not yet visible in this observation. - # - # When using this observation, the resulting cumulative reward - # should match exactly the one computed during replay (with the - # above it will differ slightly). - # - # self.info["time_index"] = t - - observation = self._create_observation(self.info["time_index"]) - - reward += self.compute_reward( - observation["achieved_goal"], - observation["desired_goal"], - self.info, - ) - - is_done = self.step_count >= mct.EPISODE_LENGTH - - return observation, reward, is_done, self.info - - def reset(self): - """Reset the environment.""" - # hard-reset simulation - del self.platform - - # initialize simulation - initial_robot_position = ( - TriFingerPlatform.spaces.robot_position.default - ) - # initialize cube at the centre - initial_object_pose = mct.move_cube.Pose( - position=mct.INITIAL_CUBE_POSITION - ) - - self.platform = TriFingerPlatform( - visualization=self.visualization, - initial_robot_position=initial_robot_position, - initial_object_pose=initial_object_pose, - ) - - # get goal trajectory from the initializer - trajectory = self.initializer.get_trajectory() - - # visualize the goal - if self.visualization: - self.goal_marker = visual_objects.CubeMarker( - width=mct.move_cube._CUBE_WIDTH, - position=trajectory[0][1], - orientation=(0, 0, 0, 1), - pybullet_client_id=self.platform.simfinger._pybullet_client_id, - ) - - self.info = {"time_index": -1, "trajectory": trajectory} - - self.step_count = 0 - - return self._create_observation(0) - - def seed(self, seed=None): - """Sets the seed for this env’s random number generator. - - .. note:: - - Spaces need to be seeded separately. E.g. if you want to sample - actions directly from the action space using - ``env.action_space.sample()`` you can set a seed there using - ``env.action_space.seed()``. - - Returns: - List of seeds used by this environment. This environment only uses - a single seed, so the list contains only one element. - """ - self.np_random, seed = gym.utils.seeding.np_random(seed) - mct.move_cube.random = self.np_random - return [seed] - - def _create_observation(self, t): - robot_observation = self.platform.get_robot_observation(t) - camera_observation = self.platform.get_camera_observation(t) - object_observation = camera_observation.filtered_object_pose - - active_goal = np.asarray( - mct.get_active_goal(self.info["trajectory"], t) - ) - - observation = { - "observation": { - "position": robot_observation.position, - "velocity": robot_observation.velocity, - "torque": robot_observation.torque, - }, - "desired_goal": active_goal, - "achieved_goal": object_observation.position, - } - - return observation - - def _gym_action_to_robot_action(self, gym_action): - # construct robot action depending on action type - if self.action_type == ActionType.TORQUE: - robot_action = self.platform.Action(torque=gym_action) - elif self.action_type == ActionType.POSITION: - robot_action = self.platform.Action(position=gym_action) - elif self.action_type == ActionType.TORQUE_AND_POSITION: - robot_action = self.platform.Action( - torque=gym_action["torque"], position=gym_action["position"] - ) - else: - raise ValueError("Invalid action_type") - - return robot_action diff --git a/trifinger_simulation/gym_wrapper/envs/trifinger_push.py b/trifinger_simulation/gym_wrapper/envs/trifinger_push.py deleted file mode 100644 index b60757b..0000000 --- a/trifinger_simulation/gym_wrapper/envs/trifinger_push.py +++ /dev/null @@ -1,270 +0,0 @@ -import numpy as np -import time - -import gym - -from trifinger_simulation.sim_finger import SimFinger -from trifinger_simulation.gym_wrapper.data_logger import DataLogger -from trifinger_simulation.gym_wrapper.finger_spaces import FingerSpaces -from trifinger_simulation.gym_wrapper import utils -from trifinger_simulation import ( - finger_types_data, - collision_objects, - visual_objects, - sample, -) - - -class TriFingerPush(gym.Env): - """A gym environment to enable training on any of the valid robots, - real or simulated, for the task of pushing. - """ - - def __init__( - self, - control_rate_s, - finger_type, - enable_visualization, - ): - """Intializes the constituents of the pushing environment. - - Constructor sets up the finger robot depending on the finger type, - sets up the observation and action spaces, smoothing for - reducing jitter on the robot, and provides for a way to synchronize - robots being trained independently. - - - Args: - control_rate_s (float): the rate (in seconds) at which step method of the env - will run. The actual robot controller may run at a higher rate, - so this is used to compute the number of robot control updates - per environment step. - finger_type (string): Name of the finger type. In order to get - a list of the valid finger types, call - :meth:`.finger_types_data.get_valid_finger_types` - enable_visualization (bool): if the simulation env is to be - visualized - """ - - #: an instance of the simulated robot depending on the desired - #: robot type - self.finger = SimFinger( - finger_type=finger_type, - enable_visualization=enable_visualization, - ) - - self.num_fingers = finger_types_data.get_number_of_fingers(finger_type) - - #: the number of times the same action is to be applied to - #: the robot in one step. - self.steps_per_control = int( - round(control_rate_s / self.finger.time_step_s) - ) - assert ( - abs( - control_rate_s - - self.steps_per_control * self.finger.time_step_s - ) - <= 0.000001 - ) - - #: the types of observations that should be a part of the environment's - #: observed state - self.observations_keys = [ - "joint_positions", - "joint_velocities", - "action_joint_positions", - "goal_position", - "object_position", - ] - #: the size of each of the observation type that is part of the - #: observation keys (in the same order) - self.observations_sizes = [ - 3 * self.num_fingers, - 3 * self.num_fingers, - 3 * self.num_fingers, - 3, - 3, - ] - - # sets up the observation and action spaces for the environment, - # unscaled spaces have the custom bounds set up for each observation - # or action type, whereas all the values in the observation and action - # spaces lie between 1 and -1 - self.spaces = FingerSpaces( - num_fingers=self.num_fingers, - observations_keys=self.observations_keys, - observations_sizes=self.observations_sizes, - separate_goals=False, - ) - - self.unscaled_observation_space = ( - self.spaces.get_unscaled_observation_space() - ) - self.unscaled_action_space = self.spaces.get_unscaled_action_space() - - self.observation_space = self.spaces.get_scaled_observation_space() - self.action_space = self.spaces.get_scaled_action_space() - - #: a logger to enable logging of observations if desired - self.logger = DataLogger() - - #: the object that has to be pushed - self.block = collision_objects.Block() - - #: a marker to visualize where the target goal position for the episode - #: is - self.goal_marker = visual_objects.Marker( - number_of_goals=1, - goal_size=0.0325, - initial_position=[0.19, 0.08, 0.0425], - ) - - self.reset() - - def _compute_reward(self, object_position, goal): - """ - The reward function of the environment - - Args: - observation (list): the observation at the - current step - goal (list): the desired goal for the episode - - Returns: - the reward, and the done signal - """ - done = False - dist = utils.compute_distance(object_position, goal) - reward = -dist - done = False - return reward, done - - def _get_state(self, observation, action, log_observation=False): - """ - Get the current observation from the env for the agent - - Args: - log_observation (bool): specify whether you want to - log the observation - - Returns: - observation (list): comprising of the observations corresponding - to the key values in the observation_keys - """ - joint_positions = observation.position - joint_velocities = observation.velocity - tip_positions = self.finger.kinematics.forward_kinematics( - joint_positions - ) - end_effector_position = np.concatenate(tip_positions) - flat_goals = np.concatenate([self.goal] * self.num_fingers) - - if self.num_fingers == 1: - flat_goals = self.goal - - end_effector_to_goal = list( - np.subtract(flat_goals, end_effector_position) - ) - - # populate this observation dict from which you can select which - # observation types to finally choose depending on the keys - # used for constructing the observation space of the environment - observation_dict = {} - observation_dict["joint_positions"] = joint_positions - observation_dict["joint_velocities"] = joint_velocities - observation_dict["end_effector_position"] = end_effector_position - observation_dict["end_effector_to_goal"] = end_effector_to_goal - observation_dict["goal_position"] = self.goal - observation_dict["object_position"], _ = self.block.get_state() - observation_dict["action_joint_positions"] = action - - # returns only the observations corresponding to the keys that were - # used for constructing the observation space - if log_observation: - self.logger.append( - observation_dict["joint_positions"], - observation_dict["end_effector_position"], - time.time(), - ) - - observation = [ - v - for key in self.spaces.observations_keys - for v in observation_dict[key] - ] - - return observation - - def step(self, action): - """ - The env step method - - Args: - action (list): the joint positions that have to be achieved - - Returns: - the observation scaled to lie between [-1;1], the reward, - the done signal, and info on if the agent was successful at - the current step - """ - # unscales the action to the ranges of the action space of the - # environment explicitly (as the prediction from the network - # lies in the range [-1;1]) - unscaled_action = utils.unscale(action, self.unscaled_action_space) - - # this is the control loop to send the actions for a few timesteps - # which depends on the actual control rate - finger_action = self.finger.Action(position=unscaled_action) - state = None - for _ in range(self.steps_per_control): - t = self.finger.append_desired_action(finger_action) - observation = self.finger.get_observation(t) - if state is None: - state = self._get_state(observation, unscaled_action, True) - - key_observation = state[self.spaces.key_to_index["object_position"]] - - reward, done = self._compute_reward(key_observation, self.goal) - info = {"is_success": np.float32(done)} - - scaled_observation = utils.scale( - state, self.unscaled_observation_space - ) - print("reward", reward) - - return scaled_observation, reward, done, info - - def reset(self): - """ - Episode reset - - Returns: - the scaled to [-1;1] observation from the env after the reset - """ - # resets the finger to a random position - action = sample.feasible_random_joint_positions_for_reaching( - self.finger, self.spaces.action_bounds - ) - observation = self.finger.reset_finger_positions_and_velocities(action) - - #: the episode target for the agent which is sampled randomly - #: for each episode - self.goal = sample.random_position_in_arena(height_limits=0.0425) - - #: the position from which the object is initialized at the - #: beginning of each episode - self.block_position = sample.random_position_in_arena( - height_limits=0.0425 - ) - - self.goal_marker.set_state([self.goal]) - self.block.set_state(self.block_position, [0, 0, 0, 1]) - - # logs relevant information for replayability - self.logger.new_episode(self.block_position, self.goal) - - return utils.scale( - self._get_state(observation, action, True), - self.unscaled_observation_space, - ) diff --git a/trifinger_simulation/gym_wrapper/envs/trifinger_reach.py b/trifinger_simulation/gym_wrapper/envs/trifinger_reach.py deleted file mode 100644 index d7f487b..0000000 --- a/trifinger_simulation/gym_wrapper/envs/trifinger_reach.py +++ /dev/null @@ -1,383 +0,0 @@ -import math -import numpy as np -import time -import datetime - -import gym - -from trifinger_simulation.sim_finger import SimFinger -from trifinger_simulation.gym_wrapper.data_logger import DataLogger -from trifinger_simulation.gym_wrapper.finger_spaces import FingerSpaces -from trifinger_simulation.gym_wrapper import utils -from trifinger_simulation import visual_objects, sample, finger_types_data - - -class TriFingerReach(gym.Env): - """ - A gym environment to enable training on either the single or - the tri-fingers robots for the task of reaching - """ - - def __init__( - self, - control_rate_s, - finger_type, - enable_visualization, - smoothing_params, - use_real_robot=False, - finger_config_suffix="0", - synchronize=False, - ): - """Intializes the constituents of the reaching environment. - - Constructor sets up the finger robot depending on the finger type, and - also whether an instance of the simulated or the real robot is to be - created. Also sets up the observation and action spaces, smoothing for - reducing jitter on the robot, and provides for a way to synchronize - robots being trained independently. - - Args: - control_rate_s (float): the rate (in seconds) at which step method of the env - will run. The actual robot controller may run at a higher rate, - so this is used to compute the number of robot control updates - per environment step. - finger_type (string): Name of the finger type. In order to get - a dictionary of the valid finger types, call - :meth:`.finger_types_data.get_valid_finger_types` - enable_visualization (bool): if the simulation env is to be - visualized - smoothing_params (dict): - num_episodes (int): the total number of episodes for which the - training is performed - start_after (float): the fraction of episodes after which the - smoothing of applied actions to the motors should start - final_alpha (float): smoothing coeff that will be reached at - the end of the smoothing - stop_after (float): the fraction of total episodes by which - final alpha is to be reached, after which the same final - alpha will be used for smoothing in the remainder of - the episodes - is_test (bool, optional): Include this for testing - use_real_robot (bool): if training is to be performed on - the real robot ([default] False) - finger_config_suffix: pass this if only one of - the three fingers is to be trained. Valid choices include - [0, 120, 240] ([default] 0) - synchronize (bool): Set this to True if you want to train - independently on three fingers in separate processes, but - have them synchronized. ([default] False) - """ - #: an instance of a simulated, or a real robot depending on - #: what is desired. - if use_real_robot: - from pybullet_fingers.real_finger import RealFinger - - self.finger = RealFinger( - finger_type=finger_type, - finger_config_suffix=finger_config_suffix, - enable_visualization=enable_visualization, - ) - - else: - self.finger = SimFinger( - finger_type=finger_type, - enable_visualization=enable_visualization, - ) - - self.num_fingers = finger_types_data.get_number_of_fingers(finger_type) - - #: the number of times the same action is to be applied to - #: the robot. - self.steps_per_control = int( - round(control_rate_s / self.finger.time_step_s) - ) - assert ( - abs( - control_rate_s - - self.steps_per_control * self.finger.time_step_s - ) - <= 0.000001 - ) - - #: the types of observations that should be a part of the environment's - #: observed state - self.observations_keys = [ - "joint_positions", - "joint_velocities", - "goal_position", - "action_joint_positions", - ] - - self.observations_sizes = [ - 3 * self.num_fingers, - 3 * self.num_fingers, - 3 * self.num_fingers, - 3 * self.num_fingers, - ] - - # sets up the observation and action spaces for the environment, - # unscaled spaces have the custom bounds set up for each observation - # or action type, whereas all the values in the observation and action - # spaces lie between 1 and -1 - self.spaces = FingerSpaces( - num_fingers=self.num_fingers, - observations_keys=self.observations_keys, - observations_sizes=self.observations_sizes, - separate_goals=True, - ) - - self.unscaled_observation_space = ( - self.spaces.get_unscaled_observation_space() - ) - self.unscaled_action_space = self.spaces.get_unscaled_action_space() - - self.observation_space = self.spaces.get_scaled_observation_space() - self.action_space = self.spaces.get_scaled_action_space() - - #: a logger to enable logging of observations if desired - self.logger = DataLogger() - - # sets up smooothing - if "is_test" in smoothing_params: - self.smoothing_start_episode = 0 - self.smoothing_alpha = smoothing_params["final_alpha"] - self.smoothing_increase_step = 0 - self.smoothing_stop_episode = math.inf - else: - self.smoothing_stop_episode = int( - smoothing_params["num_episodes"] - * smoothing_params["stop_after"] - ) - - self.smoothing_start_episode = int( - smoothing_params["num_episodes"] - * smoothing_params["start_after"] - ) - num_smoothing_increase_steps = ( - self.smoothing_stop_episode - self.smoothing_start_episode - ) - self.smoothing_alpha = 0 - self.smoothing_increase_step = ( - smoothing_params["final_alpha"] / num_smoothing_increase_steps - ) - - self.smoothed_action = None - self.episode_count = 0 - - #: a marker to visualize where the target goal position for the episode - #: is to which the tip link(s) of the robot should reach - self.enable_visualization = enable_visualization - if self.enable_visualization: - self.goal_marker = visual_objects.Marker( - number_of_goals=self.num_fingers - ) - - # set up synchronization if it's set to true - self.synchronize = synchronize - if synchronize: - now = datetime.datetime.now() - self.next_start_time = datetime.datetime( - now.year, now.month, now.day, now.hour, now.minute + 1 - ) - else: - self.next_start_time = None - - self.reset() - - def _compute_reward(self, observation, goal): - """ - The reward function of the environment - - Args: - observation (list): the observation at the - current step - goal (list): the desired goal for the episode - - Returns: - the reward, and the done signal - """ - joint_positions = observation[ - self.spaces.key_to_index["joint_positions"] - ] - - end_effector_positions = self.finger.kinematics.forward_kinematics( - np.array(joint_positions) - ) - - # TODO is matrix norm really always same as vector norm on flattend - # matrices? - distance_to_goal = utils.compute_distance(end_effector_positions, goal) - - reward = -distance_to_goal - done = False - - return reward * self.steps_per_control, done - - def _get_state(self, observation, action, log_observation=False): - """ - Get the current observation from the env for the agent - - Args: - log_observation (bool): specify whether you want to - log the observation - - Returns: - observation (list): comprising of the observations corresponding - to the key values in the observation_keys - """ - tip_positions = self.finger.kinematics.forward_kinematics( - observation.position - ) - end_effector_position = np.concatenate(tip_positions) - joint_positions = observation.position - joint_velocities = observation.velocity - flat_goals = np.concatenate(self.goal) - end_effector_to_goal = list( - np.subtract(flat_goals, end_effector_position) - ) - - # populate this observation dict from which you can select which - # observation types to finally choose depending on the keys - # used for constructing the observation space of the environment - observation_dict = {} - observation_dict["end_effector_position"] = end_effector_position - observation_dict["joint_positions"] = joint_positions - observation_dict["joint_velocities"] = joint_velocities - observation_dict["end_effector_to_goal"] = end_effector_to_goal - observation_dict["goal_position"] = flat_goals - observation_dict["action_joint_positions"] = action - - if log_observation: - self.logger.append( - joint_positions, end_effector_position, time.time() - ) - - # returns only the observations corresponding to the keys that were - # used for constructing the observation space - observation = [ - v for key in self.observations_keys for v in observation_dict[key] - ] - - return observation - - def step(self, action): - """ - The env step method - - Args: - action (list): the joint positions that have to be achieved - - Returns: - the observation scaled to lie between [-1;1], the reward, - the done signal, and info on if the agent was successful at - the current step - """ - # Unscale the action to the ranges of the action space of the - # environment, explicitly (as the prediction from the network - # lies in the range [-1;1]) - unscaled_action = utils.unscale(action, self.unscaled_action_space) - - # smooth the action by taking a weighted average with the previous - # action, where the weight, ie, the smoothing_alpha is gradually - # increased at every episode reset (see the reset method for details) - if self.smoothed_action is None: - # start with current position - # self.smoothed_action = self.finger.observation.position - self.smoothed_action = unscaled_action - - self.smoothed_action = ( - self.smoothing_alpha * self.smoothed_action - + (1 - self.smoothing_alpha) * unscaled_action - ) - - # this is the control loop to send the actions for a few timesteps - # which depends on the actual control rate - finger_action = self.finger.Action(position=self.smoothed_action) - state = None - for _ in range(self.steps_per_control): - t = self.finger.append_desired_action(finger_action) - observation = self.finger.get_observation(t) - # get observation from first iteration (when action is applied the - # first time) - if state is None: - state = self._get_state( - observation, self.smoothed_action, True - ) - if self.synchronize: - self.observation = observation - reward, done = self._compute_reward(state, self.goal) - info = {"is_success": np.float32(done)} - scaled_observation = utils.scale( - state, self.unscaled_observation_space - ) - return scaled_observation, reward, done, info - - def reset(self): - """ - Episode reset - - Returns: - the scaled to [-1;1] observation from the env after the reset - """ - # synchronize episode starts with wall time - # (freeze the robot at the current position before starting the sleep) - if self.next_start_time: - try: - t = self.finger.append_desired_action( - self.finger.Action(position=self.observation.position) - ) - self.finger.get_observation(t) - except Exception: - pass - - utils.sleep_until(self.next_start_time) - self.next_start_time += datetime.timedelta(seconds=4) - - # updates smoothing parameters - self.update_smoothing() - self.episode_count += 1 - self.smoothed_action = None - - # resets the finger to a random position - action = sample.feasible_random_joint_positions_for_reaching( - self.finger, self.spaces.action_bounds - ) - observation = self.finger.reset_finger_positions_and_velocities(action) - - # generates a random goal for the next episode - target_joint_config = np.asarray( - sample.feasible_random_joint_positions_for_reaching( - self.finger, self.spaces.action_bounds - ) - ) - self.goal = self.finger.kinematics.forward_kinematics( - target_joint_config - ) - - if self.enable_visualization: - self.goal_marker.set_state(self.goal) - - # logs relevant information for replayability - self.logger.new_episode(target_joint_config, self.goal) - - return utils.scale( - self._get_state(observation, action=action), - self.unscaled_observation_space, - ) - - def update_smoothing(self): - """ - Update the smoothing coefficient with which the action to be - applied is smoothed - """ - if ( - self.smoothing_start_episode - <= self.episode_count - < self.smoothing_stop_episode - ): - self.smoothing_alpha += self.smoothing_increase_step - print( - "episode: {}, smoothing: {}".format( - self.episode_count, self.smoothing_alpha - ) - ) diff --git a/trifinger_simulation/gym_wrapper/finger_spaces.py b/trifinger_simulation/gym_wrapper/finger_spaces.py index 585139a..3c300dd 100644 --- a/trifinger_simulation/gym_wrapper/finger_spaces.py +++ b/trifinger_simulation/gym_wrapper/finger_spaces.py @@ -1,7 +1,7 @@ import math import numpy as np -from gym import spaces +from gymnasium import spaces class FingerSpaces: diff --git a/trifinger_simulation/tasks/move_cube/evaluate_policy.py b/trifinger_simulation/tasks/move_cube/evaluate_policy.py index ec6f430..5a4136c 100644 --- a/trifinger_simulation/tasks/move_cube/evaluate_policy.py +++ b/trifinger_simulation/tasks/move_cube/evaluate_policy.py @@ -24,7 +24,7 @@ """ import sys -import gym +import gymnasium as gym from trifinger_simulation.gym_wrapper.envs import cube_env from trifinger_simulation.tasks import move_cube @@ -81,11 +81,12 @@ def main(): # environment, this is the case when looping until is_done == True. Make # sure to adjust this in case your custom environment behaves differently! is_done = False - observation = env.reset() + observation, info = env.reset() accumulated_reward = 0 while not is_done: action = policy.predict(observation) - observation, reward, is_done, info = env.step(action) + observation, reward, terminated, truncated, info = env.step(action) + is_done = terminated or truncated accumulated_reward += reward print("Accumulated reward: {}".format(accumulated_reward)) diff --git a/trifinger_simulation/tasks/move_cube_on_trajectory/evaluate_policy.py b/trifinger_simulation/tasks/move_cube_on_trajectory/evaluate_policy.py index fe0f182..cc7e0e0 100644 --- a/trifinger_simulation/tasks/move_cube_on_trajectory/evaluate_policy.py +++ b/trifinger_simulation/tasks/move_cube_on_trajectory/evaluate_policy.py @@ -23,7 +23,7 @@ import argparse import json -import gym +import gymnasium as gym from trifinger_simulation.gym_wrapper.envs import cube_trajectory_env @@ -75,11 +75,12 @@ def main(): # environment, this is the case when looping until is_done == True. Make # sure to adjust this in case your custom environment behaves differently! is_done = False - observation = env.reset() + observation, info = env.reset() accumulated_reward = 0 while not is_done: action = policy.predict(observation) - observation, reward, is_done, info = env.step(action) + observation, reward, terminated, truncated, info = env.step(action) + is_done = terminated or truncated accumulated_reward += reward print("Accumulated reward: {}".format(accumulated_reward)) diff --git a/trifinger_simulation/trifinger_platform.py b/trifinger_simulation/trifinger_platform.py index 1354745..e26082b 100755 --- a/trifinger_simulation/trifinger_platform.py +++ b/trifinger_simulation/trifinger_platform.py @@ -2,7 +2,7 @@ import enum import pickle import numpy as np -import gym +import gymnasium as gym from types import SimpleNamespace import typing