Python Examples
These examples use the official Ticksupply Python client. For raw HTTP examples, see the cURL page.
Setup
Install the client library:
Create a client instance:
from ticksupply import Client
# Reads TICKSUPPLY_API_KEY from environment
client = Client()
# Or pass the key explicitly
client = Client(api_key="your_api_key")
Catalog operations
List exchanges
exchanges = client.exchanges.list()
for exchange in exchanges:
print(f"{exchange.code}: {exchange.display_name}")
List instruments
# List all instruments for an exchange
result = client.exchanges.list_instruments("binance")
print(f"Total instruments: {result.total}")
for inst in result.items:
print(f" {inst.symbol}: {inst.base}/{inst.quote} ({inst.instrument_type})")
# Search for BTC pairs
result = client.exchanges.list_instruments("binance", search="BTC", limit=10)
for inst in result.items:
print(f" {inst.symbol}")
Paginate through instruments
# Manual pagination
page = client.exchanges.list_instruments("binance", limit=100)
all_instruments = list(page.items)
while page.has_next:
page = client.exchanges.list_instruments(
"binance", limit=100, page_token=page.next_page_token
)
all_instruments.extend(page.items)
print(f"Fetched {len(all_instruments)} instruments")
List datastreams
# Find datastreams for a specific instrument
result = client.exchanges.list_datastreams("binance", "BTCUSDT")
for ds in result.items:
print(f" id={ds.datastream_id} {ds.stream_type}/{ds.wire_format}")
# Or use the top-level datastreams resource with filters
result = client.datastreams.list(
exchange="binance",
instrument="BTCUSDT",
stream_type="trades"
)
datastream_id = result.items[0].datastream_id
Subscription operations
Create a subscription
sub = client.subscriptions.create(datastream_id=123)
print(f"Created: {sub.id}, status: {sub.status}")
print(f"Stream: {sub.datastream.exchange}/{sub.datastream.instrument}")
List subscriptions
# Get a single page
page = client.subscriptions.list(limit=20)
print(f"Total subscriptions: {page.total}")
for sub in page.items:
print(f" {sub.id}: {sub.status}")
# Auto-paginate through all subscriptions
for sub in client.subscriptions.list_all():
print(f" {sub.id}: {sub.status}")
Manage a subscription
sub_id = "sub_550e8400e29b41d4a716446655440000"
# Get details
sub = client.subscriptions.get(sub_id)
print(f"Status: {sub.status}")
# Pause collection
client.subscriptions.pause(sub_id)
# Resume collection
client.subscriptions.resume(sub_id)
# View activity spans
spans = client.subscriptions.list_spans(sub_id)
for span in spans:
print(f" {span.started_at} - {span.ended_at or 'ongoing'}")
# Delete subscription
client.subscriptions.delete(sub_id)
Export operations
Create an export
from datetime import datetime, timedelta, timezone
# Export last 24 hours
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)
job = client.exports.create(
datastream_id=123,
start_time=start,
end_time=end,
)
print(f"Export created: {job.id}, status: {job.status}")
Poll for completion and download
import time
export_id = job.id
# Poll until complete
while True:
job = client.exports.get(export_id)
print(f"Status: {job.status}")
if job.status.value == "succeeded":
break
elif job.status.value == "failed":
raise Exception(f"Export failed: {job.reason}")
time.sleep(5)
# Download artifacts
download = client.exports.get_download(export_id)
print(f"{download.count} file(s), {download.total_bytes:,} bytes total")
import requests
for artifact in download.artifacts:
print(f"Downloading {artifact.filename}...")
resp = requests.get(artifact.url)
with open(artifact.filename, "wb") as f:
f.write(resp.content)
List exports
page = client.exports.list(limit=10)
for job in page.items:
print(f" {job.id}: {job.status} ({job.format})")
Export schema operations
Create a custom schema
schema = client.export_schemas.create(
name="my_trades",
stream_category="trade",
unfold={
"bybit_linear": {"path": "data"}
},
columns=[
{"output_column": "timestamp_ns", "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
{"output_column": "price", "data": {
"binance": {"json": {"path": "data.p", "type": "decimal(18)"}},
"bybit_linear": {"json": {"path": "data.p", "type": "decimal(18)"}}
}},
{"output_column": "side", "data": {
"binance": {"json": {"path": "data.m", "type": "bool"}, "transform": "CASE WHEN {v} = false THEN 'buy' ELSE 'sell' END"},
"bybit_linear": {"json": {"path": "data.S", "type": "string"}}
}}
]
)
print(f"Created schema: {schema.id}")
List schemas
schemas = client.export_schemas.list()
for s in schemas:
print(f" {s.name} ({s.stream_category}) - {s.id}")
Use a schema in an export
You can reference a schema by name, by ID, or define it inline:
# By name — your custom schema or a built-in (e.g. "normalized", "raw")
job = client.exports.create(123, start, end, schema="my_trades")
# By ID — pins to a specific schema, unaffected by later renames
job = client.exports.create(
123, start, end,
schema="sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4",
)
# Inline — one-off mapping, no stored schema needed
job = client.exports.create(
123, start, end,
schema={
"columns": [
{"output_column": "ts",
"meta": {"value": "collection_timestamp_ns", "format": "ns"}},
{"output_column": "price", "data": {
"binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
}},
],
},
)
Omit schema= to export raw JSON rows.
Iterate on a schema via draft
Use the draft workflow to evolve a schema without affecting existing exports. Exports snapshot the schema at creation time, so publishing a new version never touches exports that already ran.
schema_id = "sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4"
# Start a draft from the latest published version
client.export_schemas.create_draft(schema_id)
# Replace the draft's contents (repeat as many times as you like)
client.export_schemas.update_draft(
schema_id,
columns=[
{"output_column": "ts",
"meta": {"value": "collection_timestamp_ns", "format": "ns"}},
{"output_column": "price", "data": {
"binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
}},
],
)
# Promote the draft to the next published version
updated = client.export_schemas.publish_draft(schema_id)
print(f"Now at version {updated.version}")
# Or discard without publishing
# client.export_schemas.discard_draft(schema_id)
Delete a schema
client.export_schemas.delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")
Availability
Check data availability
avail = client.availability.get(datastream_id=123)
print(f"Stream: {avail.datastream.exchange}/{avail.datastream.instrument}")
print(f"Updated: {avail.updated_at}")
for r in avail.ranges:
print(f" {r.from_ns} - {r.to_ns}: ~{r.rows_estimate:,} rows")
Idempotency
Most mutating operations — create, delete, pause, resume, cancel, publish-draft — accept an optional idempotency_key kwarg. Pass any UUID (v1/v4/v7, up to 128 chars). Retries with the same key return the original result instead of executing the operation twice.
import uuid
key = str(uuid.uuid4())
# If the response is lost (crash, timeout) and you retry with the same key,
# the server returns the original subscription — no duplicate is created.
sub = client.subscriptions.create(datastream_id=123, idempotency_key=key)
The client already retries transient network and 5xx errors internally. Use idempotency_key for retries from your own code — e.g. after a process crash or when resuming a job queue.
Error handling
from ticksupply.exceptions import (
NotFoundError,
RateLimitError,
AuthenticationError,
ConflictError,
TicksupplyError,
)
try:
sub = client.subscriptions.get("sub_00000000000000000000000000000000")
except NotFoundError as e:
print(f"Not found: {e.message} (code={e.code})")
except RateLimitError as e:
print(f"Rate limited, retry after {e.retry_after}s")
except AuthenticationError:
print("Invalid API key")
except TicksupplyError as e:
# Include e.request_id when reporting issues to support.
print(f"API error: {e.code} - {e.message} (request_id={e.request_id})")
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business logic errors like NotFoundError and ConflictError.
Every error carries a request_id sourced from the X-Request-Id response header. Include it when reporting issues to support — it uniquely identifies the failed request in our server logs.
Async usage
The AsyncClient provides the same interface for use with asyncio:
import asyncio
from ticksupply import AsyncClient
async def main():
async with AsyncClient() as client:
# List exchanges
exchanges = await client.exchanges.list()
for ex in exchanges:
print(f"{ex.code}: {ex.display_name}")
# Auto-paginate subscriptions
async for sub in client.subscriptions.list_all():
print(f"{sub.id}: {sub.status}")
asyncio.run(main())
Complete workflow
Subscribe to a data stream, wait for data, then export and download:
#!/usr/bin/env python3
"""Subscribe to BTCUSDT trades, export, and download."""
import time
from datetime import datetime, timedelta, timezone
import requests as http_requests
from ticksupply import Client
client = Client()
# 1. Find the datastream
result = client.datastreams.list(
exchange="binance", instrument="BTCUSDT", stream_type="trades"
)
ds = result.items[0]
print(f"Found datastream: {ds.datastream_id}")
# 2. Subscribe
sub = client.subscriptions.create(ds.datastream_id)
print(f"Subscription: {sub.id} ({sub.status})")
# 3. Wait for data collection
print("Collecting data for 60 seconds...")
time.sleep(60)
# 4. Create export
end = datetime.now(timezone.utc)
start = end - timedelta(hours=1)
job = client.exports.create(ds.datastream_id, start, end)
print(f"Export: {job.id} ({job.status})")
# 5. Wait for export
while True:
job = client.exports.get(job.id)
if job.status.value == "succeeded":
break
elif job.status.value == "failed":
raise Exception(f"Export failed: {job.reason}")
print(f" {job.status}...")
time.sleep(5)
# 6. Download
download = client.exports.get_download(job.id)
for artifact in download.artifacts:
print(f"Downloading {artifact.filename}...")
resp = http_requests.get(artifact.url)
with open(artifact.filename, "wb") as f:
f.write(resp.content)
print(f"Done! Downloaded {download.count} file(s)")
# 7. Cleanup
client.subscriptions.delete(sub.id)
client.close()