Skip to content

Human Demonstrations for Robot Learning

High-quality demonstrations are the foundation of successful imitation learning and data-driven robot policies.

Why Demonstrations Matter

Quality demonstrations directly impact:

  • Policy Performance: Better demos → better learned policies
  • Sample Efficiency: High-quality demos require fewer samples
  • Generalization: Diverse demos improve out-of-distribution performance
  • Safety: Safe demonstrations teach safe behaviors

Demonstration Quality Criteria

1. Task Success

Every demonstration should successfully complete the task:

class DemonstrationValidator:
    """Validate demonstration quality"""
    def __init__(self, success_checker):
        self.success_checker = success_checker
        self.metrics = {
            'success_rate': [],
            'trajectory_length': [],
            'smoothness': [],
            'diversity': []
        }

    def validate_demo(self, demo):
        """Check if demonstration meets quality criteria"""
        # Task success
        success = self.success_checker(demo)

        if not success:
            return False, "Task not completed successfully"

        # Trajectory smoothness
        actions = np.array([step['action'] for step in demo])
        action_diff = np.diff(actions, axis=0)
        jerk = np.linalg.norm(action_diff, axis=1).mean()

        if jerk > 0.5:  # Threshold for jerkiness
            return False, f"Trajectory too jerky: {jerk:.3f}"

        # Reasonable length
        if len(demo) < 10:
            return False, "Demonstration too short"
        if len(demo) > 1000:
            return False, "Demonstration too long (likely inefficient)"

        # State coverage
        states = np.array([step['observation']['state'] for step in demo])
        state_variance = states.var(axis=0).mean()

        if state_variance < 0.01:
            return False, "Insufficient state coverage"

        return True, "Valid demonstration"

    def log_metrics(self, demo, success):
        """Track demonstration metrics"""
        self.metrics['success_rate'].append(int(success))
        self.metrics['trajectory_length'].append(len(demo))

        actions = np.array([step['action'] for step in demo])
        smoothness = self._compute_smoothness(actions)
        self.metrics['smoothness'].append(smoothness)

    def _compute_smoothness(self, actions):
        """Compute trajectory smoothness (lower is better)"""
        # Second derivative (acceleration)
        accel = np.diff(actions, n=2, axis=0)
        return np.linalg.norm(accel, axis=1).mean()

    def report(self):
        """Generate quality report"""
        print(f"Success Rate: {np.mean(self.metrics['success_rate']) * 100:.1f}%")
        print(f"Avg Length: {np.mean(self.metrics['trajectory_length']):.1f} steps")
        print(f"Avg Smoothness: {np.mean(self.metrics['smoothness']):.4f}")

2. Consistency

Multiple demonstrations of the same task should show consistent strategies:

def measure_demonstration_consistency(demos):
    """Measure consistency across demonstrations"""

    # Extract key waypoints from each demo
    waypoints_list = []
    for demo in demos:
        waypoints = extract_waypoints(demo)
        waypoints_list.append(waypoints)

    # Compare pairwise distances
    consistency_scores = []
    for i in range(len(waypoints_list)):
        for j in range(i+1, len(waypoints_list)):
            # Dynamic Time Warping distance
            distance = dtw_distance(waypoints_list[i], waypoints_list[j])
            consistency_scores.append(1.0 / (1.0 + distance))

    return np.mean(consistency_scores)

def extract_waypoints(demo, threshold=0.1):
    """Extract key waypoints based on action magnitude changes"""
    actions = np.array([step['action'] for step in demo])
    states = np.array([step['observation']['state'] for step in demo])

    waypoints = [states[0]]  # Start

    for i in range(1, len(actions)):
        action_change = np.linalg.norm(actions[i] - actions[i-1])
        if action_change > threshold:
            waypoints.append(states[i])

    waypoints.append(states[-1])  # End

    return np.array(waypoints)

3. Diversity

Demonstrations should cover variations in initial conditions and execution:

