Python Examples
These examples use the official Ticksupply Python client. For raw HTTP examples, see the cURL page.Setup
Install the client library:pip install ticksupply
from ticksupply import Client
# Reads TICKSUPPLY_API_KEY from environment
client = Client()
# Or pass the key explicitly
client = Client(api_key="your_api_key")
Catalog operations
List exchanges
exchanges = client.exchanges.list()
for exchange in exchanges:
print(f"{exchange.code}: {exchange.display_name}")
List instruments
# List all instruments for an exchange
result = client.exchanges.list_instruments("binance")
print(f"Total instruments: {result.total}")
for inst in result.items:
print(f" {inst.symbol}: {inst.base}/{inst.quote} ({inst.instrument_type})")
# Search for BTC pairs
result = client.exchanges.list_instruments("binance", search="BTC", limit=10)
for inst in result.items:
print(f" {inst.symbol}")
Paginate through instruments
# Manual pagination
page = client.exchanges.list_instruments("binance", limit=100)
all_instruments = list(page.items)
while page.has_next:
page = client.exchanges.list_instruments(
"binance", limit=100, page_token=page.next_page_token
)
all_instruments.extend(page.items)
print(f"Fetched {len(all_instruments)} instruments")
List datastreams
# Find datastreams for a specific instrument
result = client.exchanges.list_datastreams("binance", "BTCUSDT")
for ds in result.items:
print(f" id={ds.datastream_id} {ds.stream_type}/{ds.wire_format}")
# Or use the top-level datastreams resource with filters
result = client.datastreams.list(
exchange="binance",
instrument="BTCUSDT",
stream_type="trades"
)
datastream_id = result.items[0].datastream_id
Subscription operations
Create a subscription
sub = client.subscriptions.create(datastream_id=123)
print(f"Created: {sub.id}, status: {sub.status}")
print(f"Stream: {sub.datastream.exchange}/{sub.datastream.instrument}")
List subscriptions
# Get a single page
page = client.subscriptions.list(limit=20)
print(f"Total subscriptions: {page.total}")
for sub in page.items:
print(f" {sub.id}: {sub.status}")
# Auto-paginate through all subscriptions
for sub in client.subscriptions.list_all():
print(f" {sub.id}: {sub.status}")
Manage a subscription
sub_id = "sub_550e8400e29b41d4a716446655440000"
# Get details
sub = client.subscriptions.get(sub_id)
print(f"Status: {sub.status}")
# Pause collection
client.subscriptions.pause(sub_id)
# Resume collection
client.subscriptions.resume(sub_id)
# View activity spans
spans = client.subscriptions.list_spans(sub_id)
for span in spans:
print(f" {span.started_at} - {span.ended_at or 'ongoing'}")
# Delete subscription
client.subscriptions.delete(sub_id)
Export operations
Create an export
from datetime import datetime, timedelta, timezone
# Export last 24 hours
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)
job = client.exports.create(
datastream_id=123,
start_time=start,
end_time=end,
)
print(f"Export created: {job.id}, status: {job.status}")
Poll for completion and download
import time
export_id = job.id
# Poll until complete
while True:
job = client.exports.get(export_id)
print(f"Status: {job.status}")
if job.status.value == "succeeded":
break
elif job.status.value == "failed":
raise Exception(f"Export failed: {job.reason}")
time.sleep(5)
# Download artifacts
download = client.exports.get_download(export_id)
print(f"{download.count} file(s), {download.total_bytes:,} bytes total")
import requests
for artifact in download.artifacts:
print(f"Downloading {artifact.filename}...")
resp = requests.get(artifact.url)
with open(artifact.filename, "wb") as f:
f.write(resp.content)
List exports
page = client.exports.list(limit=10)
for job in page.items:
print(f" {job.id}: {job.status} ({job.format})")
Export schema operations
Create a custom schema
schema = client.export_schemas.create(
name="my_trades",
stream_type="trade",
unfold={
"bybit_linear": {"path": "data"}
},
columns=[
{"output_column": "timestamp_ns", "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
{"output_column": "price", "data": {
"binance": {"json": {"path": "p", "type": "decimal(18)"}},
"bybit_linear": {"json": {"path": "p", "type": "decimal(18)"}}
}},
{"output_column": "side", "data": {
"binance": {"json": {"path": "m", "type": "bool"}, "transform": "CASE WHEN {v} THEN 'sell' ELSE 'buy' END"},
"bybit_linear": {"json": {"path": "S", "type": "string"}}
}}
]
)
print(f"Created schema: {schema.id}")
List schemas
schemas = client.export_schemas.list()
for s in schemas:
print(f" {s.name} ({s.stream_type}) - {s.id}")
Use a schema in an export
job = client.exports.create(
datastream_id=123,
start_time=start,
end_time=end,
schema="my_trades",
)
Delete a schema
client.export_schemas.delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")
Availability
Check data availability
avail = client.availability.get(datastream_id=123)
print(f"Stream: {avail.datastream.exchange}/{avail.datastream.instrument}")
print(f"Updated: {avail.updated_at}")
for r in avail.ranges:
print(f" {r.from_ns} - {r.to_ns}: ~{r.rows_estimate:,} rows")
Error handling
from ticksupply.exceptions import (
NotFoundError,
RateLimitError,
AuthenticationError,
ConflictError,
TicksupplyError,
)
try:
sub = client.subscriptions.get("sub_00000000000000000000000000000000")
except NotFoundError as e:
print(f"Not found: {e.message} (code={e.code})")
except RateLimitError as e:
print(f"Rate limited, retry after {e.retry_after}s")
except AuthenticationError:
print("Invalid API key")
except TicksupplyError as e:
print(f"API error: {e.code} - {e.message}")
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business logic errors like
NotFoundError and ConflictError.Async usage
TheAsyncClient provides the same interface for use with asyncio:
import asyncio
from ticksupply import AsyncClient
async def main():
async with AsyncClient() as client:
# List exchanges
exchanges = await client.exchanges.list()
for ex in exchanges:
print(f"{ex.code}: {ex.display_name}")
# Auto-paginate subscriptions
async for sub in client.subscriptions.list_all():
print(f"{sub.id}: {sub.status}")
asyncio.run(main())
Complete workflow
Subscribe to a data stream, wait for data, then export and download:#!/usr/bin/env python3
"""Subscribe to BTCUSDT trades, export, and download."""
import time
from datetime import datetime, timedelta, timezone
import requests as http_requests
from ticksupply import Client
client = Client()
# 1. Find the datastream
result = client.datastreams.list(
exchange="binance", instrument="BTCUSDT", stream_type="trades"
)
ds = result.items[0]
print(f"Found datastream: {ds.datastream_id}")
# 2. Subscribe
sub = client.subscriptions.create(ds.datastream_id)
print(f"Subscription: {sub.id} ({sub.status})")
# 3. Wait for data collection
print("Collecting data for 60 seconds...")
time.sleep(60)
# 4. Create export
end = datetime.now(timezone.utc)
start = end - timedelta(hours=1)
job = client.exports.create(ds.datastream_id, start, end)
print(f"Export: {job.id} ({job.status})")
# 5. Wait for export
while True:
job = client.exports.get(job.id)
if job.status.value == "succeeded":
break
elif job.status.value == "failed":
raise Exception(f"Export failed: {job.reason}")
print(f" {job.status}...")
time.sleep(5)
# 6. Download
download = client.exports.get_download(job.id)
for artifact in download.artifacts:
print(f"Downloading {artifact.filename}...")
resp = http_requests.get(artifact.url)
with open(artifact.filename, "wb") as f:
f.write(resp.content)
print(f"Done! Downloaded {download.count} file(s)")
# 7. Cleanup
client.subscriptions.delete(sub.id)
client.close()