Skip to content

Training RL Policies

Practical guide to training reinforcement learning policies for robotics.

Training Setup

Environment Configuration

import gymnasium as gym
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env

# Single environment
env = gym.make('FetchReach-v2')

# Vectorized environments for parallel training
env = make_vec_env('FetchReach-v2', n_envs=8)

# Custom environment wrapper
class RobotEnvWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        # Add custom observation/action processing

    def step(self, action):
        # Process action
        obs, reward, done, truncated, info = self.env.step(action)
        # Process observation
        return obs, reward, done, truncated, info

Training Script

from stable_baselines3 import PPO, SAC
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback

# Callbacks
checkpoint_callback = CheckpointCallback(
    save_freq=10000,
    save_path='./models/',
    name_prefix='rl_model'
)

eval_callback = EvalCallback(
    eval_env,
    best_model_save_path='./best_model/',
    log_path='./logs/',
    eval_freq=5000,
    deterministic=True
)

# Create model
model = PPO(
    "MlpPolicy",
    env,
    verbose=1,
    tensorboard_log="./tensorboard/",
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64
)

# Train
model.learn(
    total_timesteps=1_000_000,
    callback=[checkpoint_callback, eval_callback]
)

# Save
model.save("final_model")

Reward Shaping

Design Principles

def compute_reward(state, action, next_state, goal):
    # 1. Distance-based reward
    dist_reward = -np.linalg.norm(next_state['achieved_goal'] - goal)

    # 2. Progress reward
    old_dist = np.linalg.norm(state['achieved_goal'] - goal)
    new_dist = np.linalg.norm(next_state['achieved_goal'] - goal)
    progress_reward = old_dist - new_dist

    # 3. Success bonus
    success = np.linalg.norm(next_state['achieved_goal'] - goal) < threshold
    success_reward = 100.0 if success else 0.0

    # 4. Action penalty (encourage smoothness)
    action_penalty = -0.01 * np.sum(action**2)

    # Combine
    reward = (
        0.1 * dist_reward +
        1.0 * progress_reward +
        success_reward +
        action_penalty
    )

    return reward

Curriculum Learning

class CurriculumEnv(gym.Env):
    def __init__(self):
        self.difficulty = 0.0
        self.success_rate = deque(maxlen=100)

    def step(self, action):
        obs, reward, done, truncated, info = super().step(action)

        # Track success
        if done:
            self.success_rate.append(float(info.get('is_success', 0)))

            # Increase difficulty if doing well
            if len(self.success_rate) == 100:
                if np.mean(self.success_rate) > 0.8:
                    self.difficulty = min(1.0, self.difficulty + 0.1)
                elif np.mean(self.success_rate) < 0.3:
                    self.difficulty = max(0.0, self.difficulty - 0.1)

        return obs, reward, done, truncated, info

    def reset(self):
        # Adjust task difficulty
        self._set_difficulty(self.difficulty)
        return super().reset()

Distributed Training

Multi-GPU Training

import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPOConfig

# Initialize Ray
ray.init()

# Configure distributed training
config = (
    PPOConfig()
    .environment("FetchReach-v2")
    .framework("torch")
    .resources(num_gpus=4)
    .rollouts(
        num_rollout_workers=32,
        num_envs_per_worker=4
    )
    .training(
        train_batch_size=8192,
        sgd_minibatch_size=512,
        num_sgd_iter=10
    )
)

# Train
tuner = tune.Tuner(
    "PPO",
    param_space=config.to_dict(),
    run_config=ray.air.RunConfig(
        stop={"timesteps_total": 10_000_000},
        checkpoint_config=ray.air.CheckpointConfig(
            checkpoint_frequency=10
        )
    )
)

results = tuner.fit()

Monitoring Training

TensorBoard Logging

# Automatic logging with SB3
model = PPO("MlpPolicy", env, tensorboard_log="./tb_logs/")
model.learn(total_timesteps=1_000_000)

# View logs
# tensorboard --logdir=./tb_logs/

Custom Metrics

from stable_baselines3.common.callbacks import BaseCallback

class MetricsCallback(BaseCallback):
    def _on_step(self) -> bool:
        # Log custom metrics
        if len(self.model.ep_info_buffer) > 0:
            # Success rate
            successes = [ep['is_success'] for ep in self.model.ep_info_buffer
                        if 'is_success' in ep]
            if successes:
                self.logger.record('rollout/success_rate', np.mean(successes))

            # Episode length
            ep_lens = [ep['l'] for ep in self.model.ep_info_buffer]
            self.logger.record('rollout/ep_len_mean', np.mean(ep_lens))

        return True

Hyperparameter Optimization

Optuna Integration

import optuna
from optuna.pruners import MedianPruner

def optimize_ppo(trial):
    # Suggest hyperparameters
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True)
    n_steps = trial.suggest_categorical('n_steps', [512, 1024, 2048])
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])
    ent_coef = trial.suggest_float('ent_coef', 0.0, 0.1)

    # Create model
    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=learning_rate,
        n_steps=n_steps,
        batch_size=batch_size,
        ent_coef=ent_coef,
        verbose=0
    )

    # Train and evaluate
    model.learn(total_timesteps=100_000)
    mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=10)

    return mean_reward

# Run optimization
study = optuna.create_study(
    direction='maximize',
    pruner=MedianPruner()
)
study.optimize(optimize_ppo, n_trials=50)

print(f"Best hyperparameters: {study.best_params}")

Troubleshooting

Issue Symptoms Solutions
Not learning Flat reward curve Check reward function, increase learning rate
Unstable Reward oscillates Lower learning rate, smaller batch size
Overfitting Good in train, bad in test More diverse environments, regularization
Slow Training takes forever Parallelize, use GPU, simpler network

Best Practices

  1. Start simple: Test with simple environment first
  2. Monitor everything: Log rewards, success rates, losses
  3. Iterate on rewards: Reward shaping is critical
  4. Use curriculum: Start easy, increase difficulty
  5. Parallelize: Use vectorized environments
  6. Save frequently: Checkpoint every N steps
  7. Evaluate separately: Use separate eval environment

Next Steps