Inverse Reinforcement Learning¶
Inverse Reinforcement Learning (IRL) learns the reward function that explains expert behavior, enabling generalization beyond demonstrations.
Overview¶
Inverse RL inverts the reinforcement learning problem:
- Standard RL: Given reward → Learn policy
- Inverse RL: Given policy (expert demos) → Learn reward
Key insight: Expert behavior reveals their underlying objectives
Advantages: - ✓ Learn transferable reward function - ✓ Generalize to new states/tasks - ✓ Understand expert intentions - ✓ Enable reward shaping
Challenges: - ✗Reward ambiguity (many rewards explain same behavior) - ✗Computationally expensive (requires RL in inner loop) - ✗Assumes expert is optimal - ✗Requires environment dynamics
Key algorithms: - MaxEnt IRL (Maximum Entropy IRL) - GAIL (Generative Adversarial Imitation Learning) - AIRL (Adversarial Inverse RL) - IQ-Learn (Implicit Q-Learning)
Mathematical Foundation¶
Problem Formulation¶
Given: - MDP without reward: \((S, A, P, \gamma)\) - Expert demonstrations: \(\tau^* = \{(s_0, a_0), (s_1, a_1), ...\}\)
Find reward function \(r(s, a)\) such that expert policy \(\pi^*\) is optimal.
Reward Ambiguity¶
Problem: Infinitely many rewards explain the same policy!
Examples: - \(r(s, a) = 0\) everywhere (degenerate) - \(r(s, a) = c\) (constant) for all expert actions - \(r'(s, a) = \alpha \cdot r(s, a)\) (scaling)
Solution approaches: 1. Maximum margin: Expert better than any other policy by margin 2. Maximum entropy: Among all explaining rewards, prefer max entropy policy 3. Apprenticeship learning: Match feature expectations
Maximum Entropy IRL¶
Key idea: Model expert as Boltzmann-rational (follows max-ent policy):
Objective: Maximize likelihood of expert demonstrations:
Where \(P(\tau|r) \propto \exp(r(\tau))\) and \(\pi_r^*\) is the max-ent optimal policy for reward \(r\).
Gradient: $$ \nabla_r \mathcal{L} = \mathbb{E}{\tau \sim \text{expert}} \left[ \nabla_r r(\tau) \right] - \mathbb{E} \left[ \nabla_r r(\tau) \right] $$
Intuition: Increase reward on expert trajectories, decrease on learned policy trajectories.
Complete Implementation¶
MaxEnt IRL¶
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class MaxEntIRL:
"""
Maximum Entropy Inverse Reinforcement Learning
Reference: Ziebart et al., AAAI 2008
"""
def __init__(
self,
state_dim,
action_dim,
reward_hidden_dims=[256, 256],
rl_algorithm='SAC', # or 'PPO'
lr=1e-3,
gamma=0.99,
device='cuda'
):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.device = device
# Reward network (parameterized reward function)
self.reward_net = self._build_reward_network(
state_dim, action_dim, reward_hidden_dims
).to(device)
self.reward_optimizer = optim.Adam(self.reward_net.parameters(), lr=lr)
# RL algorithm for learning policy from current reward
if rl_algorithm == 'SAC':
from stable_baselines3 import SAC
self.rl_algorithm_class = SAC
elif rl_algorithm == 'PPO':
from stable_baselines3 import PPO
self.rl_algorithm_class = PPO
else:
raise ValueError(f"Unknown RL algorithm: {rl_algorithm}")
def _build_reward_network(self, state_dim, action_dim, hidden_dims):
"""Build reward network r(s, a)"""
layers = []
input_dim = state_dim + action_dim
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
prev_dim = hidden_dim
# Output single scalar reward
layers.append(nn.Linear(prev_dim, 1))
return nn.Sequential(*layers)
def get_reward(self, state, action):
"""Compute reward for (state, action) pair"""
state_tensor = torch.FloatTensor(state).to(self.device)
action_tensor = torch.FloatTensor(action).to(self.device)
x = torch.cat([state_tensor, action_tensor], dim=-1)
reward = self.reward_net(x)
return reward
def train(
self,
env,
expert_trajectories,
num_iterations=100,
rl_steps_per_iteration=10000
):
"""
Train IRL
Args:
env: Gym environment (with reward function wrapped)
expert_trajectories: List of expert trajectories
num_iterations: Number of IRL iterations
rl_steps_per_iteration: RL training steps per iteration
"""
print("Starting MaxEnt IRL training...")
for iteration in range(num_iterations):
print(f"\n=== IRL Iteration {iteration + 1}/{num_iterations} ===")
# Step 1: Train policy with current reward estimate
print("Training policy with current reward...")
# Wrap environment to use learned reward
wrapped_env = RewardWrapper(env, self.reward_net, self.device)
# Train RL agent
policy = self.rl_algorithm_class(
"MlpPolicy",
wrapped_env,
verbose=0
)
policy.learn(total_timesteps=rl_steps_per_iteration)
# Step 2: Collect trajectories from learned policy
print("Collecting trajectories from learned policy...")
learned_trajectories = self._collect_trajectories(
wrapped_env, policy, num_trajectories=len(expert_trajectories)
)
# Step 3: Update reward function
print("Updating reward function...")
reward_loss = self._update_reward(
expert_trajectories, learned_trajectories
)
print(f"Reward loss: {reward_loss:.6f}")
# Step 4: Evaluate
if iteration % 10 == 0:
eval_reward = self._evaluate_policy(env, policy, num_episodes=10)
print(f"Evaluation reward: {eval_reward:.2f}")
return self.reward_net, policy
def _collect_trajectories(self, env, policy, num_trajectories=10):
"""Collect trajectories from policy"""
trajectories = []
for _ in range(num_trajectories):
trajectory = []
obs = env.reset()
done = False
while not done:
action, _ = policy.predict(obs, deterministic=False)
trajectory.append((obs, action))
obs, reward, done, info = env.step(action)
trajectories.append(trajectory)
return trajectories
def _update_reward(self, expert_trajectories, learned_trajectories):
"""
Update reward function using MaxEnt IRL gradient
Gradient: E[∇r|expert] - E[∇r|learned policy]
"""
# Compute expert feature expectations
expert_states, expert_actions = self._flatten_trajectories(expert_trajectories)
expert_states = torch.FloatTensor(expert_states).to(self.device)
expert_actions = torch.FloatTensor(expert_actions).to(self.device)
# Compute learned policy feature expectations
learned_states, learned_actions = self._flatten_trajectories(learned_trajectories)
learned_states = torch.FloatTensor(learned_states).to(self.device)
learned_actions = torch.FloatTensor(learned_actions).to(self.device)
# Forward pass
expert_x = torch.cat([expert_states, expert_actions], dim=-1)
expert_rewards = self.reward_net(expert_x)
learned_x = torch.cat([learned_states, learned_actions], dim=-1)
learned_rewards = self.reward_net(learned_x)
# MaxEnt IRL loss: maximize expert rewards, minimize learned policy rewards
loss = -expert_rewards.mean() + learned_rewards.mean()
# Add regularization (optional)
l2_reg = 0.01 * sum(p.pow(2).sum() for p in self.reward_net.parameters())
loss += l2_reg
# Backward pass
self.reward_optimizer.zero_grad()
loss.backward()
self.reward_optimizer.step()
return loss.item()
def _flatten_trajectories(self, trajectories):
"""Flatten list of trajectories into states and actions"""
states = []
actions = []
for trajectory in trajectories:
for state, action in trajectory:
states.append(state)
actions.append(action)
return np.array(states), np.array(actions)
def _evaluate_policy(self, env, policy, num_episodes=10):
"""Evaluate policy in true environment"""
total_rewards = []
for _ in range(num_episodes):
obs = env.reset()
episode_reward = 0
done = False
while not done:
action, _ = policy.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
episode_reward += reward
total_rewards.append(episode_reward)
return np.mean(total_rewards)
class RewardWrapper(gym.Wrapper):
"""Wrap environment to use learned reward function"""
def __init__(self, env, reward_net, device):
super().__init__(env)
self.reward_net = reward_net
self.device = device
self.last_obs = None
def reset(self):
self.last_obs = self.env.reset()
return self.last_obs
def step(self, action):
obs, true_reward, done, info = self.env.step(action)
# Compute learned reward
state_tensor = torch.FloatTensor(self.last_obs).unsqueeze(0).to(self.device)
action_tensor = torch.FloatTensor(action).unsqueeze(0).to(self.device)
x = torch.cat([state_tensor, action_tensor], dim=-1)
with torch.no_grad():
learned_reward = self.reward_net(x).item()
self.last_obs = obs
# Return learned reward instead of true reward
return obs, learned_reward, done, info
GAIL: Generative Adversarial Imitation Learning¶
GAIL is a more practical IRL variant using adversarial training:
Paper: Ho & Ermon, "Generative Adversarial Imitation Learning", NeurIPS 2016
Key Idea¶
Train discriminator to distinguish expert from policy, train policy to fool discriminator:
- Discriminator: \(D(s, a) \rightarrow [0, 1]\) (expert vs policy)
- Policy: Maximize \(\mathbb{E}_{\pi} [\log D(s, a)]\)
Reward signal: \(r(s, a) = -\log(1 - D(s, a))\)
Implementation¶
class GAIL:
"""
Generative Adversarial Imitation Learning
Reference: Ho & Ermon, NeurIPS 2016
"""
def __init__(
self,
state_dim,
action_dim,
disc_hidden_dims=[256, 256],
lr_disc=3e-4,
lr_policy=3e-4,
device='cuda'
):
self.device = device
# Discriminator: D(s,a) ∈ [0,1]
# Output 1 for expert, 0 for policy
self.discriminator = self._build_discriminator(
state_dim, action_dim, disc_hidden_dims
).to(device)
self.disc_optimizer = optim.Adam(
self.discriminator.parameters(), lr=lr_disc
)
# Policy (use any RL algorithm, e.g., PPO)
from stable_baselines3 import PPO
self.policy_class = PPO
def _build_discriminator(self, state_dim, action_dim, hidden_dims):
"""Build discriminator network"""
layers = []
input_dim = state_dim + action_dim
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.Tanh())
prev_dim = hidden_dim
# Output probability: expert (1) vs policy (0)
layers.append(nn.Linear(prev_dim, 1))
layers.append(nn.Sigmoid())
return nn.Sequential(*layers)
def train(
self,
env,
expert_trajectories,
num_iterations=100,
policy_steps=2048,
disc_steps=4
):
"""Train GAIL"""
print("Starting GAIL training...")
# Create policy with GAIL reward wrapper
gail_env = GAILRewardWrapper(env, self.discriminator, self.device)
policy = self.policy_class("MlpPolicy", gail_env, verbose=1)
for iteration in range(num_iterations):
print(f"\n=== GAIL Iteration {iteration + 1}/{num_iterations} ===")
# Step 1: Collect policy trajectories
policy.learn(total_timesteps=policy_steps)
policy_trajectories = self._collect_trajectories(
env, policy, num_trajectories=len(expert_trajectories)
)
# Step 2: Update discriminator
for _ in range(disc_steps):
disc_loss = self._update_discriminator(
expert_trajectories, policy_trajectories
)
print(f"Discriminator loss: {disc_loss:.6f}")
# Step 3: Evaluate
if iteration % 10 == 0:
eval_reward = self._evaluate(env, policy, num_episodes=10)
print(f"Evaluation reward: {eval_reward:.2f}")
return policy
def _update_discriminator(self, expert_traj, policy_traj):
"""
Update discriminator using binary cross-entropy
Expert samples → label 1
Policy samples → label 0
"""
# Flatten trajectories
expert_states, expert_actions = self._flatten_trajectories(expert_traj)
policy_states, policy_actions = self._flatten_trajectories(policy_traj)
expert_states = torch.FloatTensor(expert_states).to(self.device)
expert_actions = torch.FloatTensor(expert_actions).to(self.device)
policy_states = torch.FloatTensor(policy_states).to(self.device)
policy_actions = torch.FloatTensor(policy_actions).to(self.device)
# Discriminator predictions
expert_x = torch.cat([expert_states, expert_actions], dim=-1)
expert_preds = self.discriminator(expert_x)
policy_x = torch.cat([policy_states, policy_actions], dim=-1)
policy_preds = self.discriminator(policy_x)
# Binary cross-entropy loss
expert_loss = nn.BCELoss()(expert_preds, torch.ones_like(expert_preds))
policy_loss = nn.BCELoss()(policy_preds, torch.zeros_like(policy_preds))
loss = expert_loss + policy_loss
# Gradient penalty (optional, for stability)
gp = self._gradient_penalty(expert_x, policy_x)
loss += 10.0 * gp
# Backward pass
self.disc_optimizer.zero_grad()
loss.backward()
self.disc_optimizer.step()
return loss.item()
def _gradient_penalty(self, expert_data, policy_data):
"""Gradient penalty for discriminator stability"""
alpha = torch.rand(expert_data.size(0), 1).to(self.device)
interpolates = alpha * expert_data + (1 - alpha) * policy_data
interpolates.requires_grad_(True)
disc_interpolates = self.discriminator(interpolates)
gradients = torch.autograd.grad(
outputs=disc_interpolates,
inputs=interpolates,
grad_outputs=torch.ones_like(disc_interpolates),
create_graph=True,
retain_graph=True
)[0]
gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
return gradient_penalty
def _collect_trajectories(self, env, policy, num_trajectories):
"""Collect trajectories from policy"""
trajectories = []
for _ in range(num_trajectories):
trajectory = []
obs = env.reset()
done = False
while not done:
action, _ = policy.predict(obs, deterministic=False)
trajectory.append((obs, action))
obs, reward, done, info = env.step(action)
trajectories.append(trajectory)
return trajectories
def _flatten_trajectories(self, trajectories):
"""Flatten trajectories to states and actions"""
states = []
actions = []
for traj in trajectories:
for state, action in traj:
states.append(state)
actions.append(action)
return np.array(states), np.array(actions)
def _evaluate(self, env, policy, num_episodes):
"""Evaluate policy"""
rewards = []
for _ in range(num_episodes):
obs = env.reset()
episode_reward = 0
done = False
while not done:
action, _ = policy.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
episode_reward += reward
rewards.append(episode_reward)
return np.mean(rewards)
class GAILRewardWrapper(gym.Wrapper):
"""Wrap environment to use GAIL discriminator as reward"""
def __init__(self, env, discriminator, device):
super().__init__(env)
self.discriminator = discriminator
self.device = device
self.last_obs = None
def reset(self):
self.last_obs = self.env.reset()
return self.last_obs
def step(self, action):
obs, true_reward, done, info = self.env.step(action)
# GAIL reward: -log(1 - D(s,a))
state_tensor = torch.FloatTensor(self.last_obs).unsqueeze(0).to(self.device)
action_tensor = torch.FloatTensor(action).unsqueeze(0).to(self.device)
x = torch.cat([state_tensor, action_tensor], dim=-1)
with torch.no_grad():
d = self.discriminator(x).item()
gail_reward = -np.log(max(1 - d, 1e-8)) # Avoid log(0)
self.last_obs = obs
return obs, gail_reward, done, info
AIRL: Adversarial Inverse RL¶
AIRL recovers disentangled reward function (separates reward from dynamics):
Paper: Fu et al., "Learning Robust Rewards with Adversarial Inverse Reinforcement Learning", ICLR 2018
Key Advantage¶
GAIL reward depends on dynamics; AIRL extracts transferable reward:
Where \(f(s, a, s') = r(s, a) + \gamma V(s') - V(s)\)
Implementation (Simplified)¶
class AIRL:
"""
Adversarial Inverse Reinforcement Learning
Learns disentangled reward function
"""
def __init__(self, state_dim, action_dim, device='cuda'):
self.device = device
# Reward function r(s, a)
self.reward_net = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.Tanh(),
nn.Linear(256, 256),
nn.Tanh(),
nn.Linear(256, 1)
).to(device)
# Value function V(s)
self.value_net = nn.Sequential(
nn.Linear(state_dim, 256),
nn.Tanh(),
nn.Linear(256, 256),
nn.Tanh(),
nn.Linear(256, 1)
).to(device)
self.optimizer = optim.Adam(
list(self.reward_net.parameters()) +
list(self.value_net.parameters()),
lr=3e-4
)
def discriminator(self, state, action, next_state):
"""AIRL discriminator"""
state_tensor = torch.FloatTensor(state).to(self.device)
action_tensor = torch.FloatTensor(action).to(self.device)
next_state_tensor = torch.FloatTensor(next_state).to(self.device)
# Compute f(s, a, s') = r(s,a) + γV(s') - V(s)
sa = torch.cat([state_tensor, action_tensor], dim=-1)
r = self.reward_net(sa)
v = self.value_net(state_tensor)
v_next = self.value_net(next_state_tensor)
f = r + 0.99 * v_next - v
# Discriminator output
# D = exp(f) / (exp(f) + π(a|s))
# Approximation: D ≈ sigmoid(f)
d = torch.sigmoid(f)
return d
# Training similar to GAIL but using discriminator with next_state
Practical Tips¶
Choosing an IRL Method¶
| Method | Computational Cost | Reward Transfer | Best For |
|---|---|---|---|
| MaxEnt IRL | Very High (needs RL in loop) | Good | Small state spaces |
| GAIL | Medium | Poor (entangled) | General imitation |
| AIRL | Medium-High | Excellent | Transfer learning |
| IQ-Learn | Low | Good | Large-scale problems |
Hyperparameter Tuning¶
# GAIL hyperparameters
gail_config = {
'disc_hidden_dims': [256, 256],
'lr_disc': 3e-4,
'policy_steps': 2048, # Per iteration
'disc_steps': 4, # Discriminator updates per policy update
'gradient_penalty_coef': 10.0
}
# For sparse reward tasks
gail_config['policy_steps'] = 4096 # More exploration
gail_config['disc_steps'] = 8 # More discriminator training
Common Issues¶
Problem: Discriminator overfits
# Solution: Add gradient penalty, dropout
def build_discriminator_with_dropout():
return nn.Sequential(
nn.Linear(input_dim, 256),
nn.Tanh(),
nn.Dropout(0.2), # Dropout
nn.Linear(256, 256),
nn.Tanh(),
nn.Dropout(0.2),
nn.Linear(256, 1),
nn.Sigmoid()
)
Problem: Training unstable
# Solution: Spectral normalization
from torch.nn.utils import spectral_norm
def build_stable_discriminator():
return nn.Sequential(
spectral_norm(nn.Linear(input_dim, 256)),
nn.Tanh(),
spectral_norm(nn.Linear(256, 256)),
nn.Tanh(),
spectral_norm(nn.Linear(256, 1)),
nn.Sigmoid()
)
References¶
Papers¶
- MaxEnt IRL: Ziebart et al., "Maximum Entropy Inverse Reinforcement Learning", AAAI 2008
- GAIL: Ho & Ermon, "Generative Adversarial Imitation Learning", NeurIPS 2016 (arXiv)
- AIRL: Fu et al., "Learning Robust Rewards with Adversarial Inverse RL", ICLR 2018 (arXiv)
- IQ-Learn: Garg et al., "IQ-Learn: Inverse soft-Q Learning for Imitation", NeurIPS 2021
Books¶
- Abbeel & Ng, "Apprenticeship Learning via Inverse RL", ICML 2004 (Foundational)
Code¶
- Imitation Library: https://github.com/HumanCompatibleAI/imitation
- Includes GAIL, AIRL implementations
- IQ-Learn: https://github.com/Div99/IQ-Learn
- SQIL: https://github.com/saxenasaurabh/SQIL
Next Steps¶
- GAIL Details - Deep dive into GAIL
- BC - Simpler alternative
- DAgger - Interactive alternative
- Diffusion Policies - Modern IL methods