Skip to content

Behavioral Cloning

Behavioral Cloning (BC) is the simplest imitation learning method: directly learn a policy from expert demonstrations using supervised learning.

Overview

Behavioral Cloning treats imitation learning as supervised learning:

  • Input: Expert demonstrations \((s, a)\) pairs
  • Output: Policy \(\pi_\theta(a|s)\) that mimics expert
  • Training: Standard supervised learning (cross-entropy, MSE)

Advantages: - ✓ Simple to implement (just supervised learning) - ✓ No reward function needed - ✓ Works well with large datasets - ✓ Offline learning (safe)

Challenges: - ✗Distribution shift (compounding errors) - ✗Requires large, diverse datasets - ✗Doesn't generalize beyond demonstrations - ✗Causal confusion

Mathematical Foundation

Basic Formulation

Given expert demonstrations \(\mathcal{D} = \{(s_i, a_i)\}_{i=1}^N\), learn policy \(\pi_\theta\) by minimizing:

For continuous actions: $$ \mathcal{L}(\theta) = \mathbb{E}{(s,a) \sim \mathcal{D}} \left[ ||a - \pi\theta(s)||^2 \right] $$

For discrete actions: $$ \mathcal{L}(\theta) = \mathbb{E}{(s,a) \sim \mathcal{D}} \left[ -\log \pi\theta(a|s) \right] $$

Distribution Shift Problem

The key challenge: Training distribution \(p_{expert}(s)\) ≠ Deployment distribution \(p_{\pi_\theta}(s)\)

If policy makes small error at state \(s_1\): - Reaches slightly different state \(s_2'\) (not in training data) - Makes another error → reaches \(s_3'\) - Errors compound exponentially over time

Result: Behavioral cloning error grows as \(O(T^2)\) where \(T\) is horizon length.

Complete Implementation

Continuous Control

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader

