Rust Examples
These examples use the official Ticksupply Rust client. For raw HTTP examples, see the cURL page.
The crate is async-first — every API call returns a Future, so a Tokio (or compatible) runtime is required.
Setup
Add the client to your Cargo.toml:
[dependencies]
ticksupply = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
chrono = { version = "0.4", features = ["clock"] }
futures = "0.3"
uuid = { version = "1", features = ["v4"] }
Create a client:
use ticksupply::Client;
#[tokio::main]
async fn main() -> ticksupply::Result<()> {
// Reads TICKSUPPLY_API_KEY from the environment.
let client = Client::new()?;
// Or pass the key explicitly:
// let client = Client::with_api_key("your_api_key")?;
Ok(())
}
Catalog operations
List exchanges
let exchanges = client.exchanges().list().await?;
for ex in &exchanges {
println!("{}: {}", ex.code, ex.display_name);
}
List instruments
// Filtered list, single page
let page = client.exchanges()
.list_instruments("binance")
.search("BTC")
.limit(10)
.send().await?;
println!("Total matching: {:?}", page.total);
for inst in &page.items {
println!(
" {}: {}/{} ({})",
inst.symbol,
inst.base.as_deref().unwrap_or("?"),
inst.quote.as_deref().unwrap_or("?"),
inst.instrument_type.as_deref().unwrap_or("?"),
);
}
Paginate through instruments
Use .stream() to auto-paginate; the stream yields each row across every page.
use futures::StreamExt;
let stream = client.exchanges()
.list_instruments("binance")
.limit(100)
.stream();
tokio::pin!(stream);
let mut all = Vec::new();
while let Some(item) = stream.next().await {
all.push(item?);
}
println!("Fetched {} instruments", all.len());
List datastreams
// Datastreams on one instrument
let page = client.exchanges()
.list_datastreams("binance", "BTCUSDT")
.send().await?;
for ds in &page.items {
println!(" id={} {}/{}", ds.datastream_id, ds.stream_type, ds.wire_format);
}
// Or use the top-level resource with filters across exchanges/instruments
let page = client.datastreams()
.list()
.exchange("binance")
.instrument("BTCUSDT")
.stream_type("trades")
.send().await?;
let datastream_id = page.items[0].datastream_id;
Subscription operations
Create a subscription
let sub = client.subscriptions()
.create(123)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
println!("Created: {}, status: {:?}", sub.id, sub.status);
println!("Stream: {}/{}", sub.datastream.exchange, sub.datastream.instrument);
List subscriptions
use futures::TryStreamExt;
// Single page
let page = client.subscriptions().list().limit(20).send().await?;
println!("Total subscriptions: {:?}", page.total);
for sub in &page.items {
println!(" {}: {:?}", sub.id, sub.status);
}
// Auto-paginate through all subscriptions
let stream = client.subscriptions().list().stream();
tokio::pin!(stream);
while let Some(sub) = stream.try_next().await? {
println!(" {}: {:?}", sub.id, sub.status);
}
Manage a subscription
let sub_id = "sub_550e8400e29b41d4a716446655440000";
// Get details (returns the flat row shape)
let sub = client.subscriptions().get(sub_id).await?;
println!("Status: {:?}", sub.status);
// Pause collection
client.subscriptions().pause(sub_id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
// Resume collection
client.subscriptions().resume(sub_id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
// View activity spans
let spans = client.subscriptions().list_spans(sub_id).await?;
for span in &spans {
println!(" {} -> {:?}", span.started_at, span.ended_at);
}
// Soft-delete
client.subscriptions().delete(sub_id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
Export operations
Create an export
use chrono::{Duration, Utc};
// Export the last hour
let end = Utc::now();
let start = end - Duration::hours(1);
let job = client.exports()
.create(123, start, end)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
println!("Export created: {}, status: {:?}", job.id, job.status);
create(...) accepts anything that implements IntoTimestamp — chrono::DateTime<Utc> (with the default chrono feature), time::OffsetDateTime (with the time feature), or raw i64 nanoseconds.
Poll for completion and download
use std::time::Duration as StdDuration;
use ticksupply::resources::exports::ExportStatus;
let job_id = job.id.clone();
let final_job = loop {
let cur = client.exports().get(&job_id).await?;
println!("Status: {:?}", cur.status);
match cur.status {
ExportStatus::Succeeded
| ExportStatus::Failed
| ExportStatus::Canceled => break cur,
_ => tokio::time::sleep(StdDuration::from_secs(5)).await,
}
};
if final_job.status != ExportStatus::Succeeded {
panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
}
let download = client.exports().get_download(&job_id).await?;
println!("{} file(s), {} bytes total", download.count, download.total_bytes);
// Stream each artifact straight to disk via reqwest
let http = reqwest::Client::new();
for artifact in &download.artifacts {
println!("Downloading {}...", artifact.filename);
let bytes = http.get(&artifact.url).send().await
.expect("download")
.bytes().await
.expect("read body");
tokio::fs::write(&artifact.filename, &bytes).await
.expect("write file");
}
Artifact URLs are short-lived pre-signed URLs — typically valid for a few minutes. Re-call get_download if a URL has expired.
List exports
let page = client.exports().list().limit(10).send().await?;
for job in &page.items {
println!(" {}: {:?} ({})", job.id, job.status, job.format);
}
Export schemas
The Rust client currently exposes the read/delete slice of the export-schemas API. Saving a reusable schema (the draft → publish flow) is not yet in the Rust SDK — for now, use inline_schema on a single export, or fall back to direct HTTP / the Python client if you need to persist a custom schema.
List schemas
let schemas = client.export_schemas().list().await?;
for s in &schemas {
println!(" {} ({}) - v{}", s.name, s.stream_category, s.version);
}
Get a schema with column definitions
let schema = client.export_schemas().get("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4").await?;
println!("Columns: {}", schema.columns);
Use a stored schema in an export
Pass either a built-in name ("raw", "normalized") or a stored schema id ("sch_…"):
let job = client.exports()
.create(123, start, end)
.schema("normalized")
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
Use an inline schema
For one-off custom mappings, pass a schema object inline:
use serde_json::json;
let job = client.exports()
.create(123, start, end)
.inline_schema(json!({
"columns": [
{"output_column": "ts",
"meta": {"value": "collection_timestamp_ns", "format": "ns"}},
{"output_column": "price", "data": {
"binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
}}
]
}))
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
Delete a schema
client.export_schemas()
.delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")
.await?;
Availability
let avail = client.availability().get(123).await?;
println!(
"Stream: {}/{}",
avail.datastream.exchange, avail.datastream.instrument
);
println!("Updated: {}", avail.updated_at);
for r in &avail.ranges {
println!(
" {} - {}: ~{} rows",
r.from_ns.as_i64(), r.to_ns.as_i64(), r.rows_estimate
);
}
Billing
let summary = client.billing().summary().await?;
println!("access_status: {:?}", summary.access_status);
println!("plan_code: {:?}", summary.plan_code);
println!(
"usage: extra_stream_minutes={}, export_gb_total={}, export_gb_extra={}",
summary.usage.extra_stream_minutes_total,
summary.usage.export_gb_total,
summary.usage.export_gb_extra,
);
Idempotency
Every mutating builder — create, delete, pause, resume, cancel — accepts .idempotency_key(...). Pass any UUID format (v1/v4/v7, up to 128 chars). If the response is lost (crash, timeout) and you retry with the same key, the server returns the original result instead of executing the operation twice.
let key = uuid::Uuid::new_v4().to_string();
// Retrying with the same key returns the original subscription —
// no duplicate is created.
let sub = client.subscriptions()
.create(123)
.idempotency_key(&key)
.send().await?;
The client already retries transient network and 5xx errors internally. Use idempotency_key for retries from your own code — e.g. after a process crash or when resuming a job queue.
Error handling
use ticksupply::Error;
match client.subscriptions().get("sub_00000000000000000000000000000000").await {
Ok(sub) => println!("{:?}", sub),
Err(Error::NotFound { message, request_id }) => {
eprintln!("not found: {message} (request_id={request_id:?})");
}
Err(Error::RateLimited { retry_after, .. }) => {
eprintln!("rate limited, retry after {retry_after:?}s");
}
Err(Error::Authentication { .. }) => {
eprintln!("invalid API key");
}
Err(e) => {
// Every API error variant carries a request_id when the server
// returned an X-Request-Id header. Include it in support tickets.
eprintln!("API error: {e}");
}
}
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business-logic errors like NotFound and AlreadyExists.
Every error variant carries a request_id field sourced from the X-Request-Id response header. Include it when reporting issues to support — it uniquely identifies the failed request in our server logs.
Complete workflow
Subscribe to a datastream, wait for data, then export and download:
//! Subscribe to BTCUSDT trades, export, and download.
use std::time::Duration as StdDuration;
use chrono::{Duration, Utc};
use ticksupply::resources::exports::ExportStatus;
use ticksupply::Client;
#[tokio::main]
async fn main() -> ticksupply::Result<()> {
let client = Client::new()?;
let http = reqwest::Client::new();
// 1. Find the datastream
let page = client.datastreams()
.list()
.exchange("binance")
.instrument("BTCUSDT")
.stream_type("trades")
.send().await?;
let ds = page.items.first().expect("no datastream found");
println!("Found datastream: {}", ds.datastream_id);
// 2. Subscribe
let sub = client.subscriptions()
.create(ds.datastream_id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
println!("Subscription: {} ({:?})", sub.id, sub.status);
// 3. Wait for data collection
println!("Collecting data for 60 seconds...");
tokio::time::sleep(StdDuration::from_secs(60)).await;
// 4. Create export (skip the most recent minute so the window is closed)
let end = Utc::now() - Duration::seconds(60);
let start = end - Duration::minutes(5);
let job = client.exports()
.create(ds.datastream_id, start, end)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
println!("Export: {} ({:?})", job.id, job.status);
// 5. Wait for export
let final_job = loop {
let cur = client.exports().get(&job.id).await?;
match cur.status {
ExportStatus::Succeeded
| ExportStatus::Failed
| ExportStatus::Canceled => break cur,
_ => {
println!(" {:?}...", cur.status);
tokio::time::sleep(StdDuration::from_secs(5)).await;
}
}
};
if final_job.status != ExportStatus::Succeeded {
panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
}
// 6. Download
let download = client.exports().get_download(&job.id).await?;
for artifact in &download.artifacts {
println!("Downloading {}...", artifact.filename);
let bytes = http.get(&artifact.url).send().await
.expect("download")
.bytes().await
.expect("read body");
tokio::fs::write(&artifact.filename, &bytes).await
.expect("write file");
}
println!("Done! Downloaded {} file(s)", download.count);
// 7. Cleanup
client.subscriptions()
.delete(&sub.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send().await?;
Ok(())
}