Skip to main content

Python Examples

These examples use the official Ticksupply Python client. For raw HTTP examples, see the cURL page.

Setup

Install the client library:
pip install ticksupply
Create a client instance:
from ticksupply import Client

# Reads TICKSUPPLY_API_KEY from environment
client = Client()

# Or pass the key explicitly
client = Client(api_key="your_api_key")

Catalog operations

List exchanges

exchanges = client.exchanges.list()
for exchange in exchanges:
    print(f"{exchange.code}: {exchange.display_name}")

List instruments

# List all instruments for an exchange
result = client.exchanges.list_instruments("binance")
print(f"Total instruments: {result.total}")
for inst in result.items:
    print(f"  {inst.symbol}: {inst.base}/{inst.quote} ({inst.instrument_type})")

# Search for BTC pairs
result = client.exchanges.list_instruments("binance", search="BTC", limit=10)
for inst in result.items:
    print(f"  {inst.symbol}")

Paginate through instruments

# Manual pagination
page = client.exchanges.list_instruments("binance", limit=100)
all_instruments = list(page.items)

while page.has_next:
    page = client.exchanges.list_instruments(
        "binance", limit=100, page_token=page.next_page_token
    )
    all_instruments.extend(page.items)

print(f"Fetched {len(all_instruments)} instruments")

List datastreams

# Find datastreams for a specific instrument
result = client.exchanges.list_datastreams("binance", "BTCUSDT")
for ds in result.items:
    print(f"  id={ds.datastream_id} {ds.stream_type}/{ds.wire_format}")

# Or use the top-level datastreams resource with filters
result = client.datastreams.list(
    exchange="binance",
    instrument="BTCUSDT",
    stream_type="trades"
)
datastream_id = result.items[0].datastream_id

Subscription operations

Create a subscription

sub = client.subscriptions.create(datastream_id=123)
print(f"Created: {sub.id}, status: {sub.status}")
print(f"Stream: {sub.datastream.exchange}/{sub.datastream.instrument}")

List subscriptions

# Get a single page
page = client.subscriptions.list(limit=20)
print(f"Total subscriptions: {page.total}")
for sub in page.items:
    print(f"  {sub.id}: {sub.status}")

# Auto-paginate through all subscriptions
for sub in client.subscriptions.list_all():
    print(f"  {sub.id}: {sub.status}")

Manage a subscription

sub_id = "sub_550e8400e29b41d4a716446655440000"

# Get details
sub = client.subscriptions.get(sub_id)
print(f"Status: {sub.status}")

# Pause collection
client.subscriptions.pause(sub_id)

# Resume collection
client.subscriptions.resume(sub_id)

# View activity spans
spans = client.subscriptions.list_spans(sub_id)
for span in spans:
    print(f"  {span.started_at} - {span.ended_at or 'ongoing'}")

# Delete subscription
client.subscriptions.delete(sub_id)

Export operations

Create an export

from datetime import datetime, timedelta, timezone

# Export last 24 hours
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)

job = client.exports.create(
    datastream_id=123,
    start_time=start,
    end_time=end,
)
print(f"Export created: {job.id}, status: {job.status}")

Poll for completion and download

import time

export_id = job.id

# Poll until complete
while True:
    job = client.exports.get(export_id)
    print(f"Status: {job.status}")

    if job.status.value == "succeeded":
        break
    elif job.status.value == "failed":
        raise Exception(f"Export failed: {job.reason}")

    time.sleep(5)

# Download artifacts
download = client.exports.get_download(export_id)
print(f"{download.count} file(s), {download.total_bytes:,} bytes total")

import requests

for artifact in download.artifacts:
    print(f"Downloading {artifact.filename}...")
    resp = requests.get(artifact.url)
    with open(artifact.filename, "wb") as f:
        f.write(resp.content)

List exports

page = client.exports.list(limit=10)
for job in page.items:
    print(f"  {job.id}: {job.status} ({job.format})")

Export schema operations

Create a custom schema

