API Guides

API Best Practices & Patterns

Building reliable, performant integrations with the Catalio API requires more than just understanding the endpoints. This guide covers production-ready patterns, error handling strategies, optimization techniques, and architectural decisions that separate robust integrations from fragile ones.

Table of Contents

  1. Core API Design Principles
  2. Error Handling Strategies
  3. Retry Logic and Exponential Backoff
  4. Caching Strategies
  5. Rate Limiting Best Practices
  6. Pagination Patterns
  7. Bulk Operations vs Individual Requests
  8. Optimistic Concurrency Control
  9. Idempotency
  10. API Versioning
  11. Testing API Integrations
  12. Monitoring and Observability
  13. Performance Optimization
  14. Multi-tenant Considerations
  15. Common Anti-patterns to Avoid
  16. Production Deployment Checklist
  17. SDK and Client Library Patterns

Core API Design Principles

RESTful Resource Design

The Catalio API follows RESTful principles with these core concepts:

Resources as Entities: Every API endpoint represents a business entity (Requirement, Organization, User, etc.). Resources have:

  • Unique identifiers: IDs uniquely identify instances within their organization
  • State representations: JSON payloads represent current resource state
  • Relationships: Resources link to related entities via foreign key references
  • Operations: Standard HTTP methods (GET, POST, PUT, DELETE) perform operations
# List all requirements for an organization
GET /api/organizations/:org_id/requirements
# Get a specific requirement
GET /api/organizations/:org_id/requirements/:id
# Create a new requirement
POST /api/organizations/:org_id/requirements
# Update a requirement
PUT /api/organizations/:org_id/requirements/:id
# Delete a requirement
DELETE /api/organizations/:org_id/requirements/:id

HTTP Status Codes

Always return appropriate HTTP status codes:

Code Meaning When to Use
200 OK Successful GET, PUT, DELETE
201 Created Successful POST creating a resource
204 No Content Successful DELETE or empty response
400 Bad Request Invalid input parameters or validation error
401 Unauthorized Missing or invalid authentication
403 Forbidden Authenticated but not authorized for resource
404 Not Found Resource doesn’t exist or wrong path
409 Conflict Concurrency conflict (version mismatch)
422 Unprocessable Entity Semantic validation failures
429 Too Many Requests Rate limit exceeded
500 Server Error Unexpected server-side error
503 Service Unavailable Temporary server issue (retry eligible)

Request/Response Design

Consistent JSON Structure: All responses follow a consistent envelope:

{
"data": {
"id": "123",
"type": "requirement",
"attributes": {
"title": "User authentication",
"status": "active",
"created_at": "2025-03-05T10:30:00Z"
},
"relationships": {
"organization": {
"data": { "id": "org-1", "type": "organization" }
}
}
}
}

List Responses with Pagination:

{
"data": [
{ "id": "1", "type": "requirement", "attributes": { "title": "Feature A" } },
{ "id": "2", "type": "requirement", "attributes": { "title": "Feature B" } }
],
"meta": {
"page": 1,
"page_size": 50,
"total": 150,
"has_more": true
}
}

Timestamp Format: Always use ISO 8601 UTC timestamps:

{
"created_at": "2025-03-05T10:30:00Z",
"updated_at": "2025-03-05T11:45:30Z"
}

Error Handling Strategies

Comprehensive Error Responses

When errors occur, return detailed error information to help debugging:

{
"errors": [
{
"status": 400,
"code": "VALIDATION_ERROR",
"title": "Invalid request parameters",
"detail": "The 'title' field is required",
"source": {
"pointer": "/data/attributes/title"
}
}
]
}

Error Classifications

Client Errors (4xx): Originate from client requests

{
"errors": [
{
"status": 400,
"code": "INVALID_PARAMETER",
"title": "Bad Request",
"detail": "Invalid format for organization_id: must be UUID",
"source": { "parameter": "organization_id" }
}
]
}

Server Errors (5xx): Originate from server issues

{
"errors": [
{
"status": 500,
"code": "INTERNAL_ERROR",
"title": "Internal Server Error",
"detail": "An unexpected error occurred processing your request",
"request_id": "req-abc123def456"
}
]
}

Validation Error Handling

Validate requests early and return comprehensive feedback:

{
"errors": [
{
"status": 422,
"code": "VALIDATION_ERROR",
"title": "Validation failed",
"detail": "Title must be between 3 and 255 characters",
"source": { "pointer": "/data/attributes/title" }
},
{
"status": 422,
"code": "VALIDATION_ERROR",
"title": "Validation failed",
"detail": "Status must be one of: active, archived, draft",
"source": { "pointer": "/data/attributes/status" }
}
]
}

Client Implementation Pattern

Handle errors gracefully in your integration code:

import requests
from typing import Optional, Dict, Any
class CatalioClient:
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def handle_error(self, response: requests.Response) -> Dict[str, Any]:
"""Extract and categorize API errors"""
try:
error_data = response.json()
errors = error_data.get("errors", [])
if response.status_code == 400:
return {
"type": "validation_error",
"message": errors[0]["detail"] if errors else "Invalid request",
"details": errors
}
elif response.status_code == 401:
return {
"type": "auth_error",
"message": "Invalid or missing authentication",
"details": errors
}
elif response.status_code == 403:
return {
"type": "permission_error",
"message": "Not authorized to access this resource",
"details": errors
}
elif response.status_code == 404:
return {
"type": "not_found",
"message": "Resource not found",
"details": errors
}
elif response.status_code == 409:
return {
"type": "conflict",
"message": "Resource conflict (possibly due to concurrent modification)",
"details": errors
}
elif response.status_code == 429:
return {
"type": "rate_limit",
"message": "Rate limit exceeded",
"retry_after": response.headers.get("Retry-After", "60")
}
elif response.status_code >= 500:
return {
"type": "server_error",
"message": "Server error (retryable)",
"request_id": errors[0].get("request_id") if errors else None,
"details": errors
}
except:
pass
return {
"type": "unknown_error",
"status_code": response.status_code,
"body": response.text
}
def get_requirement(self, org_id: str, req_id: str) -> Optional[Dict]:
"""Get a requirement with proper error handling"""
url = f"{self.base_url}/api/organizations/{org_id}/requirements/{req_id}"
try:
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.json()["data"]
else:
error = self.handle_error(response)
# Log error, alert monitoring, etc.
raise APIError(error)
except requests.RequestException as e:
# Network error - likely retryable
raise NetworkError(f"Request failed: {str(e)}")

