🎯 Core Concepts
Fundamental Ray concepts including tasks, actors, objects, and the distributed runtime that powers scalable Python applications.
What is Ray?
β–Ό

Ray is an open-source unified framework for scaling AI and Python applications.

Key Features:

Simple: Scale Python code with minimal changes

Universal: Works on laptop, cluster, or cloud

Flexible: Supports any ML framework or library

Production-Ready: Battle-tested at companies like Uber, Netflix, Spotify

Core Components:

Ray Core: distributed runtime
Ray Data: distributed data processing
Ray Train: distributed training
Ray Tune: hyperparameter tuning
Ray Serve: model serving
Installation & Setup
β–Ό

Basic Installation:

# Install Ray
pip install ray

# Install with specific libraries
pip install "ray[data]"      # Ray Data
pip install "ray[train]"     # Ray Train
pip install "ray[tune]"      # Ray Tune
pip install "ray[serve]"     # Ray Serve
pip install "ray[rllib]"     # Ray RLlib (RL)

# Install all components
pip install "ray[all]"

Initialize Ray:

import ray

# Start Ray locally
ray.init()

# Connect to existing cluster
ray.init(address='ray://localhost:10001')

# Configure resources
ray.init(num_cpus=8, num_gpus=2)
Always call ray.init() before using Ray
Ray Architecture
β–Ό

Components:

Driver: Main program that submits tasks

Workers: Processes that execute tasks

Raylet: Scheduling and resource management

Object Store: Shared memory for data

GCS: Global Control Store for metadata

Architecture:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚          Driver Process             β”‚
β”‚      (Your Python Program)          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
               β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚    Head Node        β”‚
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
    β”‚  β”‚     GCS      β”‚   β”‚ (Metadata)
    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
               β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                     β”‚
β”Œβ”€β”€β”€β”΄β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
β”‚Worker 1β”‚          β”‚Worker 2 β”‚
β”‚Raylet  β”‚          β”‚Raylet   β”‚
β”‚Object  β”‚          β”‚Object   β”‚
β”‚Store   β”‚          β”‚Store    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
πŸ”§ Ray Core - Tasks & Actors
Learn how to parallelize Python functions with tasks and create stateful distributed services with actors.
Tasks (@ray.remote)
β–Ό

Tasks are stateless functions that run asynchronously

import ray

# Define a remote task
@ray.remote
def process_data(x):
    return x * 2

# Execute task (returns future)
future = process_data.remote(5)

# Get result (blocks until complete)
result = ray.get(future)  # 10

# Parallel execution
futures = [process_data.remote(i) for i in range(10)]
results = ray.get(futures)  # Wait for all

# Non-blocking check
ready, not_ready = ray.wait([future], timeout=1.0)

Task Options:

@ray.remote(num_cpus=2, num_gpus=1, memory=1000*1024*1024)
def gpu_task():
    pass

# Set at runtime
task.options(num_cpus=4).remote()
Use tasks for stateless parallel computation
Actors
β–Ό

Actors are stateful distributed services

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_value(self):
        return self.value

# Create actor instance
counter = Counter.remote()

# Call actor methods
counter.increment.remote()
counter.increment.remote()
value = ray.get(counter.get_value.remote())  # 2

# Multiple actors
counters = [Counter.remote() for _ in range(4)]

Actor Patterns:

Parameter Server: Shared state across workers

Worker Pool: Pool of stateful workers

Tree Reduction: Hierarchical aggregation

Use actors for stateful services
Object Store
β–Ό

Shared memory for efficient data sharing

# Put object in store
obj_ref = ray.put([1, 2, 3, 4, 5])

# Get object from store
data = ray.get(obj_ref)

# Pass object references
@ray.remote
def process(obj_ref):
    data = ray.get(obj_ref)
    return sum(data)

result = process.remote(obj_ref)

# Zero-copy reads (same node)
large_array = np.zeros(1000000)
ref = ray.put(large_array)  # Stored once
result1 = task1.remote(ref)  # No copy!
result2 = task2.remote(ref)  # No copy!

Best Practices:

Large Objects: Use ray.put() to avoid serialization

Reuse: Pass refs instead of values

Cleanup: Objects auto-deleted when unreferenced

πŸ“Š Ray Data
Distributed data processing for ML workloads with support for large datasets, streaming, and preprocessing pipelines.
Creating Datasets
β–Ό
import ray

