Skip to content

Octo: Open-Source Generalist Robot Policy

Octo is a versatile, open-source transformer-based robot policy trained on diverse datasets for cross-embodiment transfer.

Paper: "Octo: An Open-Source Generalist Robot Policy" (Team et al., 2024)

GitHub: https://github.com/octo-models/octo

Models: https://huggingface.co/rail-berkeley/octo

Overview

Octo addresses the challenge of building generalist robot policies that work across different robots and tasks:

  • Open-source: Fully available weights, code, and training data
  • Cross-embodiment: Trained on 800K+ trajectories from 9+ robot types
  • Flexible: Supports goal images, language, both, or neither
  • Efficient: 93M parameters (much smaller than RT-2/OpenVLA)
  • Fast fine-tuning: Adapts with 50-200 demonstrations

Architecture

Octo uses a transformer-based architecture optimized for robotics:

graph TD
    A[Images<br/>224x224] --> B[ResNet-18<br/>Vision Encoder]
    C[Language/Goal Image] --> D[T5<br/>Language Encoder]
    B --> E[Transformer<br/>8 layers]
    D --> E
    F[Proprio State] --> E
    E --> G[Diffusion Head<br/>Action Chunking]
    G --> H[Action Chunk<br/>4 steps]

Key Design Choices

1. Lightweight Vision Encoder

Unlike VLA models that use massive vision encoders (DINOv2, SigLIP), Octo uses ResNet-18:

class OctoVisionEncoder(nn.Module):
    """Lightweight vision encoder for Octo"""
    def __init__(self):
        super().__init__()

        # ResNet-18 (much smaller than ViT-L)
        self.resnet = torchvision.models.resnet18(pretrained=True)

        # Remove classification head
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])

        # Project to transformer dimension
        self.projection = nn.Linear(512, 512)

    def forward(self, images):
        """
        Args:
            images: (B, 3, 224, 224) RGB images
        Returns:
            features: (B, 512) visual features
        """
        features = self.resnet(images)
        features = features.flatten(1)
        features = self.projection(features)

        return features

Why ResNet-18? - Much faster inference (important for real-time control) - Sufficient for manipulation tasks (not as complex as general vision) - Enables deployment on edge devices

2. Flexible Task Specification

Octo supports multiple ways to specify tasks:

class OctoTaskEncoder(nn.Module):
    """Encode tasks from language, goal images, or both"""
    def __init__(self):
        super().__init__()

        # Language encoder (T5)
        self.language_encoder = T5EncoderModel.from_pretrained('t5-base')

        # Goal image encoder (same as observation encoder)
        self.goal_encoder = OctoVisionEncoder()

        # Fusion
        self.fusion = nn.Linear(512 + 768, 512)

    def forward(self, language=None, goal_image=None):
        """
        Flexible task encoding

        Args:
            language: Optional language instruction
            goal_image: Optional goal image
        Returns:
            task_embedding: (B, 512)
        """
        embeddings = []

        if language is not None:
            lang_embed = self.language_encoder(language).last_hidden_state.mean(dim=1)
            embeddings.append(lang_embed)

        if goal_image is not None:
            goal_embed = self.goal_encoder(goal_image)
            embeddings.append(goal_embed)

        if len(embeddings) == 0:
            # No task specification - use learned null embedding
            return self.null_embedding.expand(batch_size, -1)

        elif len(embeddings) == 1:
            return embeddings[0]

        else:
            # Both language and goal image
            combined = torch.cat(embeddings, dim=1)
            return self.fusion(combined)

3. Transformer Backbone

Octo uses a standard transformer to process temporal observations:

