Data Quality for Robot Learning¶
Data quality is the single most important factor determining the success of data-driven robot learning.
Why Data Quality Matters¶
Impact on Performance: - 100 high-quality demos often outperform 1000 low-quality demos - Quality issues compound through training - Garbage in = garbage out (especially for imitation learning)
Common Quality Issues: - Incomplete task executions - Sensor noise and artifacts - Synchronization errors between modalities - Annotation errors - Distribution shift - Duplicate or near-duplicate data
Quality Metrics¶
1. Task Success Rate¶
Most fundamental metric - did the demonstration complete the task?
class TaskSuccessChecker:
"""Check if demonstrations successfully complete tasks"""
def __init__(self, task_spec):
self.task_spec = task_spec
def check_success(self, demo):
"""Verify task completion"""
final_state = demo[-1]['observation']
# Task-specific checks
if self.task_spec['task'] == 'pick_and_place':
return self.check_pick_and_place(demo, final_state)
elif self.task_spec['task'] == 'drawer_opening':
return self.check_drawer_opening(demo, final_state)
elif self.task_spec['task'] == 'wiping':
return self.check_wiping(demo, final_state)
else:
raise NotImplementedError(f"Task {self.task_spec['task']} not implemented")
def check_pick_and_place(self, demo, final_state):
"""Check pick and place success"""
# 1. Object grasped at some point
grasp_detected = any(
step['observation'].get('gripper_force', 0) > 0.5
for step in demo
)
# 2. Object in target region at end
if 'object_pos' in final_state and 'target_pos' in self.task_spec:
obj_pos = np.array(final_state['object_pos'])
target_pos = np.array(self.task_spec['target_pos'])
distance = np.linalg.norm(obj_pos - target_pos)
in_target = distance < self.task_spec.get('target_radius', 0.05)
else:
in_target = False
# 3. Gripper open at end (object released)
gripper_open = final_state.get('gripper_state', 0) < 0.1
success = grasp_detected and in_target and gripper_open
return {
'success': success,
'grasp_detected': grasp_detected,
'in_target': in_target,
'gripper_open': gripper_open
}
def check_drawer_opening(self, demo, final_state):
"""Check drawer opening success"""
initial_state = demo[0]['observation']
initial_drawer_pos = initial_state.get('drawer_position', 0)
final_drawer_pos = final_state.get('drawer_position', 0)
# Drawer opened by at least threshold
opened = (final_drawer_pos - initial_drawer_pos) > self.task_spec.get('open_threshold', 0.1)
return {
'success': opened,
'drawer_displacement': final_drawer_pos - initial_drawer_pos
}
def check_wiping(self, demo, final_state):
"""Check wiping/cleaning success"""
# Check coverage of target area
if 'ee_trajectory' in demo:
trajectory = np.array([
step['observation']['ee_pos'][:2] # x, y only
for step in demo
])
coverage = self.compute_area_coverage(
trajectory,
self.task_spec['target_area']
)
success = coverage > self.task_spec.get('coverage_threshold', 0.8)
return {
'success': success,
'coverage': coverage
}
return {'success': False, 'coverage': 0.0}
def compute_area_coverage(self, trajectory, target_area):
"""Compute fraction of target area covered by trajectory"""
# Discretize target area into grid
grid_size = 50
x_min, x_max, y_min, y_max = target_area
x_bins = np.linspace(x_min, x_max, grid_size)
y_bins = np.linspace(y_min, y_max, grid_size)
covered = np.zeros((grid_size, grid_size), dtype=bool)
# Mark cells along trajectory
for point in trajectory:
x_idx = np.digitize(point[0], x_bins) - 1
y_idx = np.digitize(point[1], y_bins) - 1
if 0 <= x_idx < grid_size and 0 <= y_idx < grid_size:
covered[x_idx, y_idx] = True
return covered.mean()
2. Trajectory Smoothness¶
Smooth trajectories indicate confident, controlled execution:
def compute_trajectory_metrics(demo):
"""Compute trajectory quality metrics"""
actions = np.array([step['action'] for step in demo])
metrics = {}
# Velocity (first derivative)
velocity = np.diff(actions, axis=0)
metrics['avg_velocity'] = np.linalg.norm(velocity, axis=1).mean()
metrics['max_velocity'] = np.linalg.norm(velocity, axis=1).max()
# Acceleration (second derivative)
acceleration = np.diff(actions, n=2, axis=0)
metrics['avg_acceleration'] = np.linalg.norm(acceleration, axis=1).mean()
metrics['max_acceleration'] = np.linalg.norm(acceleration, axis=1).max()
# Jerk (third derivative) - smoothness indicator
jerk = np.diff(actions, n=3, axis=0)
metrics['avg_jerk'] = np.linalg.norm(jerk, axis=1).mean()
metrics['max_jerk'] = np.linalg.norm(jerk, axis=1).max()
# Spectral arc length (frequency domain smoothness)
metrics['spectral_arc_length'] = compute_spectral_arc_length(actions)
# Path efficiency
if 'ee_pos' in demo[0]['observation']:
ee_trajectory = np.array([
step['observation']['ee_pos']
for step in demo
])
metrics['path_efficiency'] = compute_path_efficiency(ee_trajectory)
return metrics
def compute_spectral_arc_length(trajectory):
"""Compute spectral arc length (lower = smoother)"""
# FFT of trajectory
freqs = np.fft.fft(trajectory, axis=0)
# Magnitude spectrum
magnitude = np.abs(freqs)
# Spectral arc length
arc_length = 0
for i in range(1, len(magnitude)):
arc_length += np.sqrt(
1 + (magnitude[i] - magnitude[i-1])**2
).mean()
return arc_length
def compute_path_efficiency(trajectory):
"""Ratio of direct distance to actual path length"""
# Direct distance from start to end
direct_distance = np.linalg.norm(trajectory[-1] - trajectory[0])
# Actual path length
path_length = np.sum([
np.linalg.norm(trajectory[i+1] - trajectory[i])
for i in range(len(trajectory) - 1)
])
# Efficiency (1.0 = straight line)
efficiency = direct_distance / (path_length + 1e-6)
return efficiency
3. Multi-Modal Synchronization¶
Critical for vision-language-action models:
class SynchronizationChecker:
"""Check synchronization across modalities"""
def __init__(self, tolerance_ms=50):
self.tolerance_ms = tolerance_ms
def check_demo_sync(self, demo):
"""Verify all modalities are properly synchronized"""
issues = []
# Check timestamps
if 'timestamps' in demo:
timestamps = demo['timestamps']
# Check for missing timestamps
for modality in ['image', 'state', 'action']:
if modality not in timestamps:
issues.append(f"Missing timestamps for {modality}")
# Check timestamp alignment
if len(set(len(ts) for ts in timestamps.values())) > 1:
issues.append("Modalities have different numbers of timestamps")
# Check for large gaps
for modality, ts in timestamps.items():
gaps = np.diff(ts)
expected_gap = 1000 / 30 # 30 Hz in ms
large_gaps = gaps > expected_gap * 2
if np.any(large_gaps):
gap_indices = np.where(large_gaps)[0]
issues.append(
f"{modality}: Found {len(gap_indices)} large timestamp gaps "
f"(max: {gaps.max():.1f}ms)"
)
# Check relative synchronization
sync_errors = self.compute_sync_errors(timestamps)
if np.any(sync_errors > self.tolerance_ms):
issues.append(
f"Synchronization errors up to {sync_errors.max():.1f}ms detected"
)
return {
'synchronized': len(issues) == 0,
'issues': issues
}
def compute_sync_errors(self, timestamps):
"""Compute synchronization errors between modalities"""
# Use state as reference
ref_timestamps = timestamps['state']
errors = []
for modality, ts in timestamps.items():
if modality == 'state':
continue
# Find nearest reference timestamp for each modality timestamp
for t in ts:
nearest_ref = ref_timestamps[np.argmin(np.abs(ref_timestamps - t))]
error = abs(t - nearest_ref)
errors.append(error)
return np.array(errors)
def resync_demo(self, demo):
"""Resynchronize demonstration by interpolation"""
# Create common timeline
all_timestamps = []
for ts in demo['timestamps'].values():
all_timestamps.extend(ts)
# Common timeline at 30 Hz
start_time = min(all_timestamps)
end_time = max(all_timestamps)
common_timeline = np.arange(start_time, end_time, 1000/30)
# Interpolate each modality to common timeline
resynced_demo = {'timeline': common_timeline}
for modality in ['image', 'state', 'action']:
original_ts = demo['timestamps'][modality]
original_data = demo[modality]
resynced_data = self.interpolate_modality(
original_ts,
original_data,
common_timeline
)
resynced_demo[modality] = resynced_data
return resynced_demo
def interpolate_modality(self, original_ts, original_data, target_ts):
"""Interpolate modality data to target timestamps"""
from scipy.interpolate import interp1d
if isinstance(original_data[0], np.ndarray):
# Numerical data (states, actions)
data_array = np.array(original_data)
# Interpolate each dimension
interpolated = []
for dim in range(data_array.shape[1]):
interp_func = interp1d(
original_ts,
data_array[:, dim],
kind='linear',
fill_value='extrapolate'
)
interpolated.append(interp_func(target_ts))
return np.column_stack(interpolated)
else:
# Images - use nearest neighbor
indices = [
np.argmin(np.abs(original_ts - t))
for t in target_ts
]
return [original_data[i] for i in indices]
4. Sensor Data Quality¶
Detect and handle sensor artifacts:
class SensorQualityChecker:
"""Check for sensor data quality issues"""
def __init__(self):
self.image_checks = ['motion_blur', 'overexposure', 'occlusion']
self.state_checks = ['outliers', 'dropouts', 'noise']
def check_image_quality(self, images):
"""Check vision sensor quality"""
issues = []
for i, img in enumerate(images):
# Motion blur detection
if self.detect_motion_blur(img):
issues.append(f"Frame {i}: Motion blur detected")
# Exposure issues
if self.detect_overexposure(img):
issues.append(f"Frame {i}: Overexposed")
if self.detect_underexposure(img):
issues.append(f"Frame {i}: Underexposed")
# Occlusion detection
if self.detect_occlusion(img):
issues.append(f"Frame {i}: Possible occlusion")
return {
'quality_ok': len(issues) == 0,
'issues': issues,
'blur_ratio': self.compute_blur_ratio(images),
'avg_brightness': self.compute_avg_brightness(images)
}
def detect_motion_blur(self, image):
"""Detect motion blur using Laplacian variance"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
variance = laplacian.var()
# Threshold: typical sharp images have variance > 100
return variance < 50
def detect_overexposure(self, image):
"""Detect overexposed regions"""
# Count pixels near maximum value
overexposed_pixels = (image > 250).sum()
total_pixels = image.size
# More than 10% overexposed is problematic
return (overexposed_pixels / total_pixels) > 0.1
def detect_underexposure(self, image):
"""Detect underexposed regions"""
# Count pixels near minimum value
underexposed_pixels = (image < 10).sum()
total_pixels = image.size
return (underexposed_pixels / total_pixels) > 0.1
def detect_occlusion(self, image, template=None):
"""Detect if robot/gripper occluding view"""
# Simple heuristic: check for large dark regions
# (assuming robot is darker than environment)
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
# Bottom portion of image (where gripper usually is)
bottom_region = gray[int(gray.shape[0]*0.7):, :]
# Large dark region suggests occlusion
dark_pixels = (bottom_region < 50).sum()
return (dark_pixels / bottom_region.size) > 0.3
def compute_blur_ratio(self, images):
"""Fraction of blurry frames"""
blurry_count = sum(self.detect_motion_blur(img) for img in images)
return blurry_count / len(images)
def compute_avg_brightness(self, images):
"""Average brightness across sequence"""
brightnesses = [img.mean() for img in images]
return np.mean(brightnesses)
def check_state_quality(self, states):
"""Check proprioceptive state quality"""
states_array = np.array(states)
issues = []
# Detect outliers
outliers = self.detect_outliers(states_array)
if len(outliers) > 0:
issues.append(f"Found {len(outliers)} outlier states at indices {outliers[:10]}")
# Detect dropouts (repeated values)
dropouts = self.detect_dropouts(states_array)
if len(dropouts) > 0:
issues.append(f"Found {len(dropouts)} potential sensor dropouts")
# Check noise level
noise_level = self.estimate_noise_level(states_array)
if noise_level > 0.05:
issues.append(f"High noise level: {noise_level:.4f}")
return {
'quality_ok': len(issues) == 0,
'issues': issues,
'outlier_count': len(outliers),
'dropout_count': len(dropouts),
'noise_level': noise_level
}
def detect_outliers(self, states, threshold=3.0):
"""Detect outlier states using z-score"""
mean = states.mean(axis=0)
std = states.std(axis=0)
z_scores = np.abs((states - mean) / (std + 1e-6))
# Indices where any dimension has high z-score
outlier_indices = np.where(np.any(z_scores > threshold, axis=1))[0]
return outlier_indices
def detect_dropouts(self, states, threshold=5):
"""Detect sensor dropouts (repeated identical values)"""
dropout_indices = []
for i in range(len(states) - threshold):
# Check if next `threshold` values are identical
window = states[i:i+threshold]
if np.all(window == window[0]):
dropout_indices.append(i)
return dropout_indices
def estimate_noise_level(self, states):
"""Estimate sensor noise level"""
# High-frequency component indicates noise
# Apply high-pass filter and measure variance
from scipy.signal import butter, filtfilt
b, a = butter(4, 0.1, btype='high')
noise_levels = []
for dim in range(states.shape[1]):
filtered = filtfilt(b, a, states[:, dim])
noise_levels.append(np.std(filtered))
return np.mean(noise_levels)
Dataset-Level Quality¶
Distribution Analysis¶
class DatasetQualityAnalyzer:
"""Analyze quality of entire dataset"""
def __init__(self, dataset):
self.dataset = dataset
def analyze_distribution(self):
"""Analyze dataset distribution"""
report = {}
# State space coverage
all_states = self.collect_all_states()
report['state_coverage'] = self.compute_coverage(all_states)
# Action distribution
all_actions = self.collect_all_actions()
report['action_distribution'] = self.compute_action_stats(all_actions)
# Task distribution
report['task_distribution'] = self.compute_task_distribution()
# Diversity metrics
report['diversity'] = self.compute_diversity()
# Balance metrics
report['balance'] = self.check_balance()
return report
def compute_coverage(self, states):
"""Compute state space coverage"""
# Discretize state space
bins_per_dim = 20
coverage = []
for dim in range(states.shape[1]):
hist, _ = np.histogram(states[:, dim], bins=bins_per_dim)
occupied = (hist > 0).sum()
coverage.append(occupied / bins_per_dim)
return {
'per_dimension': coverage,
'average': np.mean(coverage),
'min': np.min(coverage)
}
def compute_action_stats(self, actions):
"""Compute action distribution statistics"""
return {
'mean': actions.mean(axis=0).tolist(),
'std': actions.std(axis=0).tolist(),
'min': actions.min(axis=0).tolist(),
'max': actions.max(axis=0).tolist(),
'saturation_rate': self.compute_saturation_rate(actions)
}
def compute_saturation_rate(self, actions):
"""Fraction of actions at limits"""
# Assume actions in [-1, 1]
saturated = (np.abs(actions) > 0.95).any(axis=1)
return saturated.mean()
def compute_diversity(self):
"""Measure dataset diversity"""
# Initial state diversity
initial_states = np.array([
demo[0]['observation']['state']
for demo in self.dataset
])
# Determinant of covariance
cov = np.cov(initial_states.T)
sign, logdet = np.linalg.slogdet(cov)
diversity_score = logdet if sign > 0 else -np.inf
# Trajectory diversity (average DTW distance)
if len(self.dataset) > 10:
sample_indices = np.random.choice(len(self.dataset), 10, replace=False)
sample_demos = [self.dataset[i] for i in sample_indices]
pairwise_distances = []
for i in range(len(sample_demos)):
for j in range(i+1, len(sample_demos)):
dist = self.compute_trajectory_distance(
sample_demos[i],
sample_demos[j]
)
pairwise_distances.append(dist)
trajectory_diversity = np.mean(pairwise_distances)
else:
trajectory_diversity = None
return {
'initial_state_diversity': diversity_score,
'trajectory_diversity': trajectory_diversity
}
def check_balance(self):
"""Check if dataset is balanced"""
# Task balance
task_counts = self.compute_task_distribution()
# Success/failure balance
success_count = sum(1 for demo in self.dataset if demo.get('success', True))
failure_count = len(self.dataset) - success_count
# Operator balance (if available)
operator_counts = {}
for demo in self.dataset:
operator = demo.get('metadata', {}).get('operator_id', 'unknown')
operator_counts[operator] = operator_counts.get(operator, 0) + 1
return {
'task_balance': task_counts,
'success_rate': success_count / len(self.dataset),
'operator_balance': operator_counts
}
def compute_task_distribution(self):
"""Count demonstrations per task"""
task_counts = {}
for demo in self.dataset:
task = demo.get('metadata', {}).get('task', 'unknown')
task_counts[task] = task_counts.get(task, 0) + 1
return task_counts
def detect_duplicates(self, similarity_threshold=0.98):
"""Detect near-duplicate demonstrations"""
duplicates = []
for i in range(len(self.dataset)):
for j in range(i+1, len(self.dataset)):
similarity = self.compute_demo_similarity(
self.dataset[i],
self.dataset[j]
)
if similarity > similarity_threshold:
duplicates.append((i, j, similarity))
return duplicates
def compute_demo_similarity(self, demo1, demo2):
"""Compute similarity between two demonstrations"""
# Compare state trajectories
states1 = np.array([s['observation']['state'] for s in demo1])
states2 = np.array([s['observation']['state'] for s in demo2])
# Resample to same length
if len(states1) != len(states2):
from scipy.interpolate import interp1d
min_len = min(len(states1), len(states2))
# Resample
t1 = np.linspace(0, 1, len(states1))
t2 = np.linspace(0, 1, len(states2))
t_common = np.linspace(0, 1, min_len)
states1_resampled = np.array([
interp1d(t1, states1[:, dim])(t_common)
for dim in range(states1.shape[1])
]).T
states2_resampled = np.array([
interp1d(t2, states2[:, dim])(t_common)
for dim in range(states2.shape[1])
]).T
else:
states1_resampled = states1
states2_resampled = states2
# Compute normalized distance
distance = np.linalg.norm(states1_resampled - states2_resampled)
max_distance = np.linalg.norm(states1_resampled) + np.linalg.norm(states2_resampled)
similarity = 1.0 - (distance / max_distance)
return similarity
Data Cleaning Pipeline¶
class DataCleaningPipeline:
"""Automated data cleaning pipeline"""
def __init__(self):
self.success_checker = TaskSuccessChecker()
self.sync_checker = SynchronizationChecker()
self.sensor_checker = SensorQualityChecker()
def clean_dataset(self, dataset, config):
"""Clean and filter dataset"""
print(f"Cleaning dataset ({len(dataset)} demonstrations)...")
cleaned_dataset = []
removal_reasons = []
for i, demo in enumerate(dataset):
keep, reason = self.should_keep_demo(demo, config)
if keep:
# Apply fixes
demo_cleaned = self.apply_fixes(demo, config)
cleaned_dataset.append(demo_cleaned)
else:
removal_reasons.append((i, reason))
# Report
print(f"\nCleaning complete:")
print(f" Kept: {len(cleaned_dataset)} / {len(dataset)}")
print(f" Removed: {len(removal_reasons)}")
if removal_reasons:
print("\nRemoval reasons:")
reason_counts = {}
for _, reason in removal_reasons:
reason_counts[reason] = reason_counts.get(reason, 0) + 1
for reason, count in sorted(reason_counts.items(), key=lambda x: -x[1]):
print(f" {reason}: {count}")
return cleaned_dataset
def should_keep_demo(self, demo, config):
"""Decide if demonstration should be kept"""
# Check task success
if config.get('require_success', True):
if not demo.get('success', False):
return False, "task_failure"
# Check trajectory length
min_length = config.get('min_length', 10)
max_length = config.get('max_length', 1000)
if len(demo) < min_length:
return False, "too_short"
if len(demo) > max_length:
return False, "too_long"
# Check trajectory smoothness
if config.get('check_smoothness', True):
metrics = compute_trajectory_metrics(demo)
if metrics['avg_jerk'] > config.get('max_jerk', 1.0):
return False, "too_jerky"
# Check synchronization
if config.get('check_sync', True):
sync_result = self.sync_checker.check_demo_sync(demo)
if not sync_result['synchronized']:
if config.get('auto_resync', False):
# Will fix in apply_fixes
pass
else:
return False, "sync_error"
# Check sensor quality
if 'images' in demo and config.get('check_image_quality', True):
image_quality = self.sensor_checker.check_image_quality(demo['images'])
if image_quality['blur_ratio'] > config.get('max_blur_ratio', 0.3):
return False, "excessive_blur"
return True, None
def apply_fixes(self, demo, config):
"""Apply automatic fixes to demonstration"""
demo_fixed = copy.deepcopy(demo)
# Resynchronize if needed
if config.get('auto_resync', False):
sync_result = self.sync_checker.check_demo_sync(demo_fixed)
if not sync_result['synchronized']:
demo_fixed = self.sync_checker.resync_demo(demo_fixed)
# Smooth trajectory if needed
if config.get('smooth_trajectories', False):
demo_fixed = self.smooth_trajectory(demo_fixed, config.get('smoothing_window', 3))
# Remove outliers
if config.get('remove_outliers', False):
demo_fixed = self.remove_outlier_steps(demo_fixed)
return demo_fixed
def smooth_trajectory(self, demo, window_size=3):
"""Apply smoothing filter to trajectory"""
from scipy.signal import savgol_filter
actions = np.array([step['action'] for step in demo])
# Savitzky-Golay filter
if len(actions) > window_size:
smoothed = savgol_filter(actions, window_size, 2, axis=0)
# Update demo
for i, step in enumerate(demo):
step['action'] = smoothed[i]
return demo
def remove_outlier_steps(self, demo):
"""Remove outlier steps from demonstration"""
states = np.array([s['observation']['state'] for s in demo])
# Detect outliers
outlier_indices = self.sensor_checker.detect_outliers(states)
# Remove outliers
demo_cleaned = [
step for i, step in enumerate(demo)
if i not in outlier_indices
]
return demo_cleaned
Quality Monitoring Dashboard¶
def generate_quality_report(dataset):
"""Generate comprehensive quality report"""
analyzer = DatasetQualityAnalyzer(dataset)
print("="*60)
print("DATASET QUALITY REPORT")
print("="*60)
# Basic stats
print(f"\nDataset Size: {len(dataset)} demonstrations")
# Success rate
success_count = sum(1 for d in dataset if d.get('success', True))
print(f"Success Rate: {success_count/len(dataset)*100:.1f}%")
# Distribution analysis
dist_report = analyzer.analyze_distribution()
print(f"\nState Space Coverage:")
print(f" Average: {dist_report['state_coverage']['average']*100:.1f}%")
print(f" Min dimension: {dist_report['state_coverage']['min']*100:.1f}%")
print(f"\nAction Statistics:")
action_stats = dist_report['action_distribution']
print(f" Mean: {np.array(action_stats['mean'])}")
print(f" Std: {np.array(action_stats['std'])}")
print(f" Saturation rate: {action_stats['saturation_rate']*100:.1f}%")
print(f"\nDiversity Metrics:")
diversity = dist_report['diversity']
print(f" Initial state diversity: {diversity['initial_state_diversity']:.2f}")
if diversity['trajectory_diversity']:
print(f" Trajectory diversity: {diversity['trajectory_diversity']:.4f}")
print(f"\nBalance:")
balance = dist_report['balance']
print(f" Task distribution: {balance['task_balance']}")
print(f" Operator distribution: {balance['operator_balance']}")
# Check for duplicates
duplicates = analyzer.detect_duplicates()
if duplicates:
print(f"\n⚠ Found {len(duplicates)} near-duplicate pairs")
print("="*60)
Best Practices¶
DO:¶
✓ Validate every demonstration immediately after collection ✓ Monitor quality metrics throughout collection ✓ Remove low-quality demonstrations early ✓ Check multi-modal synchronization ✓ Analyze dataset-level distributions ✓ Maintain high success rate (>90% for IL) ✓ Ensure trajectory smoothness ✓ Balance task and condition distributions
DON'T:¶
✗Accept demonstrations with failed tasks (for IL) ✗Ignore sensor artifacts and noise ✗Skip synchronization checks for multi-modal data ✗Keep near-duplicate demonstrations ✗Overlook distribution imbalances ✗Forget to monitor data quality metrics
Next Steps¶
- LeRobot Format - Structure your quality data
- Imitation Learning - Train from quality demonstrations
- Training Best Practices - Use your data effectively