class BehavioralCloning:
    """
    Behavioral Cloning for continuous control

    Simple supervised learning approach to imitation learning
    """
    def __init__(
        self,
        state_dim,
        action_dim,
        hidden_dims=[256, 256],
        activation='relu',
        lr=1e-3,
        batch_size=256,
        device='cuda'
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.batch_size = batch_size
        self.device = device

        # Build policy network
        self.policy = self._build_network(
            state_dim, action_dim, hidden_dims, activation
        ).to(device)

        # Optimizer
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

    def _build_network(self, input_dim, output_dim, hidden_dims, activation):
        """Build MLP policy network"""
        layers = []

        # Activation function
        if activation == 'relu':
            act_fn = nn.ReLU
        elif activation == 'tanh':
            act_fn = nn.Tanh
        elif activation == 'elu':
            act_fn = nn.ELU
        else:
            raise ValueError(f"Unknown activation: {activation}")

        # Hidden layers
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(act_fn())
            prev_dim = hidden_dim

        # Output layer
        layers.append(nn.Linear(prev_dim, output_dim))

        # Optional: Add tanh to bound actions to [-1, 1]
        # layers.append(nn.Tanh())

        return nn.Sequential(*layers)

    def train(self, expert_dataset, num_epochs=100, val_split=0.1):
        """
        Train BC policy on expert demonstrations

        Args:
            expert_dataset: Dataset of (state, action) pairs
            num_epochs: Number of training epochs
            val_split: Validation split fraction
        """
        # Split into train/val
        val_size = int(len(expert_dataset) * val_split)
        train_size = len(expert_dataset) - val_size
        train_dataset, val_dataset = torch.utils.data.random_split(
            expert_dataset, [train_size, val_size]
        )

        train_loader = DataLoader(
            train_dataset, batch_size=self.batch_size, shuffle=True
        )
        val_loader = DataLoader(
            val_dataset, batch_size=self.batch_size, shuffle=False
        )

        best_val_loss = float('inf')

        for epoch in range(num_epochs):
            # Training
            train_loss = self._train_epoch(train_loader)

            # Validation
            val_loss = self._validate(val_loader)

            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                self.save('best_bc_policy.pth')

            if epoch % 10 == 0:
                print(f"Epoch {epoch}/{num_epochs}: "
                      f"Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}")

        return train_loss, val_loss

    def _train_epoch(self, dataloader):
        """Single training epoch"""
        self.policy.train()
        total_loss = 0.0

        for states, actions in dataloader:
            states = states.to(self.device)
            actions = actions.to(self.device)

            # Forward pass
            pred_actions = self.policy(states)

            # MSE loss
            loss = nn.MSELoss()(pred_actions, actions)

            # Backward pass
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            total_loss += loss.item()

        return total_loss / len(dataloader)

    def _validate(self, dataloader):
        """Validation"""
        self.policy.eval()
        total_loss = 0.0

        with torch.no_grad():
            for states, actions in dataloader:
                states = states.to(self.device)
                actions = actions.to(self.device)

                pred_actions = self.policy(states)
                loss = nn.MSELoss()(pred_actions, actions)

                total_loss += loss.item()

        return total_loss / len(dataloader)

    def predict(self, state):
        """Predict action for given state"""
        self.policy.eval()
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            action = self.policy(state_tensor).cpu().numpy()[0]
        return action

    def save(self, path):
        """Save policy"""
        torch.save({
            'policy_state_dict': self.policy.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
        }, path)

    def load(self, path):
        """Load policy"""
        checkpoint = torch.load(path)
        self.policy.load_state_dict(checkpoint['policy_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])


class ExpertDataset(Dataset):
    """Dataset of expert demonstrations"""
    def __init__(self, states, actions):
        self.states = torch.FloatTensor(states)
        self.actions = torch.FloatTensor(actions)

    def __len__(self):
        return len(self.states)

    def __getitem__(self, idx):
        return self.states[idx], self.actions[idx]


def collect_expert_data(env, expert_policy, num_episodes=100):
    """Collect expert demonstrations"""
    states = []
    actions = []

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            # Get expert action
            action = expert_policy(state)

            # Store transition
            states.append(state)
            actions.append(action)

            # Environment step
            state, reward, done, info = env.step(action)

    return np.array(states), np.array(actions)


def train_bc_from_demonstrations(states, actions, state_dim, action_dim):
    """Complete BC training pipeline"""
    # Create dataset
    dataset = ExpertDataset(states, actions)

    # Create BC agent
    bc = BehavioralCloning(
        state_dim=state_dim,
        action_dim=action_dim,
        hidden_dims=[256, 256],
        lr=1e-3,
        batch_size=256
    )

    # Train
    train_loss, val_loss = bc.train(dataset, num_epochs=100)

    print(f"Training complete. Final validation loss: {val_loss:.4f}")

    return bc

Discrete Actions

For discrete action spaces (e.g., robotics with discrete primitives):

class BehavioralCloningDiscrete:
    """BC for discrete actions (classification)"""

    def __init__(self, state_dim, num_actions, hidden_dims=[256, 256], lr=1e-3, device='cuda'):
        self.device = device

        # Build classifier
        layers = []
        prev_dim = state_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim

        layers.append(nn.Linear(prev_dim, num_actions))

        self.policy = nn.Sequential(*layers).to(device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

    def train_epoch(self, dataloader):
        """Training with cross-entropy loss"""
        self.policy.train()
        total_loss = 0.0
        total_correct = 0
        total_samples = 0

        for states, actions in dataloader:
            states = states.to(self.device)
            actions = actions.to(self.device).long()

            # Forward pass
            logits = self.policy(states)

            # Cross-entropy loss
            loss = nn.CrossEntropyLoss()(logits, actions)

            # Backward pass
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            # Accuracy
            pred_actions = torch.argmax(logits, dim=1)
            correct = (pred_actions == actions).sum().item()

            total_loss += loss.item()
            total_correct += correct
            total_samples += len(actions)

        accuracy = total_correct / total_samples
        return total_loss / len(dataloader), accuracy

    def predict(self, state):
        """Predict action (greedy)"""
        self.policy.eval()
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            logits = self.policy(state_tensor)
            action = torch.argmax(logits, dim=1).item()
        return action

Improvements to Basic BC

1. Data Augmentation

Increase dataset diversity:

def augment_state(state, noise_std=0.01):
    """Add noise to states for data augmentation"""
    noise = np.random.normal(0, noise_std, size=state.shape)
    return state + noise

def augment_dataset(states, actions, augmentation_factor=3):
    """Augment dataset by adding noisy copies"""
    aug_states = [states]
    aug_actions = [actions]

    for _ in range(augmentation_factor - 1):
        noisy_states = augment_state(states, noise_std=0.01)
        aug_states.append(noisy_states)
        aug_actions.append(actions)

    return np.concatenate(aug_states), np.concatenate(aug_actions)

2. Dropout Regularization

Prevent overfitting:

class BCWithDropout(nn.Module):
    """BC policy with dropout"""
    def __init__(self, state_dim, action_dim, dropout_rate=0.1):
        super().__init__()

        self.network = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, action_dim)
        )

    def forward(self, state):
        return self.network(state)

3. Action Regularization

Encourage smooth actions:

def train_with_action_regularization(model, dataloader, optimizer, reg_coef=0.01):
    """Train with action smoothness regularization"""
    for states, actions in dataloader:
        # Prediction loss
        pred_actions = model(states)
        pred_loss = nn.MSELoss()(pred_actions, actions)

        # Action regularization (L2 norm)
        action_norm = torch.mean(pred_actions ** 2)

        # Total loss
        loss = pred_loss + reg_coef * action_norm

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

4. Ensemble Methods

Use ensemble to improve robustness:

class BCEnsemble:
    """Ensemble of BC policies"""
    def __init__(self, state_dim, action_dim, num_models=5, hidden_dims=[256, 256]):
        self.models = [
            BehavioralCloning(state_dim, action_dim, hidden_dims)
            for _ in range(num_models)
        ]

    def train_ensemble(self, dataset, num_epochs=100):
        """Train all models in ensemble"""
        for i, model in enumerate(self.models):
            print(f"Training model {i+1}/{len(self.models)}")
            model.train(dataset, num_epochs=num_epochs)

    def predict(self, state):
        """Predict by averaging ensemble"""
        predictions = [model.predict(state) for model in self.models]
        return np.mean(predictions, axis=0)

    def predict_with_uncertainty(self, state):
        """Predict with uncertainty estimate"""
        predictions = np.array([model.predict(state) for model in self.models])
        mean = np.mean(predictions, axis=0)
        std = np.std(predictions, axis=0)
        return mean, std

Advanced Techniques

Goal-Conditioned BC

For multi-task imitation:

class GoalConditionedBC(nn.Module):
    """BC with goal conditioning"""
    def __init__(self, state_dim, goal_dim, action_dim):
        super().__init__()

        # Concatenate state and goal
        input_dim = state_dim + goal_dim

        self.network = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

    def forward(self, state, goal):
        x = torch.cat([state, goal], dim=-1)
        return self.network(x)

# Training
def train_goal_conditioned(model, states, goals, actions):
    """Train goal-conditioned policy"""
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(num_epochs):
        # Forward pass
        pred_actions = model(states, goals)

        # Loss
        loss = nn.MSELoss()(pred_actions, actions)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

BC from Observations (BCO)

Learn from observational data (no actions):

class BCO:
    """
    Behavioral Cloning from Observations

    Learn policy from state-only trajectories using
    inverse dynamics model
    """
    def __init__(self, state_dim, action_dim):
        # Forward dynamics: (s_t, a_t) -> s_{t+1}
        self.forward_model = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, state_dim)
        )

        # Inverse dynamics: (s_t, s_{t+1}) -> a_t
        self.inverse_model = nn.Sequential(
            nn.Linear(state_dim * 2, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

        # Policy: s_t -> a_t
        self.policy = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

    def train_inverse_model(self, states, next_states, actions):
        """Train inverse dynamics model"""
        optimizer = optim.Adam(self.inverse_model.parameters(), lr=1e-3)

        for epoch in range(num_epochs):
            # Predict actions from state transitions
            state_pairs = torch.cat([states, next_states], dim=-1)
            pred_actions = self.inverse_model(state_pairs)

            loss = nn.MSELoss()(pred_actions, actions)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    def train_policy_from_observations(self, state_trajectories):
        """Train policy from observation-only data"""
        optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)

        for epoch in range(num_epochs):
            for traj in state_trajectories:
                states = traj[:-1]
                next_states = traj[1:]

                # Infer actions using inverse model
                state_pairs = torch.cat([states, next_states], dim=-1)
                with torch.no_grad():
                    inferred_actions = self.inverse_model(state_pairs)

                # Train policy to match inferred actions
                pred_actions = self.policy(states)
                loss = nn.MSELoss()(pred_actions, inferred_actions)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

Practical Tips

Dataset Requirements

Minimum dataset size (rule of thumb):

# For simple tasks (CartPole, Reacher)
min_demos = 10-50

# For manipulation (pick-and-place)
min_demos = 100-500

# For locomotion (quadruped walking)
min_demos = 500-2000

# For complex tasks (mobile manipulation)
min_demos = 2000-10000

Data quality matters more than quantity: - ✓ Diverse start states - ✓ Successful demonstrations only (no failures) - ✓ Consistent expert behavior - ✓ Cover full state distribution

Hyperparameter Tuning

# Conservative (prevent overfitting)
bc_conservative = BehavioralCloning(
    hidden_dims=[128, 128],
    lr=1e-4,
    batch_size=128
)

# Standard (balanced)
bc_standard = BehavioralCloning(
    hidden_dims=[256, 256],
    lr=1e-3,
    batch_size=256
)

# Aggressive (large dataset)
bc_aggressive = BehavioralCloning(
    hidden_dims=[512, 512, 256],
    lr=3e-3,
    batch_size=512
)

Evaluation

def evaluate_bc_policy(policy, env, num_episodes=10):
    """Evaluate BC policy"""
    successes = 0
    total_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action = policy.predict(state)
            state, reward, done, info = env.step(action)
            episode_reward += reward

        total_rewards.append(episode_reward)

        if info.get('is_success', False):
            successes += 1

    success_rate = successes / num_episodes
    mean_reward = np.mean(total_rewards)

    print(f"Success Rate: {success_rate:.2%}")
    print(f"Mean Reward: {mean_reward:.2f}")

    return success_rate, mean_reward

Common Issues & Solutions

Problem: Poor Generalization

Symptoms: Works on training states, fails on novel states

Solutions:

# 1. Data augmentation
states_aug, actions_aug = augment_dataset(states, actions, augmentation_factor=5)

# 2. More diverse demonstrations
# Collect demos from different start states, experts, conditions

# 3. Regularization
model_with_dropout = BCWithDropout(state_dim, action_dim, dropout_rate=0.2)

# 4. Ensemble
ensemble = BCEnsemble(state_dim, action_dim, num_models=5)

Problem: Compounding Errors

Symptoms: Policy diverges over long horizons

Solutions:

# 1. Use DAgger instead (collect on-policy data)
# See dagger.md

# 2. Limit horizon
max_timesteps = 100  # Cap episode length during deployment

# 3. Add noise injection during training
def train_with_noise_injection(policy, states, actions):
    for state, action in zip(states, actions):
        # Add noise to simulate distribution shift
        noisy_state = state + np.random.normal(0, 0.05, state.shape)
        # Train on noisy states
        loss = policy.train_step(noisy_state, action)

Problem: Causal Confusion

Symptoms: Policy learns spurious correlations

Example: Robot learns to "open door when door is open" instead of "open door by turning handle"

Solutions:

# 1. Remove confounding observations
# Only include task-relevant observations

# 2. Temporal data augmentation
# Shuffle temporal order to break spurious correlations

# 3. Counterfactual data augmentation
def counterfactual_augmentation(states, actions):
    # Modify states while keeping actions same
    # to break spurious correlations
    aug_states = states.copy()
    # E.g., randomize background while keeping task objects same
    aug_states[:, background_dims] = np.random.random(background_dims)
    return aug_states, actions

When to Use Behavioral Cloning

Use BC when: - ✓ You have large, diverse expert dataset - ✓ Task is relatively simple (short horizon) - ✓ Expert policy is deterministic - ✓ Safety critical (offline learning) - ✓ No reward function available

Don't use BC when: - ✗Limited demonstration data - ✗Long-horizon tasks (use DAgger or RL) - ✗Highly stochastic task - ✗Need to exceed expert performance

References

Papers

  1. Behavioral Cloning: Pomerleau, "ALVINN: An Autonomous Land Vehicle in a Neural Network", NeurIPS 1989
  2. Distribution Shift: Ross et al., "A Reduction of Imitation Learning and Structured Prediction to No-Regret Online Learning", AISTATS 2011
  3. BCO: Torabi et al., "Behavioral Cloning from Observation", IJCAI 2018

Books

  1. Sutton & Barto, "Reinforcement Learning: An Introduction", 2nd Edition, 2018
  2. Chapter on imitation learning

Code

  • Imitation Library: https://github.com/HumanCompatibleAI/imitation
  • CleanRL BC: https://github.com/vwxyzjn/cleanrl/blob/master/cleanrl/bc.py
  • SB3 BC: Part of Imitation library above

Next Steps

  • DAgger - Fix distribution shift problem
  • Inverse RL - Learn reward function from demonstrations
  • Diffusion Policies - State-of-the-art IL with diffusion
  • ACT - Action chunking for better performance