class DiversityTracker:
    """Track and encourage demonstration diversity"""
    def __init__(self, state_dim):
        self.state_dim = state_dim
        self.demo_states = []

    def add_demonstration(self, demo):
        """Add demo and compute diversity metrics"""
        states = np.array([step['observation']['state'] for step in demo])
        self.demo_states.append(states)

    def compute_coverage(self, bins=10):
        """Compute state space coverage"""
        all_states = np.vstack(self.demo_states)

        # Discretize each dimension
        coverage = []
        for dim in range(self.state_dim):
            hist, _ = np.histogram(all_states[:, dim], bins=bins)
            occupied_bins = np.sum(hist > 0)
            coverage.append(occupied_bins / bins)

        return np.mean(coverage)

    def compute_diversity_score(self):
        """Measure diversity using determinant of covariance"""
        all_states = np.vstack(self.demo_states)
        cov = np.cov(all_states.T)

        # Log determinant (volume of covariance ellipsoid)
        sign, logdet = np.linalg.slogdet(cov)
        return logdet if sign > 0 else -np.inf

    def suggest_initial_conditions(self, n_suggestions=5):
        """Suggest diverse initial conditions for next demos"""
        all_initial_states = np.array([demo[0] for demo in self.demo_states])

        # Find underexplored regions using k-means
        from sklearn.cluster import KMeans

        kmeans = KMeans(n_clusters=n_suggestions * 2)
        kmeans.fit(all_initial_states)

        # Suggest points far from existing demos
        suggestions = []
        for center in kmeans.cluster_centers_:
            # Find nearest existing demo
            distances = np.linalg.norm(all_initial_states - center, axis=1)
            min_dist = distances.min()

            # Perturb cluster center to create diverse starting point
            perturbation = np.random.randn(self.state_dim) * 0.1
            suggestion = center + perturbation
            suggestions.append((suggestion, min_dist))

        # Sort by distance (prioritize far regions)
        suggestions.sort(key=lambda x: x[1], reverse=True)

        return [s[0] for s in suggestions[:n_suggestions]]

Collection Strategies

Strategy 1: Expert Demonstrations

When to use: Initial policy training, high-stakes tasks

Characteristics: - Optimal or near-optimal behavior - Consistent execution - Fast task completion - Higher cost per demonstration

class ExpertDemoCollector:
    """Collect high-quality expert demonstrations"""
    def __init__(self, robot, expert_id):
        self.robot = robot
        self.expert_id = expert_id
        self.validator = DemonstrationValidator(self.check_success)

    def collect_session(self, task, num_demos=50, max_attempts=100):
        """Collect expert demonstrations with validation"""
        successful_demos = []
        attempts = 0

        print(f"Expert {self.expert_id} collecting {num_demos} demos for: {task}")

        while len(successful_demos) < num_demos and attempts < max_attempts:
            attempts += 1

            # Reset to varied initial condition
            self.robot.reset(randomize=True)

            print(f"\nAttempt {attempts}: ", end='')

            # Record demonstration
            demo = self.record_demonstration()

            # Validate
            is_valid, message = self.validator.validate_demo(demo)

            if is_valid:
                successful_demos.append(demo)
                print(f"✓ Valid ({len(successful_demos)}/{num_demos})")
                self.save_demo(demo, len(successful_demos))
            else:
                print(f"✗ Invalid: {message}")

        # Final report
        success_rate = len(successful_demos) / attempts * 100
        print(f"\n{'='*60}")
        print(f"Collection Complete!")
        print(f"Success Rate: {success_rate:.1f}%")
        print(f"Valid Demos: {len(successful_demos)}")
        print(f"Total Attempts: {attempts}")

        return successful_demos

    def record_demonstration(self):
        """Record a single demonstration"""
        demo = []

        input("Press Enter to start demonstration...")

        while not self.robot.is_done():
            step_data = {
                'observation': self.robot.get_observation(),
                'action': self.robot.get_current_action(),
                'timestamp': time.time()
            }
            demo.append(step_data)
            time.sleep(0.03)  # 30 Hz

        return demo

    def check_success(self, demo):
        """Task-specific success check"""
        # Example: Check if object was grasped and placed
        final_obs = demo[-1]['observation']
        return self.robot.check_task_success(final_obs)

Strategy 2: Non-Expert Demonstrations

