Human Demonstrations for Robot Learning¶
High-quality demonstrations are the foundation of successful imitation learning and data-driven robot policies.
Why Demonstrations Matter¶
Quality demonstrations directly impact:
- Policy Performance: Better demos → better learned policies
- Sample Efficiency: High-quality demos require fewer samples
- Generalization: Diverse demos improve out-of-distribution performance
- Safety: Safe demonstrations teach safe behaviors
Demonstration Quality Criteria¶
1. Task Success¶
Every demonstration should successfully complete the task:
class DemonstrationValidator:
"""Validate demonstration quality"""
def __init__(self, success_checker):
self.success_checker = success_checker
self.metrics = {
'success_rate': [],
'trajectory_length': [],
'smoothness': [],
'diversity': []
}
def validate_demo(self, demo):
"""Check if demonstration meets quality criteria"""
# Task success
success = self.success_checker(demo)
if not success:
return False, "Task not completed successfully"
# Trajectory smoothness
actions = np.array([step['action'] for step in demo])
action_diff = np.diff(actions, axis=0)
jerk = np.linalg.norm(action_diff, axis=1).mean()
if jerk > 0.5: # Threshold for jerkiness
return False, f"Trajectory too jerky: {jerk:.3f}"
# Reasonable length
if len(demo) < 10:
return False, "Demonstration too short"
if len(demo) > 1000:
return False, "Demonstration too long (likely inefficient)"
# State coverage
states = np.array([step['observation']['state'] for step in demo])
state_variance = states.var(axis=0).mean()
if state_variance < 0.01:
return False, "Insufficient state coverage"
return True, "Valid demonstration"
def log_metrics(self, demo, success):
"""Track demonstration metrics"""
self.metrics['success_rate'].append(int(success))
self.metrics['trajectory_length'].append(len(demo))
actions = np.array([step['action'] for step in demo])
smoothness = self._compute_smoothness(actions)
self.metrics['smoothness'].append(smoothness)
def _compute_smoothness(self, actions):
"""Compute trajectory smoothness (lower is better)"""
# Second derivative (acceleration)
accel = np.diff(actions, n=2, axis=0)
return np.linalg.norm(accel, axis=1).mean()
def report(self):
"""Generate quality report"""
print(f"Success Rate: {np.mean(self.metrics['success_rate']) * 100:.1f}%")
print(f"Avg Length: {np.mean(self.metrics['trajectory_length']):.1f} steps")
print(f"Avg Smoothness: {np.mean(self.metrics['smoothness']):.4f}")
2. Consistency¶
Multiple demonstrations of the same task should show consistent strategies:
def measure_demonstration_consistency(demos):
"""Measure consistency across demonstrations"""
# Extract key waypoints from each demo
waypoints_list = []
for demo in demos:
waypoints = extract_waypoints(demo)
waypoints_list.append(waypoints)
# Compare pairwise distances
consistency_scores = []
for i in range(len(waypoints_list)):
for j in range(i+1, len(waypoints_list)):
# Dynamic Time Warping distance
distance = dtw_distance(waypoints_list[i], waypoints_list[j])
consistency_scores.append(1.0 / (1.0 + distance))
return np.mean(consistency_scores)
def extract_waypoints(demo, threshold=0.1):
"""Extract key waypoints based on action magnitude changes"""
actions = np.array([step['action'] for step in demo])
states = np.array([step['observation']['state'] for step in demo])
waypoints = [states[0]] # Start
for i in range(1, len(actions)):
action_change = np.linalg.norm(actions[i] - actions[i-1])
if action_change > threshold:
waypoints.append(states[i])
waypoints.append(states[-1]) # End
return np.array(waypoints)
3. Diversity¶
Demonstrations should cover variations in initial conditions and execution:
class DiversityTracker:
"""Track and encourage demonstration diversity"""
def __init__(self, state_dim):
self.state_dim = state_dim
self.demo_states = []
def add_demonstration(self, demo):
"""Add demo and compute diversity metrics"""
states = np.array([step['observation']['state'] for step in demo])
self.demo_states.append(states)
def compute_coverage(self, bins=10):
"""Compute state space coverage"""
all_states = np.vstack(self.demo_states)
# Discretize each dimension
coverage = []
for dim in range(self.state_dim):
hist, _ = np.histogram(all_states[:, dim], bins=bins)
occupied_bins = np.sum(hist > 0)
coverage.append(occupied_bins / bins)
return np.mean(coverage)
def compute_diversity_score(self):
"""Measure diversity using determinant of covariance"""
all_states = np.vstack(self.demo_states)
cov = np.cov(all_states.T)
# Log determinant (volume of covariance ellipsoid)
sign, logdet = np.linalg.slogdet(cov)
return logdet if sign > 0 else -np.inf
def suggest_initial_conditions(self, n_suggestions=5):
"""Suggest diverse initial conditions for next demos"""
all_initial_states = np.array([demo[0] for demo in self.demo_states])
# Find underexplored regions using k-means
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_suggestions * 2)
kmeans.fit(all_initial_states)
# Suggest points far from existing demos
suggestions = []
for center in kmeans.cluster_centers_:
# Find nearest existing demo
distances = np.linalg.norm(all_initial_states - center, axis=1)
min_dist = distances.min()
# Perturb cluster center to create diverse starting point
perturbation = np.random.randn(self.state_dim) * 0.1
suggestion = center + perturbation
suggestions.append((suggestion, min_dist))
# Sort by distance (prioritize far regions)
suggestions.sort(key=lambda x: x[1], reverse=True)
return [s[0] for s in suggestions[:n_suggestions]]
Collection Strategies¶
Strategy 1: Expert Demonstrations¶
When to use: Initial policy training, high-stakes tasks
Characteristics: - Optimal or near-optimal behavior - Consistent execution - Fast task completion - Higher cost per demonstration
class ExpertDemoCollector:
"""Collect high-quality expert demonstrations"""
def __init__(self, robot, expert_id):
self.robot = robot
self.expert_id = expert_id
self.validator = DemonstrationValidator(self.check_success)
def collect_session(self, task, num_demos=50, max_attempts=100):
"""Collect expert demonstrations with validation"""
successful_demos = []
attempts = 0
print(f"Expert {self.expert_id} collecting {num_demos} demos for: {task}")
while len(successful_demos) < num_demos and attempts < max_attempts:
attempts += 1
# Reset to varied initial condition
self.robot.reset(randomize=True)
print(f"\nAttempt {attempts}: ", end='')
# Record demonstration
demo = self.record_demonstration()
# Validate
is_valid, message = self.validator.validate_demo(demo)
if is_valid:
successful_demos.append(demo)
print(f"✓ Valid ({len(successful_demos)}/{num_demos})")
self.save_demo(demo, len(successful_demos))
else:
print(f"✗ Invalid: {message}")
# Final report
success_rate = len(successful_demos) / attempts * 100
print(f"\n{'='*60}")
print(f"Collection Complete!")
print(f"Success Rate: {success_rate:.1f}%")
print(f"Valid Demos: {len(successful_demos)}")
print(f"Total Attempts: {attempts}")
return successful_demos
def record_demonstration(self):
"""Record a single demonstration"""
demo = []
input("Press Enter to start demonstration...")
while not self.robot.is_done():
step_data = {
'observation': self.robot.get_observation(),
'action': self.robot.get_current_action(),
'timestamp': time.time()
}
demo.append(step_data)
time.sleep(0.03) # 30 Hz
return demo
def check_success(self, demo):
"""Task-specific success check"""
# Example: Check if object was grasped and placed
final_obs = demo[-1]['observation']
return self.robot.check_task_success(final_obs)
Strategy 2: Non-Expert Demonstrations¶
When to use: Large-scale data collection, diverse behaviors
Characteristics: - More variable quality - Natural variation in strategies - Lower cost, higher volume - Requires filtering
class CrowdsourcedDemoCollector:
"""Collect demonstrations from multiple non-expert operators"""
def __init__(self, robot):
self.robot = robot
self.all_demos = []
self.operator_stats = {}
def collect_with_feedback(self, operator_id, task):
"""Collect demo with real-time feedback"""
print(f"\nOperator {operator_id}: {task}")
print("Tips: Smooth motions, complete the task successfully")
demo = self.record_with_guidance()
# Immediate feedback
quality_score = self.assess_quality(demo)
if quality_score > 0.7:
print(f"✓ Great demo! Quality: {quality_score:.2f}")
self.all_demos.append({
'demo': demo,
'operator_id': operator_id,
'quality': quality_score
})
elif quality_score > 0.4:
print(f"○ Acceptable demo. Quality: {quality_score:.2f}")
print("Try to be smoother and more efficient next time!")
self.all_demos.append({
'demo': demo,
'operator_id': operator_id,
'quality': quality_score
})
else:
print(f"✗ Demo needs improvement. Quality: {quality_score:.2f}")
print("Issues detected:")
self.provide_feedback(demo)
# Update operator stats
if operator_id not in self.operator_stats:
self.operator_stats[operator_id] = {'count': 0, 'avg_quality': 0}
stats = self.operator_stats[operator_id]
stats['count'] += 1
stats['avg_quality'] = (
(stats['avg_quality'] * (stats['count'] - 1) + quality_score) /
stats['count']
)
def assess_quality(self, demo):
"""Compute quality score [0, 1]"""
scores = []
# Task completion (most important)
success = self.check_task_completion(demo)
scores.append(1.0 if success else 0.0)
# Trajectory smoothness
actions = np.array([s['action'] for s in demo])
smoothness = self.compute_smoothness(actions)
smoothness_score = np.exp(-smoothness) # Lower is better
scores.append(smoothness_score)
# Efficiency (trajectory length)
efficiency = min(1.0, 100 / len(demo)) # Prefer ~100 steps
scores.append(efficiency)
# Weighted average
weights = [0.5, 0.3, 0.2] # Success is most important
return np.average(scores, weights=weights)
def provide_feedback(self, demo):
"""Give specific feedback on failed demo"""
actions = np.array([s['action'] for s in demo])
# Check for excessive jerking
jerk = np.diff(actions, n=2, axis=0)
jerk_norm = np.linalg.norm(jerk, axis=1)
if jerk_norm.mean() > 0.3:
print(" - Movements were too jerky. Try smoother motions.")
# Check if task was completed
if not self.check_task_completion(demo):
print(" - Task not completed successfully. Ensure object reaches target.")
# Check for efficiency
if len(demo) > 500:
print(" - Demonstration was too long. Try a more direct approach.")
def filter_best_demos(self, top_k=None, quality_threshold=0.6):
"""Filter to best demonstrations"""
# Sort by quality
sorted_demos = sorted(
self.all_demos,
key=lambda x: x['quality'],
reverse=True
)
# Filter by threshold
filtered = [d for d in sorted_demos if d['quality'] >= quality_threshold]
# Take top k if specified
if top_k is not None:
filtered = filtered[:top_k]
print(f"\nFiltered {len(filtered)} / {len(self.all_demos)} demonstrations")
print(f"Quality range: {filtered[-1]['quality']:.2f} - {filtered[0]['quality']:.2f}")
return [d['demo'] for d in filtered]
Strategy 3: Intervention-Based Collection¶
Collect demos where expert intervenes only when needed:
class InterventionCollector:
"""Collect demos with expert interventions"""
def __init__(self, robot, base_policy=None):
self.robot = robot
self.base_policy = base_policy # Can be random or partially trained
def collect_with_interventions(self, num_episodes=100):
"""Collect demos with interventions"""
all_data = []
intervention_stats = []
for ep in range(num_episodes):
episode_data = []
interventions = 0
autonomous_steps = 0
obs = self.robot.reset()
done = False
while not done:
# Policy proposes action
if self.base_policy:
proposed_action = self.base_policy.predict(obs)
else:
proposed_action = self.robot.sample_random_action()
# Show proposed action to expert
# Expert can either accept or override
action, intervened = self.get_expert_action(
obs, proposed_action
)
if intervened:
interventions += 1
else:
autonomous_steps += 1
# Execute action
next_obs, reward, done, info = self.robot.step(action)
episode_data.append({
'observation': obs,
'action': action,
'intervened': intervened,
'proposed_action': proposed_action
})
obs = next_obs
intervention_rate = interventions / len(episode_data)
intervention_stats.append(intervention_rate)
all_data.extend(episode_data)
print(f"Episode {ep+1}: {len(episode_data)} steps, "
f"{intervention_rate*100:.1f}% interventions")
# Analyze intervention patterns
print(f"\nAverage intervention rate: {np.mean(intervention_stats)*100:.1f}%")
return all_data, intervention_stats
def get_expert_action(self, obs, proposed_action):
"""Get expert action (either accept or override)"""
# Display proposed action
self.robot.visualize_proposed_action(proposed_action)
# Expert input with timeout
override = self.check_for_expert_override(timeout=0.1)
if override is not None:
return override, True # Intervened
else:
return proposed_action, False # Accepted
Multi-Modal Demonstrations¶
Modern robot learning benefits from multi-modal data:
class MultiModalDemoRecorder:
"""Record demonstrations with multiple modalities"""
def __init__(self, robot, cameras, audio=False, force_torque=False):
self.robot = robot
self.cameras = cameras
self.record_audio = audio
self.record_force = force_torque
def record_demo(self):
"""Record multi-modal demonstration"""
demo = {
'observations': [],
'actions': [],
'images': {cam_id: [] for cam_id in self.cameras.keys()},
'metadata': {
'start_time': time.time(),
'operator_id': input("Operator ID: "),
'task_description': input("Task description: ")
}
}
if self.record_audio:
demo['audio'] = []
self.start_audio_recording()
if self.record_force:
demo['force_torque'] = []
print("Starting recording in 3...")
time.sleep(1)
print("2...")
time.sleep(1)
print("1...")
time.sleep(1)
print("Recording!")
start = time.time()
frame_idx = 0
while True:
frame_start = time.time()
# Robot state
obs = self.robot.get_state()
action = self.robot.get_current_action()
demo['observations'].append(obs)
demo['actions'].append(action)
# Multi-camera images
for cam_id, camera in self.cameras.items():
image = camera.get_image()
demo['images'][cam_id].append(image)
# Force/torque sensors
if self.record_force:
ft_reading = self.robot.get_force_torque()
demo['force_torque'].append(ft_reading)
# Audio (for language instructions)
if self.record_audio:
audio_chunk = self.get_audio_chunk()
demo['audio'].append(audio_chunk)
# Check for stop condition
if self.check_stop_condition():
break
frame_idx += 1
# Maintain 30 Hz
elapsed = time.time() - frame_start
if elapsed < 1/30:
time.sleep(1/30 - elapsed)
demo['metadata']['end_time'] = time.time()
demo['metadata']['duration'] = demo['metadata']['end_time'] - demo['metadata']['start_time']
demo['metadata']['num_frames'] = frame_idx
print(f"Recording complete: {frame_idx} frames, {demo['metadata']['duration']:.1f}s")
if self.record_audio:
self.stop_audio_recording()
# Transcribe audio to text
demo['language_annotation'] = self.transcribe_audio(demo['audio'])
return demo
def transcribe_audio(self, audio_data):
"""Transcribe audio to text using Whisper or similar"""
# Placeholder - integrate with Whisper/speech-to-text
print("Transcribing audio...")
# transcription = whisper.transcribe(audio_data)
# return transcription
return "Manual annotation required"
Failure Recovery Demonstrations¶
Teach the policy how to recover from failures:
class FailureRecoveryCollector:
"""Collect demonstrations including failure recovery"""
def __init__(self, robot):
self.robot = robot
def collect_with_failures(self, num_demos=50):
"""Collect demos that include and recover from failures"""
demos = []
for i in range(num_demos):
print(f"\nDemo {i+1}/{num_demos}")
print("Include intentional failures and recoveries")
demo = {
'trajectory': [],
'failure_points': [],
'recovery_points': []
}
obs = self.robot.reset()
step = 0
while True:
action = self.get_operator_action()
# Check if operator marks this as a failure
if self.check_failure_marker():
demo['failure_points'].append(step)
print(f" [Step {step}] Failure marked")
# Check if operator marks recovery
if self.check_recovery_marker():
demo['recovery_points'].append(step)
print(f" [Step {step}] Recovery marked")
next_obs, _, done, _ = self.robot.step(action)
demo['trajectory'].append({
'obs': obs,
'action': action,
'step': step
})
obs = next_obs
step += 1
if done or self.check_stop():
break
# Analyze failure-recovery pairs
if len(demo['failure_points']) > 0:
print(f" Recorded {len(demo['failure_points'])} failures")
print(f" Recorded {len(demo['recovery_points'])} recoveries")
# Extract failure-recovery subsequences
demo['recovery_examples'] = self.extract_recovery_sequences(demo)
demos.append(demo)
return demos
def extract_recovery_sequences(self, demo):
"""Extract subsequences showing failure → recovery"""
recoveries = []
failure_pts = demo['failure_points']
recovery_pts = demo['recovery_points']
for fail_idx in failure_pts:
# Find next recovery point
next_recovery = [r for r in recovery_pts if r > fail_idx]
if len(next_recovery) > 0:
recovery_idx = next_recovery[0]
# Extract subsequence
sequence = demo['trajectory'][fail_idx:recovery_idx+1]
recoveries.append({
'failure_step': fail_idx,
'recovery_step': recovery_idx,
'sequence': sequence,
'duration': recovery_idx - fail_idx
})
return recoveries
Data Augmentation for Demonstrations¶
Increase demo diversity through augmentation:
class DemonstrationAugmenter:
"""Augment demonstrations to increase diversity"""
def __init__(self):
pass
def augment_vision(self, demo, augmentations=None):
"""Apply visual augmentations"""
if augmentations is None:
augmentations = ['color_jitter', 'brightness', 'blur']
augmented_demo = copy.deepcopy(demo)
for step in augmented_demo:
image = step['observation']['image']
for aug in augmentations:
if aug == 'color_jitter':
image = self.color_jitter(image)
elif aug == 'brightness':
image = self.brightness(image)
elif aug == 'blur':
image = self.gaussian_blur(image)
elif aug == 'noise':
image = self.add_noise(image)
step['observation']['image'] = image
return augmented_demo
def time_warp(self, demo, warp_factor=1.2):
"""Time warp: slow down or speed up execution"""
if warp_factor > 1.0:
# Slow down: interpolate intermediate steps
warped = []
for i in range(len(demo) - 1):
warped.append(demo[i])
# Add interpolated step
interp_step = self.interpolate_steps(demo[i], demo[i+1])
warped.append(interp_step)
warped.append(demo[-1])
return warped
else:
# Speed up: subsample
indices = np.linspace(0, len(demo)-1,
int(len(demo) * warp_factor)).astype(int)
return [demo[i] for i in indices]
def add_noise_to_actions(self, demo, noise_std=0.01):
"""Add small noise to actions"""
augmented = copy.deepcopy(demo)
for step in augmented:
action = np.array(step['action'])
noise = np.random.randn(*action.shape) * noise_std
step['action'] = action + noise
return augmented
def spatial_transform(self, demo, translation=None, rotation=None):
"""Apply spatial transformation to entire demo"""
# Useful for 2D navigation or tabletop manipulation
augmented = copy.deepcopy(demo)
if translation is None:
translation = np.random.randn(2) * 0.05 # 5cm random shift
if rotation is None:
rotation = np.random.randn() * 0.1 # Small rotation
# Transform all poses in the demo
for step in augmented:
# Transform state
if 'ee_pos' in step['observation']:
pos = step['observation']['ee_pos'][:2] # x, y
transformed_pos = self.rotate_2d(pos, rotation) + translation
step['observation']['ee_pos'][:2] = transformed_pos
# Transform actions similarly
if len(step['action']) >= 2:
action_pos = step['action'][:2]
transformed_action = self.rotate_2d(action_pos, rotation)
step['action'][:2] = transformed_action
return augmented
def rotate_2d(self, point, angle):
"""Rotate 2D point by angle"""
c, s = np.cos(angle), np.sin(angle)
R = np.array([[c, -s], [s, c]])
return R @ point
Active Learning for Demonstrations¶
Intelligently select which demonstrations to collect next:
class ActiveDemonstrationSelector:
"""Select most informative demonstrations to collect"""
def __init__(self, policy, uncertainty_estimator='ensemble'):
self.policy = policy
self.uncertainty_estimator = uncertainty_estimator
def suggest_demo_scenarios(self, candidate_scenarios, n_select=10):
"""Suggest which scenarios to collect demos for"""
uncertainties = []
for scenario in candidate_scenarios:
# Estimate policy uncertainty on this scenario
uncertainty = self.estimate_uncertainty(scenario)
uncertainties.append(uncertainty)
# Select highest uncertainty scenarios
uncertainties = np.array(uncertainties)
top_indices = np.argsort(uncertainties)[-n_select:]
selected_scenarios = [candidate_scenarios[i] for i in top_indices]
print(f"Selected {n_select} high-uncertainty scenarios:")
for i, (idx, scenario) in enumerate(zip(top_indices, selected_scenarios)):
print(f"{i+1}. Scenario {idx}: uncertainty={uncertainties[idx]:.3f}")
print(f" {scenario['description']}")
return selected_scenarios
def estimate_uncertainty(self, scenario):
"""Estimate policy uncertainty for scenario"""
if self.uncertainty_estimator == 'ensemble':
return self.ensemble_uncertainty(scenario)
elif self.uncertainty_estimator == 'dropout':
return self.mc_dropout_uncertainty(scenario)
else:
return self.disagreement_uncertainty(scenario)
def ensemble_uncertainty(self, scenario, n_rollouts=10):
"""Estimate uncertainty using ensemble disagreement"""
# Assume policy is an ensemble
predictions = []
obs = scenario['initial_state']
for _ in range(n_rollouts):
# Rollout with different ensemble members
action = self.policy.predict(obs, use_random_member=True)
predictions.append(action)
# Uncertainty = variance across predictions
predictions = np.array(predictions)
uncertainty = predictions.var(axis=0).mean()
return uncertainty
Best Practices Summary¶
DO:¶
✓ Validate demonstrations immediately after collection ✓ Collect diverse initial conditions and execution strategies ✓ Include failure recovery demonstrations ✓ Record multi-modal data (vision, force, audio) ✓ Provide immediate feedback to operators ✓ Track quality metrics throughout collection ✓ Use active learning to focus on uncertain regions
DON'T:¶
✗Accept low-quality demonstrations just to hit quantity targets ✗Collect all demos from identical initial conditions ✗Ignore failed demonstrations (they contain valuable info) ✗Rush operators - quality > speed ✗Forget to label and annotate language descriptions ✗Skip validation steps
Next Steps¶
- Data Quality - Validate and improve dataset quality
- LeRobot Format - Structure your demonstrations
- Imitation Learning - Train policies from demonstrations