Python Examples
This page provides Python examples using therequests library for all major API operations.
Setup
Install the required library:Copy
pip install requests
Copy
import os
import time
import requests
API_KEY = os.environ["TICKSUPPLY_API_KEY"]
BASE_URL = "https://api.ticksupply.com"
def api_request(method, path, **kwargs):
"""Make an authenticated API request."""
headers = kwargs.pop("headers", {})
headers["X-Api-Key"] = API_KEY
response = requests.request(
method,
f"{BASE_URL}{path}",
headers=headers,
**kwargs
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 30))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
return api_request(method, path, headers=headers, **kwargs)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
Catalog operations
List exchanges
Copy
def list_exchanges():
"""Get all available exchanges."""
return api_request("GET", "/v1/exchanges")
# Usage
exchanges = list_exchanges()
for exchange in exchanges:
print(f"{exchange['code']}: {exchange['display_name']}")
List instruments
Copy
def list_instruments(exchange, search=None, limit=None):
"""Get instruments for an exchange."""
params = {}
if search:
params["search"] = search
if limit:
params["limit"] = limit
return api_request("GET", f"/v1/exchanges/{exchange}/instruments", params=params)
# Usage
result = list_instruments("binance", search="BTC", limit=10)
for pair in result["items"]:
print(f"{pair['symbol']}: {pair['base']}/{pair['quote']}")
List datastreams
Copy
def get_datastreams(exchange, instrument, stream_type):
"""Get available datastreams."""
params = {
"exchange": exchange,
"instrument": instrument,
"stream_type": stream_type
}
return api_request("GET", "/v1/datastreams", params=params)
# Usage
result = get_datastreams("binance", "BTCUSDT", "trade")
for ds in result["items"]:
print(f"Datastream {ds['datastream_id']}: {ds['wire_format']}")
Subscription operations
Create a subscription
Copy
import uuid
def create_subscription(datastream_id, idempotency_key=None):
"""Create a new subscription."""
payload = {"datastream_id": datastream_id}
headers = {"Content-Type": "application/json"}
if idempotency_key:
headers["Idempotency-Key"] = idempotency_key
return api_request("POST", "/v1/subscriptions", json=payload, headers=headers)
# Usage
subscription = create_subscription(
datastream_id=123,
idempotency_key=str(uuid.uuid4())
)
print(f"Created subscription: {subscription['id']}")
print(f"Status: {subscription['status']}")
List subscriptions with pagination
Copy
def list_subscriptions(limit=50):
"""List all subscriptions using pagination."""
all_subscriptions = []
page_token = None
while True:
params = {"limit": limit}
if page_token:
params["page_token"] = page_token
result = api_request("GET", "/v1/subscriptions", params=params)
all_subscriptions.extend(result["items"])
print(f"Fetched {len(all_subscriptions)}/{result['total']}")
page_token = result.get("next_page_token")
if not page_token:
break
return all_subscriptions
# Usage
subscriptions = list_subscriptions()
for sub in subscriptions:
print(f"{sub['id']}: {sub['status']}")
Subscription lifecycle management
Copy
def get_subscription(subscription_id):
"""Get subscription details."""
return api_request("GET", f"/v1/subscriptions/{subscription_id}")
def pause_subscription(subscription_id):
"""Pause a subscription."""
api_request("POST", f"/v1/subscriptions/{subscription_id}/pause")
print(f"Paused subscription: {subscription_id}")
def resume_subscription(subscription_id):
"""Resume a subscription."""
api_request("POST", f"/v1/subscriptions/{subscription_id}/resume")
print(f"Resumed subscription: {subscription_id}")
def delete_subscription(subscription_id, idempotency_key=None):
"""Delete a subscription."""
headers = {}
if idempotency_key:
headers["Idempotency-Key"] = idempotency_key
api_request("DELETE", f"/v1/subscriptions/{subscription_id}", headers=headers)
print(f"Deleted subscription: {subscription_id}")
def get_subscription_spans(subscription_id):
"""Get subscription spans."""
return api_request("GET", f"/v1/subscriptions/{subscription_id}/spans")
# Usage
sub_id = "sub_550e8400e29b41d4a716446655440000"
# Get details
sub = get_subscription(sub_id)
print(f"Status: {sub['status']}")
# Pause
pause_subscription(sub_id)
# Check spans
spans = get_subscription_spans(sub_id)
for span in spans:
print(f"Span: {span['started_at']} - {span['ended_at'] or 'ongoing'}")
# Resume
resume_subscription(sub_id)
Export operations
Create an export
Copy
def create_export(datastream_id, start_time_ns, end_time_ns, idempotency_key=None):
"""Create an export job."""
payload = {
"datastream_id": datastream_id,
"start_time": str(start_time_ns),
"end_time": str(end_time_ns)
}
headers = {"Content-Type": "application/json"}
if idempotency_key:
headers["Idempotency-Key"] = idempotency_key
return api_request("POST", "/v1/exports", json=payload, headers=headers)
# Usage - export last 24 hours
now_ns = int(time.time() * 1e9)
day_ago_ns = now_ns - int(86400 * 1e9)
export = create_export(
datastream_id=123,
start_time_ns=day_ago_ns,
end_time_ns=now_ns,
idempotency_key=str(uuid.uuid4())
)
print(f"Created export: {export['id']}")
Wait for export completion
Copy
def wait_for_export(export_id, poll_interval=5, timeout=300):
"""Wait for export to complete."""
start_time = time.time()
while time.time() - start_time < timeout:
result = api_request("GET", f"/v1/exports/{export_id}")
status = result["status"]
print(f"Status: {status}")
if status == "succeeded":
return result
elif status == "failed":
raise Exception(f"Export failed: {result.get('reason', 'Unknown error')}")
time.sleep(poll_interval)
raise TimeoutError(f"Export did not complete within {timeout} seconds")
# Usage
completed_export = wait_for_export(export["id"])
print(f"Export completed at: {completed_export['finished_at']}")
Download export
Copy
def download_export(export_id, output_dir="."):
"""Download all artifacts from an export."""
# Get presigned URLs for all artifacts
result = api_request("GET", f"/v1/exports/{export_id}/download")
print(f"Downloading {result['count']} file(s), {result['total_bytes']:,} bytes total...")
downloaded_files = []
for artifact in result["artifacts"]:
output_path = f"{output_dir}/{artifact['filename']}"
print(f" Downloading {artifact['filename']} ({artifact['bytes']:,} bytes)...")
response = requests.get(artifact["url"], stream=True)
response.raise_for_status()
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded_files.append(output_path)
print(f"Downloaded {len(downloaded_files)} file(s)")
return downloaded_files
# Usage
files = download_export(export["id"], output_dir="./exports")
Availability
Check data availability
Copy
def get_availability(datastream_id):
"""Get data availability for a datastream."""
return api_request("GET", "/v1/availability", params={"datastream_id": datastream_id})
# Usage
availability = get_availability(123)
print(f"Datastream ID: {availability['datastream']['datastream_id']}")
print(f"Last updated: {availability['updated_at']}")
for range_info in availability["ranges"]:
print(f" {range_info['from']} - {range_info['to']}: ~{range_info['rows_estimate']:,} rows")
Complete workflow example
Copy
#!/usr/bin/env python3
"""Complete example: subscribe, wait, export, download."""
import os
import time
import uuid
import requests
API_KEY = os.environ["TICKSUPPLY_API_KEY"]
BASE_URL = "https://api.ticksupply.com"
def api_request(method, path, **kwargs):
headers = kwargs.pop("headers", {})
headers["X-Api-Key"] = API_KEY
response = requests.request(method, f"{BASE_URL}{path}", headers=headers, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 30))
time.sleep(retry_after)
return api_request(method, path, headers=headers, **kwargs)
response.raise_for_status()
return response.json() if response.status_code != 204 else None
def main():
# 1. Find BTCUSDT trade stream
print("Finding BTCUSDT trade stream...")
result = api_request("GET", "/v1/datastreams", params={
"exchange": "binance",
"instrument": "BTCUSDT",
"stream_type": "trade"
})
datastream_id = result["items"][0]["datastream_id"]
print(f"Found datastream ID: {datastream_id}")
# 2. Create subscription
print("Creating subscription...")
subscription = api_request("POST", "/v1/subscriptions",
json={"datastream_id": datastream_id},
headers={
"Content-Type": "application/json",
"Idempotency-Key": str(uuid.uuid4())
}
)
print(f"Subscription created: {subscription['id']}")
# 3. Wait for data collection
print("Waiting 60 seconds for data collection...")
time.sleep(60)
# 4. Create export
now_ns = int(time.time() * 1e9)
hour_ago_ns = now_ns - int(3600 * 1e9)
print("Creating export...")
export = api_request("POST", "/v1/exports",
json={
"datastream_id": datastream_id,
"start_time": str(hour_ago_ns),
"end_time": str(now_ns)
},
headers={
"Content-Type": "application/json",
"Idempotency-Key": str(uuid.uuid4())
}
)
print(f"Export created: {export['id']}")
# 5. Wait for completion
print("Waiting for export to complete...")
while True:
status = api_request("GET", f"/v1/exports/{export['id']}")["status"]
print(f"Status: {status}")
if status == "succeeded":
break
elif status == "failed":
raise Exception("Export failed!")
time.sleep(5)
# 6. Download all artifacts
print("Downloading...")
download_result = api_request("GET", f"/v1/exports/{export['id']}/download")
for artifact in download_result["artifacts"]:
print(f" Downloading {artifact['filename']}...")
response = requests.get(artifact["url"])
with open(artifact["filename"], "wb") as f:
f.write(response.content)
print(f"Downloaded {download_result['count']} file(s)")
if __name__ == "__main__":
main()