Skip to Content
GuidesPython examples

Python examples

Complete, runnable Python examples using the requests library. No official Python SDK is required — the REST API is directly usable from any HTTP client.

pip install requests

Setup

import os import time import hmac import hashlib import requests BASE_URL = "https://api.voxburst.io/v1" API_KEY = os.environ["VOXBURST_API_KEY"] # vb_live_… HEADERS = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json", }

Authentication — health check

r = requests.get(f"{BASE_URL}/health") assert r.status_code == 200, f"API unreachable: {r.status_code}" print("API is up")

List connected accounts

r = requests.get(f"{BASE_URL}/accounts", headers=HEADERS, params={"status": "ACTIVE"}) r.raise_for_status() accounts = r.json()["data"] print(f"Connected accounts: {len(accounts)}") for acct in accounts: print(f" {acct['id']} {acct['platform']} @{acct.get('username', '—')}")

Upload media and wait for READY

import pathlib def upload_media(filepath: str) -> str: """Upload a local file to VoxBurst and return its mediaId.""" path = pathlib.Path(filepath) content_type = "image/jpeg" # adjust for video/mp4 etc. size_bytes = path.stat().st_size # Step 1 — request a presigned upload URL r = requests.post( f"{BASE_URL}/media/upload", headers=HEADERS, json={"filename": path.name, "contentType": content_type, "sizeBytes": size_bytes}, ) r.raise_for_status() resp = r.json() media_id = resp["mediaId"] upload_url = resp["uploadUrl"] upload_hdrs = resp.get("uploadHeaders", {}) # Step 2 — PUT file bytes directly to S3 (no auth header here) with path.open("rb") as fh: put = requests.put(upload_url, data=fh, headers={**upload_hdrs, "Content-Type": content_type}) put.raise_for_status() # Step 3 — poll until READY for _ in range(30): m = requests.get(f"{BASE_URL}/media/{media_id}", headers=HEADERS).json() if m["status"] == "READY": print(f"Media ready: {media_id}") return media_id if m["status"] == "FAILED": raise RuntimeError(f"Media processing failed: {m.get('validationResults')}") time.sleep(2) raise TimeoutError(f"Media {media_id} did not become READY in 60 s")

Validate content before posting

r = requests.post( f"{BASE_URL}/posts/validate", headers=HEADERS, json={ "content": "Announcing our new feature 🚀", "platforms": ["TWITTER", "INSTAGRAM"], }, ) r.raise_for_status() result = r.json() if not result["valid"]: for platform, pr in result["platforms"].items(): for err in pr.get("errors", []): print(f"{platform}: {err['code']}{err['message']}") raise SystemExit("Validation failed") print("Content valid on all platforms")

Create a scheduled post

import uuid r = requests.post( f"{BASE_URL}/posts", headers={**HEADERS, "Idempotency-Key": str(uuid.uuid4())}, json={ "content": "Announcing our new feature 🚀", "accountIds": ["acc_twitter_abc", "acc_instagram_abc"], "contentType": "IMAGE", "media": ["media_abc1234567890abc12345678"], "scheduledFor": "2026-07-01T14:00:00Z", "platformOverrides": { "TWITTER": {"content": "New feature just shipped 🚀 #voxburst"}, }, "firstComment": "#launch #buildinpublic", "firstCommentDelay": 60, }, ) r.raise_for_status() post = r.json() print(f"Scheduled: {post['id']}{post['scheduledFor']}")

Always include an Idempotency-Key when creating posts. If the request is retried after a network timeout, the cached response is returned instead of creating a duplicate post.


Publish immediately

post_id = "post_abc123" r = requests.post(f"{BASE_URL}/posts/{post_id}/publish", headers=HEADERS) r.raise_for_status() print("Publish queued")

Poll post status until complete

