Skip to main content

Rust Examples

These examples use the official Ticksupply Rust client. For raw HTTP examples, see the cURL page. The crate is async-first — every API call returns a Future, so a Tokio (or compatible) runtime is required.

Setup

Add the client to your Cargo.toml:
[dependencies]
ticksupply = "0.2"
tokio      = { version = "1", features = ["macros", "rt-multi-thread"] }
chrono     = { version = "0.4", features = ["clock"] }
futures    = "0.3"
uuid       = { version = "1", features = ["v4"] }
Create a client:
use ticksupply::Client;

#[tokio::main]
async fn main() -> ticksupply::Result<()> {
    // Reads TICKSUPPLY_API_KEY from the environment.
    let client = Client::new()?;

    // Or pass the key explicitly:
    // let client = Client::with_api_key("your_api_key")?;

    Ok(())
}

Catalog operations

List exchanges

let exchanges = client.exchanges().list().await?;
for ex in &exchanges {
    println!("{}: {}", ex.code, ex.display_name);
}

List instruments

// Filtered list, single page
let page = client.exchanges()
    .list_instruments("binance")
    .search("BTC")
    .limit(10)
    .send().await?;

println!("Total matching: {:?}", page.total);
for inst in &page.items {
    println!(
        "  {}: {}/{} ({})",
        inst.symbol,
        inst.base.as_deref().unwrap_or("?"),
        inst.quote.as_deref().unwrap_or("?"),
        inst.instrument_type.as_deref().unwrap_or("?"),
    );
}

Paginate through instruments

Use .stream() to auto-paginate; the stream yields each row across every page.
use futures::StreamExt;

let stream = client.exchanges()
    .list_instruments("binance")
    .limit(100)
    .stream();
tokio::pin!(stream);

let mut all = Vec::new();
while let Some(item) = stream.next().await {
    all.push(item?);
}
println!("Fetched {} instruments", all.len());

List datastreams

// Datastreams on one instrument
let page = client.exchanges()
    .list_datastreams("binance", "BTCUSDT")
    .send().await?;
for ds in &page.items {
    println!("  id={} {}/{}", ds.datastream_id, ds.stream_type, ds.wire_format);
}

// Or use the top-level resource with filters across exchanges/instruments
let page = client.datastreams()
    .list()
    .exchange("binance")
    .instrument("BTCUSDT")
    .stream_type("trades")
    .send().await?;
let datastream_id = page.items[0].datastream_id;

Subscription operations

Create a subscription

let sub = client.subscriptions()
    .create(123)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Created: {}, status: {:?}", sub.id, sub.status);
println!("Stream: {}/{}", sub.datastream.exchange, sub.datastream.instrument);

List subscriptions

use futures::TryStreamExt;

// Single page
let page = client.subscriptions().list().limit(20).send().await?;
println!("Total subscriptions: {:?}", page.total);
for sub in &page.items {
    println!("  {}: {:?}", sub.id, sub.status);
}

// Auto-paginate through all subscriptions
let stream = client.subscriptions().list().stream();
tokio::pin!(stream);
while let Some(sub) = stream.try_next().await? {
    println!("  {}: {:?}", sub.id, sub.status);
}

Manage a subscription

let sub_id = "sub_550e8400e29b41d4a716446655440000";

// Get details (returns the flat row shape)
let sub = client.subscriptions().get(sub_id).await?;
println!("Status: {:?}", sub.status);