class OctoTransformer(nn.Module):
    """Transformer backbone for Octo"""
    def __init__(self, dim=512, depth=8, heads=8, context_length=32):
        super().__init__()
        self.dim = dim
        self.context_length = context_length

        # Positional encoding
        self.pos_embed = nn.Parameter(torch.randn(1, context_length, dim))

        # Transformer layers
        self.layers = nn.ModuleList([
            TransformerBlock(dim=dim, heads=heads)
            for _ in range(depth)
        ])

        self.norm = nn.LayerNorm(dim)

    def forward(self, tokens):
        """
        Args:
            tokens: (B, T, dim) - sequence of observation/task tokens
        Returns:
            output: (B, T, dim) - processed tokens
        """
        B, T, _ = tokens.shape

        # Add positional encoding
        tokens = tokens + self.pos_embed[:, :T, :]

        # Transformer layers
        for layer in self.layers:
            tokens = layer(tokens)

        return self.norm(tokens)

class TransformerBlock(nn.Module):
    """Standard transformer block"""
    def __init__(self, dim, heads):
        super().__init__()
        self.attention = nn.MultiheadAttention(dim, heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(dim, dim * 4),
            nn.GELU(),
            nn.Linear(dim * 4, dim)
        )
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)

    def forward(self, x):
        # Self-attention
        x_norm = self.norm1(x)
        attn_out, _ = self.attention(x_norm, x_norm, x_norm)
        x = x + attn_out

        # Feed-forward
        x = x + self.feed_forward(self.norm2(x))

        return x

4. Diffusion Action Head

Octo predicts actions using a diffusion model (inspired by Diffusion Policy):

class OctoDiffusionHead(nn.Module):
    """Diffusion-based action prediction with chunking"""
    def __init__(self, obs_dim, action_dim, action_horizon=4, diffusion_steps=10):
        super().__init__()
        self.action_dim = action_dim
        self.action_horizon = action_horizon
        self.diffusion_steps = diffusion_steps

        # Conditioning network
        self.obs_encoder = nn.Sequential(
            nn.Linear(obs_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256)
        )

        # Noise prediction network (U-Net style)
        self.noise_pred_net = ConditionalUNet1D(
            input_dim=action_dim,
            global_cond_dim=256,
            diffusion_step_embed_dim=64,
            down_dims=[128, 256],
            n_groups=8
        )

        # Noise schedule
        self.betas = self.make_beta_schedule()
        self.alphas = 1 - self.betas
        self.alpha_bars = torch.cumprod(self.alphas, dim=0)

    def forward(self, obs_features, actions=None):
        """
        Training: denoise ground-truth actions
        Inference: generate actions from noise

        Args:
            obs_features: (B, obs_dim) - observation encoding
            actions: (B, action_horizon, action_dim) - ground truth (training only)
        Returns:
            If training: loss
            If inference: predicted actions
        """
        if actions is not None:
            # Training: predict noise
            return self.compute_loss(obs_features, actions)
        else:
            # Inference: sample actions
            return self.sample_actions(obs_features)

    def compute_loss(self, obs_features, actions):
        """Diffusion training loss"""
        B, T, D = actions.shape

        # Random timestep for each sample
        timesteps = torch.randint(
            0, self.diffusion_steps, (B,),
            device=actions.device
        )

        # Add noise to actions
        noise = torch.randn_like(actions)
        alpha_bar = self.alpha_bars[timesteps].view(-1, 1, 1)
        noisy_actions = (
            torch.sqrt(alpha_bar) * actions +
            torch.sqrt(1 - alpha_bar) * noise
        )

        # Encode observations
        obs_cond = self.obs_encoder(obs_features)

        # Predict noise
        predicted_noise = self.noise_pred_net(
            noisy_actions,
            timesteps,
            global_cond=obs_cond
        )

        # MSE loss
        loss = F.mse_loss(predicted_noise, noise)

        return loss

    @torch.no_grad()
    def sample_actions(self, obs_features, num_samples=1):
        """Sample action chunk using DDPM"""
        B = obs_features.shape[0]

        # Start from random noise
        actions = torch.randn(
            B, self.action_horizon, self.action_dim,
            device=obs_features.device
        )

        # Encode observations
        obs_cond = self.obs_encoder(obs_features)

        # Iterative denoising
        for t in reversed(range(self.diffusion_steps)):
            timesteps = torch.full((B,), t, device=actions.device)

            # Predict noise
            predicted_noise = self.noise_pred_net(
                actions,
                timesteps,
                global_cond=obs_cond
            )

            # Denoise
            alpha = self.alphas[t]
            alpha_bar = self.alpha_bars[t]

            if t > 0:
                noise = torch.randn_like(actions)
            else:
                noise = 0

            actions = (1 / torch.sqrt(alpha)) * (
                actions - ((1 - alpha) / torch.sqrt(1 - alpha_bar)) * predicted_noise
            ) + torch.sqrt(self.betas[t]) * noise

        return actions

    def make_beta_schedule(self):
        """Linear beta schedule"""
        return torch.linspace(1e-4, 0.02, self.diffusion_steps)