When to use: Large-scale data collection, diverse behaviors

Characteristics: - More variable quality - Natural variation in strategies - Lower cost, higher volume - Requires filtering

class CrowdsourcedDemoCollector:
    """Collect demonstrations from multiple non-expert operators"""
    def __init__(self, robot):
        self.robot = robot
        self.all_demos = []
        self.operator_stats = {}

    def collect_with_feedback(self, operator_id, task):
        """Collect demo with real-time feedback"""
        print(f"\nOperator {operator_id}: {task}")
        print("Tips: Smooth motions, complete the task successfully")

        demo = self.record_with_guidance()

        # Immediate feedback
        quality_score = self.assess_quality(demo)

        if quality_score > 0.7:
            print(f"✓ Great demo! Quality: {quality_score:.2f}")
            self.all_demos.append({
                'demo': demo,
                'operator_id': operator_id,
                'quality': quality_score
            })
        elif quality_score > 0.4:
            print(f"○ Acceptable demo. Quality: {quality_score:.2f}")
            print("Try to be smoother and more efficient next time!")
            self.all_demos.append({
                'demo': demo,
                'operator_id': operator_id,
                'quality': quality_score
            })
        else:
            print(f"✗ Demo needs improvement. Quality: {quality_score:.2f}")
            print("Issues detected:")
            self.provide_feedback(demo)

        # Update operator stats
        if operator_id not in self.operator_stats:
            self.operator_stats[operator_id] = {'count': 0, 'avg_quality': 0}

        stats = self.operator_stats[operator_id]
        stats['count'] += 1
        stats['avg_quality'] = (
            (stats['avg_quality'] * (stats['count'] - 1) + quality_score) /
            stats['count']
        )

    def assess_quality(self, demo):
        """Compute quality score [0, 1]"""
        scores = []

        # Task completion (most important)
        success = self.check_task_completion(demo)
        scores.append(1.0 if success else 0.0)

        # Trajectory smoothness
        actions = np.array([s['action'] for s in demo])
        smoothness = self.compute_smoothness(actions)
        smoothness_score = np.exp(-smoothness)  # Lower is better
        scores.append(smoothness_score)

        # Efficiency (trajectory length)
        efficiency = min(1.0, 100 / len(demo))  # Prefer ~100 steps
        scores.append(efficiency)

        # Weighted average
        weights = [0.5, 0.3, 0.2]  # Success is most important
        return np.average(scores, weights=weights)

    def provide_feedback(self, demo):
        """Give specific feedback on failed demo"""
        actions = np.array([s['action'] for s in demo])

        # Check for excessive jerking
        jerk = np.diff(actions, n=2, axis=0)
        jerk_norm = np.linalg.norm(jerk, axis=1)

        if jerk_norm.mean() > 0.3:
            print("  - Movements were too jerky. Try smoother motions.")

        # Check if task was completed
        if not self.check_task_completion(demo):
            print("  - Task not completed successfully. Ensure object reaches target.")

        # Check for efficiency
        if len(demo) > 500:
            print("  - Demonstration was too long. Try a more direct approach.")

    def filter_best_demos(self, top_k=None, quality_threshold=0.6):
        """Filter to best demonstrations"""
        # Sort by quality
        sorted_demos = sorted(
            self.all_demos,
            key=lambda x: x['quality'],
            reverse=True
        )

        # Filter by threshold
        filtered = [d for d in sorted_demos if d['quality'] >= quality_threshold]

        # Take top k if specified
        if top_k is not None:
            filtered = filtered[:top_k]

        print(f"\nFiltered {len(filtered)} / {len(self.all_demos)} demonstrations")
        print(f"Quality range: {filtered[-1]['quality']:.2f} - {filtered[0]['quality']:.2f}")

        return [d['demo'] for d in filtered]

Strategy 3: Intervention-Based Collection

Collect demos where expert intervenes only when needed:

