Training RL Policies¶
Practical guide to training reinforcement learning policies for robotics.
Training Setup¶
Environment Configuration¶
import gymnasium as gym
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
# Single environment
env = gym.make('FetchReach-v2')
# Vectorized environments for parallel training
env = make_vec_env('FetchReach-v2', n_envs=8)
# Custom environment wrapper
class RobotEnvWrapper(gym.Wrapper):
def __init__(self, env):
super().__init__(env)
# Add custom observation/action processing
def step(self, action):
# Process action
obs, reward, done, truncated, info = self.env.step(action)
# Process observation
return obs, reward, done, truncated, info
Training Script¶
from stable_baselines3 import PPO, SAC
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
# Callbacks
checkpoint_callback = CheckpointCallback(
save_freq=10000,
save_path='./models/',
name_prefix='rl_model'
)
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./best_model/',
log_path='./logs/',
eval_freq=5000,
deterministic=True
)
# Create model
model = PPO(
"MlpPolicy",
env,
verbose=1,
tensorboard_log="./tensorboard/",
learning_rate=3e-4,
n_steps=2048,
batch_size=64
)
# Train
model.learn(
total_timesteps=1_000_000,
callback=[checkpoint_callback, eval_callback]
)
# Save
model.save("final_model")
Reward Shaping¶
Design Principles¶
def compute_reward(state, action, next_state, goal):
# 1. Distance-based reward
dist_reward = -np.linalg.norm(next_state['achieved_goal'] - goal)
# 2. Progress reward
old_dist = np.linalg.norm(state['achieved_goal'] - goal)
new_dist = np.linalg.norm(next_state['achieved_goal'] - goal)
progress_reward = old_dist - new_dist
# 3. Success bonus
success = np.linalg.norm(next_state['achieved_goal'] - goal) < threshold
success_reward = 100.0 if success else 0.0
# 4. Action penalty (encourage smoothness)
action_penalty = -0.01 * np.sum(action**2)
# Combine
reward = (
0.1 * dist_reward +
1.0 * progress_reward +
success_reward +
action_penalty
)
return reward
Curriculum Learning¶
class CurriculumEnv(gym.Env):
def __init__(self):
self.difficulty = 0.0
self.success_rate = deque(maxlen=100)
def step(self, action):
obs, reward, done, truncated, info = super().step(action)
# Track success
if done:
self.success_rate.append(float(info.get('is_success', 0)))
# Increase difficulty if doing well
if len(self.success_rate) == 100:
if np.mean(self.success_rate) > 0.8:
self.difficulty = min(1.0, self.difficulty + 0.1)
elif np.mean(self.success_rate) < 0.3:
self.difficulty = max(0.0, self.difficulty - 0.1)
return obs, reward, done, truncated, info
def reset(self):
# Adjust task difficulty
self._set_difficulty(self.difficulty)
return super().reset()
Distributed Training¶
Multi-GPU Training¶
import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPOConfig
# Initialize Ray
ray.init()
# Configure distributed training
config = (
PPOConfig()
.environment("FetchReach-v2")
.framework("torch")
.resources(num_gpus=4)
.rollouts(
num_rollout_workers=32,
num_envs_per_worker=4
)
.training(
train_batch_size=8192,
sgd_minibatch_size=512,
num_sgd_iter=10
)
)
# Train
tuner = tune.Tuner(
"PPO",
param_space=config.to_dict(),
run_config=ray.air.RunConfig(
stop={"timesteps_total": 10_000_000},
checkpoint_config=ray.air.CheckpointConfig(
checkpoint_frequency=10
)
)
)
results = tuner.fit()
Monitoring Training¶
TensorBoard Logging¶
# Automatic logging with SB3
model = PPO("MlpPolicy", env, tensorboard_log="./tb_logs/")
model.learn(total_timesteps=1_000_000)
# View logs
# tensorboard --logdir=./tb_logs/
Custom Metrics¶
from stable_baselines3.common.callbacks import BaseCallback
class MetricsCallback(BaseCallback):
def _on_step(self) -> bool:
# Log custom metrics
if len(self.model.ep_info_buffer) > 0:
# Success rate
successes = [ep['is_success'] for ep in self.model.ep_info_buffer
if 'is_success' in ep]
if successes:
self.logger.record('rollout/success_rate', np.mean(successes))
# Episode length
ep_lens = [ep['l'] for ep in self.model.ep_info_buffer]
self.logger.record('rollout/ep_len_mean', np.mean(ep_lens))
return True
Hyperparameter Optimization¶
Optuna Integration¶
import optuna
from optuna.pruners import MedianPruner
def optimize_ppo(trial):
# Suggest hyperparameters
learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True)
n_steps = trial.suggest_categorical('n_steps', [512, 1024, 2048])
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])
ent_coef = trial.suggest_float('ent_coef', 0.0, 0.1)
# Create model
model = PPO(
"MlpPolicy",
env,
learning_rate=learning_rate,
n_steps=n_steps,
batch_size=batch_size,
ent_coef=ent_coef,
verbose=0
)
# Train and evaluate
model.learn(total_timesteps=100_000)
mean_reward, _ = evaluate_policy(model, eval_env, n_eval_episodes=10)
return mean_reward
# Run optimization
study = optuna.create_study(
direction='maximize',
pruner=MedianPruner()
)
study.optimize(optimize_ppo, n_trials=50)
print(f"Best hyperparameters: {study.best_params}")
Troubleshooting¶
| Issue | Symptoms | Solutions |
|---|---|---|
| Not learning | Flat reward curve | Check reward function, increase learning rate |
| Unstable | Reward oscillates | Lower learning rate, smaller batch size |
| Overfitting | Good in train, bad in test | More diverse environments, regularization |
| Slow | Training takes forever | Parallelize, use GPU, simpler network |
Best Practices¶
- Start simple: Test with simple environment first
- Monitor everything: Log rewards, success rates, losses
- Iterate on rewards: Reward shaping is critical
- Use curriculum: Start easy, increase difficulty
- Parallelize: Use vectorized environments
- Save frequently: Checkpoint every N steps
- Evaluate separately: Use separate eval environment
Next Steps¶
- Evaluation - Evaluate trained policies
- Algorithms - Try different algorithms
- Simulators - Train in simulation