Building custom integrations allows you to connect Catalio with proprietary systems, legacy applications, and unique business workflows that aren’t covered by pre-built connectors. This comprehensive guide walks you through designing, implementing, testing, and deploying production-ready integrations using Catalio’s REST API.
When to Build Custom Integrations
Before investing in custom integration development, evaluate whether a pre-built connector meets your needs:
Choose Pre-Built Connectors When
- Standard workflows: Your use case matches common integration patterns
- Supported platforms: Catalio offers a native connector for your target system
- Time-to-market priority: You need integration running quickly with minimal development
- Limited resources: You lack dedicated integration development capacity
- Ongoing maintenance burden: You prefer vendor-supported, auto-updating integrations
Build Custom Integrations When
- Proprietary systems: Connecting to internal tools or custom-built applications
- Unique workflows: Business processes that don’t fit standard integration patterns
- Legacy systems: Integrating with older systems without modern API connectors
- Complex data transformations: Requiring sophisticated mapping or business logic
- Compliance requirements: Needing full control over data handling and security
- High-volume operations: Optimizing for specific performance characteristics
- Multi-system orchestration: Coordinating data flows across multiple systems
Prerequisites
Before building your integration, ensure you have:
API Access
- Catalio Account: Active organization with API access enabled
- API Credentials: Either API keys or OAuth 2.0 client credentials
- Permissions: Appropriate roles for the resources you’ll access
- Rate Limits: Understanding of your organization’s API quotas
Development Environment
- Programming Language: Choose from Python, JavaScript/Node.js, Go, Ruby, or Java
- HTTP Client Library: For making REST API requests
- JSON Parser: For processing request/response payloads
- Testing Tools: Postman, curl, or language-specific testing frameworks
- Version Control: Git repository for tracking integration code
Documentation Access
- API Reference: Available at
https://api.catalio.io/docs - Webhook Guide: Event types and payload schemas
- SDK Documentation: If using official Catalio SDKs
- Support Channel: Access to technical support for troubleshooting
REST API Basics
Catalio’s REST API follows modern conventions with JSON payloads, standard HTTP methods, and predictable resource-oriented URLs.
API versioning ensures backward compatibility. Major version changes are announced at least 6 months in advance.
Authentication
Catalio supports two authentication methods:
API Key Authentication
Simplest method for server-to-server integrations:
curl -H "Authorization: Bearer YOUR_API_KEY" \
https://api.catalio.io/v1/requirements
Obtaining API Keys:
- Navigate to Organization Settings → API Access
- Click “Generate New API Key”
- Copy the key immediately (shown only once)
- Store securely in environment variables or secrets manager
Security Best Practices:
- Never commit API keys to version control
- Rotate keys every 90 days
- Use separate keys for development, staging, and production
- Revoke keys immediately if compromised
OAuth 2.0 Authentication
Recommended for user-facing integrations requiring delegated access:
Step 1: Authorization Request
Redirect users to Catalio’s authorization endpoint:
https://auth.catalio.io/oauth/authorize?
response_type=code&
client_id=YOUR_CLIENT_ID&
redirect_uri=https://yourapp.com/callback&
scope=requirements:read requirements:write&
state=RANDOM_STATE_TOKEN
Step 2: Exchange Authorization Code
After user grants permission, exchange the code for access token:
curl -X POST https://auth.catalio.io/oauth/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=authorization_code" \
-d "code=AUTH_CODE_FROM_CALLBACK" \
-d "client_id=YOUR_CLIENT_ID" \
-d "client_secret=YOUR_CLIENT_SECRET" \
-d "redirect_uri=https://yourapp.com/callback"
Response:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "def50200a8b7c...",
"scope": "requirements:read requirements:write"
}
Step 3: Use Access Token
Include the access token in API requests:
curl -H "Authorization: Bearer ACCESS_TOKEN" \
https://api.catalio.io/v1/requirements
Step 4: Refresh Token
When access token expires, use refresh token:
curl -X POST https://auth.catalio.io/oauth/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=refresh_token" \
-d "refresh_token=REFRESH_TOKEN" \
-d "client_id=YOUR_CLIENT_ID" \
-d "client_secret=YOUR_CLIENT_SECRET"
Request and Response Formats
All API requests and responses use JSON with UTF-8 encoding.
Standard Request Headers
Content-Type: application/json
Authorization: Bearer YOUR_TOKEN
Accept: application/json
X-Idempotency-Key: UNIQUE_REQUEST_ID (for POST/PUT/PATCH)
Standard Response Structure
Success Response (200 OK):
{
"data": {
"id": "req_abc123",
"title": "User authentication system",
"description": "Implement secure user login...",
"status": "approved"
}
}
List Response (200 OK):
{
"data": [
{ "id": "req_abc123", "title": "Requirement 1" },
{ "id": "req_def456", "title": "Requirement 2" }
],
"pagination": {
"page": 1,
"per_page": 25,
"total_pages": 4,
"total_count": 87
}
}
Error Response (4xx/5xx):
{
"error": {
"code": "invalid_request",
"message": "Title cannot be blank",
"details": {
"field": "title",
"constraint": "required"
}
}
}
HTTP Methods and Semantics
- GET: Retrieve resources (idempotent, no side effects)
- POST: Create new resources (not idempotent without idempotency key)
- PUT: Replace entire resource (idempotent)
- PATCH: Update specific fields (idempotent with idempotency key)
- DELETE: Remove resource (idempotent)
Core API Endpoints
Requirements API
List Requirements:
GET /v1/requirements?status=approved&page=1&per_page=50
Get Single Requirement:
GET /v1/requirements/req_abc123
Create Requirement:
POST /v1/requirements
Content-Type: application/json
{
"title": "User authentication system",
"description": "Implement secure login with MFA support",
"status": "draft",
"priority": "high",
"category": "security",
"tags": ["authentication", "security", "user-management"]
}
Update Requirement:
PATCH /v1/requirements/req_abc123
Content-Type: application/json
X-Idempotency-Key: update_req_abc123_20250110
{
"status": "approved",
"priority": "critical"
}
Delete Requirement:
DELETE /v1/requirements/req_abc123
Search and Filtering
Advanced Search:
GET /v1/requirements/search?
q=authentication&
status=approved,in_review&
priority=high,critical&
created_after=2025-01-01&
sort=created_at:desc
Filter Parameters:
q: Full-text search across title and descriptionstatus: Filter by status (comma-separated for multiple)priority: Filter by priority levelcategory: Filter by categorytags: Filter by tags (comma-separated)created_after/created_before: Date range filtersupdated_after/updated_before: Last modified date rangesort: Sort field and direction (e.g.,created_at:desc)
Common Integration Patterns
Pattern 1: Synchronous Request-Response
Best for real-time operations with immediate feedback:
Use Cases:
- Creating requirements from external forms
- Real-time validation of data before submission
- User-initiated actions requiring immediate confirmation
Example: Creating Requirement from External Form
import requests
import json
def create_requirement_from_form(form_data):
"""Create Catalio requirement from external form submission."""
# Transform external form data to Catalio format
requirement_data = {
"title": form_data["requirement_title"],
"description": form_data["detailed_description"],
"priority": map_priority(form_data["urgency"]),
"category": map_category(form_data["department"]),
"tags": form_data["keywords"].split(",")
}
# Make synchronous API request
response = requests.post(
"https://api.catalio.io/v1/requirements",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"X-Idempotency-Key": f"form_{form_data['submission_id']}"
},
json=requirement_data,
timeout=10
)
if response.status_code == 201:
requirement = response.json()["data"]
return {
"success": True,
"requirement_id": requirement["id"],
"message": f"Requirement created: {requirement['title']}"
}
else:
error = response.json()["error"]
return {
"success": False,
"error_code": error["code"],
"error_message": error["message"]
}
def map_priority(urgency):
"""Map external urgency levels to Catalio priorities."""
priority_map = {
"low": "low",
"medium": "medium",
"high": "high",
"urgent": "critical"
}
return priority_map.get(urgency, "medium")
def map_category(department):
"""Map department to Catalio category."""
category_map = {
"engineering": "technical",
"sales": "business",
"support": "operational",
"security": "security"
}
return category_map.get(department, "general")
Pattern 2: Asynchronous Queue-Based Processing
Best for high-volume batch operations:
Use Cases:
- Nightly synchronization of large datasets
- Bulk imports from data warehouses
- Processing high-volume event streams
Example: Queue-Based Bulk Import
const axios = require('axios')
const Queue = require('bull')
// Create job queue for processing requirements
const requirementQueue = new Queue('catalio-import', {
redis: { host: 'localhost', port: 6379 },
})
// Producer: Add requirements to queue
async function queueBulkImport(requirements) {
console.log(`Queuing ${requirements.length} requirements for import`)
for (const req of requirements) {
await requirementQueue.add(
'import-requirement',
{
requirement: req,
retry_count: 0,
max_retries: 3,
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
}
)
}
}
// Consumer: Process queued requirements
requirementQueue.process('import-requirement', 5, async (job) => {
const { requirement, retry_count } = job.data
try {
const response = await axios.post(
'https://api.catalio.io/v1/requirements',
{
title: requirement.title,
description: requirement.description,
priority: requirement.priority,
tags: requirement.tags,
},
{
headers: {
Authorization: `Bearer ${process.env.CATALIO_API_KEY}`,
'Content-Type': 'application/json',
'X-Idempotency-Key': `import_${requirement.external_id}`,
},
timeout: 5000,
}
)
console.log(`Created requirement: ${response.data.data.id}`)
return { success: true, catalio_id: response.data.data.id }
} catch (error) {
if (error.response && error.response.status === 429) {
// Rate limit hit - retry with exponential backoff
throw new Error('Rate limit exceeded - will retry')
}
if (retry_count < 3) {
// Retry on transient errors
throw new Error(`Failed to create requirement: ${error.message}`)
}
// Log permanent failure after max retries
console.error(`Permanent failure for ${requirement.external_id}:`, error)
return { success: false, error: error.message }
}
})
// Monitor queue health
requirementQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result)
})
requirementQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message)
})
module.exports = { queueBulkImport }
Pattern 3: Event-Driven Architecture
Best for real-time reactive integrations:
Use Cases:
- Triggering workflows when requirements change status
- Notifying external systems of requirement updates
- Synchronizing changes bi-directionally
Example: Event-Driven Status Sync
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
// WebhookPayload represents incoming webhook from Catalio
type WebhookPayload struct {
Event string `json:"event"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
}
// RequirementUpdated handles requirement.updated webhook events
func RequirementUpdated(w http.ResponseWriter, r *http.Request) {
// Verify webhook signature for security
if !verifyWebhookSignature(r) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
// Parse webhook payload
var payload WebhookPayload
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
http.Error(w, "Invalid payload", http.StatusBadRequest)
return
}
// Process based on event type
switch payload.Event {
case "requirement.updated":
handleRequirementUpdate(payload.Data)
case "requirement.status_changed":
handleStatusChange(payload.Data)
case "requirement.deleted":
handleRequirementDeleted(payload.Data)
default:
log.Printf("Unknown event type: %s", payload.Event)
}
// Acknowledge receipt immediately
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"received"}`)
}
func handleRequirementUpdate(data map[string]interface{}) {
requirementID := data["id"].(string)
newStatus := data["status"].(string)
log.Printf("Requirement %s updated to status: %s", requirementID, newStatus)
// Sync to external system (e.g., Jira, Salesforce)
if err := syncToExternalSystem(requirementID, data); err != nil {
log.Printf("Failed to sync to external system: %v", err)
// Queue for retry or alert operations team
queueFailedSync(requirementID, data, err)
}
}
func handleStatusChange(data map[string]interface{}) {
requirementID := data["id"].(string)
oldStatus := data["old_status"].(string)
newStatus := data["new_status"].(string)
log.Printf("Status change: %s -> %s for %s", oldStatus, newStatus, requirementID)
// Trigger status-specific workflows
if newStatus == "approved" {
triggerImplementationWorkflow(requirementID, data)
} else if newStatus == "rejected" {
notifyStakeholders(requirementID, data, "rejected")
}
}
func syncToExternalSystem(requirementID string, data map[string]interface{}) error {
// Build payload for external system
externalPayload := map[string]interface{}{
"external_ref": requirementID,
"summary": data["title"],
"description": data["description"],
"priority": mapPriority(data["priority"].(string)),
"status": mapStatus(data["status"].(string)),
}
payloadBytes, _ := json.Marshal(externalPayload)
// Send to external API
req, _ := http.NewRequest(
"POST",
"https://external-system.com/api/v1/items",
bytes.NewBuffer(payloadBytes),
)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", getExternalAPIKey()))
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("external API returned status %d", resp.StatusCode)
}
return nil
}
func verifyWebhookSignature(r *http.Request) bool {
// Implement HMAC signature verification
// signature := r.Header.Get("X-Catalio-Signature")
// return crypto.VerifyHMAC(signature, webhookSecret, requestBody)
return true // Simplified for example
}
func mapPriority(catalio string) string {
priorityMap := map[string]string{
"low": "P4",
"medium": "P3",
"high": "P2",
"critical": "P1",
}
return priorityMap[catalio]
}
func mapStatus(catalio string) string {
statusMap := map[string]string{
"draft": "Open",
"in_review": "In Progress",
"approved": "Ready",
"rejected": "Closed",
"implemented": "Done",
}
return statusMap[catalio]
}
func main() {
http.HandleFunc("/webhooks/catalio", RequirementUpdated)
log.Println("Webhook listener starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Webhook Configuration
Webhooks enable Catalio to push real-time events to your integration, eliminating the need for polling.
Setting Up Webhooks
Step 1: Configure Webhook Endpoint
- Navigate to Organization Settings → Integrations → Webhooks
- Click “Add Webhook Endpoint”
- Configure:
- URL: Your HTTPS endpoint (e.g.,
https://yourapp.com/webhooks/catalio) - Events: Select events to receive
- Secret: Generate signing secret for verification
- Description: Purpose of this webhook
- URL: Your HTTPS endpoint (e.g.,
Step 2: Verify Webhook Endpoint
Catalio sends a verification request:
{
"event": "webhook.verification",
"challenge": "random_verification_token"
}
Respond with the challenge value:
{
"challenge": "random_verification_token"
}
Available Webhook Events
Requirement Events:
requirement.created: New requirement createdrequirement.updated: Requirement fields modifiedrequirement.status_changed: Status transition occurredrequirement.deleted: Requirement removedrequirement.assigned: Requirement assigned to userrequirement.commented: New comment added
Use Case Events:
use_case.created: New use case createduse_case.updated: Use case modifieduse_case.linked: Use case linked to requirement
Persona Events:
persona.created: New persona createdpersona.updated: Persona modified
Webhook Payload Structure
Standard Webhook Payload:
{
"event": "requirement.status_changed",
"timestamp": "2025-01-10T14:30:00Z",
"webhook_id": "wh_abc123",
"data": {
"id": "req_abc123",
"title": "User authentication system",
"old_status": "in_review",
"new_status": "approved",
"changed_by": {
"id": "user_xyz789",
"name": "Jane Smith",
"email": "jane@example.com"
}
},
"organization_id": "org_def456"
}
Webhook Security
Signature Verification:
Every webhook includes an HMAC signature in the X-Catalio-Signature header:
import hmac
import hashlib
def verify_webhook_signature(payload, signature, secret):
"""Verify webhook signature for authenticity."""
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f"sha256={expected_signature}")
# Flask example
from flask import request, abort
@app.route('/webhooks/catalio', methods=['POST'])
def handle_webhook():
signature = request.headers.get('X-Catalio-Signature')
payload = request.get_data(as_text=True)
if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
abort(401, "Invalid signature")
# Process webhook...
return {"status": "received"}
Webhook Best Practices
- Respond Quickly: Acknowledge receipt within 5 seconds to avoid timeouts
- Process Asynchronously: Queue webhook processing for longer operations
- Handle Retries: Catalio retries failed webhooks up to 3 times with exponential backoff
- Idempotency: Process each webhook only once using
webhook_idtracking - Monitor Failures: Alert on repeated webhook failures
- Verify Signatures: Always verify HMAC signatures for security
Bi-directional Synchronization
Bi-directional sync keeps Catalio and external systems in sync, handling changes from both directions.
Sync Architecture Patterns
Pattern 1: Timestamp-Based Sync
Track last sync time and query for changes:
import requests
from datetime import datetime, timedelta
class BidirectionalSync:
def __init__(self, api_key, external_system):
self.api_key = api_key
self.external_system = external_system
self.last_sync = self.load_last_sync_time()
def sync_from_catalio(self):
"""Pull changes from Catalio since last sync."""
# Query Catalio for updated requirements
response = requests.get(
"https://api.catalio.io/v1/requirements",
params={
"updated_after": self.last_sync.isoformat(),
"per_page": 100
},
headers={"Authorization": f"Bearer {self.api_key}"}
)
catalio_requirements = response.json()["data"]
for req in catalio_requirements:
# Check if requirement exists in external system
external_item = self.external_system.find_by_catalio_id(req["id"])
if external_item:
# Update existing item
if req["updated_at"] > external_item["updated_at"]:
self.external_system.update(external_item["id"], {
"title": req["title"],
"description": req["description"],
"status": self.map_status_to_external(req["status"]),
"catalio_updated_at": req["updated_at"]
})
else:
# Create new item in external system
self.external_system.create({
"catalio_id": req["id"],
"title": req["title"],
"description": req["description"],
"status": self.map_status_to_external(req["status"]),
"catalio_created_at": req["created_at"]
})
self.update_last_sync_time(datetime.utcnow())
def sync_to_catalio(self):
"""Push changes from external system to Catalio."""
# Query external system for updated items
external_items = self.external_system.find_updated_since(self.last_sync)
for item in external_items:
catalio_id = item.get("catalio_id")
if catalio_id:
# Update existing Catalio requirement
response = requests.patch(
f"https://api.catalio.io/v1/requirements/{catalio_id}",
json={
"title": item["title"],
"description": item["description"],
"status": self.map_status_from_external(item["status"])
},
headers={
"Authorization": f"Bearer {self.api_key}",
"X-Idempotency-Key": f"sync_{item['id']}_{item['updated_at']}"
}
)
if response.status_code == 200:
print(f"Updated Catalio requirement: {catalio_id}")
else:
# Create new Catalio requirement
response = requests.post(
"https://api.catalio.io/v1/requirements",
json={
"title": item["title"],
"description": item["description"],
"status": self.map_status_from_external(item["status"]),
"tags": ["external-import"]
},
headers={
"Authorization": f"Bearer {self.api_key}",
"X-Idempotency-Key": f"create_{item['id']}"
}
)
if response.status_code == 201:
catalio_req = response.json()["data"]
# Store Catalio ID in external system
self.external_system.update(item["id"], {
"catalio_id": catalio_req["id"]
})
self.update_last_sync_time(datetime.utcnow())
def run_sync_cycle(self):
"""Execute complete bidirectional sync cycle."""
print(f"Starting sync cycle at {datetime.utcnow()}")
try:
self.sync_from_catalio()
self.sync_to_catalio()
print("Sync cycle completed successfully")
except Exception as e:
print(f"Sync cycle failed: {e}")
# Alert operations team
self.send_alert(f"Sync failed: {e}")
def map_status_to_external(self, catalio_status):
"""Map Catalio status to external system status."""
status_map = {
"draft": "open",
"in_review": "in_progress",
"approved": "ready",
"implemented": "done",
"rejected": "closed"
}
return status_map.get(catalio_status, "open")
def map_status_from_external(self, external_status):
"""Map external system status to Catalio status."""
status_map = {
"open": "draft",
"in_progress": "in_review",
"ready": "approved",
"done": "implemented",
"closed": "rejected"
}
return status_map.get(external_status, "draft")
Pattern 2: Event-Driven Sync
Use webhooks from both systems to trigger immediate synchronization:
const express = require('express')
const app = express()
// Parse JSON bodies
app.use(express.json())
// Webhook from Catalio triggers external system update
app.post('/webhooks/catalio', async (req, res) => {
const { event, data } = req.body
// Acknowledge immediately
res.status(200).json({ status: 'received' })
// Process asynchronously
if (event === 'requirement.updated') {
await syncToExternalSystem(data)
}
})
// Webhook from external system triggers Catalio update
app.post('/webhooks/external', async (req, res) => {
const { action, item } = req.body
// Acknowledge immediately
res.status(200).json({ status: 'received' })
// Process asynchronously
if (action === 'item_updated') {
await syncToCatalio(item)
}
})
async function syncToExternalSystem(catalioRequirement) {
const externalItem = await externalAPI.findByCatalioId(catalioRequirement.id)
if (externalItem) {
await externalAPI.update(externalItem.id, {
title: catalioRequirement.title,
description: catalioRequirement.description,
status: mapStatusToExternal(catalioRequirement.status),
})
}
}
async function syncToCatalio(externalItem) {
const catalioId = externalItem.catalio_id
if (catalioId) {
await catalioAPI.updateRequirement(catalioId, {
title: externalItem.title,
description: externalItem.description,
status: mapStatusFromExternal(externalItem.status),
})
}
}
Conflict Resolution Strategies
Strategy 1: Last-Write-Wins
Simplest approach - most recent change takes precedence:
def resolve_conflict_last_write_wins(catalio_req, external_item):
"""Resolve conflict by choosing most recent update."""
catalio_time = datetime.fromisoformat(catalio_req["updated_at"])
external_time = datetime.fromisoformat(external_item["updated_at"])
if catalio_time > external_time:
# Catalio is newer - update external system
return ("update_external", catalio_req)
else:
# External is newer - update Catalio
return ("update_catalio", external_item)
Strategy 2: Field-Level Merging
Merge changes at field level:
def resolve_conflict_field_merge(catalio_req, external_item, last_known_state):
"""Merge conflicting changes at field level."""
merged = {}
# For each field, determine which change to keep
for field in ["title", "description", "status", "priority"]:
catalio_value = catalio_req.get(field)
external_value = external_item.get(field)
last_value = last_known_state.get(field)
if catalio_value != last_value and external_value == last_value:
# Only Catalio changed - use Catalio value
merged[field] = catalio_value
elif external_value != last_value and catalio_value == last_value:
# Only external changed - use external value
merged[field] = external_value
elif catalio_value != last_value and external_value != last_value:
# Both changed - requires manual resolution
merged[field] = resolve_manual(field, catalio_value, external_value)
else:
# No changes or identical changes
merged[field] = catalio_value
return merged
Strategy 3: Manual Review
Flag conflicts for human review:
def resolve_conflict_manual_review(catalio_req, external_item):
"""Create conflict record for manual review."""
conflict = {
"type": "sync_conflict",
"catalio_requirement": catalio_req,
"external_item": external_item,
"detected_at": datetime.utcnow().isoformat(),
"status": "pending_review"
}
# Store in conflict resolution queue
save_conflict_for_review(conflict)
# Notify admin team
send_notification(
"Sync conflict detected",
f"Manual review required for requirement {catalio_req['id']}"
)
return ("manual_review", conflict)
Error Handling and Retry Logic
Robust error handling ensures your integration handles transient failures gracefully.
HTTP Error Status Codes
Client Errors (4xx):
400 Bad Request: Invalid request format or parameters401 Unauthorized: Invalid or missing authentication credentials403 Forbidden: Valid auth but insufficient permissions404 Not Found: Resource doesn’t exist409 Conflict: Request conflicts with current resource state422 Unprocessable Entity: Validation errors429 Too Many Requests: Rate limit exceeded
Server Errors (5xx):
500 Internal Server Error: Unexpected server error502 Bad Gateway: Gateway or proxy error503 Service Unavailable: Temporary service unavailable504 Gateway Timeout: Gateway timeout
Retry Strategy
Implement exponential backoff with jitter for retries:
import time
import random
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_retrying_session(
retries=3,
backoff_factor=1.0,
status_forcelist=(500, 502, 503, 504),
session=None
):
"""Create requests session with automatic retries."""
session = session or requests.Session()
retry_strategy = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Usage
session = create_retrying_session(retries=3, backoff_factor=2)
response = session.post(
"https://api.catalio.io/v1/requirements",
json={"title": "New requirement"},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=10
)
Custom Retry with Exponential Backoff
import time
import random
def retry_with_backoff(
func,
max_retries=3,
base_delay=1,
max_delay=60,
exponential_base=2,
jitter=True
):
"""
Retry function with exponential backoff and optional jitter.
Args:
func: Function to retry
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
exponential_base: Base for exponential calculation
jitter: Add randomness to prevent thundering herd
"""
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
if attempt == max_retries:
# Final attempt failed - raise exception
raise
# Calculate delay with exponential backoff
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Add jitter to prevent thundering herd problem
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
# Usage
def create_requirement():
response = requests.post(
"https://api.catalio.io/v1/requirements",
json={"title": "New requirement"},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=10
)
response.raise_for_status()
return response.json()
result = retry_with_backoff(create_requirement, max_retries=3, base_delay=2)
Idempotency Keys
Use idempotency keys to safely retry requests without duplicate creation:
import uuid
import requests
def create_requirement_idempotent(requirement_data, external_id=None):
"""
Create requirement with idempotency key for safe retries.
Args:
requirement_data: Requirement attributes
external_id: External system ID for idempotency key
"""
# Generate idempotency key from external ID or UUID
idempotency_key = f"create_{external_id}" if external_id else str(uuid.uuid4())
response = requests.post(
"https://api.catalio.io/v1/requirements",
json=requirement_data,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"X-Idempotency-Key": idempotency_key
},
timeout=10
)
if response.status_code == 201:
# Successfully created
return response.json()["data"]
elif response.status_code == 200:
# Idempotency key matched - returning existing resource
return response.json()["data"]
else:
response.raise_for_status()
Circuit Breaker Pattern
Prevent cascading failures with circuit breaker:
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing - reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold=5,
timeout=60,
recovery_timeout=30
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN - rejecting request")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful request."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
def call_catalio_api():
return circuit_breaker.call(
lambda: requests.get(
"https://api.catalio.io/v1/requirements",
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=10
)
)
Rate Limiting and Backoff
Catalio enforces rate limits to ensure fair usage and system stability.
Rate Limit Headers
API responses include rate limit information:
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1736524800
X-RateLimit-Limit: Maximum requests per windowX-RateLimit-Remaining: Requests remaining in current windowX-RateLimit-Reset: Unix timestamp when limit resets
Rate Limit Tiers
Standard Tier:
- 1,000 requests per hour
- 10,000 requests per day
Professional Tier:
- 5,000 requests per hour
- 50,000 requests per day
Enterprise Tier:
- Custom rate limits negotiated
- Dedicated infrastructure available
Handling Rate Limits
Respect Rate Limit Headers:
import time
import requests
def make_api_request_with_rate_limit(url, headers):
"""Make API request respecting rate limits."""
response = requests.get(url, headers=headers)
# Check rate limit headers
remaining = int(response.headers.get('X-RateLimit-Remaining', 1000))
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
if response.status_code == 429:
# Rate limit exceeded - wait until reset
wait_time = reset_time - int(time.time())
print(f"Rate limit exceeded. Waiting {wait_time}s...")
time.sleep(max(wait_time, 0) + 1)
# Retry request
return make_api_request_with_rate_limit(url, headers)
# Proactive throttling when approaching limit
if remaining < 10:
print(f"Approaching rate limit ({remaining} remaining). Throttling...")
time.sleep(2)
return response
Adaptive Rate Limiting:
class AdaptiveRateLimiter:
def __init__(self, initial_delay=0.1):
self.delay = initial_delay
self.min_delay = 0.05
self.max_delay = 10.0
def wait(self):
"""Wait before next request."""
time.sleep(self.delay)
def on_success(self, response):
"""Adjust delay based on successful response."""
remaining = int(response.headers.get('X-RateLimit-Remaining', 1000))
limit = int(response.headers.get('X-RateLimit-Limit', 1000))
usage_ratio = 1 - (remaining / limit)
if usage_ratio > 0.9:
# Approaching limit - increase delay
self.delay = min(self.delay * 1.5, self.max_delay)
elif usage_ratio < 0.5:
# Well below limit - decrease delay
self.delay = max(self.delay * 0.8, self.min_delay)
def on_rate_limit(self, reset_time):
"""Handle rate limit error."""
wait_time = reset_time - int(time.time())
time.sleep(max(wait_time, 0) + 1)
self.delay = self.max_delay # Start slow after rate limit
# Usage
rate_limiter = AdaptiveRateLimiter()
for requirement_id in requirement_ids:
rate_limiter.wait()
response = requests.get(
f"https://api.catalio.io/v1/requirements/{requirement_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if response.status_code == 429:
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
rate_limiter.on_rate_limit(reset_time)
else:
rate_limiter.on_success(response)
Security Best Practices
API Key Management
Storage:
- Use environment variables or secrets management services (AWS Secrets Manager, HashiCorp Vault)
- Never commit API keys to version control
- Use separate keys for development, staging, and production
- Rotate keys every 90 days
Example: Environment Variable Configuration
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
CATALIO_API_KEY = os.getenv('CATALIO_API_KEY')
CATALIO_API_BASE_URL = os.getenv('CATALIO_API_BASE_URL', 'https://api.catalio.io/v1')
if not CATALIO_API_KEY:
raise ValueError("CATALIO_API_KEY environment variable not set")
Example: AWS Secrets Manager
import boto3
import json
def get_catalio_api_key():
"""Retrieve API key from AWS Secrets Manager."""
client = boto3.client('secretsmanager', region_name='us-east-1')
try:
response = client.get_secret_value(SecretId='prod/catalio/api-key')
secret = json.loads(response['SecretString'])
return secret['api_key']
except Exception as e:
print(f"Failed to retrieve secret: {e}")
raise
CATALIO_API_KEY = get_catalio_api_key()
HTTPS and TLS
Always use HTTPS:
- All Catalio API endpoints enforce HTTPS
- Reject connections with invalid certificates
- Use TLS 1.2 or higher
import requests
# Enable SSL verification (default)
response = requests.get(
"https://api.catalio.io/v1/requirements",
headers={"Authorization": f"Bearer {API_KEY}"},
verify=True # Always verify SSL certificates
)
Input Validation and Sanitization
Validate all inputs before sending to API:
import re
def validate_requirement_data(data):
"""Validate requirement data before API submission."""
errors = []
# Title validation
if not data.get('title') or len(data['title'].strip()) == 0:
errors.append("Title is required")
elif len(data['title']) > 200:
errors.append("Title must be 200 characters or less")
# Priority validation
valid_priorities = ['low', 'medium', 'high', 'critical']
if data.get('priority') and data['priority'] not in valid_priorities:
errors.append(f"Priority must be one of: {', '.join(valid_priorities)}")
# Email validation (if present)
if data.get('contact_email'):
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, data['contact_email']):
errors.append("Invalid email format")
if errors:
raise ValueError(f"Validation errors: {', '.join(errors)}")
return True
Audit Logging
Log all API interactions for security auditing:
import logging
import json
from datetime import datetime
# Configure audit logger
audit_logger = logging.getLogger('catalio_audit')
audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler('logs/catalio_audit.log')
handler.setFormatter(logging.Formatter('%(message)s'))
audit_logger.addHandler(handler)
def log_api_request(method, endpoint, request_data, response_status, user_id=None):
"""Log API request for audit trail."""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"method": method,
"endpoint": endpoint,
"user_id": user_id,
"status_code": response_status,
"request_summary": {
"keys": list(request_data.keys()) if request_data else []
}
}
audit_logger.info(json.dumps(audit_entry))
# Usage
def create_requirement(data, user_id):
log_api_request("POST", "/v1/requirements", data, None, user_id)
response = requests.post(
"https://api.catalio.io/v1/requirements",
json=data,
headers={"Authorization": f"Bearer {API_KEY}"}
)
log_api_request("POST", "/v1/requirements", data, response.status_code, user_id)
return response
Testing Strategies
Unit Testing API Interactions
Mock API responses for unit tests:
import unittest
from unittest.mock import patch, Mock
import requests
class TestCatalioIntegration(unittest.TestCase):
@patch('requests.post')
def test_create_requirement_success(self, mock_post):
"""Test successful requirement creation."""
# Mock API response
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {
"data": {
"id": "req_test123",
"title": "Test Requirement",
"status": "draft"
}
}
mock_post.return_value = mock_response
# Call function
result = create_requirement({
"title": "Test Requirement",
"description": "Test description"
})
# Assertions
self.assertEqual(result["id"], "req_test123")
mock_post.assert_called_once()
@patch('requests.post')
def test_create_requirement_validation_error(self, mock_post):
"""Test requirement creation with validation error."""
# Mock validation error response
mock_response = Mock()
mock_response.status_code = 422
mock_response.json.return_value = {
"error": {
"code": "validation_error",
"message": "Title cannot be blank"
}
}
mock_post.return_value = mock_response
# Expect exception
with self.assertRaises(ValidationError):
create_requirement({"description": "No title"})
Integration Testing with Sandbox
Use Catalio sandbox environment for integration tests:
import os
import pytest
# Sandbox configuration
SANDBOX_API_KEY = os.getenv('CATALIO_SANDBOX_API_KEY')
SANDBOX_BASE_URL = 'https://sandbox-api.catalio.io/v1'
@pytest.fixture
def sandbox_client():
"""Create sandbox API client for tests."""
return CatalioClient(
api_key=SANDBOX_API_KEY,
base_url=SANDBOX_BASE_URL
)
def test_create_and_retrieve_requirement(sandbox_client):
"""Test full requirement lifecycle in sandbox."""
# Create requirement
created = sandbox_client.create_requirement({
"title": "Integration Test Requirement",
"description": "Created by automated test",
"tags": ["test", "automation"]
})
assert created["id"]
assert created["title"] == "Integration Test Requirement"
# Retrieve requirement
retrieved = sandbox_client.get_requirement(created["id"])
assert retrieved["id"] == created["id"]
assert retrieved["title"] == created["title"]
# Update requirement
updated = sandbox_client.update_requirement(created["id"], {
"status": "in_review"
})
assert updated["status"] == "in_review"
# Delete requirement (cleanup)
sandbox_client.delete_requirement(created["id"])
Contract Testing
Verify API responses match expected schema:
import jsonschema
# Define expected schema for requirement object
REQUIREMENT_SCHEMA = {
"type": "object",
"properties": {
"id": {"type": "string", "pattern": "^req_"},
"title": {"type": "string"},
"description": {"type": ["string", "null"]},
"status": {"type": "string", "enum": ["draft", "in_review", "approved", "rejected"]},
"priority": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"created_at": {"type": "string", "format": "date-time"},
"updated_at": {"type": "string", "format": "date-time"}
},
"required": ["id", "title", "status", "created_at"]
}
def test_requirement_schema_compliance():
"""Verify API response matches expected schema."""
response = requests.get(
f"{SANDBOX_BASE_URL}/requirements/req_test123",
headers={"Authorization": f"Bearer {SANDBOX_API_KEY}"}
)
requirement = response.json()["data"]
# Validate against schema
try:
jsonschema.validate(instance=requirement, schema=REQUIREMENT_SCHEMA)
except jsonschema.ValidationError as e:
pytest.fail(f"Schema validation failed: {e.message}")
Deployment Patterns
Serverless Deployment (AWS Lambda)
Lambda function for webhook processing:
import json
import os
import boto3
import requests
# Environment variables
CATALIO_API_KEY = os.environ['CATALIO_API_KEY']
EXTERNAL_SYSTEM_URL = os.environ['EXTERNAL_SYSTEM_URL']
def lambda_handler(event, context):
"""
AWS Lambda handler for Catalio webhook events.
Triggered by API Gateway when Catalio sends webhook.
"""
try:
# Parse webhook payload
body = json.loads(event['body'])
webhook_event = body['event']
data = body['data']
# Verify webhook signature
signature = event['headers'].get('X-Catalio-Signature')
if not verify_signature(event['body'], signature):
return {
'statusCode': 401,
'body': json.dumps({'error': 'Invalid signature'})
}
# Process event based on type
if webhook_event == 'requirement.created':
result = handle_requirement_created(data)
elif webhook_event == 'requirement.updated':
result = handle_requirement_updated(data)
elif webhook_event == 'requirement.status_changed':
result = handle_status_changed(data)
else:
result = {'status': 'ignored', 'reason': 'Unknown event type'}
return {
'statusCode': 200,
'body': json.dumps({'status': 'processed', 'result': result})
}
except Exception as e:
print(f"Error processing webhook: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_requirement_created(data):
"""Sync newly created requirement to external system."""
response = requests.post(
f"{EXTERNAL_SYSTEM_URL}/items",
json={
"catalio_id": data["id"],
"title": data["title"],
"description": data["description"]
}
)
return {"external_id": response.json()["id"]}
Serverless Framework configuration (serverless.yml):
service: catalio-integration
provider:
name: aws
runtime: python3.11
region: us-east-1
environment:
CATALIO_API_KEY: ${env:CATALIO_API_KEY}
EXTERNAL_SYSTEM_URL: ${env:EXTERNAL_SYSTEM_URL}
functions:
webhook:
handler: handler.lambda_handler
events:
- http:
path: webhooks/catalio
method: post
timeout: 30
sync:
handler: sync.sync_handler
events:
- schedule: rate(5 minutes)
timeout: 300
Containerized Deployment (Docker + Kubernetes)
Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Run application
CMD ["python", "main.py"]
Kubernetes Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: catalio-integration
spec:
replicas: 3
selector:
matchLabels:
app: catalio-integration
template:
metadata:
labels:
app: catalio-integration
spec:
containers:
- name: integration
image: your-registry/catalio-integration:latest
env:
- name: CATALIO_API_KEY
valueFrom:
secretKeyRef:
name: catalio-secrets
key: api-key
ports:
- containerPort: 8080
resources:
requests:
memory: '256Mi'
cpu: '250m'
limits:
memory: '512Mi'
cpu: '500m'
---
apiVersion: v1
kind: Service
metadata:
name: catalio-integration
spec:
selector:
app: catalio-integration
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
Complete Example: CRM Integration
Full example integrating Catalio with custom CRM system:
"""
Complete CRM Integration Example
Synchronizes requirements between Catalio and custom CRM system.
"""
import os
import time
import hmac
import hashlib
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from flask import Flask, request, jsonify
# Configuration
CATALIO_API_KEY = os.getenv('CATALIO_API_KEY')
CATALIO_BASE_URL = 'https://api.catalio.io/v1'
WEBHOOK_SECRET = os.getenv('CATALIO_WEBHOOK_SECRET')
CRM_API_KEY = os.getenv('CRM_API_KEY')
CRM_BASE_URL = 'https://crm.example.com/api/v1'
app = Flask(__name__)
class CatalioClient:
"""Client for Catalio API interactions."""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.session = self._create_session()
def _create_session(self):
"""Create requests session with retries."""
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
})
return session
def get_requirement(self, requirement_id: str) -> Dict:
"""Get single requirement by ID."""
response = self.session.get(
f'{self.base_url}/requirements/{requirement_id}'
)
response.raise_for_status()
return response.json()['data']
def list_requirements(
self,
updated_after: Optional[str] = None,
per_page: int = 100
) -> List[Dict]:
"""List requirements with optional filters."""
params = {'per_page': per_page}
if updated_after:
params['updated_after'] = updated_after
response = self.session.get(
f'{self.base_url}/requirements',
params=params
)
response.raise_for_status()
return response.json()['data']
def create_requirement(self, data: Dict) -> Dict:
"""Create new requirement."""
response = self.session.post(
f'{self.base_url}/requirements',
json=data,
headers={'X-Idempotency-Key': data.get('idempotency_key', '')}
)
response.raise_for_status()
return response.json()['data']
def update_requirement(self, requirement_id: str, data: Dict) -> Dict:
"""Update existing requirement."""
response = self.session.patch(
f'{self.base_url}/requirements/{requirement_id}',
json=data
)
response.raise_for_status()
return response.json()['data']
class CRMClient:
"""Client for custom CRM API interactions."""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.session = self._create_session()
def _create_session(self):
"""Create requests session with auth."""
session = requests.Session()
session.headers.update({
'X-API-Key': self.api_key,
'Content-Type': 'application/json'
})
return session
def find_opportunity_by_catalio_id(self, catalio_id: str) -> Optional[Dict]:
"""Find CRM opportunity linked to Catalio requirement."""
response = self.session.get(
f'{self.base_url}/opportunities',
params={'custom_field_catalio_id': catalio_id}
)
if response.status_code == 404:
return None
response.raise_for_status()
opportunities = response.json()['data']
return opportunities[0] if opportunities else None
def create_opportunity(self, data: Dict) -> Dict:
"""Create new CRM opportunity."""
response = self.session.post(
f'{self.base_url}/opportunities',
json=data
)
response.raise_for_status()
return response.json()['data']
def update_opportunity(self, opportunity_id: str, data: Dict) -> Dict:
"""Update existing CRM opportunity."""
response = self.session.patch(
f'{self.base_url}/opportunities/{opportunity_id}',
json=data
)
response.raise_for_status()
return response.json()['data']
class IntegrationSync:
"""Bi-directional synchronization between Catalio and CRM."""
def __init__(self):
self.catalio = CatalioClient(CATALIO_API_KEY, CATALIO_BASE_URL)
self.crm = CRMClient(CRM_API_KEY, CRM_BASE_URL)
self.last_sync_time = self._load_last_sync_time()
def sync_catalio_to_crm(self, requirement: Dict):
"""Sync Catalio requirement to CRM opportunity."""
# Find existing CRM opportunity
opportunity = self.crm.find_opportunity_by_catalio_id(requirement['id'])
# Transform Catalio data to CRM format
crm_data = {
'name': requirement['title'],
'description': requirement['description'],
'stage': self._map_status_to_crm(requirement['status']),
'priority': requirement.get('priority', 'medium'),
'custom_field_catalio_id': requirement['id'],
'custom_field_catalio_url': f"https://app.catalio.io/requirements/{requirement['id']}"
}
if opportunity:
# Update existing
updated = self.crm.update_opportunity(opportunity['id'], crm_data)
print(f"Updated CRM opportunity {updated['id']} from requirement {requirement['id']}")
return updated
else:
# Create new
created = self.crm.create_opportunity(crm_data)
print(f"Created CRM opportunity {created['id']} from requirement {requirement['id']}")
return created
def sync_crm_to_catalio(self, opportunity: Dict):
"""Sync CRM opportunity to Catalio requirement."""
catalio_id = opportunity.get('custom_field_catalio_id')
# Transform CRM data to Catalio format
catalio_data = {
'title': opportunity['name'],
'description': opportunity['description'],
'status': self._map_stage_to_catalio(opportunity['stage']),
'priority': opportunity.get('priority', 'medium')
}
if catalio_id:
# Update existing requirement
updated = self.catalio.update_requirement(catalio_id, catalio_data)
print(f"Updated Catalio requirement {updated['id']} from opportunity {opportunity['id']}")
return updated
else:
# Create new requirement
catalio_data['tags'] = ['crm-import']
catalio_data['idempotency_key'] = f"crm_{opportunity['id']}"
created = self.catalio.create_requirement(catalio_data)
# Update CRM with Catalio ID
self.crm.update_opportunity(opportunity['id'], {
'custom_field_catalio_id': created['id']
})
print(f"Created Catalio requirement {created['id']} from opportunity {opportunity['id']}")
return created
def _map_status_to_crm(self, catalio_status: str) -> str:
"""Map Catalio status to CRM stage."""
status_map = {
'draft': 'prospecting',
'in_review': 'qualification',
'approved': 'proposal',
'implemented': 'closed_won',
'rejected': 'closed_lost'
}
return status_map.get(catalio_status, 'prospecting')
def _map_stage_to_catalio(self, crm_stage: str) -> str:
"""Map CRM stage to Catalio status."""
stage_map = {
'prospecting': 'draft',
'qualification': 'in_review',
'proposal': 'approved',
'closed_won': 'implemented',
'closed_lost': 'rejected'
}
return stage_map.get(crm_stage, 'draft')
def _load_last_sync_time(self) -> datetime:
"""Load last successful sync timestamp."""
# In production, load from database or cache
return datetime.utcnow() - timedelta(hours=1)
def _save_last_sync_time(self, sync_time: datetime):
"""Save last successful sync timestamp."""
# In production, save to database or cache
pass
# Initialize integration
integration = IntegrationSync()
# Webhook endpoint for Catalio events
@app.route('/webhooks/catalio', methods=['POST'])
def catalio_webhook():
"""Handle incoming Catalio webhook events."""
# Verify webhook signature
signature = request.headers.get('X-Catalio-Signature', '')
if not verify_webhook_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 401
# Parse webhook payload
payload = request.json
event_type = payload['event']
data = payload['data']
# Process event asynchronously (in production, use queue)
try:
if event_type in ['requirement.created', 'requirement.updated', 'requirement.status_changed']:
integration.sync_catalio_to_crm(data)
return jsonify({'status': 'received'}), 200
except Exception as e:
print(f"Error processing webhook: {e}")
return jsonify({'error': str(e)}), 500
def verify_webhook_signature(payload: bytes, signature: str) -> bool:
"""Verify webhook signature using HMAC."""
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f"sha256={expected_signature}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Troubleshooting Common Issues
Issue: 401 Unauthorized Error
Symptoms: API requests return 401 status code
Causes:
- Invalid or expired API key
- Missing Authorization header
- Incorrect header format
Solutions:
# Check API key format
print(f"API Key starts with: {CATALIO_API_KEY[:10]}...")
# Verify header format
headers = {
'Authorization': f'Bearer {CATALIO_API_KEY}', # Correct
# NOT: 'Authorization': CATALIO_API_KEY
}
# Test with simple GET request
response = requests.get(
'https://api.catalio.io/v1/requirements',
headers=headers
)
print(f"Status: {response.status_code}")
print(f"Response: {response.text}")
Issue: 429 Rate Limit Exceeded
Symptoms: API requests return 429 status code
Solutions:
- Implement exponential backoff retry logic
- Respect X-RateLimit-* headers
- Reduce request frequency
- Contact support for rate limit increase
Issue: Webhook Not Receiving Events
Symptoms: Webhook endpoint not being called
Checklist:
- Verify endpoint URL is publicly accessible
- Ensure HTTPS (HTTP not supported)
- Check firewall/security group rules
- Verify webhook is enabled in Catalio settings
- Test endpoint with curl:
curl -X POST https://your-domain.com/webhooks/catalio \
-H "Content-Type: application/json" \
-d '{"event":"test","data":{}}'
Issue: Duplicate Resources Created
Symptoms: Same resource created multiple times
Solutions:
- Always use idempotency keys for create operations
- Check for existing resources before creating
- Implement deduplication logic based on external IDs
Next Steps
After building your custom integration:
- Monitor and Optimize: Use application monitoring to track API usage, error rates, and performance
- Scale Infrastructure: Adjust resources based on integration load
- Automate Deployment: Set up CI/CD pipelines for integration updates
- Document Workflows: Create runbooks for operations and troubleshooting
- Plan for Growth: Design integration to handle increased data volumes
Additional Resources:
- API Reference Documentation
- Integration Examples Repository
- Developer Community Forum
- Support Portal
Happy integrating! If you have questions or need assistance, reach out to our developer support team at developers@catalio.io.