DAgger & Interactive Imitation Learning¶
DAgger (Dataset Aggregation) solves behavioral cloning's distribution shift problem through iterative expert feedback.
Overview¶
DAgger addresses the fundamental limitation of behavioral cloning: distribution shift between expert and learned policy.
Key idea: Train on states visited by the learned policy (not just expert states):
- Train policy \(\pi\) on current dataset \(\mathcal{D}\)
- Execute \(\pi\) to collect states \(s \sim \pi\)
- Query expert for actions \(a^* = \pi^*(s)\) at those states
- Aggregate: \(\mathcal{D} \leftarrow \mathcal{D} \cup \{(s, a^*)\}\)
- Repeat
Advantages over BC: - ✓ No distribution shift (trains on learned policy's distribution) - ✓ Provably reduces error to \(O(T)\) instead of \(O(T^2)\) - ✓ Needs fewer expert demos initially - ✓ Recovers from errors gracefully
Challenges: - ✗Requires expert to be available during training - ✗Can be expensive (expert labeling cost) - ✗Assumes expert can label any state (may be infeasible)
Mathematical Foundation¶
Error Bounds¶
Behavioral Cloning error: $$ \mathbb{E}[\text{cost}(\pi_{BC})] \leq \mathbb{E}[\text{cost}(\pi^*)] + O(\epsilon T^2) $$
DAgger error (with \(N\) iterations): $$ \mathbb{E}[\text{cost}(\pi_{DAgger})] \leq \mathbb{E}[\text{cost}(\pi^*)] + O\left(\frac{\epsilon T}{N}\right) $$
Where: - \(\epsilon\) = supervised learning error - \(T\) = horizon length - \(N\) = number of DAgger iterations
Key insight: Quadratic → Linear improvement!
Algorithm¶
Initialize dataset D with expert demonstrations
for iteration i = 1 to N do:
Train policy π_i on D
Execute π_i to collect trajectories
Query expert for actions at visited states
Aggregate: D ← D ∪ {(s, a*)}
end for
return π_N
Complete Implementation¶
Core DAgger Algorithm¶
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
class DAgger:
"""
Dataset Aggregation (DAgger)
Reference: Ross et al., AISTATS 2011
"""
def __init__(
self,
state_dim,
action_dim,
expert_policy,
hidden_dims=[256, 256],
lr=1e-3,
batch_size=256,
device='cuda'
):
self.expert_policy = expert_policy
self.batch_size = batch_size
self.device = device
# Build policy network (same as BC)
self.policy = self._build_network(
state_dim, action_dim, hidden_dims
).to(device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
# Aggregated dataset
self.dataset_states = []
self.dataset_actions = []
def _build_network(self, input_dim, output_dim, hidden_dims):
"""Build MLP policy"""
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
return nn.Sequential(*layers)
def dagger_iteration(self, env, num_rollouts=50, mixing_ratio=1.0):
"""
Single DAgger iteration
Args:
env: Environment
num_rollouts: Number of trajectories to collect
mixing_ratio: β parameter (1.0 = always expert, 0.0 = never expert)
"""
new_states = []
new_actions = []
for rollout in range(num_rollouts):
state = env.reset()
done = False
while not done:
# Decide whether to use expert or learned policy
use_expert = np.random.random() < mixing_ratio
if use_expert:
# Execute expert action
action = self.expert_policy(state)
else:
# Execute learned policy action
action = self.predict(state)
# Always query expert for label
expert_action = self.expert_policy(state)
# Store (state, expert_action) pair
new_states.append(state)
new_actions.append(expert_action)
# Environment step
state, reward, done, info = env.step(action)
# Aggregate into dataset
self.dataset_states.extend(new_states)
self.dataset_actions.extend(new_actions)
print(f"Dataset size: {len(self.dataset_states)}")
def train_on_aggregated_dataset(self, num_epochs=10):
"""Train policy on full aggregated dataset"""
states = torch.FloatTensor(np.array(self.dataset_states)).to(self.device)
actions = torch.FloatTensor(np.array(self.dataset_actions)).to(self.device)
dataset = torch.utils.data.TensorDataset(states, actions)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=self.batch_size, shuffle=True
)
for epoch in range(num_epochs):
total_loss = 0.0
for batch_states, batch_actions in dataloader:
# Forward pass
pred_actions = self.policy(batch_states)
# MSE loss
loss = nn.MSELoss()(pred_actions, batch_actions)
# Backward pass
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
if epoch % 5 == 0:
avg_loss = total_loss / len(dataloader)
print(f"Epoch {epoch}: Loss = {avg_loss:.6f}")
def train(
self,
env,
num_iterations=10,
rollouts_per_iteration=50,
epochs_per_iteration=10,
mixing_schedule='linear'
):
"""
Full DAgger training loop
Args:
env: Environment
num_iterations: Number of DAgger iterations
rollouts_per_iteration: Trajectories per iteration
epochs_per_iteration: Training epochs per iteration
mixing_schedule: 'linear', 'exponential', or 'constant'
"""
print("Starting DAgger training...")
for iteration in range(num_iterations):
print(f"\n=== DAgger Iteration {iteration + 1}/{num_iterations} ===")
# Compute mixing ratio (β)
if mixing_schedule == 'linear':
mixing_ratio = 1.0 - (iteration / num_iterations)
elif mixing_schedule == 'exponential':
mixing_ratio = 0.5 ** iteration
else: # constant
mixing_ratio = 1.0
print(f"Mixing ratio (β): {mixing_ratio:.3f}")
# Collect data
self.dagger_iteration(
env,
num_rollouts=rollouts_per_iteration,
mixing_ratio=mixing_ratio
)
# Train on aggregated dataset
self.train_on_aggregated_dataset(num_epochs=epochs_per_iteration)
# Evaluate
if iteration % 2 == 0:
avg_reward = self.evaluate(env, num_episodes=10)
print(f"Average reward: {avg_reward:.2f}")
print("\nDAgger training complete!")
def predict(self, state):
"""Predict action"""
self.policy.eval()
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
action = self.policy(state_tensor).cpu().numpy()[0]
return action
def evaluate(self, env, num_episodes=10):
"""Evaluate current policy"""
total_rewards = []
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
done = False
while not done:
action = self.predict(state)
state, reward, done, info = env.step(action)
episode_reward += reward
total_rewards.append(episode_reward)
return np.mean(total_rewards)
def save(self, path):
"""Save policy"""
torch.save({
'policy_state_dict': self.policy.state_dict(),
'dataset_states': self.dataset_states,
'dataset_actions': self.dataset_actions
}, path)
def load(self, path):
"""Load policy"""
checkpoint = torch.load(path)
self.policy.load_state_dict(checkpoint['policy_state_dict'])
self.dataset_states = checkpoint['dataset_states']
self.dataset_actions = checkpoint['dataset_actions']
Example Usage¶
import gymnasium as gym
# Create environment
env = gym.make('HalfCheetah-v4')
# Define expert policy (e.g., pre-trained RL agent)
def expert_policy(state):
# Your expert implementation
# Could be: pre-trained SAC, human teleop, etc.
return expert_model.predict(state)
# Create DAgger agent
dagger = DAgger(
state_dim=env.observation_space.shape[0],
action_dim=env.action_space.shape[0],
expert_policy=expert_policy,
hidden_dims=[256, 256],
lr=1e-3
)
# Train
dagger.train(
env=env,
num_iterations=10,
rollouts_per_iteration=50,
epochs_per_iteration=10,
mixing_schedule='linear'
)
# Evaluate
final_reward = dagger.evaluate(env, num_episodes=100)
print(f"Final average reward: {final_reward:.2f}")
# Save
dagger.save('dagger_policy.pth')
Variants & Improvements¶
SafeDAgger¶
Safe exploration during data collection:
class SafeDAgger(DAgger):
"""DAgger with safety constraints"""
def dagger_iteration(
self,
env,
num_rollouts=50,
mixing_ratio=1.0,
safety_threshold=0.1
):
"""DAgger iteration with safety fallback"""
new_states = []
new_actions = []
for rollout in range(num_rollouts):
state = env.reset()
done = False
while not done:
# Get learned policy action
learned_action = self.predict(state)
# Get expert action
expert_action = self.expert_policy(state)
# Check if learned action is safe
action_diff = np.linalg.norm(learned_action - expert_action)
if action_diff > safety_threshold:
# Use expert action (unsafe deviation)
action = expert_action
else:
# Use learned action (safe)
use_expert = np.random.random() < mixing_ratio
action = expert_action if use_expert else learned_action
# Store expert label
new_states.append(state)
new_actions.append(expert_action)
state, reward, done, info = env.step(action)
self.dataset_states.extend(new_states)
self.dataset_actions.extend(new_actions)
HG-DAgger (Human-Gated DAgger)¶
Query expert only when uncertain:
class HGDAgger(DAgger):
"""Human-Gated DAgger - query expert selectively"""
def __init__(self, *args, uncertainty_threshold=0.1, **kwargs):
super().__init__(*args, **kwargs)
self.uncertainty_threshold = uncertainty_threshold
# Build ensemble for uncertainty estimation
self.ensemble = nn.ModuleList([
self._build_network(args[0], args[1], kwargs.get('hidden_dims', [256, 256]))
for _ in range(5)
])
def estimate_uncertainty(self, state):
"""Estimate uncertainty using ensemble disagreement"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
predictions = []
for model in self.ensemble:
model.eval()
with torch.no_grad():
pred = model(state_tensor).cpu().numpy()[0]
predictions.append(pred)
predictions = np.array(predictions)
uncertainty = np.std(predictions, axis=0).mean()
return uncertainty
def dagger_iteration(self, env, num_rollouts=50):
"""Query expert only when uncertain"""
new_states = []
new_actions = []
num_queries = 0
for rollout in range(num_rollouts):
state = env.reset()
done = False
while not done:
# Get learned action
action = self.predict(state)
# Estimate uncertainty
uncertainty = self.estimate_uncertainty(state)
# Query expert if uncertain
if uncertainty > self.uncertainty_threshold:
expert_action = self.expert_policy(state)
new_states.append(state)
new_actions.append(expert_action)
num_queries += 1
# Use expert action
action = expert_action
state, reward, done, info = env.step(action)
print(f"Expert queries: {num_queries}")
self.dataset_states.extend(new_states)
self.dataset_actions.extend(new_actions)
DART (Disturbance-based Reward for Adversarial Robust Imitation Learning)¶
Add noise injection for robustness:
class DART(DAgger):
"""
DAgger with Adversarial Noise Injection
Reference: Mandlekar et al., CoRL 2020
"""
def dagger_iteration(
self,
env,
num_rollouts=50,
noise_level=0.1,
noise_schedule='constant'
):
"""DAgger with adversarial noise"""
new_states = []
new_actions = []
for rollout in range(num_rollouts):
state = env.reset()
done = False
timestep = 0
while not done:
# Add noise to action
if noise_schedule == 'decay':
current_noise = noise_level * (0.9 ** timestep)
else:
current_noise = noise_level
# Get policy action
base_action = self.predict(state)
# Add Gaussian noise
noise = np.random.normal(0, current_noise, size=base_action.shape)
noisy_action = base_action + noise
noisy_action = np.clip(noisy_action, -1, 1)
# Query expert
expert_action = self.expert_policy(state)
# Store expert label
new_states.append(state)
new_actions.append(expert_action)
# Execute noisy action
state, reward, done, info = env.step(noisy_action)
timestep += 1
self.dataset_states.extend(new_states)
self.dataset_actions.extend(new_actions)
Practical Considerations¶
Expert Query Budget¶
Limit expert labeling cost:
class BudgetedDAgger(DAgger):
"""DAgger with limited expert budget"""
def __init__(self, *args, total_budget=10000, **kwargs):
super().__init__(*args, **kwargs)
self.total_budget = total_budget
self.queries_used = 0
def dagger_iteration(self, env, num_rollouts=50, prioritize_states=True):
"""Prioritize which states to query"""
# Collect states
collected_states = []
collected_info = []
for rollout in range(num_rollouts):
state = env.reset()
done = False
while not done:
action = self.predict(state)
# Store state and metadata
collected_states.append(state)
collected_info.append({
'timestep': len(collected_states),
'rollout': rollout
})
state, reward, done, info = env.step(action)
# Prioritize states to query
if prioritize_states:
# Query states with high uncertainty (using ensemble)
uncertainties = [
self.estimate_uncertainty(s) for s in collected_states
]
sorted_indices = np.argsort(uncertainties)[::-1]
else:
# Random sampling
sorted_indices = np.random.permutation(len(collected_states))
# Query up to budget
queries_remaining = self.total_budget - self.queries_used
num_to_query = min(queries_remaining, len(collected_states))
for idx in sorted_indices[:num_to_query]:
state = collected_states[idx]
expert_action = self.expert_policy(state)
self.dataset_states.append(state)
self.dataset_actions.append(expert_action)
self.queries_used += 1
print(f"Queries used: {self.queries_used}/{self.total_budget}")
Asynchronous Expert Queries¶
For human experts (slower feedback):
class AsyncDAgger(DAgger):
"""DAgger with asynchronous expert labeling"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_queue = deque()
self.labeled_queue = deque()
def collect_states_for_labeling(self, env, num_rollouts=50):
"""Collect states, queue for expert labeling"""
for rollout in range(num_rollouts):
state = env.reset()
done = False
while not done:
action = self.predict(state)
# Add to query queue
self.query_queue.append({
'state': state.copy(),
'timestamp': time.time()
})
state, reward, done, info = env.step(action)
print(f"Queued {len(self.query_queue)} states for labeling")
def process_expert_labels(self, labeled_data):
"""Process expert-labeled data"""
for item in labeled_data:
state = item['state']
action = item['action']
self.dataset_states.append(state)
self.dataset_actions.append(action)
print(f"Processed {len(labeled_data)} expert labels")
def train_async(self, env, check_interval=60):
"""Asynchronous training loop"""
iteration = 0
while True:
# Collect states
self.collect_states_for_labeling(env, num_rollouts=10)
# Wait for expert labels
print(f"Waiting for expert labels... (checking every {check_interval}s)")
time.sleep(check_interval)
# Check for new labels (placeholder - implement your labeling interface)
labeled_data = check_for_new_labels()
if len(labeled_data) > 0:
# Process labels
self.process_expert_labels(labeled_data)
# Train on updated dataset
self.train_on_aggregated_dataset(num_epochs=10)
iteration += 1
print(f"Completed iteration {iteration}")
Comparison with Other Methods¶
DAgger vs BC vs DAgger Variants¶
| Method | Sample Complexity | Expert Queries | Stability | Best For |
|---|---|---|---|---|
| BC | High | 0 (after initial) | High | Large offline datasets |
| DAgger | Medium | Many | Medium | Interactive expert available |
| SafeDAgger | Medium | Many | Very High | Safety-critical applications |
| HG-DAgger | Low | Few | Medium | Limited expert budget |
| DART | Medium | Many | Very High | Robust real-world deployment |
Common Issues & Solutions¶
Problem: Expert Fatigue¶
Symptoms: Expert quality degrades over time
Solutions:
# 1. Batch queries
batch_size = 100
query_expert_in_batches(states, batch_size)
# 2. Prioritize important queries (HG-DAgger)
query_only_uncertain_states()
# 3. Limit session length
max_queries_per_session = 500
Problem: Policy Oscillation¶
Symptoms: Policy keeps changing, doesn't converge
Solutions:
# 1. Reduce learning rate over time
def get_lr_schedule(iteration):
initial_lr = 1e-3
return initial_lr * (0.9 ** iteration)
# 2. Use exponential moving average
class EMAPolicy:
def __init__(self, policy, alpha=0.1):
self.policy = policy
self.ema_params = copy.deepcopy(list(policy.parameters()))
self.alpha = alpha
def update_ema(self):
for ema_param, param in zip(self.ema_params, self.policy.parameters()):
ema_param.data = (
self.alpha * param.data +
(1 - self.alpha) * ema_param.data
)
# 3. Early stopping
if validation_loss_not_improving_for_n_iterations(n=3):
break
Problem: Expensive Expert¶
Symptoms: Expert labeling is prohibitively expensive
Solutions:
# 1. Use BC initialization
# Start with large BC dataset, then fine-tune with DAgger
# 2. Active learning (HG-DAgger)
# Query only uncertain/important states
# 3. Synthetic expert
# Use simulation/planning as "expert"
def synthetic_expert(state):
# MPC, sampling-based planning, etc.
return mpc_planner.plan(state)
When to Use DAgger¶
Use DAgger when: - ✓ Expert available during training - ✓ Expert can label any state (even off-distribution) - ✓ Long-horizon tasks (BC fails) - ✓ Limited initial demonstration data
Use HG-DAgger when: - ✓ Expert expensive/limited - ✓ Want to minimize expert queries - ✓ Can estimate uncertainty
Use DART when: - ✓ Need robustness to perturbations - ✓ Deploying to real world - ✓ Safety is critical
References¶
Papers¶
- DAgger: Ross et al., "A Reduction of Imitation Learning and Structured Prediction to No-Regret Online Learning", AISTATS 2011 (PDF)
- SafeDAgger: Zhang & Cho, "Deep Imitation Learning for Complex Manipulation Tasks from Virtual Reality Teleoperation", ICRA 2018
- HG-DAgger: Kelly et al., "HG-DAgger: Interactive Imitation Learning with Human Experts", ICRA 2019
- DART: Mandlekar et al., "IRIS: Implicit Reinforcement without Interaction at Scale for Learning Control from Offline Robot Manipulation Data", CoRL 2020
Books¶
- Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition
- Section on imitation learning
Code¶
- Imitation Library: https://github.com/HumanCompatibleAI/imitation
- Includes DAgger implementation
- IRIS (includes DART): https://github.com/NVlabs/IRIS
Tutorials¶
- Interactive IL Tutorial: https://sites.google.com/view/icra19-ilbc
Next Steps¶
- Behavioral Cloning - Simpler offline alternative
- Inverse RL - Learn reward function
- GAIL - Generative adversarial imitation
- Diffusion Policies - State-of-the-art IL