class InterventionCollector:
    """Collect demos with expert interventions"""
    def __init__(self, robot, base_policy=None):
        self.robot = robot
        self.base_policy = base_policy  # Can be random or partially trained

    def collect_with_interventions(self, num_episodes=100):
        """Collect demos with interventions"""
        all_data = []
        intervention_stats = []

        for ep in range(num_episodes):
            episode_data = []
            interventions = 0
            autonomous_steps = 0

            obs = self.robot.reset()
            done = False

            while not done:
                # Policy proposes action
                if self.base_policy:
                    proposed_action = self.base_policy.predict(obs)
                else:
                    proposed_action = self.robot.sample_random_action()

                # Show proposed action to expert
                # Expert can either accept or override
                action, intervened = self.get_expert_action(
                    obs, proposed_action
                )

                if intervened:
                    interventions += 1
                else:
                    autonomous_steps += 1

                # Execute action
                next_obs, reward, done, info = self.robot.step(action)

                episode_data.append({
                    'observation': obs,
                    'action': action,
                    'intervened': intervened,
                    'proposed_action': proposed_action
                })

                obs = next_obs

            intervention_rate = interventions / len(episode_data)
            intervention_stats.append(intervention_rate)

            all_data.extend(episode_data)

            print(f"Episode {ep+1}: {len(episode_data)} steps, "
                  f"{intervention_rate*100:.1f}% interventions")

        # Analyze intervention patterns
        print(f"\nAverage intervention rate: {np.mean(intervention_stats)*100:.1f}%")

        return all_data, intervention_stats

    def get_expert_action(self, obs, proposed_action):
        """Get expert action (either accept or override)"""
        # Display proposed action
        self.robot.visualize_proposed_action(proposed_action)

        # Expert input with timeout
        override = self.check_for_expert_override(timeout=0.1)

        if override is not None:
            return override, True  # Intervened
        else:
            return proposed_action, False  # Accepted

Multi-Modal Demonstrations

Modern robot learning benefits from multi-modal data:

class MultiModalDemoRecorder:
    """Record demonstrations with multiple modalities"""
    def __init__(self, robot, cameras, audio=False, force_torque=False):
        self.robot = robot
        self.cameras = cameras
        self.record_audio = audio
        self.record_force = force_torque

    def record_demo(self):
        """Record multi-modal demonstration"""
        demo = {
            'observations': [],
            'actions': [],
            'images': {cam_id: [] for cam_id in self.cameras.keys()},
            'metadata': {
                'start_time': time.time(),
                'operator_id': input("Operator ID: "),
                'task_description': input("Task description: ")
            }
        }

        if self.record_audio:
            demo['audio'] = []
            self.start_audio_recording()

        if self.record_force:
            demo['force_torque'] = []

        print("Starting recording in 3...")
        time.sleep(1)
        print("2...")
        time.sleep(1)
        print("1...")
        time.sleep(1)
        print("Recording!")

        start = time.time()
        frame_idx = 0

        while True:
            frame_start = time.time()

            # Robot state
            obs = self.robot.get_state()
            action = self.robot.get_current_action()

            demo['observations'].append(obs)
            demo['actions'].append(action)

            # Multi-camera images
            for cam_id, camera in self.cameras.items():
                image = camera.get_image()
                demo['images'][cam_id].append(image)

            # Force/torque sensors
            if self.record_force:
                ft_reading = self.robot.get_force_torque()
                demo['force_torque'].append(ft_reading)

            # Audio (for language instructions)
            if self.record_audio:
                audio_chunk = self.get_audio_chunk()
                demo['audio'].append(audio_chunk)

            # Check for stop condition
            if self.check_stop_condition():
                break

            frame_idx += 1

            # Maintain 30 Hz
            elapsed = time.time() - frame_start
            if elapsed < 1/30:
                time.sleep(1/30 - elapsed)

        demo['metadata']['end_time'] = time.time()
        demo['metadata']['duration'] = demo['metadata']['end_time'] - demo['metadata']['start_time']
        demo['metadata']['num_frames'] = frame_idx

        print(f"Recording complete: {frame_idx} frames, {demo['metadata']['duration']:.1f}s")

        if self.record_audio:
            self.stop_audio_recording()
            # Transcribe audio to text
            demo['language_annotation'] = self.transcribe_audio(demo['audio'])

        return demo

    def transcribe_audio(self, audio_data):
        """Transcribe audio to text using Whisper or similar"""
        # Placeholder - integrate with Whisper/speech-to-text
        print("Transcribing audio...")
        # transcription = whisper.transcribe(audio_data)
        # return transcription
        return "Manual annotation required"