Retry Logic and Exponential Backoff

When to Retry

Retryable Errors:

  • 408 Request Timeout
  • 429 Too Many Requests
  • 500 Internal Server Error
  • 502 Bad Gateway
  • 503 Service Unavailable
  • 504 Gateway Timeout
  • Network timeouts
  • Connection refused

Non-Retryable Errors:

  • 400 Bad Request (validation error)
  • 401 Unauthorized
  • 403 Forbidden
  • 404 Not Found

Exponential Backoff Implementation

Implement exponential backoff to prevent overwhelming the server during transient failures:

import time
import random
from typing import Callable, TypeVar, Optional
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry behavior"""
def __init__(
self,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 32.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Calculate delay for given attempt number (0-indexed)"""
# Exponential backoff: delay = initial_delay * (base ^ attempt)
delay = self.initial_delay * (self.exponential_base ** attempt)
# Cap at max_delay
delay = min(delay, self.max_delay)
# Add jitter: random value between delay * 0.5 and delay
if self.jitter:
delay = delay * (0.5 + random.random())
return delay
def retry_with_backoff(
func: Callable[..., T],
config: Optional[RetryConfig] = None,
**kwargs
) -> T:
"""
Execute function with exponential backoff retry logic
Example:
result = retry_with_backoff(
client.get_requirement,
org_id="org-123",
req_id="req-456"
)
"""
config = config or RetryConfig()
last_error = None
for attempt in range(config.max_attempts):
try:
return func(**kwargs)
except (NetworkError, TimeoutError, ServerError) as e:
last_error = e
if attempt < config.max_attempts - 1:
delay = config.get_delay(attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}")
time.sleep(delay)
else:
print(f"All {config.max_attempts} attempts failed")
raise last_error
# Usage
try:
requirement = retry_with_backoff(
client.get_requirement,
config=RetryConfig(max_attempts=4),
org_id="org-123",
req_id="req-456"
)
except Exception as e:
print(f"Failed to fetch requirement after retries: {e}")

Handling Retry-After Headers

When receiving a 429 (Too Many Requests) response, respect the Retry-After header:

from datetime import datetime, timedelta, timezone
def handle_rate_limit(response: requests.Response) -> int:
"""Extract retry-after delay from response headers"""
retry_after = response.headers.get("Retry-After")
if retry_after:
# Retry-After can be seconds (integer) or HTTP-date
try:
return int(retry_after)
except ValueError:
# Parse as HTTP-date
from email.utils import parsedate_to_datetime
retry_time = parsedate_to_datetime(retry_after)
# Use timezone-aware now to match parsed datetime
now = datetime.now(timezone.utc)
# Clamp negative delays to zero
seconds = max(0, int((retry_time - now).total_seconds()))
return seconds
# Default fallback
return 60
def request_with_rate_limit_handling(
func: Callable,
**kwargs
) -> Any:
"""Make request with automatic rate limit handling"""
max_retries = 3
for attempt in range(max_retries):
try:
response = func(**kwargs)
if response.status_code == 429:
if attempt < max_retries - 1:
delay = handle_rate_limit(response)
print(f"Rate limited, waiting {delay}s before retry")
time.sleep(delay)
continue
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
raise Exception("Max retries exceeded")

Caching Strategies

Cache-Control Headers

The Catalio API returns Cache-Control headers to indicate cacheability:

Cache-Control: public, max-age=3600

Cache Types:

  • public: Can be cached by any cache (CDNs, proxies, browsers)
  • private: Only user-specific caches can store
  • max-age=3600: Cache valid for 3600 seconds (1 hour)
  • must-revalidate: Revalidate with server if stale
  • no-cache: Validate with server before using
  • no-store: Never cache this response

Client-Side Caching Implementation