# From Python lists
ds = ray.data.range(1000)
ds = ray.data.from_items([{"id": i} for i in range(100)])

# From files
ds = ray.data.read_parquet("s3://bucket/data.parquet")
ds = ray.data.read_csv("data.csv")
ds = ray.data.read_json("data.json")
ds = ray.data.read_images("s3://bucket/images/")

# From numpy
import numpy as np
ds = ray.data.from_numpy(np.zeros((100, 10)))

# From pandas
import pandas as pd
df = pd.DataFrame({"a": [1, 2, 3]})
ds = ray.data.from_pandas(df)
Ray Data is lazy - operations don't execute until needed
Transformations
β–Ό
# Map operations
ds = ray.data.range(100)
ds = ds.map(lambda x: {"value": x["id"] * 2})

# Filter
ds = ds.filter(lambda x: x["value"] > 50)

# Batch operations (more efficient)
def batch_process(batch):
    batch["doubled"] = batch["value"] * 2
    return batch

ds = ds.map_batches(batch_process, batch_format="pandas")

# Flat map
ds = ds.flat_map(lambda x: [{"id": x["id"]}, {"id": x["id"] + 1}])

# GroupBy and aggregate
ds = ds.groupby("category").count()
ds = ds.groupby("user_id").sum("amount")

Common Patterns:

map: Transform each row

map_batches: Transform batches (faster)

filter: Keep rows matching condition

flat_map: One row β†’ multiple rows

ML Preprocessing
β–Ό
import ray
from ray.data.preprocessors import StandardScaler, Chain

# Load data
ds = ray.data.read_parquet("s3://bucket/train.parquet")

# Define preprocessing pipeline
preprocessor = Chain(
    StandardScaler(columns=["feature1", "feature2"]),
    # Add more preprocessors
)

# Fit and transform
preprocessor.fit(ds)
transformed_ds = preprocessor.transform(ds)

# Image augmentation
def augment_image(batch):
    import torchvision.transforms as T
    transform = T.Compose([
        T.RandomHorizontalFlip(),
        T.RandomRotation(10),
        T.Resize((224, 224))
    ])
    batch["image"] = [transform(img) for img in batch["image"]]
    return batch

ds = ds.map_batches(augment_image)
πŸŽ“ Ray Train
Distributed deep learning training with native support for PyTorch, TensorFlow, XGBoost, and more.
PyTorch Distributed Training
β–Ό
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader
    from ray.train import get_context

    # Your model
    model = nn.Linear(10, 1)
    model = ray.train.torch.prepare_model(model)

    # Your data
    train_dataset = ...
    train_loader = DataLoader(train_dataset, batch_size=32)
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training loop
    optimizer = torch.optim.Adam(model.parameters())
    for epoch in range(config["num_epochs"]):
        for batch in train_loader:
            loss = ...
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Report metrics
        ray.train.report({"loss": loss.item()})

# Configure distributed training
trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 10},
    scaling_config=ScalingConfig(
        num_workers=4,
        use_gpu=True
    )
)

result = trainer.fit()
Checkpointing
β–Ό
from ray import train
from ray.train import Checkpoint

def train_func(config):
    model = ...

    for epoch in range(10):
        # Training...

        # Save checkpoint
        checkpoint = Checkpoint.from_dict({
            "epoch": epoch,
            "model_state": model.state_dict(),
            "optimizer_state": optimizer.state_dict()
        })

        train.report(
            {"loss": loss},
            checkpoint=checkpoint
        )

# Resume from checkpoint
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=4),
    resume_from_checkpoint=result.checkpoint
)
πŸ” Ray Tune
Distributed hyperparameter tuning at any scale with support for popular optimization algorithms and schedulers.
Basic Hyperparameter Tuning
β–Ό
from ray import tune

def objective(config):
    score = config["a"] ** 2 + config["b"]
    return {"score": score}

# Define search space
search_space = {
    "a": tune.uniform(0, 1),
    "b": tune.choice([1, 2, 3])
}

# Run tuning
tuner = tune.Tuner(
    objective,
    param_space=search_space,
    tune_config=tune.TuneConfig(
        num_samples=100,
        metric="score",
        mode="min"
    )
)