Failure Recovery Demonstrations

Teach the policy how to recover from failures:

class FailureRecoveryCollector:
    """Collect demonstrations including failure recovery"""
    def __init__(self, robot):
        self.robot = robot

    def collect_with_failures(self, num_demos=50):
        """Collect demos that include and recover from failures"""
        demos = []

        for i in range(num_demos):
            print(f"\nDemo {i+1}/{num_demos}")
            print("Include intentional failures and recoveries")

            demo = {
                'trajectory': [],
                'failure_points': [],
                'recovery_points': []
            }

            obs = self.robot.reset()
            step = 0

            while True:
                action = self.get_operator_action()

                # Check if operator marks this as a failure
                if self.check_failure_marker():
                    demo['failure_points'].append(step)
                    print(f"  [Step {step}] Failure marked")

                # Check if operator marks recovery
                if self.check_recovery_marker():
                    demo['recovery_points'].append(step)
                    print(f"  [Step {step}] Recovery marked")

                next_obs, _, done, _ = self.robot.step(action)

                demo['trajectory'].append({
                    'obs': obs,
                    'action': action,
                    'step': step
                })

                obs = next_obs
                step += 1

                if done or self.check_stop():
                    break

            # Analyze failure-recovery pairs
            if len(demo['failure_points']) > 0:
                print(f"  Recorded {len(demo['failure_points'])} failures")
                print(f"  Recorded {len(demo['recovery_points'])} recoveries")

                # Extract failure-recovery subsequences
                demo['recovery_examples'] = self.extract_recovery_sequences(demo)

            demos.append(demo)

        return demos

    def extract_recovery_sequences(self, demo):
        """Extract subsequences showing failure → recovery"""
        recoveries = []

        failure_pts = demo['failure_points']
        recovery_pts = demo['recovery_points']

        for fail_idx in failure_pts:
            # Find next recovery point
            next_recovery = [r for r in recovery_pts if r > fail_idx]

            if len(next_recovery) > 0:
                recovery_idx = next_recovery[0]

                # Extract subsequence
                sequence = demo['trajectory'][fail_idx:recovery_idx+1]

                recoveries.append({
                    'failure_step': fail_idx,
                    'recovery_step': recovery_idx,
                    'sequence': sequence,
                    'duration': recovery_idx - fail_idx
                })

        return recoveries

Data Augmentation for Demonstrations

Increase demo diversity through augmentation:

class DemonstrationAugmenter:
    """Augment demonstrations to increase diversity"""
    def __init__(self):
        pass

    def augment_vision(self, demo, augmentations=None):
        """Apply visual augmentations"""
        if augmentations is None:
            augmentations = ['color_jitter', 'brightness', 'blur']

        augmented_demo = copy.deepcopy(demo)

        for step in augmented_demo:
            image = step['observation']['image']

            for aug in augmentations:
                if aug == 'color_jitter':
                    image = self.color_jitter(image)
                elif aug == 'brightness':
                    image = self.brightness(image)
                elif aug == 'blur':
                    image = self.gaussian_blur(image)
                elif aug == 'noise':
                    image = self.add_noise(image)

            step['observation']['image'] = image

        return augmented_demo

    def time_warp(self, demo, warp_factor=1.2):
        """Time warp: slow down or speed up execution"""
        if warp_factor > 1.0:
            # Slow down: interpolate intermediate steps
            warped = []
            for i in range(len(demo) - 1):
                warped.append(demo[i])

                # Add interpolated step
                interp_step = self.interpolate_steps(demo[i], demo[i+1])
                warped.append(interp_step)

            warped.append(demo[-1])
            return warped

        else:
            # Speed up: subsample
            indices = np.linspace(0, len(demo)-1,
                                  int(len(demo) * warp_factor)).astype(int)
            return [demo[i] for i in indices]

    def add_noise_to_actions(self, demo, noise_std=0.01):
        """Add small noise to actions"""
        augmented = copy.deepcopy(demo)

        for step in augmented:
            action = np.array(step['action'])
            noise = np.random.randn(*action.shape) * noise_std
            step['action'] = action + noise

        return augmented

    def spatial_transform(self, demo, translation=None, rotation=None):
        """Apply spatial transformation to entire demo"""
        # Useful for 2D navigation or tabletop manipulation
        augmented = copy.deepcopy(demo)

        if translation is None:
            translation = np.random.randn(2) * 0.05  # 5cm random shift

        if rotation is None:
            rotation = np.random.randn() * 0.1  # Small rotation

        # Transform all poses in the demo
        for step in augmented:
            # Transform state
            if 'ee_pos' in step['observation']:
                pos = step['observation']['ee_pos'][:2]  # x, y
                transformed_pos = self.rotate_2d(pos, rotation) + translation
                step['observation']['ee_pos'][:2] = transformed_pos

            # Transform actions similarly
            if len(step['action']) >= 2:
                action_pos = step['action'][:2]
                transformed_action = self.rotate_2d(action_pos, rotation)
                step['action'][:2] = transformed_action

        return augmented

    def rotate_2d(self, point, angle):
        """Rotate 2D point by angle"""
        c, s = np.cos(angle), np.sin(angle)
        R = np.array([[c, -s], [s, c]])
        return R @ point