Implement intelligent caching to reduce API calls:

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class CacheEntry:
def __init__(self, data: Any, max_age: int):
self.data = data
self.created_at = datetime.now()
self.max_age = max_age
def is_expired(self) -> bool:
age = (datetime.now() - self.created_at).total_seconds()
return age > self.max_age
def is_stale(self) -> bool:
# Stale if older than max_age but still usable
age = (datetime.now() - self.created_at).total_seconds()
return age > self.max_age * 0.8
class ResponseCache:
"""In-memory cache for API responses"""
def __init__(self):
self.cache: Dict[str, CacheEntry] = {}
def get_cache_key(self, method: str, url: str, params: Dict = None) -> str:
"""Generate cache key from request details"""
key_data = f"{method}:{url}:{json.dumps(params or {})}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get cached value if fresh"""
if key in self.cache:
entry = self.cache[key]
if not entry.is_expired():
return entry.data
else:
del self.cache[key]
return None
def get_stale(self, key: str) -> Optional[Any]:
"""Get stale cached value (for stale-while-revalidate)"""
if key in self.cache:
return self.cache[key].data
return None
def set(self, key: str, data: Any, max_age: int = 3600):
"""Store response in cache"""
self.cache[key] = CacheEntry(data, max_age)
def clear(self):
"""Clear all cached responses"""
self.cache.clear()
class CachingAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.cache = ResponseCache()
def get_requirement(
self,
org_id: str,
req_id: str,
use_cache: bool = True,
force_refresh: bool = False
) -> Dict:
"""Get requirement with intelligent caching"""
url = f"/api/organizations/{org_id}/requirements/{req_id}"
cache_key = self.cache.get_cache_key("GET", url)
# Check cache first
if use_cache and not force_refresh:
cached = self.cache.get(cache_key)
if cached is not None:
cached["_cached"] = True
return cached
# Fetch from API
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
data = response.json()["data"]
# Store in cache based on Cache-Control header
cache_control = response.headers.get("Cache-Control", "max-age=3600")
max_age = self.parse_max_age(cache_control)
self.cache.set(cache_key, data, max_age)
return data
@staticmethod
def parse_max_age(cache_control: str) -> int:
"""Extract max-age from Cache-Control header"""
parts = cache_control.split(",")
for part in parts:
part = part.strip()
if part.startswith("max-age="):
return int(part.split("=")[1])
return 3600 # Default 1 hour
# Usage
client = CachingAPIClient(api_key="your-api-key")
# First call fetches from API
req = client.get_requirement("org-123", "req-456")
# Second call returns cached value
req = client.get_requirement("org-123", "req-456") # Returns cached copy
# Force refresh
req = client.get_requirement("org-123", "req-456", force_refresh=True)

Cache Invalidation Strategies

Implement cache invalidation when resources change:

class CacheInvalidationClient(CachingAPIClient):
"""Cache client with invalidation support"""
def create_requirement(
self,
org_id: str,
data: Dict
) -> Dict:
"""Create requirement and invalidate org requirements list cache"""
result = self._post(
f"organizations/{org_id}/requirements",
data
)
# Invalidate related caches
self._invalidate_org_cache(org_id)
return result
def update_requirement(
self,
org_id: str,
req_id: str,
data: Dict
) -> Dict:
"""Update requirement and invalidate caches"""
result = self._put(
f"organizations/{org_id}/requirements/{req_id}",
data
)
# Invalidate specific requirement and list caches
self._invalidate_requirement_cache(org_id, req_id)
self._invalidate_org_cache(org_id)
return result
def _invalidate_requirement_cache(self, org_id: str, req_id: str):
"""Invalidate specific requirement cache"""
url = f"/api/organizations/{org_id}/requirements/{req_id}"
cache_key = self.cache.get_cache_key("GET", url)
if cache_key in self.cache.cache:
del self.cache.cache[cache_key]
def _invalidate_org_cache(self, org_id: str):
"""Invalidate organization requirements list cache"""
url = f"/api/organizations/{org_id}/requirements"
# Invalidate all pagination variants
for page in range(1, 10): # Invalidate first 10 pages
cache_key = self.cache.get_cache_key("GET", url, {"page": page})
if cache_key in self.cache.cache:
del self.cache.cache[cache_key]

Rate Limiting Best Practices

Understanding Rate Limits

The Catalio API enforces rate limits to ensure fair resource usage:

Category Limit Window
Authentication 10 requests 1 minute
Standard API 1000 requests 1 hour
Bulk Operations 100 requests 1 hour
WebSocket 100 messages 1 minute

Rate Limit Headers:

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 842
X-RateLimit-Reset: 1699564200

Rate Limit Awareness

Monitor rate limit headers in every response:

class RateLimitManager:
"""Track and respect API rate limits"""
def __init__(self):
self.limit = None
self.remaining = None
self.reset_time = None
def update_from_response(self, response: requests.Response):
"""Update rate limit info from response headers"""
self.limit = int(response.headers.get("X-RateLimit-Limit", 0))
self.remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
reset_timestamp = int(response.headers.get("X-RateLimit-Reset", 0))
self.reset_time = datetime.fromtimestamp(reset_timestamp)
def should_throttle(self) -> bool:
"""Check if we should throttle requests"""
if self.remaining is None:
return False
# Throttle if less than 10% of limit remains
threshold = self.limit * 0.1
return self.remaining < threshold
def get_wait_time(self) -> float:
"""Get seconds to wait before next request"""
if not self.should_throttle() or not self.reset_time:
return 0
wait = (self.reset_time - datetime.now()).total_seconds()
return max(0, wait)
def log_status(self):
"""Log current rate limit status"""
if self.limit:
usage_pct = ((self.limit - self.remaining) / self.limit) * 100
print(f"Rate Limit: {self.remaining}/{self.limit} remaining ({usage_pct:.1f}% used)")
if self.reset_time:
print(f"Resets at: {self.reset_time}")
class RateLimitAwareClient:
"""API client that respects rate limits"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.rate_limit = RateLimitManager()
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Make request with rate limit awareness"""
# Check if we need to wait
wait_time = self.rate_limit.get_wait_time()
if wait_time > 0:
print(f"Rate limit approaching, waiting {wait_time:.1f}s")
time.sleep(wait_time)
# Make request
url = f"{self.base_url}{path}"
response = requests.request(
method,
url,
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
# Update rate limit info
self.rate_limit.update_from_response(response)
self.rate_limit.log_status()
response.raise_for_status()
return response.json()

Batch Requests to Reduce API Calls

Use batch endpoints when available to reduce rate limit usage:

def batch_create_requirements(
client: APIClient,
org_id: str,
requirements: List[Dict]
) -> List[Dict]:
"""Create multiple requirements with single API call"""
# Use batch endpoint instead of creating each individually
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/batch",
data={"requirements": requirements}
)
return response["data"]
# Usage
requirements = [
{"title": "User Auth", "priority": "high"},
{"title": "Payment Processing", "priority": "high"},
{"title": "Email Notifications", "priority": "medium"},
]
# Single API call instead of 3
created = batch_create_requirements(client, "org-123", requirements)
# Track usage
print(f"Created {len(created)} requirements with 1 API call")
print(f"Rate limit remaining: {client.rate_limit.remaining}")