def poll_post(post_id: str, interval: int = 5, timeout: int = 300) -> dict: """Poll until status leaves 'publishing'. Returns the final post object.""" deadline = time.time() + timeout while time.time() < deadline: r = requests.get(f"{BASE_URL}/posts/{post_id}", headers=HEADERS) r.raise_for_status() post = r.json() if post["status"] != "publishing": return post time.sleep(interval) raise TimeoutError(f"Post {post_id} still publishing after {timeout}s") post = poll_post("post_abc123") print(f"Final status: {post['status']}") for p in post.get("platforms", []): url = p.get("platformPostUrl") or "—" print(f" {p['platform']}: {p['status']} {url}") # Handle partial failure if post["status"] in ("partial", "failed"): r = requests.post( f"{BASE_URL}/posts/{post['id']}/retry", headers={**HEADERS, "Idempotency-Key": f"retry-{post['id']}-1"}, ) r.raise_for_status() print("Retry queued")

Webhook signature verification

def verify_webhook(payload_bytes: bytes, signature_header: str, secret: str) -> bool: """Verify X-VoxBurst-Signature. Returns True if valid.""" parts = dict(p.split("=", 1) for p in signature_header.split(",")) timestamp = parts.get("t") sig = parts.get("v2") if not timestamp or not sig: return False # Reject replays older than 5 minutes if abs(time.time() - int(timestamp)) > 300: return False signing_string = f"{timestamp}.{payload_bytes.decode('utf-8')}".encode() expected = hmac.new(secret.encode(), signing_string, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, sig) # In your Flask/FastAPI/Django handler: # # @app.post("/webhooks/voxburst") # def handle_webhook(request): # sig = request.headers["X-VoxBurst-Signature"] # if not verify_webhook(request.body, sig, os.environ["WEBHOOK_SECRET"]): # return Response(status=401) # # event = request.json() # if event["type"] == "post.published": # status = event["data"]["status"] # "PUBLISHED" or "PARTIAL" # post_id = event["data"]["id"] # print(f"Post {post_id} → {status}") # return Response(status=200)

Error handling with retry

def api_post(url: str, body: dict, idempotency_key: str, max_attempts: int = 3) -> dict: """POST with idempotency key and automatic retry on 429/5xx.""" for attempt in range(1, max_attempts + 1): r = requests.post( url, headers={**HEADERS, "Idempotency-Key": idempotency_key}, json=body, ) if r.status_code == 429: retry_after = int(r.headers.get("Retry-After", 60)) print(f"Rate limited — waiting {retry_after}s (attempt {attempt})") time.sleep(retry_after) continue if r.status_code >= 500 and attempt < max_attempts: wait = 2 ** attempt print(f"Server error {r.status_code} — retrying in {wait}s") time.sleep(wait) continue r.raise_for_status() return r.json() raise RuntimeError(f"Failed after {max_attempts} attempts")

Common error codes

r = requests.post(f"{BASE_URL}/posts", headers=HEADERS, json={...}) if r.status_code == 400: err = r.json()["error"] print(f"Bad request: {err['code']}{err['message']}") # err['details'] contains field-level info for VALIDATION_ERROR elif r.status_code == 401: print("Invalid API key — check VOXBURST_API_KEY") elif r.status_code == 402: print("Plan limit reached — upgrade or free up capacity") elif r.status_code == 409: err = r.json()["error"] if err["code"] == "IDEMPOTENCY_CONFLICT": print("Idempotency key already used with a different body — use a new key") elif err["code"] == "CONFLICT": print("Post status conflict — check current status before retrying") elif r.status_code == 429: retry_after = r.headers.get("Retry-After", "60") print(f"Rate limited — retry after {retry_after}s") elif r.status_code >= 500: print(f"Server error — retry with backoff: {r.status_code}")

Paginate through all posts

def iter_posts(status: str = "published"): """Yield every post page-by-page using cursor pagination.""" cursor = None while True: params = {"status": status, "limit": 50} if cursor: params["cursor"] = cursor r = requests.get(f"{BASE_URL}/posts", headers=HEADERS, params=params) r.raise_for_status() data = r.json() yield from data["data"] pagination = data.get("pagination", {}) if not pagination.get("has_more"): break cursor = pagination["next_cursor"] for post in iter_posts("published"): print(f"{post['id']}: {post['status']}{post.get('publishedAt', '—')}")
Last updated on