Training Data

Octo is trained on the Open X-Embodiment dataset:

  • Size: 800K+ trajectories
  • Robots: Franka, UR5, xArm, Google Robot, etc.
  • Tasks: Manipulation, navigation, mobile manipulation
  • Environments: Labs, kitchens, outdoor
class OpenXDataset(torch.utils.data.Dataset):
    """Open X-Embodiment dataset loader"""
    def __init__(self, data_dir, robots=None, transform=None):
        self.data_dir = Path(data_dir)
        self.transform = transform

        # Load trajectories
        self.trajectories = []

        for traj_file in self.data_dir.glob("**/*.pkl"):
            with open(traj_file, 'rb') as f:
                traj = pickle.load(f)

                # Filter by robot if specified
                if robots is None or traj['robot'] in robots:
                    self.trajectories.append(traj)

        print(f"Loaded {len(self.trajectories)} trajectories")

    def __len__(self):
        return len(self.trajectories)

    def __getitem__(self, idx):
        traj = self.trajectories[idx]

        # Sample random window
        T = len(traj['observations'])
        start_idx = np.random.randint(0, max(1, T - 32))
        end_idx = min(start_idx + 32, T)

        # Extract window
        observations = traj['observations'][start_idx:end_idx]
        actions = traj['actions'][start_idx:end_idx]

        # Apply transforms
        if self.transform:
            observations = self.transform(observations)

        return {
            'observations': observations,
            'actions': actions,
            'language': traj.get('language', ''),
            'robot': traj['robot']
        }

Training Procedure

def train_octo(config):
    """Train Octo model"""
    # Initialize model
    model = OctoModel(config)
    model = model.cuda()

    # Load dataset
    dataset = OpenXDataset(config.data_dir)
    dataloader = DataLoader(
        dataset,
        batch_size=config.batch_size,
        num_workers=config.num_workers,
        shuffle=True,
        pin_memory=True
    )

    # Optimizer
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config.learning_rate,
        weight_decay=config.weight_decay
    )

    # Learning rate scheduler
    scheduler = get_cosine_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config.warmup_steps,
        num_training_steps=config.max_steps
    )

    # Training loop
    global_step = 0
    for epoch in range(config.num_epochs):
        for batch in dataloader:
            # Move to GPU
            batch = {k: v.cuda() if isinstance(v, torch.Tensor) else v
                    for k, v in batch.items()}

            # Forward pass
            loss = model.compute_loss(batch)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip)
            optimizer.step()
            scheduler.step()

            global_step += 1

            # Logging
            if global_step % 100 == 0:
                print(f"Step {global_step}: Loss = {loss.item():.4f}, "
                      f"LR = {scheduler.get_last_lr()[0]:.2e}")

            # Checkpointing
            if global_step % 10000 == 0:
                save_checkpoint(model, optimizer, global_step, config.checkpoint_dir)

    return model

Fine-Tuning for Your Robot

Octo excels at few-shot adaptation:

from octo.model import OctoModel