Pagination Patterns

Cursor-Based Pagination

For large datasets, use cursor-based pagination for reliable results:

{
"data": [
{ "id": "req-1", "title": "Feature A" },
{ "id": "req-2", "title": "Feature B" }
],
"meta": {
"next_cursor": "eyJpZCI6IjUwZjc2YWY2ZjI2MjkifQ==",
"prev_cursor": "eyJpZCI6IjRkZjhhZGY2ZjI2MjkifQ==",
"has_more": true
}
}

Advantages of Cursor-Based Pagination:

  • Handles insertions/deletions without gaps
  • Consistent results regardless of concurrent modifications
  • More efficient database queries
  • Supports bidirectional navigation
class PaginationIterator:
"""Iterate through paginated API results"""
def __init__(
self,
client: APIClient,
path: str,
page_size: int = 50
):
self.client = client
self.path = path
self.page_size = page_size
self.next_cursor = None
def __iter__(self):
return self
def __next__(self) -> List[Dict]:
"""Fetch next page of results"""
params = {"limit": self.page_size}
if self.next_cursor:
params["cursor"] = self.next_cursor
response = self.client.request("GET", self.path, params=params)
data = response["data"]
if not data:
raise StopIteration
# Update cursor for next iteration
meta = response.get("meta", {})
self.next_cursor = meta.get("next_cursor")
return data
# Usage: Iterate through all requirements
def process_all_requirements(client: APIClient, org_id: str):
"""Process all requirements in an organization"""
iterator = PaginationIterator(
client,
f"/api/organizations/{org_id}/requirements",
page_size=100
)
requirement_count = 0
for page in iterator:
for requirement in page:
# Process requirement
print(f"Processing: {requirement['title']}")
requirement_count += 1
print(f"Processed {requirement_count} total requirements")
# Usage
process_all_requirements(client, "org-123")

Offset-Based Pagination (When Necessary)

If cursor-based pagination isn’t available, use offset-based with caution:

class OffsetPagination:
"""Offset-based pagination (less efficient but sometimes necessary)"""
def __init__(
self,
client: APIClient,
path: str,
page_size: int = 50
):
self.client = client
self.path = path
self.page_size = page_size
self.current_page = 1
def get_page(self, page: int) -> Dict:
"""Fetch specific page"""
response = self.client.request(
"GET",
self.path,
params={"page": page, "page_size": self.page_size}
)
return response
def __iter__(self):
self.current_page = 1
return self
def __next__(self) -> List[Dict]:
"""Fetch next page"""
response = self.get_page(self.current_page)
data = response["data"]
if not data:
raise StopIteration
self.current_page += 1
return data
# ⚠️ WARNING: This approach can miss/duplicate items if data changes
# Only use if cursor-based pagination is unavailable

Bulk Operations vs Individual Requests

When to Use Bulk Operations

Bulk operations are more efficient for:

  • Creating/updating/deleting many resources
  • Reducing rate limit usage
  • Atomic multi-resource operations
  • Transactional consistency

Individual requests are better for:

  • Single resource changes
  • Complex business logic per request
  • Real-time user interactions
  • Immediate feedback requirements

Bulk Creation Example

def bulk_create_requirements(
client: APIClient,
org_id: str,
requirements: List[Dict]
) -> BulkOperationResult:
"""
Create multiple requirements efficiently
Instead of 100 individual POST requests, use bulk endpoint
"""
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/bulk",
data={
"requirements": requirements
}
)
return BulkOperationResult(response)
class BulkOperationResult:
"""Handle results from bulk operations"""
def __init__(self, response: Dict):
self.response = response
self.succeeded = response.get("succeeded", [])
self.failed = response.get("failed", [])
def success_count(self) -> int:
return len(self.succeeded)
def failure_count(self) -> int:
return len(self.failed)
def success_rate(self) -> float:
total = len(self.succeeded) + len(self.failed)
return len(self.succeeded) / total if total > 0 else 0
def has_failures(self) -> bool:
return len(self.failed) > 0
def get_error_summary(self) -> str:
"""Get human-readable error summary"""
if not self.failed:
return "All operations succeeded"
errors_by_code = {}
for failure in self.failed:
code = failure.get("code", "unknown")
if code not in errors_by_code:
errors_by_code[code] = 0
errors_by_code[code] += 1
summary = f"{self.failure_count()} operations failed:\n"
for code, count in errors_by_code.items():
summary += f" - {code}: {count}\n"
return summary
# Usage
requirements = [
{"title": "Feature A", "priority": "high"},
{"title": "Feature B", "priority": "medium"},
{"title": "Feature C", "priority": "low"},
]
result = bulk_create_requirements(client, "org-123", requirements)
print(f"Success rate: {result.success_rate():.1%}")
if result.has_failures():
print(result.get_error_summary())

Handling Partial Failures

Bulk operations may partially succeed. Handle this gracefully:

def bulk_update_with_retry(
client: APIClient,
org_id: str,
updates: List[Dict]
) -> None:
"""
Bulk update with automatic retry of failures
Attempts bulk update, retries individual items that fail
"""
result = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/bulk-update",
data={"updates": updates}
)
# Retry individual failures
if result.get("failed"):
failed_updates = [
next(u for u in updates if u["id"] == f["id"])
for f in result["failed"]
]
retried_count = 0
for update in failed_updates:
try:
client.request(
"PUT",
f"/api/organizations/{org_id}/requirements/{update['id']}",
data=update
)
retried_count += 1
except Exception as e:
print(f"Individual retry failed for {update['id']}: {e}")
print(f"Retried {retried_count} failed operations")