// Pause collection
client.subscriptions().pause(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

// Resume collection
client.subscriptions().resume(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

// View activity spans
let spans = client.subscriptions().list_spans(sub_id).await?;
for span in &spans {
    println!("  {} -> {:?}", span.started_at, span.ended_at);
}

// Soft-delete
client.subscriptions().delete(sub_id)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Export operations

Create an export

use chrono::{Duration, Utc};

// Export the last hour
let end = Utc::now();
let start = end - Duration::hours(1);

let job = client.exports()
    .create(123, start, end)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Export created: {}, status: {:?}", job.id, job.status);
create(...) accepts anything that implements IntoTimestampchrono::DateTime<Utc> (with the default chrono feature), time::OffsetDateTime (with the time feature), or raw i64 nanoseconds.

Poll for completion and download

use std::time::Duration as StdDuration;
use ticksupply::resources::exports::ExportStatus;

let job_id = job.id.clone();
let final_job = loop {
    let cur = client.exports().get(&job_id).await?;
    println!("Status: {:?}", cur.status);
    match cur.status {
        ExportStatus::Succeeded
        | ExportStatus::Failed
        | ExportStatus::Canceled => break cur,
        _ => tokio::time::sleep(StdDuration::from_secs(5)).await,
    }
};

if final_job.status != ExportStatus::Succeeded {
    panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
}

let download = client.exports().get_download(&job_id).await?;
println!("{} file(s), {} bytes total", download.count, download.total_bytes);

// Stream each artifact straight to disk via reqwest
let http = reqwest::Client::new();
for artifact in &download.artifacts {
    println!("Downloading {}...", artifact.filename);
    let bytes = http.get(&artifact.url).send().await
        .expect("download")
        .bytes().await
        .expect("read body");
    tokio::fs::write(&artifact.filename, &bytes).await
        .expect("write file");
}
Artifact URLs are short-lived pre-signed URLs — typically valid for a few minutes. Re-call get_download if a URL has expired.

List exports

let page = client.exports().list().limit(10).send().await?;
for job in &page.items {
    println!("  {}: {:?} ({})", job.id, job.status, job.format);
}

Export schemas

Schema content is built from typed values in ticksupply::resources::export_schemasSchemaContent, SchemaColumn, MetaExtraction, JsonExtraction, ExchangeExtractor, plus enums for StreamCategory, MetaValue, TimestampFormat, and DataType. Build a SchemaContent once with the fluent builder, then pass it to any mutation (create, update, update_draft, or inline_content).

List schemas

let schemas = client.export_schemas().list().await?;
for s in &schemas {
    println!("  {} ({:?}) - v{}", s.name, s.stream_category, s.version);
}

Get a schema with column definitions

let schema = client.export_schemas().get("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4").await?;
for col in &schema.columns {
    println!("  {}", col.output_column);
}

Build schema content

use ticksupply::resources::export_schemas::{
    DataType, ExchangeExtractor, MetaExtraction, MetaValue,
    SchemaColumn, SchemaContent, TimestampFormat,
};

let content = SchemaContent::builder()
    .column(SchemaColumn::meta(
        "timestamp_ns",
        MetaExtraction::new(MetaValue::CollectionTimestampNs).format(TimestampFormat::Ns),
    ))
    .column(
        SchemaColumn::data("price")
            .exchange("binance",      ExchangeExtractor::json("data.p", DataType::Decimal(18)))
            .exchange("bybit_linear", ExchangeExtractor::json("data.p", DataType::Decimal(18))),
    )
    .column(
        SchemaColumn::data("quantity")
            .exchange("binance",      ExchangeExtractor::json("data.q", DataType::Decimal(18)))
            .exchange("bybit_linear", ExchangeExtractor::json("data.v", DataType::Decimal(18))),
    )
    .build();

Create a saved schema

use ticksupply::resources::export_schemas::StreamCategory;

let schema = client.export_schemas()
    .create("my_trades", StreamCategory::Trade, content)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Created: {} v{}", schema.id, schema.version);

Iterate via the draft workflow

for_id returns a SchemaHandle so you don’t repeat the schema id on each call. The handle hosts every schema-relative operation.
let h = client.export_schemas().for_id(&schema.id);

// Branch from the latest published version. Pass `.content(new_content)` to
// supply explicit content instead.
h.create_draft()
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

// Replace the draft body
h.update_draft(updated_content).send().await?;

// Promote the draft to the next published version
let v2 = h.publish_draft()
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
println!("Published v{}", v2.version);
To skip the draft and publish atomically, call update directly:
let v3 = client.export_schemas()
    .update(&schema.id, even_newer_content)
    .send().await?;

Use a stored schema in an export

Pass either a built-in name ("raw", "normalized") or a stored schema id ("sch_…"):
let job = client.exports()
    .create(123, start, end)
    .schema("normalized")
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Use an inline schema

For one-off custom mappings without saving a schema, pass a SchemaContent to .inline_content(...):
let job = client.exports()
    .create(123, start, end)
    .inline_content(content)
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;
For ad-hoc shapes that don’t fit the typed builder, .inline_schema(serde_json::Value) accepts raw JSON.

Delete a schema

client.export_schemas()
    .delete("sch_0194a1b2c3d4e5f6a7b8c9d0e1f2a3b4")
    .idempotency_key(uuid::Uuid::new_v4().to_string())
    .send().await?;

Availability

let avail = client.availability().get(123).await?;
println!(
    "Stream: {}/{}",
    avail.datastream.exchange, avail.datastream.instrument
);
println!("Updated: {}", avail.updated_at);
for r in &avail.ranges {
    println!(
        "  {} - {}: ~{} rows",
        r.from_ns.as_i64(), r.to_ns.as_i64(), r.rows_estimate
    );
}

Billing

let summary = client.billing().summary().await?;
println!("access_status: {:?}", summary.access_status);
println!("plan_code:     {:?}", summary.plan_code);
println!(
    "usage: extra_stream_minutes={}, export_gb_total={}, export_gb_extra={}",
    summary.usage.extra_stream_minutes_total,
    summary.usage.export_gb_total,
    summary.usage.export_gb_extra,
);

Idempotency

Every mutating builder — create, delete, pause, resume, cancel — accepts .idempotency_key(...). Pass any UUID format (v1/v4/v7, up to 128 chars). If the response is lost (crash, timeout) and you retry with the same key, the server returns the original result instead of executing the operation twice.
let key = uuid::Uuid::new_v4().to_string();

// Retrying with the same key returns the original subscription —
// no duplicate is created.
let sub = client.subscriptions()
    .create(123)
    .idempotency_key(&key)
    .send().await?;
The client already retries transient network and 5xx errors internally. Use idempotency_key for retries from your own code — e.g. after a process crash or when resuming a job queue.

Error handling

use ticksupply::Error;

match client.subscriptions().get("sub_00000000000000000000000000000000").await {
    Ok(sub) => println!("{:?}", sub),
    Err(Error::NotFound { message, request_id }) => {
        eprintln!("not found: {message} (request_id={request_id:?})");
    }
    Err(Error::RateLimited { retry_after, .. }) => {
        eprintln!("rate limited, retry after {retry_after:?}s");
    }
    Err(Error::Authentication { .. }) => {
        eprintln!("invalid API key");
    }
    Err(e) => {
        // Every API error variant carries a request_id when the server
        // returned an X-Request-Id header. Include it in support tickets.
        eprintln!("API error: {e}");
    }
}
The client automatically retries on transient errors (5xx, timeouts) with exponential backoff. You only need to handle business-logic errors like NotFound and AlreadyExists.
Every error variant carries a request_id field sourced from the X-Request-Id response header. Include it when reporting issues to support — it uniquely identifies the failed request in our server logs.

Complete workflow

Subscribe to a datastream, wait for data, then export and download:
//! Subscribe to BTCUSDT trades, export, and download.

use std::time::Duration as StdDuration;

use chrono::{Duration, Utc};
use ticksupply::resources::exports::ExportStatus;
use ticksupply::Client;

#[tokio::main]
async fn main() -> ticksupply::Result<()> {
    let client = Client::new()?;
    let http = reqwest::Client::new();

    // 1. Find the datastream
    let page = client.datastreams()
        .list()
        .exchange("binance")
        .instrument("BTCUSDT")
        .stream_type("trades")
        .send().await?;
    let ds = page.items.first().expect("no datastream found");
    println!("Found datastream: {}", ds.datastream_id);

    // 2. Subscribe
    let sub = client.subscriptions()
        .create(ds.datastream_id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;
    println!("Subscription: {} ({:?})", sub.id, sub.status);

    // 3. Wait for data collection
    println!("Collecting data for 60 seconds...");
    tokio::time::sleep(StdDuration::from_secs(60)).await;

    // 4. Create export (skip the most recent minute so the window is closed)
    let end = Utc::now() - Duration::seconds(60);
    let start = end - Duration::minutes(5);
    let job = client.exports()
        .create(ds.datastream_id, start, end)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;
    println!("Export: {} ({:?})", job.id, job.status);

    // 5. Wait for export
    let final_job = loop {
        let cur = client.exports().get(&job.id).await?;
        match cur.status {
            ExportStatus::Succeeded
            | ExportStatus::Failed
            | ExportStatus::Canceled => break cur,
            _ => {
                println!("  {:?}...", cur.status);
                tokio::time::sleep(StdDuration::from_secs(5)).await;
            }
        }
    };
    if final_job.status != ExportStatus::Succeeded {
        panic!("Export ended in {:?}: {:?}", final_job.status, final_job.reason);
    }

    // 6. Download
    let download = client.exports().get_download(&job.id).await?;
    for artifact in &download.artifacts {
        println!("Downloading {}...", artifact.filename);
        let bytes = http.get(&artifact.url).send().await
            .expect("download")
            .bytes().await
            .expect("read body");
        tokio::fs::write(&artifact.filename, &bytes).await
            .expect("write file");
    }
    println!("Done! Downloaded {} file(s)", download.count);

    // 7. Cleanup
    client.subscriptions()
        .delete(&sub.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send().await?;

    Ok(())
}
Last modified on April 27, 2026