Skip to main content

Python Examples

This page provides Python examples using the requests library for all major API operations.

Setup

Install the required library:
pip install requests
Set up your API client:
import os
import time
import requests

API_KEY = os.environ["TICKSUPPLY_API_KEY"]
BASE_URL = "https://api.ticksupply.com"

def api_request(method, path, **kwargs):
    """Make an authenticated API request."""
    headers = kwargs.pop("headers", {})
    headers["X-Api-Key"] = API_KEY
    
    response = requests.request(
        method,
        f"{BASE_URL}{path}",
        headers=headers,
        **kwargs
    )
    
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 30))
        print(f"Rate limited. Waiting {retry_after}s...")
        time.sleep(retry_after)
        return api_request(method, path, headers=headers, **kwargs)
    
    response.raise_for_status()
    
    if response.status_code == 204:
        return None
    return response.json()

Catalog operations

List exchanges

def list_exchanges():
    """Get all available exchanges."""
    return api_request("GET", "/v1/exchanges")

# Usage
exchanges = list_exchanges()
for exchange in exchanges:
    print(f"{exchange['code']}: {exchange['display_name']}")

List instruments

def list_instruments(exchange, search=None, limit=None):
    """Get instruments for an exchange."""
    params = {}
    if search:
        params["search"] = search
    if limit:
        params["limit"] = limit
    
    return api_request("GET", f"/v1/exchanges/{exchange}/instruments", params=params)

# Usage
result = list_instruments("binance", search="BTC", limit=10)
for pair in result["items"]:
    print(f"{pair['symbol']}: {pair['base']}/{pair['quote']}")

List datastreams

def get_datastreams(exchange, instrument, stream_type):
    """Get available datastreams."""
    params = {
        "exchange": exchange,
        "instrument": instrument,
        "stream_type": stream_type
    }
    return api_request("GET", "/v1/datastreams", params=params)

# Usage
result = get_datastreams("binance", "BTCUSDT", "trade")
for ds in result["items"]:
    print(f"Datastream {ds['datastream_id']}: {ds['wire_format']}")

Subscription operations

Create a subscription

import uuid

def create_subscription(datastream_id, idempotency_key=None):
    """Create a new subscription."""
    payload = {"datastream_id": datastream_id}

    headers = {"Content-Type": "application/json"}
    if idempotency_key:
        headers["Idempotency-Key"] = idempotency_key

    return api_request("POST", "/v1/subscriptions", json=payload, headers=headers)

# Usage
subscription = create_subscription(
    datastream_id=123,
    idempotency_key=str(uuid.uuid4())
)
print(f"Created subscription: {subscription['id']}")
print(f"Status: {subscription['status']}")

List subscriptions with pagination

def list_subscriptions(limit=50):
    """List all subscriptions using pagination."""
    all_subscriptions = []
    page_token = None
    
    while True:
        params = {"limit": limit}
        if page_token:
            params["page_token"] = page_token
        
        result = api_request("GET", "/v1/subscriptions", params=params)
        all_subscriptions.extend(result["items"])
        
        print(f"Fetched {len(all_subscriptions)}/{result['total']}")
        
        page_token = result.get("next_page_token")
        if not page_token:
            break
    
    return all_subscriptions

# Usage
subscriptions = list_subscriptions()
for sub in subscriptions:
    print(f"{sub['id']}: {sub['status']}")

Subscription lifecycle management

def get_subscription(subscription_id):
    """Get subscription details."""
    return api_request("GET", f"/v1/subscriptions/{subscription_id}")

def pause_subscription(subscription_id):
    """Pause a subscription."""
    api_request("POST", f"/v1/subscriptions/{subscription_id}/pause")
    print(f"Paused subscription: {subscription_id}")

def resume_subscription(subscription_id):
    """Resume a subscription."""
    api_request("POST", f"/v1/subscriptions/{subscription_id}/resume")
    print(f"Resumed subscription: {subscription_id}")

def delete_subscription(subscription_id, idempotency_key=None):
    """Delete a subscription."""
    headers = {}
    if idempotency_key:
        headers["Idempotency-Key"] = idempotency_key
    
    api_request("DELETE", f"/v1/subscriptions/{subscription_id}", headers=headers)
    print(f"Deleted subscription: {subscription_id}")

def get_subscription_spans(subscription_id):
    """Get subscription spans."""
    return api_request("GET", f"/v1/subscriptions/{subscription_id}/spans")

# Usage
sub_id = "sub_550e8400e29b41d4a716446655440000"

# Get details
sub = get_subscription(sub_id)
print(f"Status: {sub['status']}")

# Pause
pause_subscription(sub_id)

# Check spans
spans = get_subscription_spans(sub_id)
for span in spans:
    print(f"Span: {span['started_at']} - {span['ended_at'] or 'ongoing'}")

# Resume
resume_subscription(sub_id)

Export operations

Create an export

def create_export(datastream_id, start_time_ns, end_time_ns, idempotency_key=None):
    """Create an export job."""
    payload = {
        "datastream_id": datastream_id,
        "start_time": str(start_time_ns),
        "end_time": str(end_time_ns)
    }

    headers = {"Content-Type": "application/json"}
    if idempotency_key:
        headers["Idempotency-Key"] = idempotency_key

    return api_request("POST", "/v1/exports", json=payload, headers=headers)