Optimistic Concurrency Control

Using ETags and Version Fields

Prevent lost updates when multiple clients modify the same resource:

{
"data": {
"id": "req-123",
"title": "User Authentication",
"status": "active",
"version": 3,
"_etag": "\"abc123def456\""
}
}

Updating with Version Check

def update_requirement_with_version(
client: APIClient,
org_id: str,
req_id: str,
current_version: int,
updates: Dict
) -> Dict:
"""
Update requirement with optimistic locking
Fails if current_version doesn't match server version
"""
try:
response = client.request(
"PUT",
f"/api/organizations/{org_id}/requirements/{req_id}",
data={
"requirement": updates,
"version": current_version
}
)
return response["data"]
except HTTPError as e:
if e.status_code == 409:
# Conflict: resource was modified
error_data = e.response.json()
raise ConflictError(
f"Resource modified by another client. "
f"Expected version {current_version}, "
f"got {error_data['data']['version']}"
) from e
raise
# Usage
requirement = client.get_requirement("org-123", "req-456")
try:
updated = update_requirement_with_version(
client,
"org-123",
"req-456",
current_version=requirement["version"],
updates={"title": "New Title"}
)
except ConflictError as e:
# Handle conflict: refresh and retry
print(f"Conflict detected: {e}")
requirement = client.get_requirement("org-123", "req-456")
# Notify user or implement merge logic

Handling Conflicts

Implement smart conflict resolution:

class OptimisticLockingClient:
def __init__(self, client: APIClient):
self.client = client
def update_with_conflict_handling(
self,
org_id: str,
req_id: str,
updates: Dict,
max_retries: int = 3
) -> Dict:
"""Update with automatic retry on conflict"""
for attempt in range(max_retries):
# Fetch current state
current = self.client.get_requirement(org_id, req_id)
version = current["version"]
try:
# Attempt update with current version
return update_requirement_with_version(
self.client,
org_id,
req_id,
version,
updates
)
except ConflictError:
if attempt < max_retries - 1:
print(f"Conflict on attempt {attempt + 1}, retrying...")
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
else:
raise

Idempotency

Idempotency Keys

Use idempotency keys to make requests safe to retry:

import uuid
class IdempotentAPIClient:
"""API client with idempotency support"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
def create_requirement(
self,
org_id: str,
data: Dict,
idempotency_key: Optional[str] = None
) -> Dict:
"""
Create requirement with idempotency guarantee
If request is retried with same idempotency_key,
returns cached result instead of creating duplicate
"""
# Generate idempotency key if not provided
if not idempotency_key:
idempotency_key = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {self.api_key}",
"Idempotency-Key": idempotency_key
}
response = requests.post(
f"{self.base_url}/api/organizations/{org_id}/requirements",
headers=headers,
json=data
)
# Store idempotency key for audit trail
result = response.json()["data"]
result["_idempotency_key"] = idempotency_key
return result
# Usage
client = IdempotentAPIClient(api_key="...")
# Generate stable key based on source data
idempotency_key = hashlib.md5(
json.dumps({"title": "Feature A"}).encode()
).hexdigest()
# Can retry safely without creating duplicate
requirement = client.create_requirement(
"org-123",
{"title": "Feature A"},
idempotency_key=idempotency_key
)
# Retry with same key - returns same result
requirement_retry = client.create_requirement(
"org-123",
{"title": "Feature A"},
idempotency_key=idempotency_key
)
assert requirement["id"] == requirement_retry["id"] # Same requirement

Idempotent Operations

Design operations to be naturally idempotent:

def set_requirement_status(
client: APIClient,
org_id: str,
req_id: str,
status: str
) -> Dict:
"""
Set requirement status (idempotent)
Safe to call multiple times with same status
"""
return client.request(
"PATCH",
f"/api/organizations/{org_id}/requirements/{req_id}",
data={"status": status}
)
# All of these are safe and produce same result
set_requirement_status(client, "org-123", "req-456", "active")
set_requirement_status(client, "org-123", "req-456", "active") # Idempotent
set_requirement_status(client, "org-123", "req-456", "active") # Idempotent
def increment_counter(
client: APIClient,
org_id: str,
counter_id: str
) -> int:
"""
Increment counter (NOT idempotent - use idempotency key)
Each call increments by 1, so retries would double increment
"""
# ⚠️ REQUIRES idempotency_key to prevent duplicate increments
return client.request(
"POST",
f"/api/organizations/{org_id}/counters/{counter_id}/increment"
)

API Versioning

Version Strategy

The Catalio API uses header-based versioning:

# Request specific API version
GET /api/organizations/org-123/requirements \
-H "Accept: application/vnd.catalio.v2+json"
# Defaults to latest stable version
GET /api/organizations/org-123/requirements

Version Negotiation

class VersionedAPIClient:
"""API client with version negotiation"""
def __init__(
self,
api_key: str,
api_version: str = "v2",
base_url: Optional[str] = None
):
self.api_key = api_key
self.api_version = api_version
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.supported_versions = {"v1", "v2", "v3"}
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None,
version: Optional[str] = None
) -> Dict:
"""Make request with version header"""
version = version or self.api_version
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": f"application/vnd.catalio.{version}+json"
}
response = requests.request(
method,
f"{self.base_url}{path}",
headers=headers,
json=data
)
# Check for version deprecation warning
if "Deprecation" in response.headers:
print(f"⚠️ API version {version} is deprecated")
print(f" Sunset date: {response.headers.get('Sunset')}")
response.raise_for_status()
return response.json()
# Usage
client = VersionedAPIClient(api_key="...", api_version="v2")
# Use default version
requirements = client.request("GET", "/api/organizations/org-123/requirements")
# Try newer version
try:
requirements = client.request(
"GET",
"/api/organizations/org-123/requirements",
version="v3"
)
except requests.HTTPError as e:
if e.response.status_code == 406:
print("API version v3 not available, falling back to v2")

Migration Strategy

When APIs change, implement gradual migration:

class MigrationManager:
"""Manage migration between API versions"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.v2_client = VersionedAPIClient(api_key=api_key, api_version="v2", base_url=base_url)
self.v3_client = VersionedAPIClient(api_key=api_key, api_version="v3", base_url=base_url)
self.migration_percentage = 0 # Start with 0% on v3
def get_requirement(self, org_id: str, req_id: str):
"""Gradually migrate to new API version"""
import random
# Gradually increase percentage of traffic on new version
if random.random() < self.migration_percentage / 100:
try:
return self.v3_client.request(
"GET",
f"/api/organizations/{org_id}/requirements/{req_id}"
)
except Exception as e:
# Fall back to v2 on errors
print(f"v3 request failed: {e}, falling back to v2")
# Use v2 for rest of traffic
return self.v2_client.request(
"GET",
f"/api/organizations/{org_id}/requirements/{req_id}"
)
def increase_migration_percentage(self, percentage: int):
"""Gradually increase traffic to new API version"""
self.migration_percentage = min(percentage, 100)
print(f"Migrating {self.migration_percentage}% traffic to v3")