# Load pre-trained Octo
model = OctoModel.load_pretrained('octo-base')

# Prepare your robot data (50-200 demos)
your_dataset = YourRobotDataset(num_demos=100)

# Fine-tune with LoRA for efficiency
from peft import LoraConfig, get_peft_model

lora_config = LoraConfig(
    r=16,  # Rank
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],  # Transformer attention
    lora_dropout=0.05
)

model = get_peft_model(model, lora_config)

# Fine-tune
finetune_octo(model, your_dataset, num_epochs=20, lr=1e-4)

Fine-Tuning Results

Demos Success Rate Training Time
0 (zero-shot) 35% 0
50 68% ~30 min
100 79% ~1 hour
200 87% ~2 hours

Deployment

Inference

import torch
from octo.model import OctoModel

# Load model
model = OctoModel.load_pretrained('octo-base')
model.eval()
model = model.cuda()

@torch.no_grad()
def predict_action(model, observation, language_instruction):
    """
    Predict action from current observation

    Args:
        observation: dict with 'image' and 'state'
        language_instruction: str
    Returns:
        action: (action_dim,) numpy array
    """
    # Preprocess
    image = preprocess_image(observation['image'])
    state = torch.tensor(observation['state']).float()
    language = model.tokenizer(language_instruction)

    # Move to GPU
    image = image.cuda()
    state = state.cuda()

    # Predict action chunk
    action_chunk = model.predict(
        image=image.unsqueeze(0),
        state=state.unsqueeze(0),
        language=language
    )  # (1, action_horizon, action_dim)

    # Return first action
    action = action_chunk[0, 0].cpu().numpy()

    return action

# Control loop
obs = env.reset()
for t in range(100):
    action = predict_action(model, obs, "pick up the red block")
    obs, reward, done, info = env.step(action)

    if done:
        break

Optimization for Real-Time Control

# 1. Compile model (PyTorch 2.0+)
model = torch.compile(model, mode='reduce-overhead')

# 2. Use half precision
model = model.half()

# 3. Reduce diffusion steps
model.diffusion_head.diffusion_steps = 5  # Faster, slight accuracy drop

# 4. Batch inference if controlling multiple robots
actions = model.predict(
    image=images,  # (num_robots, 3, 224, 224)
    state=states,
    language=["pick red block"] * num_robots
)

Comparison with Other Models

Model Params Open Multi-Robot Action Pred Inference
RT-1 35M Categorical ~3 Hz
RT-2 5B-562B Categorical ~1 Hz
OpenVLA 7B Categorical ~8 Hz
Octo 93M Diffusion ~10 Hz

Key Advantages

Compared to OpenVLA/RT-2: - ✓ Faster inference (93M vs 7B parameters) - ✓ Better action precision (diffusion vs categorical) - ✓ Easier to deploy (smaller model)

Compared to RT-1: - ✓ Cross-embodiment (trained on 9+ robots) - ✓ Flexible task specification (language + goal images) - ✓ Better sample efficiency (transfer learning)

Limitations

  • Weaker language understanding (vs RT-2/OpenVLA with large LLMs)
  • Lower zero-shot performance (vs RT-2-PaLM-E)
  • Requires more demos for new tasks (vs OpenVLA)

Best Use Cases

Use Octo when: - Real-time control is critical (need >5 Hz) - Have 50-200 demonstrations available - Deploying on edge devices with limited compute - Task specification via goal images is sufficient - Need precise, multi-modal action distributions

Use OpenVLA/RT-2 when: - Zero-shot generalization is critical - Complex language understanding needed - Can tolerate slower inference - Have access to GPUs for deployment

Resources

  • Code: https://github.com/octo-models/octo
  • Models: https://huggingface.co/rail-berkeley/octo
  • Paper: https://octo-models.github.io
  • Colab Tutorial: https://colab.research.google.com/github/octo-models/octo/blob/main/examples/octo_example.ipynb

Next Steps