Skip to main content

Rust Examples

These examples use the official Ticksupply Rust client. For raw HTTP examples, see the cURL page. The crate is async-first — every API call returns a Future, so a Tokio (or compatible) runtime is required.

Setup

Add the client to your Cargo.toml:
[dependencies]
ticksupply = "0.1"
tokio      = { version = "1", features = ["macros", "rt-multi-thread"] }
chrono     = { version = "0.4", features = ["clock"] }
futures    = "0.3"
uuid       = { version = "1", features = ["v4"] }
Create a client:
use ticksupply::Client;

#[tokio::main]
async fn main() -> ticksupply::Result<()> {
    // Reads TICKSUPPLY_API_KEY from the environment.
    let client = Client::new()?;

    // Or pass the key explicitly:
    // let client = Client::with_api_key("your_api_key")?;

    Ok(())
}

Catalog operations

List exchanges

let exchanges = client.exchanges().list().await?;
for ex in &exchanges {
    println!("{}: {}", ex.code, ex.display_name);
}

List instruments

// Filtered list, single page
let page = client.exchanges()
    .list_instruments("binance")
    .search("BTC")
    .limit(10)
    .send().await?;

println!("Total matching: {:?}", page.total);
for inst in &page.items {
    println!(
        "  {}: {}/{} ({})",
        inst.symbol,
        inst.base.as_deref().unwrap_or("?"),
        inst.quote.as_deref().unwrap_or("?"),
        inst.instrument_type.as_deref().unwrap_or("?"),
    );
}

Paginate through instruments

Use .stream() to auto-paginate; the stream yields each row across every page.
use futures::StreamExt;

let stream = client.exchanges()
    .list_instruments("binance")
    .limit(100)
    .stream();
tokio::pin!(stream);

let mut all = Vec::new();
while let Some(item) = stream.next().await {
    all.push(item?);
}
println!("Fetched {} instruments", all.len());

List datastreams

// Datastreams on one instrument
let page = client.exchanges()
    .list_datastreams("binance", "BTCUSDT")
    .send().await?;
for ds in &page.items {
    println!("  id={} {}/{}", ds.datastream_id, ds.stream_type, ds.wire_format);
}

// Or use the top-level resource with filters across exchanges/instruments
let page = client.datastreams()
    .list()
    .exchange("binance")
    .instrument("BTCUSDT")
    .stream_type("trades")
    .send().await?;
let datastream_id = page.items[0].datastream_id;

Subscription operations

Create a subscription

let sub = client.subscriptions()
    .create(123)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Created: {}, status: {:?}", sub.id, sub.status);
println!("Stream: {}/{}", sub.datastream.exchange, sub.datastream.instrument);

List subscriptions

use futures::TryStreamExt;

// Single page
let page = client.subscriptions().list().limit(20).send().await?;
println!("Total subscriptions: {:?}", page.total);
for sub in &page.items {
    println!("  {}: {:?}", sub.id, sub.status);
}

// Auto-paginate through all subscriptions
let stream = client.subscriptions().list().stream();
tokio::pin!(stream);
while let Some(sub) = stream.try_next().await? {
    println!("  {}: {:?}", sub.id, sub.status);
}

Manage a subscription

let sub_id = "sub_550e8400e29b41d4a716446655440000";

// Get details (returns the flat row shape)
let sub = client.subscriptions().get(sub_id).await?;
println!("Status: {:?}", sub.status);