Testing API Integrations

Unit Testing with Mocks

Test your integration code without hitting real API:

import pytest
from unittest.mock import Mock, patch, MagicMock
@pytest.fixture
def mock_api_client():
"""Create mocked API client for testing"""
return Mock()
def test_create_requirement_success(mock_api_client):
"""Test successful requirement creation"""
# Mock API response
mock_api_client.create_requirement.return_value = {
"id": "req-123",
"title": "User Auth",
"status": "active"
}
# Call function under test
result = create_requirement(
mock_api_client,
"org-123",
{"title": "User Auth"}
)
# Assertions
assert result["id"] == "req-123"
assert result["status"] == "active"
mock_api_client.create_requirement.assert_called_once()
def test_create_requirement_with_retry(mock_api_client):
"""Test retry logic on transient failures"""
# Mock API to fail twice then succeed
mock_api_client.create_requirement.side_effect = [
requests.ConnectionError("Network error"),
requests.ConnectionError("Network error"),
{"id": "req-123", "title": "User Auth"}
]
# Should succeed after retries
result = create_requirement_with_retry(
mock_api_client,
"org-123",
{"title": "User Auth"},
max_retries=3
)
assert result["id"] == "req-123"
assert mock_api_client.create_requirement.call_count == 3
def test_rate_limit_handling(mock_api_client):
"""Test rate limit handling"""
response = Mock()
response.status_code = 429
response.headers = {"Retry-After": "60"}
mock_api_client.request.return_value = response
# Should wait and retry
with patch("time.sleep") as mock_sleep:
handle_rate_limit(mock_api_client, "org-123")
mock_sleep.assert_called_once()
# Verify wait time is respected
assert mock_sleep.call_args[0][0] >= 60

Integration Testing

Test with real API:

import os
import pytest
@pytest.fixture(scope="session")
def api_client():
"""Create real API client for testing"""
api_key = os.getenv("CATALIO_API_KEY")
if not api_key:
pytest.skip("CATALIO_API_KEY not set")
return CatalioClient(api_key=api_key)
@pytest.mark.integration
def test_create_and_retrieve_requirement(api_client):
"""Test creating and retrieving a requirement"""
# Create
created = api_client.create_requirement(
"org-123",
{"title": f"Test {uuid.uuid4()}"}
)
# Retrieve
retrieved = api_client.get_requirement(
"org-123",
created["id"]
)
# Verify
assert retrieved["id"] == created["id"]
assert retrieved["title"] == created["title"]
# Cleanup
api_client.delete_requirement("org-123", created["id"])
@pytest.mark.integration
def test_pagination(api_client):
"""Test pagination through large result sets"""
# Create multiple requirements
for i in range(150):
api_client.create_requirement(
"org-123",
{"title": f"Paginated Test {i}"}
)
# Paginate through all
all_reqs = []
iterator = api_client.paginate("organizations/org-123/requirements")
for page in iterator:
all_reqs.extend(page)
assert len(all_reqs) >= 150

Monitoring and Observability

Structured Logging

Log API interactions for debugging and monitoring:

import logging
import json
from datetime import datetime
class StructuredLogger:
"""Structured logging for API requests"""
def __init__(self, logger: logging.Logger):
self.logger = logger
def log_request(
self,
method: str,
path: str,
status_code: int,
duration_ms: float,
error: Optional[str] = None
):
"""Log API request with structured data"""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"type": "api_request",
"method": method,
"path": path,
"status_code": status_code,
"duration_ms": duration_ms,
"error": error
}
self.logger.info(json.dumps(log_data))
class MonitoredAPIClient:
"""API client with integrated monitoring"""
def __init__(self, api_key: str, base_url: Optional[str] = None):
self.api_key = api_key
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.logger = StructuredLogger(logging.getLogger("catalio.api"))
def request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Make request with monitoring"""
import time
start_time = time.time()
status_code = None
error = None
try:
response = requests.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
status_code = response.status_code
response.raise_for_status()
return response.json()
except Exception as e:
error = str(e)
status_code = getattr(e.response, "status_code", 0) if hasattr(e, "response") else 0
raise
finally:
duration_ms = (time.time() - start_time) * 1000
self.logger.log_request(method, path, status_code, duration_ms, error)

Metrics Collection

Track API performance metrics:

from dataclasses import dataclass
from statistics import mean, stdev
@dataclass
class APIMetrics:
"""Metrics for API performance"""
total_requests: int = 0
total_errors: int = 0
total_duration_ms: float = 0.0
request_durations: list = None
def __post_init__(self):
if self.request_durations is None:
self.request_durations = []
def record_request(self, duration_ms: float, success: bool):
"""Record metrics for a request"""
self.total_requests += 1
self.total_duration_ms += duration_ms
self.request_durations.append(duration_ms)
if not success:
self.total_errors += 1
def error_rate(self) -> float:
"""Calculate error rate percentage"""
if self.total_requests == 0:
return 0
return (self.total_errors / self.total_requests) * 100
def average_duration_ms(self) -> float:
"""Calculate average request duration"""
if not self.request_durations:
return 0
return mean(self.request_durations)
def p95_duration_ms(self) -> float:
"""Calculate 95th percentile duration"""
if not self.request_durations or len(self.request_durations) < 20:
return 0
sorted_durations = sorted(self.request_durations)
index = int(len(sorted_durations) * 0.95)
return sorted_durations[index]
def summarize(self) -> str:
"""Get human-readable metrics summary"""
return f"""
API Metrics:
Total Requests: {self.total_requests}
Error Rate: {self.error_rate():.1f}%
Avg Duration: {self.average_duration_ms():.1f}ms
P95 Duration: {self.p95_duration_ms():.1f}ms
"""
class MetricsTrackingClient:
"""API client with metrics collection"""
def __init__(self, api_key: str):
self.api_key = api_key
self.metrics = APIMetrics()
def request(self, method: str, path: str) -> Dict:
"""Make request and track metrics"""
import time
start = time.time()
success = False
try:
response = requests.request(...)
response.raise_for_status()
success = True
return response.json()
finally:
duration_ms = (time.time() - start) * 1000
self.metrics.record_request(duration_ms, success)
def get_metrics_summary(self) -> str:
"""Get current metrics"""
return self.metrics.summarize()

Performance Optimization

Connection Pooling

Reuse connections to improve performance:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session() -> requests.Session:
"""Create HTTP session with connection pooling and retry strategy"""
session = requests.Session()
# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
class OptimizedAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = create_session()
def request(self, method: str, path: str) -> Dict:
"""Make request using pooled connection"""
response = self.session.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()
def close(self):
"""Close session and connections"""
self.session.close()
# Usage
client = OptimizedAPIClient(api_key="...")
try:
# Multiple requests reuse connections
req1 = client.request("GET", "/api/organizations/org-123/requirements/1")
req2 = client.request("GET", "/api/organizations/org-123/requirements/2")
req3 = client.request("GET", "/api/organizations/org-123/requirements/3")
finally:
client.close()

Parallel Requests

Execute multiple independent requests in parallel:

import concurrent.futures
from typing import List
def fetch_requirements_parallel(
client: APIClient,
org_id: str,
requirement_ids: List[str],
max_workers: int = 5
) -> List[Dict]:
"""Fetch multiple requirements in parallel"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(client.get_requirement, org_id, req_id)
for req_id in requirement_ids
]
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Failed to fetch requirement: {e}")
return results
# Usage
requirement_ids = [f"req-{i}" for i in range(100)]
# Fetch 100 requirements in parallel with 5 concurrent requests
requirements = fetch_requirements_parallel(client, "org-123", requirement_ids)
print(f"Fetched {len(requirements)} requirements")

Batch Requests for Efficiency

Combine multiple operations into single request:

def get_requirements_batch(
client: APIClient,
org_id: str,
requirement_ids: List[str]
) -> Dict[str, Dict]:
"""Get multiple requirements with single API call"""
response = client.request(
"POST",
f"/api/organizations/{org_id}/requirements/batch-get",
data={"ids": requirement_ids}
)
# Index results by ID for easy lookup
results = {}
for req in response["data"]:
results[req["id"]] = req
return results
# Usage
# Much more efficient than 100 individual GET requests
requirements = get_requirements_batch(
client,
"org-123",
[f"req-{i}" for i in range(100)]
)

Multi-tenant Considerations

Organization-Scoped Requests

Always include organization context:

