Skip to content

On-Policy Reinforcement Learning

On-policy algorithms learn from data collected by the current policy, providing stable and reliable training for robotics.

Overview

On-policy means the agent learns from experiences collected by the current policy: - Collect data with policy \(\pi_\theta\) - Update \(\theta\) to improve policy - Discard old data, collect new data with \(\pi_{\theta'}\)

Key algorithms: - PPO (Proximal Policy Optimization) - Most popular - TRPO (Trust Region Policy Optimization) - Theoretically grounded - A3C/A2C (Actor-Critic) - Parallelizable

Proximal Policy Optimization (PPO)

PPO is the most widely used RL algorithm in robotics due to its simplicity, stability, and performance.

Paper: Schulman et al., "Proximal Policy Optimization Algorithms", 2017

Mathematical Foundation

PPO optimizes a clipped surrogate objective:

\[ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \hat{A}_t) \right] \]

Where: - \(r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}\) (probability ratio) - \(\hat{A}_t\) = advantage estimate - \(\epsilon\) = clip parameter (typically 0.2)

Intuition: Prevent large policy updates by clipping the ratio.

Complete Implementation

import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import numpy as np

class PPO:
    """
    Proximal Policy Optimization for continuous control

    Reference: Schulman et al., 2017
    """
    def __init__(
        self,
        state_dim,
        action_dim,
        hidden_dim=256,
        lr=3e-4,
        gamma=0.99,
        gae_lambda=0.95,
        clip_epsilon=0.2,
        value_coef=0.5,
        entropy_coef=0.01,
        max_grad_norm=0.5,
        ppo_epochs=10,
        mini_batch_size=64
    ):
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        self.max_grad_norm = max_grad_norm
        self.ppo_epochs = ppo_epochs
        self.mini_batch_size = mini_batch_size

        # Actor network (policy)
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
        )
        self.actor_mean = nn.Linear(hidden_dim, action_dim)
        self.actor_logstd = nn.Parameter(torch.zeros(action_dim))

        # Critic network (value function)
        self.critic = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )

        # Optimizer
        self.optimizer = optim.Adam(
            list(self.actor.parameters()) +
            list(self.actor_mean.parameters()) +
            [self.actor_logstd] +
            list(self.critic.parameters()),
            lr=lr
        )

    def get_action(self, state, deterministic=False):
        """Sample action from policy"""
        state = torch.FloatTensor(state)

        # Actor forward pass
        features = self.actor(state)
        mean = self.actor_mean(features)
        std = torch.exp(self.actor_logstd)

        if deterministic:
            return mean.detach().numpy()

        # Sample from Gaussian policy
        dist = Normal(mean, std)
        action = dist.sample()

        return action.detach().numpy()

    def evaluate_actions(self, states, actions):
        """Evaluate log prob and entropy of actions"""
        features = self.actor(states)
        mean = self.actor_mean(features)
        std = torch.exp(self.actor_logstd)

        dist = Normal(mean, std)
        log_probs = dist.log_prob(actions).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)

        return log_probs, entropy

    def compute_gae(self, rewards, values, dones, next_value):
        """
        Generalized Advantage Estimation (GAE)

        Paper: Schulman et al., "High-Dimensional Continuous Control Using
               Generalized Advantage Estimation", 2016

        GAE(λ) provides a trade-off between bias and variance:
        λ=0: high bias, low variance (TD)
        λ=1: low bias, high variance (Monte Carlo)
        """
        advantages = torch.zeros_like(rewards)
        last_gae = 0

        values = torch.cat([values, next_value.unsqueeze(0)])

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value_t = next_value
            else:
                next_value_t = values[t + 1]

            # TD error
            delta = rewards[t] + self.gamma * next_value_t * (1 - dones[t]) - values[t]

            # GAE
            advantages[t] = last_gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * last_gae

        returns = advantages + values[:-1]

        return advantages, returns

    def update(self, rollout_buffer):
        """
        PPO update using clipped objective

        Performs multiple epochs of minibatch updates
        """
        states = torch.FloatTensor(rollout_buffer['states'])
        actions = torch.FloatTensor(rollout_buffer['actions'])
        old_log_probs = torch.FloatTensor(rollout_buffer['log_probs'])
        rewards = torch.FloatTensor(rollout_buffer['rewards'])
        dones = torch.FloatTensor(rollout_buffer['dones'])
        values = torch.FloatTensor(rollout_buffer['values'])
        next_value = torch.FloatTensor([rollout_buffer['next_value']])

        # Compute advantages and returns
        advantages, returns = self.compute_gae(rewards, values, dones, next_value)

        # Normalize advantages (important for stability)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # PPO epochs
        for epoch in range(self.ppo_epochs):
            # Generate random minibatches
            indices = np.random.permutation(len(states))

            for start in range(0, len(states), self.mini_batch_size):
                end = start + self.mini_batch_size
                batch_indices = indices[start:end]

                batch_states = states[batch_indices]
                batch_actions = actions[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_advantages = advantages[batch_indices]
                batch_returns = returns[batch_indices]

                # Evaluate current policy
                log_probs, entropy = self.evaluate_actions(batch_states, batch_actions)
                values_pred = self.critic(batch_states).squeeze(-1)

                # Compute ratio
                ratio = torch.exp(log_probs - batch_old_log_probs)

                # Clipped surrogate objective
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
                actor_loss = -torch.min(surr1, surr2).mean()

                # Value loss (MSE)
                value_loss = nn.MSELoss()(values_pred, batch_returns)

                # Entropy bonus (encourages exploration)
                entropy_loss = -entropy.mean()

                # Total loss
                loss = actor_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss

                # Optimization step
                self.optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(
                    list(self.actor.parameters()) +
                    list(self.critic.parameters()),
                    self.max_grad_norm
                )
                self.optimizer.step()

        return {
            'actor_loss': actor_loss.item(),
            'value_loss': value_loss.item(),
            'entropy': -entropy_loss.item()
        }


class RolloutBuffer:
    """Buffer for collecting on-policy rollouts"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.states = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.values = []
        self.log_probs = []
        self.next_value = None

    def add(self, state, action, reward, done, value, log_prob):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)
        self.dones.append(done)
        self.values.append(value)
        self.log_probs.append(log_prob)

    def get(self):
        return {
            'states': np.array(self.states),
            'actions': np.array(self.actions),
            'rewards': np.array(self.rewards),
            'dones': np.array(self.dones),
            'values': np.array(self.values),
            'log_probs': np.array(self.log_probs),
            'next_value': self.next_value
        }


def train_ppo(env, num_timesteps=1_000_000, rollout_length=2048):
    """Training loop for PPO"""

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]

    ppo = PPO(state_dim, action_dim)
    buffer = RolloutBuffer()

    state = env.reset()
    episode_reward = 0
    episode_length = 0

    for timestep in range(num_timesteps):
        # Collect rollout
        if timestep % rollout_length == 0 and timestep > 0:
            # Compute next value for GAE
            with torch.no_grad():
                buffer.next_value = ppo.critic(torch.FloatTensor(state)).item()

            # Update policy
            metrics = ppo.update(buffer.get())
            buffer.reset()

            # Logging
            if timestep % 10000 == 0:
                print(f"Timestep {timestep}:")
                print(f"  Actor Loss: {metrics['actor_loss']:.4f}")
                print(f"  Value Loss: {metrics['value_loss']:.4f}")
                print(f"  Entropy: {metrics['entropy']:.4f}")

        # Get action
        action = ppo.get_action(state)

        # Evaluate for PPO
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state)
            features = ppo.actor(state_tensor)
            mean = ppo.actor_mean(features)
            std = torch.exp(ppo.actor_logstd)
            dist = Normal(mean, std)
            action_tensor = torch.FloatTensor(action)
            log_prob = dist.log_prob(action_tensor).sum().item()
            value = ppo.critic(state_tensor).item()

        # Environment step
        next_state, reward, done, info = env.step(action)

        # Store transition
        buffer.add(state, action, reward, done, value, log_prob)

        episode_reward += reward
        episode_length += 1

        if done:
            print(f"Episode: Reward={episode_reward:.2f}, Length={episode_length}")
            state = env.reset()
            episode_reward = 0
            episode_length = 0
        else:
            state = next_state

    return ppo

Hyperparameter Tuning

Critical hyperparameters:

Parameter Typical Range Robotics Default Notes
learning_rate 1e-5 to 1e-3 3e-4 Lower for fine control
clip_epsilon 0.1 to 0.3 0.2 Larger = more aggressive updates
gamma 0.95 to 0.999 0.99 Higher for long-horizon tasks
gae_lambda 0.9 to 0.98 0.95 Trade-off bias/variance
ppo_epochs 3 to 15 10 More epochs = better optimization
mini_batch_size 32 to 128 64 Depends on rollout size

Tuning tips:

# For manipulation tasks (short episodes)
ppo_manipulation = PPO(
    lr=3e-4,
    gamma=0.99,
    gae_lambda=0.95,
    clip_epsilon=0.2,
    ppo_epochs=10
)

# For locomotion tasks (long episodes)
ppo_locomotion = PPO(
    lr=1e-4,
    gamma=0.995,
    gae_lambda=0.97,
    clip_epsilon=0.2,
    ppo_epochs=15
)

# For highly stochastic environments
ppo_stochastic = PPO(
    lr=3e-4,
    gamma=0.99,
    gae_lambda=0.9,  # Lower for high variance
    clip_epsilon=0.3,  # Larger clips
    entropy_coef=0.05  # More exploration
)

Trust Region Policy Optimization (TRPO)

TRPO guarantees monotonic policy improvement using a trust region constraint.

Paper: Schulman et al., "Trust Region Policy Optimization", ICML 2015

Mathematical Foundation

TRPO maximizes:

\[ \max_\theta \mathbb{E}_t \left[ \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} \hat{A}_t \right] \]

Subject to: $$ \mathbb{E}t [KL(\pi(\cdot|s_t) || \pi_\theta(\cdot|s_t))] \leq \delta $$}

Advantage over PPO: Theoretically guaranteed improvement Disadvantage: More complex, requires conjugate gradient

Key Difference from PPO

Feature PPO TRPO
Constraint Clip ratio KL divergence
Implementation Simple Complex (CG)
Performance Similar Similar
Computation Fast Slower

Recommendation: Use PPO for most robotics applications.

Actor-Critic (A2C/A3C)

Simpler on-policy algorithm using parallel workers.

class A2C:
    """Advantage Actor-Critic (synchronous)"""
    def __init__(self, state_dim, action_dim):
        # Actor and critic networks (shared backbone)
        self.shared = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU()
        )

        self.actor = nn.Linear(256, action_dim)
        self.critic = nn.Linear(256, 1)

        self.optimizer = optim.Adam(self.parameters(), lr=1e-3)

    def forward(self, state):
        features = self.shared(state)
        action_logits = self.actor(features)
        value = self.critic(features)
        return action_logits, value

    def update(self, states, actions, rewards, next_states, dones):
        """A2C update"""
        # Compute advantages
        _, values = self.forward(states)
        _, next_values = self.forward(next_states)

        advantages = rewards + 0.99 * next_values * (1 - dones) - values

        # Actor loss (policy gradient)
        actor_loss = -(advantages.detach() * log_probs).mean()

        # Critic loss
        critic_loss = advantages.pow(2).mean()

        # Update
        loss = actor_loss + 0.5 * critic_loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

Practical Tips

When to Use On-Policy

Use when: - You need stable, reliable training - Sample efficiency is not critical (can collect lots of data) - Working in simulation - Policy stability matters (safety)

Avoid when: - Sample collection is expensive (real robot) - Need maximum sample efficiency - Environment is very high-dimensional

Common Issues

Problem: Training unstable - Solution: Reduce learning rate, increase clip_epsilon

Problem: No improvement - Solution: Check reward scaling, increase entropy coefficient

Problem: Catastrophic forgetting - Solution: Reduce PPO epochs, smaller mini-batches

References

Papers

  1. PPO: Schulman et al., "Proximal Policy Optimization Algorithms", arXiv 2017
  2. TRPO: Schulman et al., "Trust Region Policy Optimization", ICML 2015
  3. GAE: Schulman et al., "High-Dimensional Continuous Control Using Generalized Advantage Estimation", ICLR 2016

Books

  1. Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition, 2018
  2. Bertsekas, "Reinforcement Learning and Optimal Control", 2019

Code Implementations

  • Stable-Baselines3: https://github.com/DLR-RM/stable-baselines3
  • CleanRL: https://github.com/vwxyzjn/cleanrl
  • SpinningUp: https://spinningup.openai.com/

Tutorials

  • OpenAI Spinning Up: https://spinningup.openai.com/en/latest/
  • Lil'Log PPO Tutorial: https://lilianweng.github.io/posts/2018-04-08-policy-gradient/

Next Steps

Framework Guides