Ray is an open-source unified framework for scaling AI and Python applications.
Key Features:
Simple: Scale Python code with minimal changes
Universal: Works on laptop, cluster, or cloud
Flexible: Supports any ML framework or library
Production-Ready: Battle-tested at companies like Uber, Netflix, Spotify
Core Components:
Basic Installation:
# Install Ray
pip install ray
# Install with specific libraries
pip install "ray[data]" # Ray Data
pip install "ray[train]" # Ray Train
pip install "ray[tune]" # Ray Tune
pip install "ray[serve]" # Ray Serve
pip install "ray[rllib]" # Ray RLlib (RL)
# Install all components
pip install "ray[all]"
Initialize Ray:
import ray
# Start Ray locally
ray.init()
# Connect to existing cluster
ray.init(address='ray://localhost:10001')
# Configure resources
ray.init(num_cpus=8, num_gpus=2)
Always call ray.init() before using Ray
Components:
Driver: Main program that submits tasks
Workers: Processes that execute tasks
Raylet: Scheduling and resource management
Object Store: Shared memory for data
GCS: Global Control Store for metadata
Architecture:
βββββββββββββββββββββββββββββββββββββββ
β Driver Process β
β (Your Python Program) β
ββββββββββββββββ¬βββββββββββββββββββββββ
β
ββββββββββββ΄βββββββββββ
β Head Node β
β ββββββββββββββββ β
β β GCS β β (Metadata)
β ββββββββββββββββ β
βββββββββββββββββββββββ
β
ββββββββββββ΄βββββββββββ
β β
βββββ΄βββββ ββββββ΄βββββ
βWorker 1β βWorker 2 β
βRaylet β βRaylet β
βObject β βObject β
βStore β βStore β
ββββββββββ βββββββββββ
Tasks are stateless functions that run asynchronously
import ray
# Define a remote task
@ray.remote
def process_data(x):
return x * 2
# Execute task (returns future)
future = process_data.remote(5)
# Get result (blocks until complete)
result = ray.get(future) # 10
# Parallel execution
futures = [process_data.remote(i) for i in range(10)]
results = ray.get(futures) # Wait for all
# Non-blocking check
ready, not_ready = ray.wait([future], timeout=1.0)
Task Options:
@ray.remote(num_cpus=2, num_gpus=1, memory=1000*1024*1024)
def gpu_task():
pass
# Set at runtime
task.options(num_cpus=4).remote()
Use tasks for stateless parallel computation
Actors are stateful distributed services
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_value(self):
return self.value
# Create actor instance
counter = Counter.remote()
# Call actor methods
counter.increment.remote()
counter.increment.remote()
value = ray.get(counter.get_value.remote()) # 2
# Multiple actors
counters = [Counter.remote() for _ in range(4)]
Actor Patterns:
Parameter Server: Shared state across workers
Worker Pool: Pool of stateful workers
Tree Reduction: Hierarchical aggregation
Use actors for stateful servicesShared memory for efficient data sharing
# Put object in store
obj_ref = ray.put([1, 2, 3, 4, 5])
# Get object from store
data = ray.get(obj_ref)
# Pass object references
@ray.remote
def process(obj_ref):
data = ray.get(obj_ref)
return sum(data)
result = process.remote(obj_ref)
# Zero-copy reads (same node)
large_array = np.zeros(1000000)
ref = ray.put(large_array) # Stored once
result1 = task1.remote(ref) # No copy!
result2 = task2.remote(ref) # No copy!
Best Practices:
Large Objects: Use ray.put() to avoid serialization
Reuse: Pass refs instead of values
Cleanup: Objects auto-deleted when unreferenced
import ray
# From Python lists
ds = ray.data.range(1000)
ds = ray.data.from_items([{"id": i} for i in range(100)])
# From files
ds = ray.data.read_parquet("s3://bucket/data.parquet")
ds = ray.data.read_csv("data.csv")
ds = ray.data.read_json("data.json")
ds = ray.data.read_images("s3://bucket/images/")
# From numpy
import numpy as np
ds = ray.data.from_numpy(np.zeros((100, 10)))
# From pandas
import pandas as pd
df = pd.DataFrame({"a": [1, 2, 3]})
ds = ray.data.from_pandas(df)
Ray Data is lazy - operations don't execute until needed
# Map operations
ds = ray.data.range(100)
ds = ds.map(lambda x: {"value": x["id"] * 2})
# Filter
ds = ds.filter(lambda x: x["value"] > 50)
# Batch operations (more efficient)
def batch_process(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(batch_process, batch_format="pandas")
# Flat map
ds = ds.flat_map(lambda x: [{"id": x["id"]}, {"id": x["id"] + 1}])
# GroupBy and aggregate
ds = ds.groupby("category").count()
ds = ds.groupby("user_id").sum("amount")
Common Patterns:
map: Transform each row
map_batches: Transform batches (faster)
filter: Keep rows matching condition
flat_map: One row β multiple rows
import ray
from ray.data.preprocessors import StandardScaler, Chain
# Load data
ds = ray.data.read_parquet("s3://bucket/train.parquet")
# Define preprocessing pipeline
preprocessor = Chain(
StandardScaler(columns=["feature1", "feature2"]),
# Add more preprocessors
)
# Fit and transform
preprocessor.fit(ds)
transformed_ds = preprocessor.transform(ds)
# Image augmentation
def augment_image(batch):
import torchvision.transforms as T
transform = T.Compose([
T.RandomHorizontalFlip(),
T.RandomRotation(10),
T.Resize((224, 224))
])
batch["image"] = [transform(img) for img in batch["image"]]
return batch
ds = ds.map_batches(augment_image)
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func(config):
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from ray.train import get_context
# Your model
model = nn.Linear(10, 1)
model = ray.train.torch.prepare_model(model)
# Your data
train_dataset = ...
train_loader = DataLoader(train_dataset, batch_size=32)
train_loader = ray.train.torch.prepare_data_loader(train_loader)
# Training loop
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(config["num_epochs"]):
for batch in train_loader:
loss = ...
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Report metrics
ray.train.report({"loss": loss.item()})
# Configure distributed training
trainer = TorchTrainer(
train_func,
train_loop_config={"num_epochs": 10},
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=True
)
)
result = trainer.fit()
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = ...
for epoch in range(10):
# Training...
# Save checkpoint
checkpoint = Checkpoint.from_dict({
"epoch": epoch,
"model_state": model.state_dict(),
"optimizer_state": optimizer.state_dict()
})
train.report(
{"loss": loss},
checkpoint=checkpoint
)
# Resume from checkpoint
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4),
resume_from_checkpoint=result.checkpoint
)
from ray import tune
def objective(config):
score = config["a"] ** 2 + config["b"]
return {"score": score}
# Define search space
search_space = {
"a": tune.uniform(0, 1),
"b": tune.choice([1, 2, 3])
}
# Run tuning
tuner = tune.Tuner(
objective,
param_space=search_space,
tune_config=tune.TuneConfig(
num_samples=100,
metric="score",
mode="min"
)
)
results = tuner.fit()
best_result = results.get_best_result()
print(f"Best config: {best_result.config}")
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.search.hyperopt import HyperOptSearch
# Bayesian Optimization
search_alg = BayesOptSearch(metric="score", mode="min")
# Optuna (TPE)
search_alg = OptunaSearch(metric="score", mode="min")
# HyperOpt
search_alg = HyperOptSearch(metric="score", mode="min")
tuner = tune.Tuner(
objective,
param_space=search_space,
tune_config=tune.TuneConfig(search_alg=search_alg)
)
Schedulers:
ASHA: Async Successive Halving (early stopping)
PopulationBasedTraining: Dynamic hyperparameter updates
HyperBand: Adaptive resource allocation
from ray import serve
import requests
@serve.deployment
class MyModel:
def __init__(self, model_path):
self.model = load_model(model_path)
def __call__(self, request):
data = request.query_params["data"]
prediction = self.model.predict(data)
return {"prediction": prediction}
# Start Serve
serve.start()
# Deploy model
MyModel.deploy(model_path="/path/to/model")
# Send requests
response = requests.get("http://localhost:8000/MyModel?data=...")
print(response.json())
Deployment Options:
@serve.deployment(
num_replicas=4,
ray_actor_options={"num_gpus": 1},
max_concurrent_queries=100
)
class MyModel:
...
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class BatchedModel:
def __init__(self):
self.model = load_model()
@serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
async def handle_batch(self, inputs: list):
# Process batch together (more efficient)
return self.model.predict_batch(inputs)
async def __call__(self, request):
input_data = await request.json()
result = await self.handle_batch(input_data)
return {"prediction": result}
# Serve automatically batches incoming requests
BatchedModel.deploy()
Use batching for throughput optimization
Avoid Anti-Patterns:
# β BAD: Too many small tasks
futures = [tiny_task.remote(i) for i in range(1000000)]
# β
GOOD: Batch work
@ray.remote
def process_batch(items):
return [process(i) for i in items]
batches = np.array_split(items, 100)
futures = [process_batch.remote(batch) for batch in batches]
# β BAD: Passing large data as arguments
ray.get([task.remote(large_data) for _ in range(100)])
# β
GOOD: Use object store
data_ref = ray.put(large_data)
ray.get([task.remote(data_ref) for _ in range(100)])
Key Principles:
Batch: Process multiple items per task
Minimize: Reduce data transfer between tasks
Reuse: Use actors for stateful operations
Profile: Use Ray dashboard to identify bottlenecks
Ray Dashboard:
Access at http://localhost:8265
- View task timeline and resource usage
- Monitor actors and their states
- Track object references and memory
- View logs and errors
Logging:
import logging
@ray.remote
def my_task():
logger = logging.getLogger(__name__)
logger.info("Task started")
# Work...
logger.error("Something went wrong")
# View logs in dashboard or terminal
ray logs --follow
Profiling:
# Enable profiling
ray.init(_system_config={"enable_timeline": True})
# View timeline in dashboard
# Shows task execution, dependencies, and bottlenecks
# Specify resource requirements
@ray.remote(num_cpus=2, num_gpus=1, memory=2*1024**3)
def gpu_task():
pass
# Custom resources
ray.init(resources={"special_hardware": 4})
@ray.remote(resources={"special_hardware": 1})
def special_task():
pass
# Actor placement
@ray.remote(num_cpus=4, scheduling_strategy="SPREAD")
class Worker:
pass
# Check available resources
print(ray.cluster_resources())
print(ray.available_resources())
Scheduling Strategies:
DEFAULT: Pack tasks on fewer nodes
SPREAD: Distribute across nodes
Node affinity: Place on specific nodes