Caching and Performance
TL;DR
GraphQL caching is more complex than REST because requests go to a single endpoint with varying queries. Key strategies include response caching (full query results), normalized caching (entity-level), persisted queries (pre-registered queries), and CDN caching. Automatic Persisted Queries (APQ) reduce bandwidth, while cache control directives enable fine-grained TTLs per field.
Caching Challenges
Why GraphQL Caching is Different
┌─────────────────────────────────────────────────────────────────┐
│ REST vs GraphQL Caching │
│ │
│ REST GraphQL │
│ ──── ─────── │
│ GET /users/123 POST /graphql │
│ GET /users/123/posts { query: "..." } │
│ │
│ • Different URLs per resource • Single endpoint │
│ • HTTP caching works • HTTP caching doesn't work │
│ • CDN-friendly by default • POST = not cacheable │
│ • Cache key = URL • Cache key = query hash? │
│ │
│ GraphQL Challenges: │
│ 1. POST requests aren't cached by HTTP │
│ 2. Same query can return different data (variables) │
│ 3. Different queries return overlapping data │
│ 4. Field-level cache control needed │
│ 5. Query complexity varies wildly │
└─────────────────────────────────────────────────────────────────┘Response Caching
Full Response Cache
python
import hashlib
import json
from functools import wraps
class ResponseCache:
"""Cache entire GraphQL responses by query hash"""
def __init__(self, redis_client, default_ttl=300):
self.redis = redis_client
self.default_ttl = default_ttl
def cache_key(self, query: str, variables: dict, user_id: str = None) -> str:
"""Generate cache key from query and variables"""
# Include user_id for personalized queries
key_data = {
"query": query,
"variables": variables,
"user": user_id
}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"graphql:response:{key_hash}"
async def get(self, query: str, variables: dict, user_id: str = None):
key = self.cache_key(query, variables, user_id)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def set(
self,
query: str,
variables: dict,
response: dict,
ttl: int = None,
user_id: str = None
):
key = self.cache_key(query, variables, user_id)
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(response)
)
# Middleware for response caching
async def caching_middleware(resolve, obj, info, **kwargs):
cache = info.context["response_cache"]
# Only cache queries, not mutations
if info.operation.operation != "query":
return await resolve(obj, info, **kwargs)
# Check cache
query_str = info.context["query_string"]
variables = info.context["variables"]
user_id = info.context.get("user", {}).get("id")
cached = await cache.get(query_str, variables, user_id)
if cached:
info.context["cache_hit"] = True
return cached
# Execute and cache
result = await resolve(obj, info, **kwargs)
# Get TTL from cache hints
ttl = get_min_ttl_from_hints(info)
await cache.set(query_str, variables, result, ttl, user_id)
return resultCache Control Directives
graphql
# Schema with cache hints
type Query {
# Public data, cache for 1 hour
posts: [Post!]! @cacheControl(maxAge: 3600)
# User-specific, shorter cache
me: User @cacheControl(maxAge: 60, scope: PRIVATE)
# Real-time data, no caching
notifications: [Notification!]! @cacheControl(maxAge: 0)
}
type Post {
id: ID!
title: String!
content: String!
# Author rarely changes
author: User! @cacheControl(maxAge: 3600)
# Comments change frequently
comments: [Comment!]! @cacheControl(maxAge: 60)
# View count changes constantly
viewCount: Int! @cacheControl(maxAge: 0)
}
type User {
id: ID! @cacheControl(maxAge: 3600)
name: String! @cacheControl(maxAge: 3600)
# Email is private
email: String! @cacheControl(scope: PRIVATE)
}Implementation
python
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class CacheScope(Enum):
PUBLIC = "PUBLIC"
PRIVATE = "PRIVATE"
@dataclass
class CacheHint:
max_age: int
scope: CacheScope = CacheScope.PUBLIC
# Store hints during resolution
class CacheHintCollector:
def __init__(self):
self.hints = []
def add_hint(self, path: list, hint: CacheHint):
self.hints.append({"path": path, "hint": hint})
def get_overall_policy(self) -> dict:
"""Calculate overall cache policy from all hints"""
if not self.hints:
return {"maxAge": 0, "scope": "PUBLIC"}
min_age = min(h["hint"].max_age for h in self.hints)
scope = "PRIVATE" if any(
h["hint"].scope == CacheScope.PRIVATE for h in self.hints
) else "PUBLIC"
return {"maxAge": min_age, "scope": scope}
# Directive implementation
def cache_control_directive(resolver, obj, info, max_age=0, scope="PUBLIC"):
"""Process @cacheControl directive"""
# Record hint
hint = CacheHint(max_age=max_age, scope=CacheScope[scope])
info.context["cache_hints"].add_hint(info.path, hint)
return resolver(obj, info)
# Add cache headers to response
def add_cache_headers(response, cache_hints: CacheHintCollector):
policy = cache_hints.get_overall_policy()
if policy["maxAge"] > 0:
scope = "private" if policy["scope"] == "PRIVATE" else "public"
response.headers["Cache-Control"] = f"{scope}, max-age={policy['maxAge']}"
else:
response.headers["Cache-Control"] = "no-store"
return responsePersisted Queries
How Persisted Queries Work
┌─────────────────────────────────────────────────────────────────┐
│ Persisted Queries Flow │
│ │
│ BUILD TIME (Ahead of time registration) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Extract queries from client code │ │
│ │ ↓ │ │
│ │ Generate hash for each query │ │
│ │ ↓ │ │
│ │ Register hash → query mapping on server │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ RUNTIME (Query by hash) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Client sends: │ │
│ │ { │ │
│ │ "extensions": { │ │
│ │ "persistedQuery": { │ │
│ │ "sha256Hash": "abc123..." │ │
│ │ } │ │
│ │ }, │ │
│ │ "variables": { "id": "123" } │ │
│ │ } │ │
│ │ │ │
│ │ Server looks up query by hash and executes │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Benefits: │
│ • Smaller request payloads (hash vs full query) │
│ • Server can whitelist allowed queries │
│ • Enables CDN caching (GET with hash as param) │
│ • Security: prevent arbitrary query execution │
└─────────────────────────────────────────────────────────────────┘Server Implementation
python
import hashlib
from typing import Dict, Optional
class PersistedQueryStore:
"""Store for persisted queries"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.local_cache: Dict[str, str] = {}
def hash_query(self, query: str) -> str:
"""Generate SHA256 hash of query"""
return hashlib.sha256(query.encode()).hexdigest()
async def register(self, query: str) -> str:
"""Register a query and return its hash"""
query_hash = self.hash_query(query)
# Store in Redis for distributed access
if self.redis:
await self.redis.set(f"pq:{query_hash}", query)
# Also cache locally
self.local_cache[query_hash] = query
return query_hash
async def get(self, query_hash: str) -> Optional[str]:
"""Get query by hash"""
# Check local cache first
if query_hash in self.local_cache:
return self.local_cache[query_hash]
# Check Redis
if self.redis:
query = await self.redis.get(f"pq:{query_hash}")
if query:
self.local_cache[query_hash] = query
return query
return None
# Request handling with persisted queries
async def handle_graphql_request(request):
body = await request.json()
query = body.get("query")
variables = body.get("variables", {})
extensions = body.get("extensions", {})
# Check for persisted query
persisted = extensions.get("persistedQuery", {})
query_hash = persisted.get("sha256Hash")
if query_hash and not query:
# Lookup query by hash
query = await persisted_store.get(query_hash)
if not query:
return {
"errors": [{
"message": "PersistedQueryNotFound",
"extensions": {"code": "PERSISTED_QUERY_NOT_FOUND"}
}]
}
elif query and query_hash:
# Verify hash matches (APQ registration)
expected_hash = persisted_store.hash_query(query)
if query_hash != expected_hash:
return {
"errors": [{
"message": "Hash mismatch",
"extensions": {"code": "INVALID_HASH"}
}]
}
# Register the query
await persisted_store.register(query)
# Execute query
return await execute_graphql(query, variables)Automatic Persisted Queries (APQ)
┌─────────────────────────────────────────────────────────────────┐
│ APQ Flow │
│ │
│ First request (query not registered): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Client: { hash: "abc123" } │ │
│ │ Server: "PersistedQueryNotFound" │ │
│ │ Client: { hash: "abc123", query: "{ user {...} }" } │ │
│ │ Server: Registers query, returns data │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Subsequent requests: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Client: { hash: "abc123" } │ │
│ │ Server: Looks up query, returns data │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Benefits: │
│ • No build step required │
│ • Automatic registration on first use │
│ • Only one extra round-trip per unique query │
│ • Works with dynamically generated queries │
└─────────────────────────────────────────────────────────────────┘CDN Caching
Making GraphQL CDN-Friendly
python
# Convert POST to GET for cacheable queries
async def handle_get_request(request):
"""Handle GET requests with persisted queries for CDN caching"""
# Query parameters
query_hash = request.query_params.get("hash")
variables = json.loads(request.query_params.get("variables", "{}"))
# Look up query
query = await persisted_store.get(query_hash)
if not query:
return Response(status_code=404)
# Check if cacheable (no mutations, no private data)
if not is_cacheable_query(query):
return Response(status_code=400)
# Execute
result = await execute_graphql(query, variables)
# Add cache headers
cache_policy = get_cache_policy(result)
headers = {
"Cache-Control": f"public, max-age={cache_policy['maxAge']}",
"Vary": "Accept-Encoding",
}
return Response(json.dumps(result), headers=headers)
# CDN configuration (Cloudflare/Fastly)
"""
Cache rules:
- Cache GET /graphql?hash=* requests
- Vary by: Accept-Encoding, Authorization (if needed)
- Respect Cache-Control headers
- Purge by tag when data updates
"""Cache Tags for Invalidation
python
class CacheTagCollector:
"""Collect cache tags during resolution for CDN invalidation"""
def __init__(self):
self.tags = set()
def add_tag(self, tag: str):
self.tags.add(tag)
def add_entity(self, type_name: str, id: str):
self.tags.add(f"{type_name}:{id}")
# Resolver adds tags
@query.field("post")
async def resolve_post(_, info, id):
post = await info.context["db"].posts.find_one({"_id": id})
# Add cache tag for this entity
info.context["cache_tags"].add_entity("Post", id)
info.context["cache_tags"].add_entity("User", post["author_id"])
return post
# Add tags to response headers
def add_cache_tags(response, tags: CacheTagCollector):
if tags.tags:
response.headers["Cache-Tag"] = ",".join(tags.tags)
# Cloudflare: Surrogate-Key
response.headers["Surrogate-Key"] = " ".join(tags.tags)
return response
# Invalidation on mutation
@mutation.field("updatePost")
async def resolve_update_post(_, info, id, input):
result = await info.context["db"].posts.update_one(
{"_id": id},
{"$set": input}
)
# Purge CDN cache for this entity
await purge_cdn_cache(tags=[f"Post:{id}"])
return await info.context["post_loader"].load(id)Client-Side Caching
Normalized Cache (Apollo Client)
┌─────────────────────────────────────────────────────────────────┐
│ Normalized Client Cache │
│ │
│ Query Result: │
│ { │
│ "post": { │
│ "id": "1", │
│ "title": "Hello", │
│ "author": { │
│ "id": "100", │
│ "name": "Alice" │
│ } │
│ } │
│ } │
│ │
│ Normalized Cache: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ "Post:1": { │ │
│ │ "id": "1", │ │
│ │ "title": "Hello", │ │
│ │ "author": { "__ref": "User:100" } ◄─── Reference │ │
│ │ } │ │
│ │ │ │
│ │ "User:100": { │ │
│ │ "id": "100", │ │
│ │ "name": "Alice" │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Benefits: │
│ • Updates to User:100 reflected everywhere │
│ • Deduplication of data │
│ • Automatic cache updates on mutations │
└─────────────────────────────────────────────────────────────────┘Apollo Client Configuration
javascript
import { ApolloClient, InMemoryCache } from '@apollo/client';
const cache = new InMemoryCache({
typePolicies: {
// Configure cache behavior per type
User: {
// Use 'id' field as cache key
keyFields: ['id'],
fields: {
// Merge paginated results
posts: {
keyArgs: ['filter'],
merge(existing = [], incoming, { args }) {
if (!args?.after) {
return incoming;
}
return [...existing, ...incoming];
},
},
},
},
Post: {
keyFields: ['id'],
fields: {
// Cache comments separately with TTL
comments: {
read(existing, { readField, cache }) {
// Check if cache is stale
const cachedAt = readField('__cachedAt');
if (cachedAt && Date.now() - cachedAt > 60000) {
return undefined; // Trigger refetch
}
return existing;
},
},
},
},
Query: {
fields: {
// Configure root query caching
post(_, { args, toReference }) {
return toReference({
__typename: 'Post',
id: args.id,
});
},
},
},
},
});
const client = new ApolloClient({
cache,
uri: '/graphql',
});Cache Updates After Mutations
javascript
// Automatic update (Apollo detects by ID)
const [createPost] = useMutation(CREATE_POST, {
// Apollo automatically updates cache for Post:newId
});
// Manual cache update
const [deletePost] = useMutation(DELETE_POST, {
update(cache, { data: { deletePost } }) {
// Remove from cache
cache.evict({ id: `Post:${deletePost.id}` });
cache.gc();
},
});
// Refetch queries after mutation
const [updatePost] = useMutation(UPDATE_POST, {
refetchQueries: [
{ query: GET_POSTS },
'GetUserPosts', // By operation name
],
});
// Optimistic update
const [likePost] = useMutation(LIKE_POST, {
optimisticResponse: {
likePost: {
__typename: 'Post',
id: postId,
likeCount: currentLikeCount + 1,
likedByMe: true,
},
},
});Performance Optimization
Query Cost Analysis
python
def calculate_query_cost(info, max_cost=10000):
"""
Calculate query cost for rate limiting
"""
def calculate_field_cost(field, parent_multiplier=1):
cost = 1 # Base cost per field
multiplier = 1
# Check for list arguments
for arg in field.arguments:
if arg.name.value in ('first', 'last', 'limit'):
multiplier = min(arg.value.value, 100) # Cap at 100
field_cost = cost * parent_multiplier * multiplier
# Recurse into selections
if field.selection_set:
for selection in field.selection_set.selections:
field_cost += calculate_field_cost(
selection,
multiplier
)
return field_cost
total_cost = 0
for field in info.field_nodes:
total_cost += calculate_field_cost(field)
if total_cost > max_cost:
raise GraphQLError(
f"Query cost {total_cost} exceeds maximum {max_cost}",
extensions={"code": "QUERY_TOO_EXPENSIVE", "cost": total_cost}
)
return total_costRequest Batching
python
# Server-side batch handling
async def handle_batch_request(request):
"""Handle batched GraphQL requests"""
body = await request.json()
# Check if batch request
if isinstance(body, list):
if len(body) > 10: # Limit batch size
return {"error": "Batch size exceeds maximum of 10"}
# Execute all queries in parallel
results = await asyncio.gather(*[
execute_single_request(req) for req in body
])
return results
return await execute_single_request(body)
# Client-side batching (Apollo)
import { BatchHttpLink } from '@apollo/client/link/batch-http';
const link = new BatchHttpLink({
uri: '/graphql',
batchMax: 10, // Max queries per batch
batchInterval: 20, // Wait time in ms
});Deferred Execution (@defer)
graphql
# Query with deferred fields
query GetPost($id: ID!) {
post(id: $id) {
id
title
content
# Defer expensive fields
... @defer {
comments {
id
text
author {
name
}
}
relatedPosts {
id
title
}
}
}
}
# Server streams response:
# 1. Initial: { "data": { "post": { "id": "1", "title": "...", "content": "..." } } }
# 2. Deferred: { "data": { "post": { "comments": [...], "relatedPosts": [...] } }, "path": ["post"] }Monitoring and Observability
Performance Metrics
python
import time
from prometheus_client import Histogram, Counter
# Metrics
query_duration = Histogram(
'graphql_query_duration_seconds',
'GraphQL query duration',
['operation_name', 'operation_type']
)
resolver_duration = Histogram(
'graphql_resolver_duration_seconds',
'GraphQL resolver duration',
['type_name', 'field_name']
)
cache_hits = Counter(
'graphql_cache_hits_total',
'GraphQL cache hits',
['cache_type']
)
# Instrumentation middleware
class PerformanceMiddleware:
async def resolve(self, next, obj, info, **kwargs):
start = time.time()
try:
return await next(obj, info, **kwargs)
finally:
duration = time.time() - start
resolver_duration.labels(
type_name=info.parent_type.name,
field_name=info.field_name
).observe(duration)
# Log slow resolvers
if duration > 0.1: # 100ms
logger.warning(
f"Slow resolver: {info.parent_type.name}.{info.field_name} "
f"took {duration*1000:.2f}ms"
)Query Logging
python
import hashlib
def log_query(info, duration_ms, error=None):
"""Log query for analysis"""
query_str = info.context.get("query_string", "")
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"query_hash": hashlib.sha256(query_str.encode()).hexdigest()[:12],
"operation_name": info.operation.name.value if info.operation.name else None,
"operation_type": info.operation.operation.value,
"duration_ms": duration_ms,
"complexity": info.context.get("query_complexity"),
"cache_hit": info.context.get("cache_hit", False),
"user_id": info.context.get("user", {}).get("id"),
"error": str(error) if error else None,
}
# Send to logging/analytics
logger.info("graphql_query", extra=log_entry)
# Store for slow query analysis
if duration_ms > 1000:
await store_slow_query(query_str, log_entry)Best Practices
Response Caching:
□ Use cache control directives for field-level TTLs
□ Calculate overall cache policy from field hints
□ Separate public vs private cached data
□ Invalidate cache on mutations
Persisted Queries:
□ Use APQ for automatic registration
□ Consider static extraction for production
□ Enable GET requests for CDN caching
□ Whitelist queries in high-security environments
Client Caching:
□ Configure normalized cache with proper key fields
□ Define type policies for pagination merging
□ Use optimistic updates for better UX
□ Clean up cache on logout/user switch
Performance:
□ Implement query complexity/cost analysis
□ Set reasonable limits (depth, complexity, batch size)
□ Use @defer for expensive fields
□ Monitor resolver performance
□ Log and analyze slow queries