// Pause collection
client.subscriptions().pause(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

// Resume collection
client.subscriptions().resume(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

// View activity spans
let spans = client.subscriptions().list_spans(sub_id).await?;
for span in &spans {
    println!("  {} -> {:?}", span.started_at, span.ended_at);
}

// Soft-delete
client.subscriptions().delete(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Export operations

Create an export

use chrono::{Duration, Utc};

// Export the last hour
let end = Utc::now();
let start = end - Duration::hours(1);

let job = client.exports()
    .create(123, start, end)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Export created: {}, status: {:?}", job.id, job.status);
create(...) accepts anything that implements IntoTimestampchrono::DateTime<Utc> (with the default chrono feature), time::OffsetDateTime (with the time feature), or raw i64 nanoseconds.

Poll for completion and download

use std::time::Duration as StdDuration;
use ticksupply::resources::exports::ExportStatus;

let job_id = job.id.clone();
let final_job = loop {
    let cur = client.exports().get(&job_id).await?;
    println!("Status: {:?}", cur.status);
    match cur.status {
        ExportStatus::Succeeded
        | ExportStatus::Failed
        | ExportStatus::Canceled => break cur,
        _ => tokio::time::sleep(StdDuration::from_secs(5)).await,
    }
};

if final_job.status != ExportStatus::Succeeded {
    panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
}

let download = client.exports().get_download(&job_id).await?;
println!("{} file(s), {} bytes total", download.count, download.total_bytes);

// Stream each artifact straight to disk via reqwest
let http = reqwest::Client::new();
for artifact in &download.artifacts {
    println!("Downloading {}...", artifact.filename);
    let bytes = http.get(&artifact.url).send().await
        .expect("download")
        .bytes().await
        .expect("read body");
    tokio::fs::write(&artifact.filename, &bytes).await
        .expect("write file");
}
Artifact URLs are short-lived pre-signed URLs — typically valid for a few minutes. Re-call get_download if a URL has expired.

List exports

let page = client.exports().list().limit(10).send().await?;
for job in &page.items {
    println!("  {}: {:?} ({})", job.id, job.status, job.format);
}

Export schemas

The Rust client currently exposes the read/delete slice of the export-schemas API. Saving a reusable schema (the draft → publish flow) is not yet in the Rust SDK — for now, use inline_schema on a single export, or fall back to direct HTTP / the Python client if you need to persist a custom schema.

List schemas

let schemas = client.export_schemas().list().await?;
for s in &schemas {
    println!("  {} ({}) - v{}", s.name, s.stream_category, s.version);
}

Get a schema with column definitions

let schema = client.export_schemas().get("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4").await?;
println!("Columns: {}", schema.columns);

Use a stored schema in an export

Pass either a built-in name ("raw", "normalized") or a stored schema id ("sch_…"):
let job = client.exports()
    .create(123, start, end)
    .schema("normalized")
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Use an inline schema

For one-off custom mappings, pass a schema object inline:
use serde_json::json;

let job = client.exports()
    .create(123, start, end)
    .inline_schema(json!({
        "columns": [
            {"output_column": "ts",
             "meta": {"value": "collection_timestamp_ns", "format": "ns"}},
            {"output_column": "price", "data": {
                "binance": {"json": {"path": "data.p", "type": "decimal(18)"}}
            }}
        ]
    }))
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Delete a schema

client.export_schemas()
    .delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")
    .await?;

Availability

let avail = client.availability().get(123).await?;
println!(
    "Stream: {}/{}",
    avail.datastream.exchange, avail.datastream.instrument
);
println!("Updated: {}", avail.updated_at);
for r in &avail.ranges {
    println!(
        "  {} - {}: ~{} rows",
        r.from_ns.as_i64(), r.to_ns.as_i64(), r.rows_estimate
    );
}

Billing

let summary = client.billing().summary().await?;
println!("access_status: {:?}", summary.access_status);
println!("plan_code:     {:?}", summary.plan_code);
println!(
    "usage: extra_stream_minutes={}, export_gb_total={}, export_gb_extra={}",
    summary.usage.extra_stream_minutes_total,
    summary.usage.export_gb_total,
    summary.usage.export_gb_extra,
);

Idempotency

Every mutating builder — create, delete, pause, resume, cancel — accepts .idempotency_key(...). Pass any UUID format (v1/v4/v7, up to 128 chars). If the response is lost (crash, timeout) and you retry with the same key, the server returns the original result instead of executing the operation twice.
let key = uuid::Uuid::new_v4().to_string();

// Retrying with the same key returns the original subscription —
// no duplicate is created.
let sub = client.subscriptions()
    .create(123)
    .idempotency_key(&key)
    .send().await?;
The client already retries transient network and 5xx errors internally. Use idempotency_key for retries from your own code — e.g. after a process crash or when resuming a job queue.

Error handling

use ticksupply::Error;

match client.subscriptions().get("sub_00000000000000000000000000000000").await {
    Ok(sub) => println!("{:?}", sub),
    Err(Error::NotFound { message, request_id }) => {
        eprintln!("not found: {message} (request_id={request_id:?})");
    }
    Err(Error::RateLimited { retry_after, .. }) => {
        eprintln!("rate limited, retry after {retry_after:?}s");
    }
    Err(Error::Authentication { .. }) => {
        eprintln!("invalid API key");
    }
    Err(e) => {
        // Every API error variant carries a request_id when the server
        // returned an X-Request-Id header. Include it in support tickets.
        eprintln!("API error: {e}");
    }
}
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business-logic errors like NotFound and AlreadyExists.
Every error variant carries a request_id field sourced from the X-Request-Id response header. Include it when reporting issues to support — it uniquely identifies the failed request in our server logs.

Complete workflow

Subscribe to a datastream, wait for data, then export and download:
//! Subscribe to BTCUSDT trades, export, and download.

use std::time::Duration as StdDuration;

use chrono::{Duration, Utc};
use ticksupply::resources::exports::ExportStatus;
use ticksupply::Client;

#[tokio::main]
async fn main() -> ticksupply::Result<()> {
    let client = Client::new()?;
    let http = reqwest::Client::new();

    // 1. Find the datastream
    let page = client.datastreams()
        .list()
        .exchange("binance")
        .instrument("BTCUSDT")
        .stream_type("trades")
        .send().await?;
    let ds = page.items.first().expect("no datastream found");
    println!("Found datastream: {}", ds.datastream_id);

    // 2. Subscribe
    let sub = client.subscriptions()
        .create(ds.datastream_id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;
    println!("Subscription: {} ({:?})", sub.id, sub.status);

    // 3. Wait for data collection
    println!("Collecting data for 60 seconds...");
    tokio::time::sleep(StdDuration::from_secs(60)).await;

    // 4. Create export (skip the most recent minute so the window is closed)
    let end = Utc::now() - Duration::seconds(60);
    let start = end - Duration::minutes(5);
    let job = client.exports()
        .create(ds.datastream_id, start, end)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;
    println!("Export: {} ({:?})", job.id, job.status);

    // 5. Wait for export
    let final_job = loop {
        let cur = client.exports().get(&job.id).await?;
        match cur.status {
            ExportStatus::Succeeded
            | ExportStatus::Failed
            | ExportStatus::Canceled => break cur,
            _ => {
                println!("  {:?}...", cur.status);
                tokio::time::sleep(StdDuration::from_secs(5)).await;
            }
        }
    };
    if final_job.status != ExportStatus::Succeeded {
        panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
    }

    // 6. Download
    let download = client.exports().get_download(&job.id).await?;
    for artifact in &download.artifacts {
        println!("Downloading {}...", artifact.filename);
        let bytes = http.get(&artifact.url).send().await
            .expect("download")
            .bytes().await
            .expect("read body");
        tokio::fs::write(&artifact.filename, &bytes).await
            .expect("write file");
    }
    println!("Done! Downloaded {} file(s)", download.count);

    // 7. Cleanup
    client.subscriptions()
        .delete(&sub.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;

    Ok(())
}
Last modified on April 26, 2026