Building reliable, performant integrations with the Catalio API requires more than just understanding the endpoints. This guide covers production-ready patterns, error handling strategies, optimization techniques, and architectural decisions that separate robust integrations from fragile ones.
Table of Contents
- Core API Design Principles
- Error Handling Strategies
- Retry Logic and Exponential Backoff
- Caching Strategies
- Rate Limiting Best Practices
- Pagination Patterns
- Bulk Operations vs Individual Requests
- Optimistic Concurrency Control
- Idempotency
- API Versioning
- Testing API Integrations
- Monitoring and Observability
- Performance Optimization
- Multi-tenant Considerations
- Common Anti-patterns to Avoid
- Production Deployment Checklist
- SDK and Client Library Patterns
Core API Design Principles
RESTful Resource Design
The Catalio API follows RESTful principles with these core concepts:
Resources as Entities: Every API endpoint represents a business entity (Requirement, Organization, User, etc.). Resources have:
- Unique identifiers: IDs uniquely identify instances within their organization
- State representations: JSON payloads represent current resource state
- Relationships: Resources link to related entities via foreign key references
- Operations: Standard HTTP methods (GET, POST, PUT, DELETE) perform operations
# List all requirements for an organization
GET /api/organizations/:org_id/requirements
# Get a specific requirement
GET /api/organizations/:org_id/requirements/:id
# Create a new requirement
POST /api/organizations/:org_id/requirements
# Update a requirement
PUT /api/organizations/:org_id/requirements/:id
# Delete a requirement
DELETE /api/organizations/:org_id/requirements/:id
HTTP Status Codes
Always return appropriate HTTP status codes:
| Code | Meaning | When to Use |
|---|---|---|
| 200 | OK | Successful GET, PUT, DELETE |
| 201 | Created | Successful POST creating a resource |
| 204 | No Content | Successful DELETE or empty response |
| 400 | Bad Request | Invalid input parameters or validation error |
| 401 | Unauthorized | Missing or invalid authentication |
| 403 | Forbidden | Authenticated but not authorized for resource |
| 404 | Not Found | Resource doesn’t exist or wrong path |
| 409 | Conflict | Concurrency conflict (version mismatch) |
| 422 | Unprocessable Entity | Semantic validation failures |
| 429 | Too Many Requests | Rate limit exceeded |
| 500 | Server Error | Unexpected server-side error |
| 503 | Service Unavailable | Temporary server issue (retry eligible) |
Request/Response Design
Consistent JSON Structure: All responses follow a consistent envelope:
{
"data": {
"id": "123",
"type": "requirement",
"attributes": {
"title": "User authentication",
"status": "active",
"created_at": "2025-03-05T10:30:00Z"
},
"relationships": {
"organization": {
"data": { "id": "org-1", "type": "organization" }
}
}
}
}
List Responses with Pagination:
{
"data": [
{ "id": "1", "type": "requirement", "attributes": { "title": "Feature A" } },
{ "id": "2", "type": "requirement", "attributes": { "title": "Feature B" } }
],
"meta": {
"page": 1,
"page_size": 50,
"total": 150,
"has_more": true
}
}
Timestamp Format: Always use ISO 8601 UTC timestamps:
{
"created_at": "2025-03-05T10:30:00Z",
"updated_at": "2025-03-05T11:45:30Z"
}
Error Handling Strategies
Comprehensive Error Responses
When errors occur, return detailed error information to help debugging:
{
"errors": [
{
"status": 400,
"code": "VALIDATION_ERROR",
"title": "Invalid request parameters",
"detail": "The 'title' field is required",
"source": {
"pointer": "/data/attributes/title"
}
}
]
}
Error Classifications
Client Errors (4xx): Originate from client requests
{
"errors": [
{
"status": 400,
"code": "INVALID_PARAMETER",
"title": "Bad Request",
"detail": "Invalid format for organization_id: must be UUID",
"source": { "parameter": "organization_id" }
}
]
}
Server Errors (5xx): Originate from server issues
{
"errors": [
{
"status": 500,
"code": "INTERNAL_ERROR",
"title": "Internal Server Error",
"detail": "An unexpected error occurred processing your request",
"request_id": "req-abc123def456"
}
]
}
Validation Error Handling
Validate requests early and return comprehensive feedback:
{
"errors": [
{
"status": 422,
"code": "VALIDATION_ERROR",
"title": "Validation failed",
"detail": "Title must be between 3 and 255 characters",
"source": { "pointer": "/data/attributes/title" }
},
{
"status": 422,
"code": "VALIDATION_ERROR",
"title": "Validation failed",
"detail": "Status must be one of: active, archived, draft",
"source": { "pointer": "/data/attributes/status" }
}
]
}
Client Implementation Pattern
Handle errors gracefully in your integration code:
import requests
from typing import Optional, Dict, Any
class CatalioClient:
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def handle_error(self, response: requests.Response) -> Dict[str, Any]:
"""Extract and categorize API errors"""
try:
error_data = response.json()
errors = error_data.get("errors", [])
if response.status_code == 400:
return {
"type": "validation_error",
"message": errors[0]["detail"] if errors else "Invalid request",
"details": errors
}
elif response.status_code == 401:
return {
"type": "auth_error",
"message": "Invalid or missing authentication",
"details": errors
}
elif response.status_code == 403:
return {
"type": "permission_error",
"message": "Not authorized to access this resource",
"details": errors
}
elif response.status_code == 404:
return {
"type": "not_found",
"message": "Resource not found",
"details": errors
}
elif response.status_code == 409:
return {
"type": "conflict",
"message": "Resource conflict (possibly due to concurrent modification)",
"details": errors
}
elif response.status_code == 429:
return {
"type": "rate_limit",
"message": "Rate limit exceeded",
"retry_after": response.headers.get("Retry-After", "60")
}
elif response.status_code >= 500:
return {
"type": "server_error",
"message": "Server error (retryable)",
"request_id": errors[0].get("request_id") if errors else None,
"details": errors
}
except:
pass
return {
"type": "unknown_error",
"status_code": response.status_code,
"body": response.text
}
def get_requirement(self, org_id: str, req_id: str) -> Optional[Dict]:
"""Get a requirement with proper error handling"""
url = f"{self.base_url}/api/organizations/{org_id}/requirements/{req_id}"
try:
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()["data"]
else:
error = self.handle_error(response)
# Log error, alert monitoring, etc.
raise APIError(error)
except requests.RequestException as e:
# Network error - likely retryable
raise NetworkError(f"Request failed: {str(e)}")
Retry Logic and Exponential Backoff
When to Retry
Retryable Errors:
- 408 Request Timeout
- 429 Too Many Requests
- 500 Internal Server Error
- 502 Bad Gateway
- 503 Service Unavailable
- 504 Gateway Timeout
- Network timeouts
- Connection refused
Non-Retryable Errors:
- 400 Bad Request (validation error)
- 401 Unauthorized
- 403 Forbidden
- 404 Not Found
Exponential Backoff Implementation
Implement exponential backoff to prevent overwhelming the server during transient failures:
import time
import random
from typing import Callable, TypeVar, Optional
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry behavior"""
def __init__(
self,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 32.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Calculate delay for given attempt number (0-indexed)"""
# Exponential backoff: delay = initial_delay * (base ^ attempt)
delay = self.initial_delay * (self.exponential_base ** attempt)
# Cap at max_delay
delay = min(delay, self.max_delay)
# Add jitter: random value between delay * 0.5 and delay
if self.jitter:
delay = delay * (0.5 + random.random())
return delay
def retry_with_backoff(
func: Callable[..., T],
config: Optional[RetryConfig] = None,
**kwargs
) -> T:
"""
Execute function with exponential backoff retry logic
Example:
result = retry_with_backoff(
client.get_requirement,
org_id="org-123",
req_id="req-456"
)
"""
config = config or RetryConfig()
last_error = None
for attempt in range(config.max_attempts):
try:
return func(**kwargs)
except (NetworkError, TimeoutError, ServerError) as e:
last_error = e
if attempt < config.max_attempts - 1:
delay = config.get_delay(attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}")
time.sleep(delay)
else:
print(f"All {config.max_attempts} attempts failed")
raise last_error
# Usage
try:
requirement = retry_with_backoff(
client.get_requirement,
config=RetryConfig(max_attempts=4),
org_id="org-123",
req_id="req-456"
)
except Exception as e:
print(f"Failed to fetch requirement after retries: {e}")
Handling Retry-After Headers
When receiving a 429 (Too Many Requests) response, respect the Retry-After header:
from datetime import datetime, timedelta, timezone
def handle_rate_limit(response: requests.Response) -> int:
"""Extract retry-after delay from response headers"""
retry_after = response.headers.get("Retry-After")
if retry_after:
# Retry-After can be seconds (integer) or HTTP-date
try:
return int(retry_after)
except ValueError:
# Parse as HTTP-date
from email.utils import parsedate_to_datetime
retry_time = parsedate_to_datetime(retry_after)
# Use timezone-aware now to match parsed datetime
now = datetime.now(timezone.utc)
# Clamp negative delays to zero
seconds = max(0, int((retry_time - now).total_seconds()))
return seconds
# Default fallback
return 60
def request_with_rate_limit_handling(
func: Callable,
**kwargs
) -> Any:
"""Make request with automatic rate limit handling"""
max_retries = 3
for attempt in range(max_retries):
try:
response = func(**kwargs)
if response.status_code == 429:
if attempt < max_retries - 1:
delay = handle_rate_limit(response)
print(f"Rate limited, waiting {delay}s before retry")
time.sleep(delay)
continue
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
raise Exception("Max retries exceeded")
Caching Strategies
Cache-Control Headers
The Catalio API returns Cache-Control headers to indicate cacheability:
Cache-Control: public, max-age=3600
Cache Types:
public: Can be cached by any cache (CDNs, proxies, browsers)private: Only user-specific caches can storemax-age=3600: Cache valid for 3600 seconds (1 hour)must-revalidate: Revalidate with server if staleno-cache: Validate with server before usingno-store: Never cache this response
Client-Side Caching Implementation
Implement intelligent caching to reduce API calls:
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class CacheEntry:
def __init__(self, data: Any, max_age: int):
self.data = data
self.created_at = datetime.now()
self.max_age = max_age
def is_expired(self) -> bool:
age = (datetime.now() - self.created_at).total_seconds()
return age > self.max_age
def is_stale(self) -> bool:
# Stale if older than max_age but still usable
age = (datetime.now() - self.created_at).total_seconds()
return age > self.max_age * 0.8
class ResponseCache:
"""In-memory cache for API responses"""
def __init__(self):
self.cache: Dict[str, CacheEntry] = {}
def get_cache_key(self, method: str, url: str, params: Dict = None) -> str:
"""Generate cache key from request details"""
key_data = f"{method}:{url}:{json.dumps(params or {})}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get cached value if fresh"""
if key in self.cache:
entry = self.cache[key]
if not entry.is_expired():
return entry.data
else:
del self.cache[key]
return None
def get_stale(self, key: str) -> Optional[Any]:
"""Get stale cached value (for stale-while-revalidate)"""
if key in self.cache:
return self.cache[key].data
return None
def set(self, key: str, data: Any, max_age: int = 3600):
"""Store response in cache"""
self.cache[key] = CacheEntry(data, max_age)
def clear(self):
"""Clear all cached responses"""
self.cache.clear()
class CachingAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.cache = ResponseCache()
def get_requirement(
self,
org_id: str,
req_id: str,
use_cache: bool = True,
force_refresh: bool = False
) -> Dict:
"""Get requirement with intelligent caching"""
url = f"/api/organizations/{org_id}/requirements/{req_id}"
cache_key = self.cache.get_cache_key("GET", url)
# Check cache first
if use_cache and not force_refresh:
cached = self.cache.get(cache_key)
if cached is not None:
cached["_cached"] = True
return cached
# Fetch from API
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
data = response.json()["data"]
# Store in cache based on Cache-Control header
cache_control = response.headers.get("Cache-Control", "max-age=3600")
max_age = self.parse_max_age(cache_control)
self.cache.set(cache_key, data, max_age)
return data
@staticmethod
def parse_max_age(cache_control: str) -> int:
"""Extract max-age from Cache-Control header"""
parts = cache_control.split(",")
for part in parts:
part = part.strip()
if part.startswith("max-age="):
return int(part.split("=")[1])
return 3600 # Default 1 hour
# Usage
client = CachingAPIClient(api_key="your-api-key")
# First call fetches from API
req = client.get_requirement("org-123", "req-456")
# Second call returns cached value
req = client.get_requirement("org-123", "req-456") # Returns cached copy
# Force refresh
req = client.get_requirement("org-123", "req-456", force_refresh=True)
Cache Invalidation Strategies
Implement cache invalidation when resources change:
class CacheInvalidationClient(CachingAPIClient):
"""Cache client with invalidation support"""
def create_requirement(
self,
org_id: str,
data: Dict
) -> Dict:
"""Create requirement and invalidate org requirements list cache"""
result = self._post(
f"organizations/{org_id}/requirements",
data
)
# Invalidate related caches
self._invalidate_org_cache(org_id)
return result
def update_requirement(
self,
org_id: str,
req_id: str,
data: Dict
) -> Dict:
"""Update requirement and invalidate caches"""
result = self._put(
f"organizations/{org_id}/requirements/{req_id}",
data
)
# Invalidate specific requirement and list caches
self._invalidate_requirement_cache(org_id, req_id)
self._invalidate_org_cache(org_id)
return result
def _invalidate_requirement_cache(self, org_id: str, req_id: str):
"""Invalidate specific requirement cache"""
url = f"/api/organizations/{org_id}/requirements/{req_id}"
cache_key = self.cache.get_cache_key("GET", url)
if cache_key in self.cache.cache:
del self.cache.cache[cache_key]
def _invalidate_org_cache(self, org_id: str):
"""Invalidate organization requirements list cache"""
url = f"/api/organizations/{org_id}/requirements"
# Invalidate all pagination variants
for page in range(1, 10): # Invalidate first 10 pages
cache_key = self.cache.get_cache_key("GET", url, {"page": page})
if cache_key in self.cache.cache:
del self.cache.cache[cache_key]
Rate Limiting Best Practices
Understanding Rate Limits
The Catalio API enforces rate limits to ensure fair resource usage:
| Category | Limit | Window |
|---|---|---|
| Authentication | 10 requests | 1 minute |
| Standard API | 1000 requests | 1 hour |
| Bulk Operations | 100 requests | 1 hour |
| WebSocket | 100 messages | 1 minute |
Rate Limit Headers:
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 842
X-RateLimit-Reset: 1699564200
Rate Limit Awareness
Monitor rate limit headers in every response:
class RateLimitManager:
"""Track and respect API rate limits"""
def __init__(self):
self.limit = None
self.remaining = None
self.reset_time = None
def update_from_response(self, response: requests.Response):
"""Update rate limit info from response headers"""
self.limit = int(response.headers.get("X-RateLimit-Limit", 0))
self.remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
reset_timestamp = int(response.headers.get("X-RateLimit-Reset", 0))
self.reset_time = datetime.fromtimestamp(reset_timestamp)
def should_throttle(self) -> bool:
"""Check if we should throttle requests"""
if self.remaining is None:
return False
# Throttle if less than 10% of limit remains
threshold = self.limit * 0.1
return self.remaining < threshold
def get_wait_time(self) -> float:
"""Get seconds to wait before next request"""
if not self.should_throttle() or not self.reset_time:
return 0
wait = (self.reset_time - datetime.now()).total_seconds()
return max(0, wait)
def log_status(self):
"""Log current rate limit status"""
if self.limit:
usage_pct = ((self.limit - self.remaining) / self.limit) * 100
print(f"Rate Limit: {self.remaining}/{self.limit} remaining ({usage_pct:.1f}% used)")
if self.reset_time:
print(f"Resets at: {self.reset_time}")
class RateLimitAwareClient:
"""API client that respects rate limits"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.rate_limit = RateLimitManager()
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Make request with rate limit awareness"""
# Check if we need to wait
wait_time = self.rate_limit.get_wait_time()
if wait_time > 0:
print(f"Rate limit approaching, waiting {wait_time:.1f}s")
time.sleep(wait_time)
# Make request
url = f"{self.base_url}{path}"
response = requests.request(
method,
url,
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
# Update rate limit info
self.rate_limit.update_from_response(response)
self.rate_limit.log_status()
response.raise_for_status()
return response.json()
Batch Requests to Reduce API Calls
Use batch endpoints when available to reduce rate limit usage:
def batch_create_requirements(
client: APIClient,
org_id: str,
requirements: List[Dict]
) -> List[Dict]:
"""Create multiple requirements with single API call"""
# Use batch endpoint instead of creating each individually
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/batch",
data={"requirements": requirements}
)
return response["data"]
# Usage
requirements = [
{"title": "User Auth", "priority": "high"},
{"title": "Payment Processing", "priority": "high"},
{"title": "Email Notifications", "priority": "medium"},
]
# Single API call instead of 3
created = batch_create_requirements(client, "org-123", requirements)
# Track usage
print(f"Created {len(created)} requirements with 1 API call")
print(f"Rate limit remaining: {client.rate_limit.remaining}")
Pagination Patterns
Cursor-Based Pagination
For large datasets, use cursor-based pagination for reliable results:
{
"data": [
{ "id": "req-1", "title": "Feature A" },
{ "id": "req-2", "title": "Feature B" }
],
"meta": {
"next_cursor": "eyJpZCI6IjUwZjc2YWY2ZjI2MjkifQ==",
"prev_cursor": "eyJpZCI6IjRkZjhhZGY2ZjI2MjkifQ==",
"has_more": true
}
}
Advantages of Cursor-Based Pagination:
- Handles insertions/deletions without gaps
- Consistent results regardless of concurrent modifications
- More efficient database queries
- Supports bidirectional navigation
class PaginationIterator:
"""Iterate through paginated API results"""
def __init__(
self,
client: APIClient,
path: str,
page_size: int = 50
):
self.client = client
self.path = path
self.page_size = page_size
self.next_cursor = None
def __iter__(self):
return self
def __next__(self) -> List[Dict]:
"""Fetch next page of results"""
params = {"limit": self.page_size}
if self.next_cursor:
params["cursor"] = self.next_cursor
response = self.client.request("GET", self.path, params=params)
data = response["data"]
if not data:
raise StopIteration
# Update cursor for next iteration
meta = response.get("meta", {})
self.next_cursor = meta.get("next_cursor")
return data
# Usage: Iterate through all requirements
def process_all_requirements(client: APIClient, org_id: str):
"""Process all requirements in an organization"""
iterator = PaginationIterator(
client,
f"/api/organizations/{org_id}/requirements",
page_size=100
)
requirement_count = 0
for page in iterator:
for requirement in page:
# Process requirement
print(f"Processing: {requirement['title']}")
requirement_count += 1
print(f"Processed {requirement_count} total requirements")
# Usage
process_all_requirements(client, "org-123")
Offset-Based Pagination (When Necessary)
If cursor-based pagination isn’t available, use offset-based with caution:
class OffsetPagination:
"""Offset-based pagination (less efficient but sometimes necessary)"""
def __init__(
self,
client: APIClient,
path: str,
page_size: int = 50
):
self.client = client
self.path = path
self.page_size = page_size
self.current_page = 1
def get_page(self, page: int) -> Dict:
"""Fetch specific page"""
response = self.client.request(
"GET",
self.path,
params={"page": page, "page_size": self.page_size}
)
return response
def __iter__(self):
self.current_page = 1
return self
def __next__(self) -> List[Dict]:
"""Fetch next page"""
response = self.get_page(self.current_page)
data = response["data"]
if not data:
raise StopIteration
self.current_page += 1
return data
# ⚠️ WARNING: This approach can miss/duplicate items if data changes
# Only use if cursor-based pagination is unavailable
Bulk Operations vs Individual Requests
When to Use Bulk Operations
Bulk operations are more efficient for:
- Creating/updating/deleting many resources
- Reducing rate limit usage
- Atomic multi-resource operations
- Transactional consistency
Individual requests are better for:
- Single resource changes
- Complex business logic per request
- Real-time user interactions
- Immediate feedback requirements
Bulk Creation Example
def bulk_create_requirements(
client: APIClient,
org_id: str,
requirements: List[Dict]
) -> BulkOperationResult:
"""
Create multiple requirements efficiently
Instead of 100 individual POST requests, use bulk endpoint
"""
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/bulk",
data={
"requirements": requirements
}
)
return BulkOperationResult(response)
class BulkOperationResult:
"""Handle results from bulk operations"""
def __init__(self, response: Dict):
self.response = response
self.succeeded = response.get("succeeded", [])
self.failed = response.get("failed", [])
def success_count(self) -> int:
return len(self.succeeded)
def failure_count(self) -> int:
return len(self.failed)
def success_rate(self) -> float:
total = len(self.succeeded) + len(self.failed)
return len(self.succeeded) / total if total > 0 else 0
def has_failures(self) -> bool:
return len(self.failed) > 0
def get_error_summary(self) -> str:
"""Get human-readable error summary"""
if not self.failed:
return "All operations succeeded"
errors_by_code = {}
for failure in self.failed:
code = failure.get("code", "unknown")
if code not in errors_by_code:
errors_by_code[code] = 0
errors_by_code[code] += 1
summary = f"{self.failure_count()} operations failed:\n"
for code, count in errors_by_code.items():
summary += f" - {code}: {count}\n"
return summary
# Usage
requirements = [
{"title": "Feature A", "priority": "high"},
{"title": "Feature B", "priority": "medium"},
{"title": "Feature C", "priority": "low"},
]
result = bulk_create_requirements(client, "org-123", requirements)
print(f"Success rate: {result.success_rate():.1%}")
if result.has_failures():
print(result.get_error_summary())
Handling Partial Failures
Bulk operations may partially succeed. Handle this gracefully:
def bulk_update_with_retry(
client: APIClient,
org_id: str,
updates: List[Dict]
) -> None:
"""
Bulk update with automatic retry of failures
Attempts bulk update, retries individual items that fail
"""
result = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/bulk-update",
data={"updates": updates}
)
# Retry individual failures
if result.get("failed"):
failed_updates = [
next(u for u in updates if u["id"] == f["id"])
for f in result["failed"]
]
retried_count = 0
for update in failed_updates:
try:
client.request(
"PUT",
f"/api/organizations/{org_id}/requirements/{update['id']}",
data=update
)
retried_count += 1
except Exception as e:
print(f"Individual retry failed for {update['id']}: {e}")
print(f"Retried {retried_count} failed operations")
Optimistic Concurrency Control
Using ETags and Version Fields
Prevent lost updates when multiple clients modify the same resource:
{
"data": {
"id": "req-123",
"title": "User Authentication",
"status": "active",
"version": 3,
"_etag": "\"abc123def456\""
}
}
Updating with Version Check
def update_requirement_with_version(
client: APIClient,
org_id: str,
req_id: str,
current_version: int,
updates: Dict
) -> Dict:
"""
Update requirement with optimistic locking
Fails if current_version doesn't match server version
"""
try:
response = client.request(
"PUT",
f"/api/organizations/{org_id}/requirements/{req_id}",
data={
"requirement": updates,
"version": current_version
}
)
return response["data"]
except HTTPError as e:
if e.status_code == 409:
# Conflict: resource was modified
error_data = e.response.json()
raise ConflictError(
f"Resource modified by another client. "
f"Expected version {current_version}, "
f"got {error_data['data']['version']}"
) from e
raise
# Usage
requirement = client.get_requirement("org-123", "req-456")
try:
updated = update_requirement_with_version(
client,
"org-123",
"req-456",
current_version=requirement["version"],
updates={"title": "New Title"}
)
except ConflictError as e:
# Handle conflict: refresh and retry
print(f"Conflict detected: {e}")
requirement = client.get_requirement("org-123", "req-456")
# Notify user or implement merge logic
Handling Conflicts
Implement smart conflict resolution:
class OptimisticLockingClient:
def __init__(self, client: APIClient):
self.client = client
def update_with_conflict_handling(
self,
org_id: str,
req_id: str,
updates: Dict,
max_retries: int = 3
) -> Dict:
"""Update with automatic retry on conflict"""
for attempt in range(max_retries):
# Fetch current state
current = self.client.get_requirement(org_id, req_id)
version = current["version"]
try:
# Attempt update with current version
return update_requirement_with_version(
self.client,
org_id,
req_id,
version,
updates
)
except ConflictError:
if attempt < max_retries - 1:
print(f"Conflict on attempt {attempt + 1}, retrying...")
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
else:
raise
Idempotency
Idempotency Keys
Use idempotency keys to make requests safe to retry:
import uuid
class IdempotentAPIClient:
"""API client with idempotency support"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
def create_requirement(
self,
org_id: str,
data: Dict,
idempotency_key: Optional[str] = None
) -> Dict:
"""
Create requirement with idempotency guarantee
If request is retried with same idempotency_key,
returns cached result instead of creating duplicate
"""
# Generate idempotency key if not provided
if not idempotency_key:
idempotency_key = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {self.api_key}",
"Idempotency-Key": idempotency_key
}
response = requests.post(
f"{self.base_url}/api/organizations/{org_id}/requirements",
headers=headers,
json=data
)
# Store idempotency key for audit trail
result = response.json()["data"]
result["_idempotency_key"] = idempotency_key
return result
# Usage
client = IdempotentAPIClient(api_key="...")
# Generate stable key based on source data
idempotency_key = hashlib.md5(
json.dumps({"title": "Feature A"}).encode()
).hexdigest()
# Can retry safely without creating duplicate
requirement = client.create_requirement(
"org-123",
{"title": "Feature A"},
idempotency_key=idempotency_key
)
# Retry with same key - returns same result
requirement_retry = client.create_requirement(
"org-123",
{"title": "Feature A"},
idempotency_key=idempotency_key
)
assert requirement["id"] == requirement_retry["id"] # Same requirement
Idempotent Operations
Design operations to be naturally idempotent:
def set_requirement_status(
client: APIClient,
org_id: str,
req_id: str,
status: str
) -> Dict:
"""
Set requirement status (idempotent)
Safe to call multiple times with same status
"""
return client.request(
"PATCH",
f"/api/organizations/{org_id}/requirements/{req_id}",
data={"status": status}
)
# All of these are safe and produce same result
set_requirement_status(client, "org-123", "req-456", "active")
set_requirement_status(client, "org-123", "req-456", "active") # Idempotent
set_requirement_status(client, "org-123", "req-456", "active") # Idempotent
def increment_counter(
client: APIClient,
org_id: str,
counter_id: str
) -> int:
"""
Increment counter (NOT idempotent - use idempotency key)
Each call increments by 1, so retries would double increment
"""
# ⚠️ REQUIRES idempotency_key to prevent duplicate increments
return client.request(
"POST",
f"/api/organizations/{org_id}/counters/{counter_id}/increment"
)
API Versioning
Version Strategy
The Catalio API uses header-based versioning:
# Request specific API version
GET /api/organizations/org-123/requirements \
-H "Accept: application/vnd.catalio.v2+json"
# Defaults to latest stable version
GET /api/organizations/org-123/requirements
Version Negotiation
class VersionedAPIClient:
"""API client with version negotiation"""
def __init__(
self,
api_key: str,
api_version: str = "v2",
base_url: Optional[str] = None
):
self.api_key = api_key
self.api_version = api_version
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.supported_versions = {"v1", "v2", "v3"}
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
version: Optional[str] = None
) -> Dict:
"""Make request with version header"""
version = version or self.api_version
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": f"application/vnd.catalio.{version}+json"
}
response = requests.request(
method,
f"{self.base_url}{path}",
headers=headers,
json=data
)
# Check for version deprecation warning
if "Deprecation" in response.headers:
print(f"⚠️ API version {version} is deprecated")
print(f" Sunset date: {response.headers.get('Sunset')}")
response.raise_for_status()
return response.json()
# Usage
client = VersionedAPIClient(api_key="...", api_version="v2")
# Use default version
requirements = client.request("GET", "/api/organizations/org-123/requirements")
# Try newer version
try:
requirements = client.request(
"GET",
"/api/organizations/org-123/requirements",
version="v3"
)
except requests.HTTPError as e:
if e.response.status_code == 406:
print("API version v3 not available, falling back to v2")
Migration Strategy
When APIs change, implement gradual migration:
class MigrationManager:
"""Manage migration between API versions"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.v2_client = VersionedAPIClient(api_key=api_key, api_version="v2", base_url=base_url)
self.v3_client = VersionedAPIClient(api_key=api_key, api_version="v3", base_url=base_url)
self.migration_percentage = 0 # Start with 0% on v3
def get_requirement(self, org_id: str, req_id: str):
"""Gradually migrate to new API version"""
import random
# Gradually increase percentage of traffic on new version
if random.random() < self.migration_percentage / 100:
try:
return self.v3_client.request(
"GET",
f"/api/organizations/{org_id}/requirements/{req_id}"
)
except Exception as e:
# Fall back to v2 on errors
print(f"v3 request failed: {e}, falling back to v2")
# Use v2 for rest of traffic
return self.v2_client.request(
"GET",
f"/api/organizations/{org_id}/requirements/{req_id}"
)
def increase_migration_percentage(self, percentage: int):
"""Gradually increase traffic to new API version"""
self.migration_percentage = min(percentage, 100)
print(f"Migrating {self.migration_percentage}% traffic to v3")
Testing API Integrations
Unit Testing with Mocks
Test your integration code without hitting real API:
import pytest
from unittest.mock import Mock, patch, MagicMock
@pytest.fixture
def mock_api_client():
"""Create mocked API client for testing"""
return Mock()
def test_create_requirement_success(mock_api_client):
"""Test successful requirement creation"""
# Mock API response
mock_api_client.create_requirement.return_value = {
"id": "req-123",
"title": "User Auth",
"status": "active"
}
# Call function under test
result = create_requirement(
mock_api_client,
"org-123",
{"title": "User Auth"}
)
# Assertions
assert result["id"] == "req-123"
assert result["status"] == "active"
mock_api_client.create_requirement.assert_called_once()
def test_create_requirement_with_retry(mock_api_client):
"""Test retry logic on transient failures"""
# Mock API to fail twice then succeed
mock_api_client.create_requirement.side_effect = [
requests.ConnectionError("Network error"),
requests.ConnectionError("Network error"),
{"id": "req-123", "title": "User Auth"}
]
# Should succeed after retries
result = create_requirement_with_retry(
mock_api_client,
"org-123",
{"title": "User Auth"},
max_retries=3
)
assert result["id"] == "req-123"
assert mock_api_client.create_requirement.call_count == 3
def test_rate_limit_handling(mock_api_client):
"""Test rate limit handling"""
response = Mock()
response.status_code = 429
response.headers = {"Retry-After": "60"}
mock_api_client.request.return_value = response
# Should wait and retry
with patch("time.sleep") as mock_sleep:
handle_rate_limit(mock_api_client, "org-123")
mock_sleep.assert_called_once()
# Verify wait time is respected
assert mock_sleep.call_args[0][0] >= 60
Integration Testing
Test with real API:
import os
import pytest
@pytest.fixture(scope="session")
def api_client():
"""Create real API client for testing"""
api_key = os.getenv("CATALIO_API_KEY")
if not api_key:
pytest.skip("CATALIO_API_KEY not set")
return CatalioClient(api_key=api_key)
@pytest.mark.integration
def test_create_and_retrieve_requirement(api_client):
"""Test creating and retrieving a requirement"""
# Create
created = api_client.create_requirement(
"org-123",
{"title": f"Test {uuid.uuid4()}"}
)
# Retrieve
retrieved = api_client.get_requirement(
"org-123",
created["id"]
)
# Verify
assert retrieved["id"] == created["id"]
assert retrieved["title"] == created["title"]
# Cleanup
api_client.delete_requirement("org-123", created["id"])
@pytest.mark.integration
def test_pagination(api_client):
"""Test pagination through large result sets"""
# Create multiple requirements
for i in range(150):
api_client.create_requirement(
"org-123",
{"title": f"Paginated Test {i}"}
)
# Paginate through all
all_reqs = []
iterator = api_client.paginate("organizations/org-123/requirements")
for page in iterator:
all_reqs.extend(page)
assert len(all_reqs) >= 150
Monitoring and Observability
Structured Logging
Log API interactions for debugging and monitoring:
import logging
import json
from datetime import datetime
class StructuredLogger:
"""Structured logging for API requests"""
def __init__(self, logger: logging.Logger):
self.logger = logger
def log_request(
self,
method: str,
path: str,
status_code: int,
duration_ms: float,
error: Optional[str] = None
):
"""Log API request with structured data"""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"type": "api_request",
"method": method,
"path": path,
"status_code": status_code,
"duration_ms": duration_ms,
"error": error
}
self.logger.info(json.dumps(log_data))
class MonitoredAPIClient:
"""API client with integrated monitoring"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.logger = StructuredLogger(logging.getLogger("catalio.api"))
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Make request with monitoring"""
import time
start_time = time.time()
status_code = None
error = None
try:
response = requests.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
status_code = response.status_code
response.raise_for_status()
return response.json()
except Exception as e:
error = str(e)
status_code = getattr(e.response, "status_code", 0) if hasattr(e, "response") else 0
raise
finally:
duration_ms = (time.time() - start_time) * 1000
self.logger.log_request(method, path, status_code, duration_ms, error)
Metrics Collection
Track API performance metrics:
from dataclasses import dataclass
from statistics import mean, stdev
@dataclass
class APIMetrics:
"""Metrics for API performance"""
total_requests: int = 0
total_errors: int = 0
total_duration_ms: float = 0.0
request_durations: list = None
def __post_init__(self):
if self.request_durations is None:
self.request_durations = []
def record_request(self, duration_ms: float, success: bool):
"""Record metrics for a request"""
self.total_requests += 1
self.total_duration_ms += duration_ms
self.request_durations.append(duration_ms)
if not success:
self.total_errors += 1
def error_rate(self) -> float:
"""Calculate error rate percentage"""
if self.total_requests == 0:
return 0
return (self.total_errors / self.total_requests) * 100
def average_duration_ms(self) -> float:
"""Calculate average request duration"""
if not self.request_durations:
return 0
return mean(self.request_durations)
def p95_duration_ms(self) -> float:
"""Calculate 95th percentile duration"""
if not self.request_durations or len(self.request_durations) < 20:
return 0
sorted_durations = sorted(self.request_durations)
index = int(len(sorted_durations) * 0.95)
return sorted_durations[index]
def summarize(self) -> str:
"""Get human-readable metrics summary"""
return f"""
API Metrics:
Total Requests: {self.total_requests}
Error Rate: {self.error_rate():.1f}%
Avg Duration: {self.average_duration_ms():.1f}ms
P95 Duration: {self.p95_duration_ms():.1f}ms
"""
class MetricsTrackingClient:
"""API client with metrics collection"""
def __init__(self, api_key: str):
self.api_key = api_key
self.metrics = APIMetrics()
def request(self, method: str, path: str) -> Dict:
"""Make request and track metrics"""
import time
start = time.time()
success = False
try:
response = requests.request(...)
response.raise_for_status()
success = True
return response.json()
finally:
duration_ms = (time.time() - start) * 1000
self.metrics.record_request(duration_ms, success)
def get_metrics_summary(self) -> str:
"""Get current metrics"""
return self.metrics.summarize()
Performance Optimization
Connection Pooling
Reuse connections to improve performance:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session() -> requests.Session:
"""Create HTTP session with connection pooling and retry strategy"""
session = requests.Session()
# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
class OptimizedAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = create_session()
def request(self, method: str, path: str) -> Dict:
"""Make request using pooled connection"""
response = self.session.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()
def close(self):
"""Close session and connections"""
self.session.close()
# Usage
client = OptimizedAPIClient(api_key="...")
try:
# Multiple requests reuse connections
req1 = client.request("GET", "/api/organizations/org-123/requirements/1")
req2 = client.request("GET", "/api/organizations/org-123/requirements/2")
req3 = client.request("GET", "/api/organizations/org-123/requirements/3")
finally:
client.close()
Parallel Requests
Execute multiple independent requests in parallel:
import concurrent.futures
from typing import List
def fetch_requirements_parallel(
client: APIClient,
org_id: str,
requirement_ids: List[str],
max_workers: int = 5
) -> List[Dict]:
"""Fetch multiple requirements in parallel"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(client.get_requirement, org_id, req_id)
for req_id in requirement_ids
]
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Failed to fetch requirement: {e}")
return results
# Usage
requirement_ids = [f"req-{i}" for i in range(100)]
# Fetch 100 requirements in parallel with 5 concurrent requests
requirements = fetch_requirements_parallel(client, "org-123", requirement_ids)
print(f"Fetched {len(requirements)} requirements")
Batch Requests for Efficiency
Combine multiple operations into single request:
def get_requirements_batch(
client: APIClient,
org_id: str,
requirement_ids: List[str]
) -> Dict[str, Dict]:
"""Get multiple requirements with single API call"""
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/batch-get",
data={"ids": requirement_ids}
)
# Index results by ID for easy lookup
results = {}
for req in response["data"]:
results[req["id"]] = req
return results
# Usage
# Much more efficient than 100 individual GET requests
requirements = get_requirements_batch(
client,
"org-123",
[f"req-{i}" for i in range(100)]
)
Multi-tenant Considerations
Organization-Scoped Requests
Always include organization context:
class MultiTenantAPIClient:
"""API client with multi-tenant awareness"""
def __init__(self, api_key: str, org_id: str):
self.api_key = api_key
self.org_id = org_id
def get_requirement(self, req_id: str) -> Dict:
"""Get requirement for current organization"""
# Always scope to organization
response = requests.get(
f"/api/organizations/{self.org_id}/requirements/{req_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()["data"]
def create_requirement(self, data: Dict) -> Dict:
"""Create requirement for current organization"""
response = requests.post(
f"/api/organizations/{self.org_id}/requirements",
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
response.raise_for_status()
return response.json()["data"]
Preventing Cross-Tenant Data Leaks
Validate organization ownership:
def verify_organization_access(
client: APIClient,
api_key: str,
org_id: str
) -> bool:
"""Verify API key has access to organization"""
try:
response = requests.get(
f"/api/organizations/{org_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
return response.status_code == 200
except:
return False
class SecureMultiTenantClient:
def __init__(self, api_key: str, org_id: str):
self.api_key = api_key
self.org_id = org_id
# Verify access on initialization
if not verify_organization_access(self, api_key, org_id):
raise AuthenticationError(f"No access to organization {org_id}")
def request(self, method: str, path: str) -> Dict:
"""Make request with organization validation"""
# Ensure path is scoped to organization
if not path.startswith(f"/api/organizations/{self.org_id}"):
raise SecurityError("Requests must be scoped to initialized organization")
return requests.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"}
).json()
Common Anti-patterns to Avoid
❌ Anti-pattern: Ignoring Rate Limits
# BAD: Makes requests without respecting rate limits
for i in range(1000):
requirement = client.get_requirement("org-123", f"req-{i}")
✅ Pattern: Respecting Rate Limits
# GOOD: Batches requests to avoid rate limit
requirement_ids = [f"req-{i}" for i in range(1000)]
for batch in batches(requirement_ids, 100):
requirements = client.batch_get_requirements("org-123", batch)
❌ Anti-pattern: No Retry Logic
# BAD: Single attempt, no retry
response = requests.get(url) # Fails if network is flaky
✅ Pattern: Automatic Retries
# GOOD: Automatic retry with backoff
result = retry_with_backoff(lambda: requests.get(url))
❌ Anti-pattern: Hardcoded Timeouts
# BAD: Request might hang indefinitely
response = requests.get(url)
✅ Pattern: Appropriate Timeouts
# GOOD: Request times out after 10 seconds
response = requests.get(url, timeout=10)
❌ Anti-pattern: No Error Handling
# BAD: Crashes on validation error
user = create_user(client, {"email": "invalid"})
✅ Pattern: Proper Error Handling
# GOOD: Handles validation errors gracefully
try:
user = create_user(client, {"email": "invalid"})
except ValidationError as e:
print(f"Validation failed: {e.details}")
except APIError as e:
print(f"API error: {e}")
Production Deployment Checklist
Before deploying API integrations to production, verify:
-
Authentication
- API keys securely stored (environment variables, secrets manager)
- API keys rotated regularly
- No hardcoded credentials in code or config files
-
Error Handling
- All error types handled appropriately
- User-friendly error messages displayed
- Errors logged for debugging
- Request IDs captured for API support
-
Resilience
- Retry logic implemented with exponential backoff
- Timeout values set appropriately
- Connection pooling configured
- Circuit breaker pattern for graceful degradation
-
Performance
- Caching implemented for appropriate endpoints
- Batch operations used instead of individual requests
- Pagination implemented for large datasets
- Load testing completed
-
Monitoring
- Request/response logging enabled
- Metrics collection (error rates, latency, throughput)
- Alerts configured for failures
- Dashboard created for visibility
-
Security
- TLS/SSL verification enabled
- Input validation implemented
- Multi-tenant data isolation verified
- Rate limiting respected
-
Testing
- Unit tests with mocks
- Integration tests completed
- Error scenario testing
- Load testing completed
-
Documentation
- Integration documented
- Error handling procedures documented
- Troubleshooting guide created
- Runbook for common issues
SDK and Client Library Patterns
Building a Reusable SDK
class CatalioSDK:
"""Production-ready SDK for Catalio API"""
def __init__(
self,
api_key: str,
org_id: str,
base_url: Optional[str] = None
):
self.api_key = api_key
self.org_id = org_id
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.session = self._create_session()
self.cache = ResponseCache()
def _create_session(self) -> requests.Session:
"""Create HTTP session with retry strategy"""
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
session.mount("https://", adapter)
return session
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Internal request method with error handling"""
url = f"{self.base_url}{path}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/vnd.catalio.v2+json"
}
response = self.session.request(
method,
url,
headers=headers,
json=data,
timeout=10
)
if response.status_code == 429:
raise RateLimitError(
message="Rate limit exceeded",
retry_after=int(response.headers.get("Retry-After", 60))
)
elif response.status_code >= 400:
raise APIError(response.json())
return response.json()
# Resource Methods
def get_requirement(self, requirement_id: str) -> Dict:
"""Get a requirement"""
return self._request(
"GET",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}"
)["data"]
def list_requirements(
self,
page: int = 1,
page_size: int = 50,
status: Optional[str] = None
) -> Dict:
"""List requirements with optional filtering"""
params = {"page": page, "page_size": page_size}
if status:
params["status"] = status
return self._request(
"GET",
f"/api/organizations/{self.org_id}/requirements",
params=params
)
def create_requirement(self, data: Dict) -> Dict:
"""Create a requirement"""
return self._request(
"POST",
f"/api/organizations/{self.org_id}/requirements",
data=data
)["data"]
def update_requirement(self, requirement_id: str, data: Dict) -> Dict:
"""Update a requirement"""
return self._request(
"PUT",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}",
data=data
)["data"]
def delete_requirement(self, requirement_id: str) -> None:
"""Delete a requirement"""
self._request(
"DELETE",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}"
)
def close(self):
"""Close session and cleanup resources"""
self.session.close()
# Usage
sdk = CatalioSDK(api_key="your-api-key", org_id="org-123")
try:
# List requirements
reqs = sdk.list_requirements(status="active")
# Create requirement
new_req = sdk.create_requirement({"title": "New Feature"})
# Update requirement
updated = sdk.update_requirement(new_req["id"], {"status": "archived"})
# Delete requirement
sdk.delete_requirement(new_req["id"])
finally:
sdk.close()
SDK Best Practices
- Consistent Method Naming: Use predictable patterns (get, list, create, update, delete)
- Type Hints: Provide clear input/output types
- Documentation: Document each method with examples
- Error Handling: Raise specific exception types
- Resource Management: Provide close/cleanup methods
- Version Support: Support multiple API versions gracefully
- Testing: Include comprehensive test suite
- Publishing: Publish to package repositories (PyPI, npm, etc.)
Learn More
- API Reference - Complete API documentation
- Authentication Guide - Auth patterns and tokens
- Common Integration Scenarios - Real-world examples