Active Learning for Demonstrations

Intelligently select which demonstrations to collect next:

class ActiveDemonstrationSelector:
    """Select most informative demonstrations to collect"""
    def __init__(self, policy, uncertainty_estimator='ensemble'):
        self.policy = policy
        self.uncertainty_estimator = uncertainty_estimator

    def suggest_demo_scenarios(self, candidate_scenarios, n_select=10):
        """Suggest which scenarios to collect demos for"""
        uncertainties = []

        for scenario in candidate_scenarios:
            # Estimate policy uncertainty on this scenario
            uncertainty = self.estimate_uncertainty(scenario)
            uncertainties.append(uncertainty)

        # Select highest uncertainty scenarios
        uncertainties = np.array(uncertainties)
        top_indices = np.argsort(uncertainties)[-n_select:]

        selected_scenarios = [candidate_scenarios[i] for i in top_indices]

        print(f"Selected {n_select} high-uncertainty scenarios:")
        for i, (idx, scenario) in enumerate(zip(top_indices, selected_scenarios)):
            print(f"{i+1}. Scenario {idx}: uncertainty={uncertainties[idx]:.3f}")
            print(f"   {scenario['description']}")

        return selected_scenarios

    def estimate_uncertainty(self, scenario):
        """Estimate policy uncertainty for scenario"""
        if self.uncertainty_estimator == 'ensemble':
            return self.ensemble_uncertainty(scenario)
        elif self.uncertainty_estimator == 'dropout':
            return self.mc_dropout_uncertainty(scenario)
        else:
            return self.disagreement_uncertainty(scenario)

    def ensemble_uncertainty(self, scenario, n_rollouts=10):
        """Estimate uncertainty using ensemble disagreement"""
        # Assume policy is an ensemble
        predictions = []

        obs = scenario['initial_state']

        for _ in range(n_rollouts):
            # Rollout with different ensemble members
            action = self.policy.predict(obs, use_random_member=True)
            predictions.append(action)

        # Uncertainty = variance across predictions
        predictions = np.array(predictions)
        uncertainty = predictions.var(axis=0).mean()

        return uncertainty

Best Practices Summary

DO:

✓ Validate demonstrations immediately after collection ✓ Collect diverse initial conditions and execution strategies ✓ Include failure recovery demonstrations ✓ Record multi-modal data (vision, force, audio) ✓ Provide immediate feedback to operators ✓ Track quality metrics throughout collection ✓ Use active learning to focus on uncertain regions

DON'T:

✗Accept low-quality demonstrations just to hit quantity targets ✗Collect all demos from identical initial conditions ✗Ignore failed demonstrations (they contain valuable info) ✗Rush operators - quality > speed ✗Forget to label and annotate language descriptions ✗Skip validation steps

Next Steps