results = tuner.fit()
best_result = results.get_best_result()
print(f"Best config: {best_result.config}")
Search Algorithms
β–Ό
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.search.hyperopt import HyperOptSearch

# Bayesian Optimization
search_alg = BayesOptSearch(metric="score", mode="min")

# Optuna (TPE)
search_alg = OptunaSearch(metric="score", mode="min")

# HyperOpt
search_alg = HyperOptSearch(metric="score", mode="min")

tuner = tune.Tuner(
    objective,
    param_space=search_space,
    tune_config=tune.TuneConfig(search_alg=search_alg)
)

Schedulers:

ASHA: Async Successive Halving (early stopping)

PopulationBasedTraining: Dynamic hyperparameter updates

HyperBand: Adaptive resource allocation

πŸš€ Ray Serve
Scalable model serving framework for deploying ML models as production-ready APIs with dynamic batching and resource management.
Deploying Models
β–Ό
from ray import serve
import requests

@serve.deployment
class MyModel:
    def __init__(self, model_path):
        self.model = load_model(model_path)

    def __call__(self, request):
        data = request.query_params["data"]
        prediction = self.model.predict(data)
        return {"prediction": prediction}

# Start Serve
serve.start()

# Deploy model
MyModel.deploy(model_path="/path/to/model")

# Send requests
response = requests.get("http://localhost:8000/MyModel?data=...")
print(response.json())

Deployment Options:

@serve.deployment(
    num_replicas=4,
    ray_actor_options={"num_gpus": 1},
    max_concurrent_queries=100
)
class MyModel:
    ...
Dynamic Batching
β–Ό
from ray import serve
from ray.serve.handle import DeploymentHandle

@serve.deployment
class BatchedModel:
    def __init__(self):
        self.model = load_model()

    @serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
    async def handle_batch(self, inputs: list):
        # Process batch together (more efficient)
        return self.model.predict_batch(inputs)

    async def __call__(self, request):
        input_data = await request.json()
        result = await self.handle_batch(input_data)
        return {"prediction": result}

# Serve automatically batches incoming requests
BatchedModel.deploy()
Use batching for throughput optimization
βœ… Best Practices
Production tips, performance optimization, debugging strategies, and common patterns for building robust Ray applications.
Performance Tips
β–Ό

Avoid Anti-Patterns:

# ❌ BAD: Too many small tasks
futures = [tiny_task.remote(i) for i in range(1000000)]

# βœ… GOOD: Batch work
@ray.remote
def process_batch(items):
    return [process(i) for i in items]

batches = np.array_split(items, 100)
futures = [process_batch.remote(batch) for batch in batches]

# ❌ BAD: Passing large data as arguments
ray.get([task.remote(large_data) for _ in range(100)])

# βœ… GOOD: Use object store
data_ref = ray.put(large_data)
ray.get([task.remote(data_ref) for _ in range(100)])

Key Principles:

Batch: Process multiple items per task

Minimize: Reduce data transfer between tasks

Reuse: Use actors for stateful operations

Profile: Use Ray dashboard to identify bottlenecks

Debugging & Monitoring
β–Ό

Ray Dashboard:

Access at http://localhost:8265

  • View task timeline and resource usage
  • Monitor actors and their states
  • Track object references and memory
  • View logs and errors

Logging:

import logging

@ray.remote
def my_task():
    logger = logging.getLogger(__name__)
    logger.info("Task started")
    # Work...
    logger.error("Something went wrong")

# View logs in dashboard or terminal
ray logs --follow

Profiling:

# Enable profiling
ray.init(_system_config={"enable_timeline": True})

# View timeline in dashboard
# Shows task execution, dependencies, and bottlenecks
Resource Management
β–Ό
# Specify resource requirements
@ray.remote(num_cpus=2, num_gpus=1, memory=2*1024**3)
def gpu_task():
    pass

# Custom resources
ray.init(resources={"special_hardware": 4})

@ray.remote(resources={"special_hardware": 1})
def special_task():
    pass

# Actor placement
@ray.remote(num_cpus=4, scheduling_strategy="SPREAD")
class Worker:
    pass

# Check available resources
print(ray.cluster_resources())
print(ray.available_resources())

Scheduling Strategies:

DEFAULT: Pack tasks on fewer nodes

SPREAD: Distribute across nodes

Node affinity: Place on specific nodes