Python examples
Complete, runnable Python examples using the requests library. No official Python SDK is required — the REST API is directly usable from any HTTP client.
pip install requestsSetup
import os
import time
import hmac
import hashlib
import requests
BASE_URL = "https://api.voxburst.io/v1"
API_KEY = os.environ["VOXBURST_API_KEY"] # vb_live_…
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}Authentication — health check
r = requests.get(f"{BASE_URL}/health")
assert r.status_code == 200, f"API unreachable: {r.status_code}"
print("API is up")List connected accounts
r = requests.get(f"{BASE_URL}/accounts", headers=HEADERS, params={"status": "ACTIVE"})
r.raise_for_status()
accounts = r.json()["data"]
print(f"Connected accounts: {len(accounts)}")
for acct in accounts:
print(f" {acct['id']} {acct['platform']} @{acct.get('username', '—')}")Upload media and wait for READY
import pathlib
def upload_media(filepath: str) -> str:
"""Upload a local file to VoxBurst and return its mediaId."""
path = pathlib.Path(filepath)
content_type = "image/jpeg" # adjust for video/mp4 etc.
size_bytes = path.stat().st_size
# Step 1 — request a presigned upload URL
r = requests.post(
f"{BASE_URL}/media/upload",
headers=HEADERS,
json={"filename": path.name, "contentType": content_type, "sizeBytes": size_bytes},
)
r.raise_for_status()
resp = r.json()
media_id = resp["mediaId"]
upload_url = resp["uploadUrl"]
upload_hdrs = resp.get("uploadHeaders", {})
# Step 2 — PUT file bytes directly to S3 (no auth header here)
with path.open("rb") as fh:
put = requests.put(upload_url, data=fh, headers={**upload_hdrs, "Content-Type": content_type})
put.raise_for_status()
# Step 3 — poll until READY
for _ in range(30):
m = requests.get(f"{BASE_URL}/media/{media_id}", headers=HEADERS).json()
if m["status"] == "READY":
print(f"Media ready: {media_id}")
return media_id
if m["status"] == "FAILED":
raise RuntimeError(f"Media processing failed: {m.get('validationResults')}")
time.sleep(2)
raise TimeoutError(f"Media {media_id} did not become READY in 60 s")Validate content before posting
r = requests.post(
f"{BASE_URL}/posts/validate",
headers=HEADERS,
json={
"content": "Announcing our new feature 🚀",
"platforms": ["TWITTER", "INSTAGRAM"],
},
)
r.raise_for_status()
result = r.json()
if not result["valid"]:
for platform, pr in result["platforms"].items():
for err in pr.get("errors", []):
print(f"{platform}: {err['code']} — {err['message']}")
raise SystemExit("Validation failed")
print("Content valid on all platforms")Create a scheduled post
import uuid
r = requests.post(
f"{BASE_URL}/posts",
headers={**HEADERS, "Idempotency-Key": str(uuid.uuid4())},
json={
"content": "Announcing our new feature 🚀",
"accountIds": ["acc_twitter_abc", "acc_instagram_abc"],
"contentType": "IMAGE",
"media": ["media_abc1234567890abc12345678"],
"scheduledFor": "2026-07-01T14:00:00Z",
"platformOverrides": {
"TWITTER": {"content": "New feature just shipped 🚀 #voxburst"},
},
"firstComment": "#launch #buildinpublic",
"firstCommentDelay": 60,
},
)
r.raise_for_status()
post = r.json()
print(f"Scheduled: {post['id']} → {post['scheduledFor']}")Always include an Idempotency-Key when creating posts. If the request is retried after a network timeout, the cached response is returned instead of creating a duplicate post.
Publish immediately
post_id = "post_abc123"
r = requests.post(f"{BASE_URL}/posts/{post_id}/publish", headers=HEADERS)
r.raise_for_status()
print("Publish queued")Poll post status until complete
def poll_post(post_id: str, interval: int = 5, timeout: int = 300) -> dict:
"""Poll until status leaves 'publishing'. Returns the final post object."""
deadline = time.time() + timeout
while time.time() < deadline:
r = requests.get(f"{BASE_URL}/posts/{post_id}", headers=HEADERS)
r.raise_for_status()
post = r.json()
if post["status"] != "publishing":
return post
time.sleep(interval)
raise TimeoutError(f"Post {post_id} still publishing after {timeout}s")
post = poll_post("post_abc123")
print(f"Final status: {post['status']}")
for p in post.get("platforms", []):
url = p.get("platformPostUrl") or "—"
print(f" {p['platform']}: {p['status']} {url}")
# Handle partial failure
if post["status"] in ("partial", "failed"):
r = requests.post(
f"{BASE_URL}/posts/{post['id']}/retry",
headers={**HEADERS, "Idempotency-Key": f"retry-{post['id']}-1"},
)
r.raise_for_status()
print("Retry queued")Webhook signature verification
def verify_webhook(payload_bytes: bytes, signature_header: str, secret: str) -> bool:
"""Verify X-VoxBurst-Signature. Returns True if valid."""
parts = dict(p.split("=", 1) for p in signature_header.split(","))
timestamp = parts.get("t")
sig = parts.get("v2")
if not timestamp or not sig:
return False
# Reject replays older than 5 minutes
if abs(time.time() - int(timestamp)) > 300:
return False
signing_string = f"{timestamp}.{payload_bytes.decode('utf-8')}".encode()
expected = hmac.new(secret.encode(), signing_string, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
# In your Flask/FastAPI/Django handler:
#
# @app.post("/webhooks/voxburst")
# def handle_webhook(request):
# sig = request.headers["X-VoxBurst-Signature"]
# if not verify_webhook(request.body, sig, os.environ["WEBHOOK_SECRET"]):
# return Response(status=401)
#
# event = request.json()
# if event["type"] == "post.published":
# status = event["data"]["status"] # "PUBLISHED" or "PARTIAL"
# post_id = event["data"]["id"]
# print(f"Post {post_id} → {status}")
# return Response(status=200)Error handling with retry
def api_post(url: str, body: dict, idempotency_key: str, max_attempts: int = 3) -> dict:
"""POST with idempotency key and automatic retry on 429/5xx."""
for attempt in range(1, max_attempts + 1):
r = requests.post(
url,
headers={**HEADERS, "Idempotency-Key": idempotency_key},
json=body,
)
if r.status_code == 429:
retry_after = int(r.headers.get("Retry-After", 60))
print(f"Rate limited — waiting {retry_after}s (attempt {attempt})")
time.sleep(retry_after)
continue
if r.status_code >= 500 and attempt < max_attempts:
wait = 2 ** attempt
print(f"Server error {r.status_code} — retrying in {wait}s")
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Failed after {max_attempts} attempts")Common error codes
r = requests.post(f"{BASE_URL}/posts", headers=HEADERS, json={...})
if r.status_code == 400:
err = r.json()["error"]
print(f"Bad request: {err['code']} — {err['message']}")
# err['details'] contains field-level info for VALIDATION_ERROR
elif r.status_code == 401:
print("Invalid API key — check VOXBURST_API_KEY")
elif r.status_code == 402:
print("Plan limit reached — upgrade or free up capacity")
elif r.status_code == 409:
err = r.json()["error"]
if err["code"] == "IDEMPOTENCY_CONFLICT":
print("Idempotency key already used with a different body — use a new key")
elif err["code"] == "CONFLICT":
print("Post status conflict — check current status before retrying")
elif r.status_code == 429:
retry_after = r.headers.get("Retry-After", "60")
print(f"Rate limited — retry after {retry_after}s")
elif r.status_code >= 500:
print(f"Server error — retry with backoff: {r.status_code}")Paginate through all posts
def iter_posts(status: str = "published"):
"""Yield every post page-by-page using cursor pagination."""
cursor = None
while True:
params = {"status": status, "limit": 50}
if cursor:
params["cursor"] = cursor
r = requests.get(f"{BASE_URL}/posts", headers=HEADERS, params=params)
r.raise_for_status()
data = r.json()
yield from data["data"]
pagination = data.get("pagination", {})
if not pagination.get("has_more"):
break
cursor = pagination["next_cursor"]
for post in iter_posts("published"):
print(f"{post['id']}: {post['status']} — {post.get('publishedAt', '—')}")Last updated on