Skip to main content

Python Examples

These examples use the official Ticksupply Python client. For raw HTTP examples, see the cURL page.

Setup

Install the client library:
pip install ticksupply
Create a client instance:
from ticksupply import Client

# Reads TICKSUPPLY_API_KEY from environment
client = Client()

# Or pass the key explicitly
client = Client(api_key="your_api_key")

Catalog operations

List exchanges

exchanges = client.exchanges.list()
for exchange in exchanges:
    print(f"{exchange.code}: {exchange.display_name}")

List instruments

# List all instruments for an exchange
result = client.exchanges.list_instruments("binance")
print(f"Total instruments: {result.total}")
for inst in result.items:
    print(f"  {inst.symbol}: {inst.base}/{inst.quote} ({inst.instrument_type})")

# Search for BTC pairs
result = client.exchanges.list_instruments("binance", search="BTC", limit=10)
for inst in result.items:
    print(f"  {inst.symbol}")

Paginate through instruments

# Manual pagination
page = client.exchanges.list_instruments("binance", limit=100)
all_instruments = list(page.items)

while page.has_next:
    page = client.exchanges.list_instruments(
        "binance", limit=100, page_token=page.next_page_token
    )
    all_instruments.extend(page.items)

print(f"Fetched {len(all_instruments)} instruments")

List datastreams

# Find datastreams for a specific instrument
result = client.exchanges.list_datastreams("binance", "BTCUSDT")
for ds in result.items:
    print(f"  id={ds.datastream_id} {ds.stream_type}/{ds.wire_format}")

# Or use the top-level datastreams resource with filters
result = client.datastreams.list(
    exchange="binance",
    instrument="BTCUSDT",
    stream_type="trades"
)
datastream_id = result.items[0].datastream_id

Subscription operations

Create a subscription

sub = client.subscriptions.create(datastream_id=123)
print(f"Created: {sub.id}, status: {sub.status}")
print(f"Stream: {sub.datastream.exchange}/{sub.datastream.instrument}")

List subscriptions

# Get a single page
page = client.subscriptions.list(limit=20)
print(f"Total subscriptions: {page.total}")
for sub in page.items:
    print(f"  {sub.id}: {sub.status}")

# Auto-paginate through all subscriptions
for sub in client.subscriptions.list_all():
    print(f"  {sub.id}: {sub.status}")

Manage a subscription

sub_id = "sub_550e8400e29b41d4a716446655440000"

# Get details
sub = client.subscriptions.get(sub_id)
print(f"Status: {sub.status}")

# Pause collection
client.subscriptions.pause(sub_id)

# Resume collection
client.subscriptions.resume(sub_id)

# View activity spans
spans = client.subscriptions.list_spans(sub_id)
for span in spans:
    print(f"  {span.started_at} - {span.ended_at or 'ongoing'}")

# Delete subscription
client.subscriptions.delete(sub_id)

Export operations

Create an export

from datetime import datetime, timedelta, timezone

# Export last 24 hours
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)

job = client.exports.create(
    datastream_id=123,
    start_time=start,
    end_time=end,
)
print(f"Export created: {job.id}, status: {job.status}")

Poll for completion and download

import time

export_id = job.id

# Poll until complete
while True:
    job = client.exports.get(export_id)
    print(f"Status: {job.status}")

    if job.status.value == "succeeded":
        break
    elif job.status.value == "failed":
        raise Exception(f"Export failed: {job.reason}")

    time.sleep(5)

# Download artifacts
download = client.exports.get_download(export_id)
print(f"{download.count} file(s), {download.total_bytes:,} bytes total")

import requests

for artifact in download.artifacts:
    print(f"Downloading {artifact.filename}...")
    resp = requests.get(artifact.url)
    with open(artifact.filename, "wb") as f:
        f.write(resp.content)

List exports

page = client.exports.list(limit=10)
for job in page.items:
    print(f"  {job.id}: {job.status} ({job.format})")

Export schema operations

Create a custom schema