class MultiTenantAPIClient:
"""API client with multi-tenant awareness"""
def __init__(self, api_key: str, org_id: str):
self.api_key = api_key
self.org_id = org_id
def get_requirement(self, req_id: str) -> Dict:
"""Get requirement for current organization"""
# Always scope to organization
response = requests.get(
f"/api/organizations/{self.org_id}/requirements/{req_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()["data"]
def create_requirement(self, data: Dict) -> Dict:
"""Create requirement for current organization"""
response = requests.post(
f"/api/organizations/{self.org_id}/requirements",
headers={"Authorization": f"Bearer {self.api_key}"},
json=data
)
response.raise_for_status()
return response.json()["data"]

Preventing Cross-Tenant Data Leaks

Validate organization ownership:

def verify_organization_access(
client: APIClient,
api_key: str,
org_id: str
) -> bool:
"""Verify API key has access to organization"""
try:
response = requests.get(
f"/api/organizations/{org_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
return response.status_code == 200
except:
return False
class SecureMultiTenantClient:
def __init__(self, api_key: str, org_id: str):
self.api_key = api_key
self.org_id = org_id
# Verify access on initialization
if not verify_organization_access(self, api_key, org_id):
raise AuthenticationError(f"No access to organization {org_id}")
def request(self, method: str, path: str) -> Dict:
"""Make request with organization validation"""
# Ensure path is scoped to organization
if not path.startswith(f"/api/organizations/{self.org_id}"):
raise SecurityError("Requests must be scoped to initialized organization")
return requests.request(
method,
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {self.api_key}"}
).json()

Common Anti-patterns to Avoid

❌ Anti-pattern: Ignoring Rate Limits

# BAD: Makes requests without respecting rate limits
for i in range(1000):
requirement = client.get_requirement("org-123", f"req-{i}")

✅ Pattern: Respecting Rate Limits

# GOOD: Batches requests to avoid rate limit
requirement_ids = [f"req-{i}" for i in range(1000)]
for batch in batches(requirement_ids, 100):
requirements = client.batch_get_requirements("org-123", batch)

❌ Anti-pattern: No Retry Logic

# BAD: Single attempt, no retry
response = requests.get(url) # Fails if network is flaky

✅ Pattern: Automatic Retries

# GOOD: Automatic retry with backoff
result = retry_with_backoff(lambda: requests.get(url))

❌ Anti-pattern: Hardcoded Timeouts

# BAD: Request might hang indefinitely
response = requests.get(url)

✅ Pattern: Appropriate Timeouts

# GOOD: Request times out after 10 seconds
response = requests.get(url, timeout=10)

❌ Anti-pattern: No Error Handling

# BAD: Crashes on validation error
user = create_user(client, {"email": "invalid"})

✅ Pattern: Proper Error Handling

# GOOD: Handles validation errors gracefully
try:
user = create_user(client, {"email": "invalid"})
except ValidationError as e:
print(f"Validation failed: {e.details}")
except APIError as e:
print(f"API error: {e}")

Production Deployment Checklist

Before deploying API integrations to production, verify:

  • Authentication

    • API keys securely stored (environment variables, secrets manager)
    • API keys rotated regularly
    • No hardcoded credentials in code or config files
  • Error Handling

    • All error types handled appropriately
    • User-friendly error messages displayed
    • Errors logged for debugging
    • Request IDs captured for API support
  • Resilience

    • Retry logic implemented with exponential backoff
    • Timeout values set appropriately
    • Connection pooling configured
    • Circuit breaker pattern for graceful degradation
  • Performance

    • Caching implemented for appropriate endpoints
    • Batch operations used instead of individual requests
    • Pagination implemented for large datasets
    • Load testing completed
  • Monitoring

    • Request/response logging enabled
    • Metrics collection (error rates, latency, throughput)
    • Alerts configured for failures
    • Dashboard created for visibility
  • Security

    • TLS/SSL verification enabled
    • Input validation implemented
    • Multi-tenant data isolation verified
    • Rate limiting respected
  • Testing

    • Unit tests with mocks
    • Integration tests completed
    • Error scenario testing
    • Load testing completed
  • Documentation

    • Integration documented
    • Error handling procedures documented
    • Troubleshooting guide created
    • Runbook for common issues

SDK and Client Library Patterns

Building a Reusable SDK

class CatalioSDK:
"""Production-ready SDK for Catalio API"""
def __init__(
self,
api_key: str,
org_id: str,
base_url: Optional[str] = None
):
self.api_key = api_key
self.org_id = org_id
# Get base_url from environment or use provided value
self.base_url = base_url or os.getenv("CATALIO_API_BASE_URL", "http://localhost:4000")
if not self.base_url:
raise ValueError("base_url must be provided or set CATALIO_API_BASE_URL environment variable")
self.session = self._create_session()
self.cache = ResponseCache()
def _create_session(self) -> requests.Session:
"""Create HTTP session with retry strategy"""
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
session.mount("https://", adapter)
return session
def _request(
self,
method: str,
path: str,
data: Optional[Dict] = None
) -> Dict:
"""Internal request method with error handling"""
url = f"{self.base_url}{path}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/vnd.catalio.v2+json"
}
response = self.session.request(
method,
url,
headers=headers,
json=data,
timeout=10
)
if response.status_code == 429:
raise RateLimitError(
message="Rate limit exceeded",
retry_after=int(response.headers.get("Retry-After", 60))
)
elif response.status_code >= 400:
raise APIError(response.json())
return response.json()
# Resource Methods
def get_requirement(self, requirement_id: str) -> Dict:
"""Get a requirement"""
return self._request(
"GET",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}"
)["data"]
def list_requirements(
self,
page: int = 1,
page_size: int = 50,
status: Optional[str] = None
) -> Dict:
"""List requirements with optional filtering"""
params = {"page": page, "page_size": page_size}
if status:
params["status"] = status
return self._request(
"GET",
f"/api/organizations/{self.org_id}/requirements",
params=params
)
def create_requirement(self, data: Dict) -> Dict:
"""Create a requirement"""
return self._request(
"POST",
f"/api/organizations/{self.org_id}/requirements",
data=data
)["data"]
def update_requirement(self, requirement_id: str, data: Dict) -> Dict:
"""Update a requirement"""
return self._request(
"PUT",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}",
data=data
)["data"]
def delete_requirement(self, requirement_id: str) -> None:
"""Delete a requirement"""
self._request(
"DELETE",
f"/api/organizations/{self.org_id}/requirements/{requirement_id}"
)
def close(self):
"""Close session and cleanup resources"""
self.session.close()
# Usage
sdk = CatalioSDK(api_key="your-api-key", org_id="org-123")
try:
# List requirements
reqs = sdk.list_requirements(status="active")
# Create requirement
new_req = sdk.create_requirement({"title": "New Feature"})
# Update requirement
updated = sdk.update_requirement(new_req["id"], {"status": "archived"})
# Delete requirement
sdk.delete_requirement(new_req["id"])
finally:
sdk.close()

SDK Best Practices

  1. Consistent Method Naming: Use predictable patterns (get, list, create, update, delete)
  2. Type Hints: Provide clear input/output types
  3. Documentation: Document each method with examples
  4. Error Handling: Raise specific exception types
  5. Resource Management: Provide close/cleanup methods
  6. Version Support: Support multiple API versions gracefully
  7. Testing: Include comprehensive test suite
  8. Publishing: Publish to package repositories (PyPI, npm, etc.)

Learn More