# Usage - export last 24 hours
now_ns = int(time.time() * 1e9)
day_ago_ns = now_ns - int(86400 * 1e9)

export = create_export(
    datastream_id=123,
    start_time_ns=day_ago_ns,
    end_time_ns=now_ns,
    idempotency_key=str(uuid.uuid4())
)
print(f"Created export: {export['id']}")

Wait for export completion

def wait_for_export(export_id, poll_interval=5, timeout=300):
    """Wait for export to complete."""
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        result = api_request("GET", f"/v1/exports/{export_id}")
        status = result["status"]
        
        print(f"Status: {status}")
        
        if status == "succeeded":
            return result
        elif status == "failed":
            raise Exception(f"Export failed: {result.get('reason', 'Unknown error')}")
        
        time.sleep(poll_interval)
    
    raise TimeoutError(f"Export did not complete within {timeout} seconds")

# Usage
completed_export = wait_for_export(export["id"])
print(f"Export completed at: {completed_export['finished_at']}")

Download export

def download_export(export_id, output_dir="."):
    """Download all artifacts from an export."""
    # Get presigned URLs for all artifacts
    result = api_request("GET", f"/v1/exports/{export_id}/download")

    print(f"Downloading {result['count']} file(s), {result['total_bytes']:,} bytes total...")

    downloaded_files = []
    for artifact in result["artifacts"]:
        output_path = f"{output_dir}/{artifact['filename']}"
        print(f"  Downloading {artifact['filename']} ({artifact['bytes']:,} bytes)...")

        response = requests.get(artifact["url"], stream=True)
        response.raise_for_status()

        with open(output_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        downloaded_files.append(output_path)

    print(f"Downloaded {len(downloaded_files)} file(s)")
    return downloaded_files

# Usage
files = download_export(export["id"], output_dir="./exports")

Availability

Check data availability

def get_availability(datastream_id):
    """Get data availability for a datastream."""
    return api_request("GET", "/v1/availability", params={"datastream_id": datastream_id})

# Usage
availability = get_availability(123)
print(f"Datastream ID: {availability['datastream']['datastream_id']}")
print(f"Last updated: {availability['updated_at']}")
for range_info in availability["ranges"]:
    print(f"  {range_info['from']} - {range_info['to']}: ~{range_info['rows_estimate']:,} rows")

Complete workflow example

#!/usr/bin/env python3
"""Complete example: subscribe, wait, export, download."""

import os
import time
import uuid
import requests

API_KEY = os.environ["TICKSUPPLY_API_KEY"]
BASE_URL = "https://api.ticksupply.com"

def api_request(method, path, **kwargs):
    headers = kwargs.pop("headers", {})
    headers["X-Api-Key"] = API_KEY
    response = requests.request(method, f"{BASE_URL}{path}", headers=headers, **kwargs)

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 30))
        time.sleep(retry_after)
        return api_request(method, path, headers=headers, **kwargs)

    response.raise_for_status()
    return response.json() if response.status_code != 204 else None

def main():
    # 1. Find BTCUSDT trade stream
    print("Finding BTCUSDT trade stream...")
    result = api_request("GET", "/v1/datastreams", params={
        "exchange": "binance",
        "instrument": "BTCUSDT",
        "stream_type": "trade"
    })
    datastream_id = result["items"][0]["datastream_id"]
    print(f"Found datastream ID: {datastream_id}")

    # 2. Create subscription
    print("Creating subscription...")
    subscription = api_request("POST", "/v1/subscriptions",
        json={"datastream_id": datastream_id},
        headers={
            "Content-Type": "application/json",
            "Idempotency-Key": str(uuid.uuid4())
        }
    )
    print(f"Subscription created: {subscription['id']}")

    # 3. Wait for data collection
    print("Waiting 60 seconds for data collection...")
    time.sleep(60)

    # 4. Create export
    now_ns = int(time.time() * 1e9)
    hour_ago_ns = now_ns - int(3600 * 1e9)

    print("Creating export...")
    export = api_request("POST", "/v1/exports",
        json={
            "datastream_id": datastream_id,
            "start_time": str(hour_ago_ns),
            "end_time": str(now_ns)
        },
        headers={
            "Content-Type": "application/json",
            "Idempotency-Key": str(uuid.uuid4())
        }
    )
    print(f"Export created: {export['id']}")

    # 5. Wait for completion
    print("Waiting for export to complete...")
    while True:
        status = api_request("GET", f"/v1/exports/{export['id']}")["status"]
        print(f"Status: {status}")

        if status == "succeeded":
            break
        elif status == "failed":
            raise Exception("Export failed!")

        time.sleep(5)

    # 6. Download all artifacts
    print("Downloading...")
    download_result = api_request("GET", f"/v1/exports/{export['id']}/download")

    for artifact in download_result["artifacts"]:
        print(f"  Downloading {artifact['filename']}...")
        response = requests.get(artifact["url"])
        with open(artifact["filename"], "wb") as f:
            f.write(response.content)

    print(f"Downloaded {download_result['count']} file(s)")

if __name__ == "__main__":
    main()