schema = client.export_schemas.create(
    name="my_trades",
    stream_type="trade",
    unfold={
        "bybit_linear": {"path": "data"}
    },
    columns=[
        {"output_column": "timestamp_ns", "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
        {"output_column": "price", "data": {
            "binance": {"json": {"path": "p", "type": "decimal(18)"}},
            "bybit_linear": {"json": {"path": "p", "type": "decimal(18)"}}
        }},
        {"output_column": "side", "data": {
            "binance": {"json": {"path": "m", "type": "bool"}, "transform": "CASE WHEN {v} THEN 'sell' ELSE 'buy' END"},
            "bybit_linear": {"json": {"path": "S", "type": "string"}}
        }}
    ]
)
print(f"Created schema: {schema.id}")

List schemas

schemas = client.export_schemas.list()
for s in schemas:
    print(f"  {s.name} ({s.stream_type}) - {s.id}")

Use a schema in an export

job = client.exports.create(
    datastream_id=123,
    start_time=start,
    end_time=end,
    schema="my_trades",
)

Delete a schema

client.export_schemas.delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")

Availability

Check data availability

avail = client.availability.get(datastream_id=123)
print(f"Stream: {avail.datastream.exchange}/{avail.datastream.instrument}")
print(f"Updated: {avail.updated_at}")
for r in avail.ranges:
    print(f"  {r.from_ns} - {r.to_ns}: ~{r.rows_estimate:,} rows")

Error handling

from ticksupply.exceptions import (
    NotFoundError,
    RateLimitError,
    AuthenticationError,
    ConflictError,
    TicksupplyError,
)

try:
    sub = client.subscriptions.get("sub_00000000000000000000000000000000")
except NotFoundError as e:
    print(f"Not found: {e.message} (code={e.code})")
except RateLimitError as e:
    print(f"Rate limited, retry after {e.retry_after}s")
except AuthenticationError:
    print("Invalid API key")
except TicksupplyError as e:
    print(f"API error: {e.code} - {e.message}")
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business logic errors like NotFoundError and ConflictError.

Async usage

The AsyncClient provides the same interface for use with asyncio:
import asyncio
from ticksupply import AsyncClient

async def main():
    async with AsyncClient() as client:
        # List exchanges
        exchanges = await client.exchanges.list()
        for ex in exchanges:
            print(f"{ex.code}: {ex.display_name}")

        # Auto-paginate subscriptions
        async for sub in client.subscriptions.list_all():
            print(f"{sub.id}: {sub.status}")

asyncio.run(main())

Complete workflow

Subscribe to a data stream, wait for data, then export and download:
#!/usr/bin/env python3
"""Subscribe to BTCUSDT trades, export, and download."""

import time
from datetime import datetime, timedelta, timezone

import requests as http_requests
from ticksupply import Client

client = Client()

# 1. Find the datastream
result = client.datastreams.list(
    exchange="binance", instrument="BTCUSDT", stream_type="trades"
)
ds = result.items[0]
print(f"Found datastream: {ds.datastream_id}")

# 2. Subscribe
sub = client.subscriptions.create(ds.datastream_id)
print(f"Subscription: {sub.id} ({sub.status})")

# 3. Wait for data collection
print("Collecting data for 60 seconds...")
time.sleep(60)

# 4. Create export
end = datetime.now(timezone.utc)
start = end - timedelta(hours=1)
job = client.exports.create(ds.datastream_id, start, end)
print(f"Export: {job.id} ({job.status})")

# 5. Wait for export
while True:
    job = client.exports.get(job.id)
    if job.status.value == "succeeded":
        break
    elif job.status.value == "failed":
        raise Exception(f"Export failed: {job.reason}")
    print(f"  {job.status}...")
    time.sleep(5)

# 6. Download
download = client.exports.get_download(job.id)
for artifact in download.artifacts:
    print(f"Downloading {artifact.filename}...")
    resp = http_requests.get(artifact.url)
    with open(artifact.filename, "wb") as f:
        f.write(resp.content)

print(f"Done! Downloaded {download.count} file(s)")

# 7. Cleanup
client.subscriptions.delete(sub.id)
client.close()
Last modified on April 5, 2026