schema = client.export_schemas.create(
    name="my_trades",
    stream_category="trade",
    unfold={
        "bybit_linear": {"path": "data"}
    },
    columns=[
        {"output_column": "timestamp_ns", "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
        {"output_column": "price", "data": {
            "binance": {"json": {"path": "data.p", "type": "decimal(18)"}},
            "bybit_linear": {"json": {"path": "data.p", "type": "decimal(18)"}}
        }},
        {"output_column": "side", "data": {
            "binance": {"json": {"path": "data.m", "type": "bool"}, "transform": "CASE WHEN {v} = false THEN 'buy' ELSE 'sell' END"},
            "bybit_linear": {"json": {"path": "data.S", "type": "string"}}
        }}
    ]
)
print(f"Created schema: {schema.id}")

List schemas

schemas = client.export_schemas.list()
for s in schemas:
    print(f"  {s.name} ({s.stream_category}) - {s.id}")

Use a schema in an export

You can reference a schema by name, by ID, or define it inline:
# By name — your custom schema or a built-in (e.g. "normalized", "raw")
job = client.exports.create(123, start, end, schema="my_trades")

# By ID — pins to a specific schema, unaffected by later renames
job = client.exports.create(
    123, start, end,
    schema="sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4",
)

# Inline — one-off mapping, no stored schema needed
job = client.exports.create(
    123, start, end,
    schema={
        "columns": [
            {"output_column": "ts",
             "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
            {"output_column": "price", "data": {
                "binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
            }},
        ],
    },
)
Omit schema= to export raw JSON rows.

Iterate on a schema via draft

Use the draft workflow to evolve a schema without affecting existing exports. Exports snapshot the schema at creation time, so publishing a new version never touches exports that already ran.
schema_id = "sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4"

# Start a draft from the latest published version
client.export_schemas.create_draft(schema_id)

# Replace the draft's contents (repeat as many times as you like)
client.export_schemas.update_draft(
    schema_id,
    columns=[
        {"output_column": "ts",
         "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
        {"output_column": "price", "data": {
            "binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
        }},
    ],
)

# Promote the draft to the next published version
updated = client.export_schemas.publish_draft(schema_id)
print(f"Now at version {updated.version}")

# Or discard without publishing
# client.export_schemas.discard_draft(schema_id)

Delete a schema

client.export_schemas.delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")

Availability

Check data availability

avail = client.availability.get(datastream_id=123)
print(f"Stream: {avail.datastream.exchange}/{avail.datastream.instrument}")
print(f"Updated: {avail.updated_at}")
for r in avail.ranges:
    print(f"  {r.from_ns} - {r.to_ns}: ~{r.rows_estimate:,} rows")

Idempotency

Most mutating operations — create, delete, pause, resume, cancel, publish-draft — accept an optional idempotency_key kwarg. Pass any UUID (v1/v4/v7, up to 128 chars). Retries with the same key return the original result instead of executing the operation twice.
import uuid

key = str(uuid.uuid4())

# If the response is lost (crash, timeout) and you retry with the same key,
# the server returns the original subscription — no duplicate is created.
sub = client.subscriptions.create(datastream_id=123, idempotency_key=key)
The client already retries transient network and 5xx errors internally. Use idempotency_key for retries from your own code — e.g. after a process crash or when resuming a job queue.

Error handling

from ticksupply.exceptions import (
    NotFoundError,
    RateLimitError,
    AuthenticationError,
    ConflictError,
    TicksupplyError,
)

try:
    sub = client.subscriptions.get("sub_00000000000000000000000000000000")
except NotFoundError as e:
    print(f"Not found: {e.message} (code={e.code})")
except RateLimitError as e:
    print(f"Rate limited, retry after {e.retry_after}s")
except AuthenticationError:
    print("Invalid API key")
except TicksupplyError as e:
    # Include e.request_id when reporting issues to support.
    print(f"API error: {e.code} - {e.message} (request_id={e.request_id})")
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business logic errors like NotFoundError and ConflictError.
Every error carries a request_id sourced from the X-Request-Id response header. Include it when reporting issues to support — it uniquely identifies the failed request in our server logs.

Async usage

The AsyncClient provides the same interface for use with asyncio:
import asyncio
from ticksupply import AsyncClient

async def main():
    async with AsyncClient() as client:
        # List exchanges
        exchanges = await client.exchanges.list()
        for ex in exchanges:
            print(f"{ex.code}: {ex.display_name}")

        # Auto-paginate subscriptions
        async for sub in client.subscriptions.list_all():
            print(f"{sub.id}: {sub.status}")

asyncio.run(main())

Complete workflow

Subscribe to a data stream, wait for data, then export and download:
#!/usr/bin/env python3
"""Subscribe to BTCUSDT trades, export, and download."""

import time
from datetime import datetime, timedelta, timezone

import requests as http_requests
from ticksupply import Client

client = Client()

# 1. Find the datastream
result = client.datastreams.list(
    exchange="binance", instrument="BTCUSDT", stream_type="trades"
)
ds = result.items[0]
print(f"Found datastream: {ds.datastream_id}")

# 2. Subscribe
sub = client.subscriptions.create(ds.datastream_id)
print(f"Subscription: {sub.id} ({sub.status})")

# 3. Wait for data collection
print("Collecting data for 60 seconds...")
time.sleep(60)

# 4. Create export
end = datetime.now(timezone.utc)
start = end - timedelta(hours=1)
job = client.exports.create(ds.datastream_id, start, end)
print(f"Export: {job.id} ({job.status})")

# 5. Wait for export
while True:
    job = client.exports.get(job.id)
    if job.status.value == "succeeded":
        break
    elif job.status.value == "failed":
        raise Exception(f"Export failed: {job.reason}")
    print(f"  {job.status}...")
    time.sleep(5)

# 6. Download
download = client.exports.get_download(job.id)
for artifact in download.artifacts:
    print(f"Downloading {artifact.filename}...")
    resp = http_requests.get(artifact.url)
    with open(artifact.filename, "wb") as f:
        f.write(resp.content)

print(f"Done! Downloaded {download.count} file(s)")

# 7. Cleanup
client.subscriptions.delete(sub.id)
client.close()
Last modified on April 20, 2026