Trajectory data¶
The SFI.trajectory submodule is the data handling layer of SFI. It
provides a single user-facing container,
SFI.trajectory.TrajectoryCollection, which represents trajectories
together with masks and extras in a JAX-friendly form.
This guide shows how to construct, save, load and combine
TrajectoryCollection objects, and how they integrate into the
simulation and inference workflow.
Why a dedicated container, not a DataFrame?¶
A tracking table (one row per detection) is the natural interchange
format, and SFI ingests it directly (from_dataframe(), below).
But inference does not run on rows — it runs on streams derived from
the trajectory: positions X, increments dX, finite-difference
velocities, and basis functions evaluated along the path. The
collection exists to serve those streams, which a row-oriented DataFrame
cannot:
Increment / stream views.
dX, secant velocities, and the skipped errors-in-variables instrument are built lazily from a sliding time window with the correct per-stream alignment — not stored as columns.Masking. Missing frames and particles entering or leaving are tracked per
(time, particle)so each stream only ever yields rows where its whole finite-difference window is valid (see Working with masking).Normalisation and weighting. Pooled experiments of different length and quality are reweighted into one estimator (per-dataset weights, effective observation time).
JAX, above all. The data are held as dense, rectangular
(T, N, d)arrays so the whole pipeline is JIT-compiled,vmap-ed and GPU-ready; a dynamically-shaped table of Python objects would defeat that.
So: bring your data as a DataFrame or a tracking file, and let the collection turn it into the JAX-native, mask-aware streams the estimators consume.
Basic construction from arrays¶
The main constructor when you already have tensors in memory is
TrajectoryCollection.from_arrays():
import jax.numpy as jnp
from SFI.trajectory import TrajectoryCollection
# Toy data: T time steps, N particles, d dimensions
T, N, d = 1000, 5, 2
X = jnp.zeros((T, N, d)) # your tracked positions
# Uniform timestep
dt = 0.1
# Optional boolean mask (T, N): False = missing sample
mask = jnp.ones((T, N), dtype=bool)
coll = TrajectoryCollection.from_arrays(
X=X,
dt=dt,
mask=mask,
extras_global={"temperature": 300.0},
extras_local={"radius": jnp.ones((N,))},
)
Key points:
Xhas shape(T, N, d)or(T, d); in the latter case, a single particle is assumed.You may specify either:
a scalar or per-step
dt(shape(T,)), oran absolute time vector
t(shape(T,)), orboth
tanddt=None; the dataset will derive step sizes fromtwhen needed.
If
maskis omitted, all samples are treated as valid.
The resulting coll is a single-dataset collection; you can pass it
directly to inference routines.
Note
The middle axis N is the particle axis (see
particles). At pdepth=0 a single law is applied
independently to each particle; interacting models (pdepth=1) see
all particles at once. The particle count may change over time — the
mask records who is present at each frame — so N is the maximum
concurrent count. See Particle systems.
Constructing from tracking tables¶
Many experimental pipelines produce per-detection tables with columns such as:
particle identifier,
frame or time index,
coordinates (x, y, z, …),
optional per-row metadata.
There are two ways to turn such tables into a collection.
From a saved tracking file¶
If your data are already in a CSV / parquet / HDF5 file that follows the SFI trajectory format — specified in Trajectory file formats — you can load it directly:
from SFI.trajectory import TrajectoryCollection
coll = TrajectoryCollection.load("tracks.parquet")
This will:
parse the state vectors and (particle, time) indices,
reconstruct masked
(T, N, d)arrays,read extras and metadata from the file header and columns,
return a single-dataset collection.
If the file contains raw tracking output in a different layout, you can:
use pandas (or your preferred tool) to reshape it into the standard column layout, then
feed the columns to
TrajectoryCollection.from_columns().
From a pandas DataFrame¶
For raw tracking tables (trackpy, TrackMate, custom pipelines), the
most direct route is TrajectoryCollection.from_dataframe() —
columns are addressed by name, in any order, and junk columns are
dropped:
import pandas as pd
from SFI.trajectory import TrajectoryCollection
tracks = pd.read_csv("raw_tracks.csv") # e.g. track_id, frame, x, y, quality
coll = TrajectoryCollection.from_dataframe(
tracks,
particle="track_id",
time="frame",
coords=("x", "y"),
dt=0.05,
)
Common particle/time column names (particle_id, particle,
track_id, frame, time, …) are auto-detected when the
keywords are omitted; an ambiguous table raises with the candidates
listed. A float-valued time column is factorized into frame indices
and its unique values become the absolute time axis. Columns with the
extras prefixes (G_/TG_/P_/TP_, see
Trajectory file formats) are parsed into extras exactly as
when loading files.
From in-memory columns¶
For custom pipelines that already expose columns, use
TrajectoryCollection.from_columns() (the lower-level form that
from_dataframe() delegates to):
import numpy as np
from SFI.trajectory import TrajectoryCollection
# Example columns
particle_idx = np.array([0, 0, 1, 1, 0, 0, 1, 1]) # (L,)
time_idx = np.array([0, 1, 0, 1, 2, 3, 2, 3]) # (L,)
state_vecs = np.random.randn(8, 2) # (L, d)
coll = TrajectoryCollection.from_columns(
particle_idx=particle_idx,
time_idx=time_idx,
state_vectors=state_vecs,
dt=0.1, # or t=..., or use extras_global["t"]
)
The constructor:
reconstructs the dense
(T, N, d)state array,infers the number of particles and time steps,
builds the internal mask (dropping any missing rows),
attaches any provided extras or metadata.
Working with masking¶
The mask is a boolean array of shape (T, N) attached to each
dataset. It encodes which positions are available at each time and
for each particle.
How missing data is handled. Dropped detections, blinking labels,
tracking gaps, and particles that enter or leave the field of view are
all represented the same way — the corresponding mask entries are set to
False. Nothing is removed from the array and nothing is
interpolated: a masked frame simply does not contribute the increments
that would have used it. You can therefore keep one clean rectangular
(T, N, d) array even when each particle is observed over a different,
gappy subset of frames. When mask is None the dataset is treated
as fully observed.
Internally, when an inference program requests streams such as "X"
and "dX", the dataset:
computes the smallest time window needed to provide all requested streams (e.g.
tandt+1for"dX"),defines the set of valid indices where this window is in-bounds and all required positions are unmasked,
only ever produces rows for those valid indices.
This means:
you can freely mask out arbitrary times and particles without worrying about the exact finite-difference scheme used later,
the same collection can be reused for different inference programs that request different combinations of streams.
You typically do not interact with valid indices directly; the integration runtime requests them via internal streaming methods on the collection.
Using extras¶
Extras are user-defined fields attached to a dataset, accessible during
integration through the "extras" stream. They are divided into:
extras_global: shared by all particles in the dataset,extras_local: per-particle quantities.
Example: per-experiment and per-particle parameters:
import jax.numpy as jnp
from SFI.trajectory import TrajectoryCollection
X = jnp.zeros((T, N, d))
coll = TrajectoryCollection.from_arrays(
X=X,
dt=0.1,
extras_global={
"temperature": 300.0,
"salt_concentration": 0.2,
},
extras_local={
"radius": jnp.linspace(0.5, 1.0, N),
},
)
Semantics:
Global extras are visible to all particles; local extras are aligned with the particle index axis.
If the same key appears in both, the local version wins.
Time-dependent extras are first-class: wrap an array with a leading
time axis as a TimeSeriesExtra
(time_series_extra(values), shape (T, ...) for globals or
(T, N, ...) per particle), and the integration runtime slices it to
the value at each frame. TG_ / TP_ file columns load this way
automatically (Trajectory file formats).
During integration, when a program asks for the "extras" stream, it
receives a dictionary, for each time index, built by:
slicing any recognised time-series extras at that index,
evaluating any callable extras at that index, passing along an optional context string supplied by the integrator,
merging global and local dictionaries.
This makes it natural to fit force fields that depend not only on the
state, but also on experiment conditions, drive protocols, or particle
labels. The compositional entry point is
extra_scalar(), which turns an extras key into a basis
symbol:
from SFI.bases import X, extra_scalar
# trap + protocol-modulated trap: F = θ₁·x + θ₂·k(t)·x
B = X(dim=1) & (extra_scalar("k_drive", dim=1) * X(dim=1))
inf.infer_force_linear(B)
See Time-dependent forcing — protocols as extras for the full round trip (simulate with a protocol, infer it back), and Simulation for the simulation-side conventions. For multi-experiment collections (see below), you can attach different extras to each dataset and use them as input features in inference.
Combining multiple experiments¶
A TrajectoryCollection can hold several independent datasets.
They may differ in:
number of particles and duration,
masks and time axes,
extras and metadata.
You can build them separately and then concatenate:
coll1 = TrajectoryCollection.from_arrays(X=X1, dt=0.1,
extras_global={"temperature": 280.0})
coll2 = TrajectoryCollection.from_arrays(X=X2, dt=0.1,
extras_global={"temperature": 300.0})
coll = coll1 & coll2 # quick merge; default "pool" policy
# …or make each experiment count equally regardless of length:
coll = coll1.concat([coll2], weights="per_dataset")
The resulting collection:
has two datasets, each with its own extras and mask,
carries per-dataset weights — an unnormalised multiplier that the runtime applies to every estimator (force, diffusion, parametric).
The weights policy sets how much each dataset contributes to the
pooled inference:
"pool"(default): multiplier1for every dataset — pool all the increments on equal footing. Combined with each estimator’s intrinsic within-dataset weighting (the force is per-\(\Delta t\), the diffusion per-point), each dataset then contributes in proportion to its effective observation time (force) or its number of points (diffusion)."per_dataset": each experiment contributes equally regardless of length (multiplier \(\overline{T_\mathrm{eff}}/T_{\mathrm{eff},d}\)); exact for the force, and for the diffusion whendtis uniform.an explicit array: manual per-dataset multipliers.
A single inference on the concatenated collection then fits one shared model to all experiments at once — see Multi-experiment ABP inference for a worked example.
Shared vs. experiment-specific parameters. Every produced row
carries the reserved extra dataset_index (the dataset’s position in
the collection — injected at integration time, never stored on disk),
which lets part of the model be experiment-specific while the rest is
shared:
from SFI.bases import X, named_scalar, per_dataset_scalar, unit_vector_basis
k = named_scalar("k", default=1.0) # shared stiffness
a = per_dataset_scalar("a", n_datasets=2) # one drift per experiment
F = a * unit_vector_basis(1) - k * X(dim=1)
inf.infer_force(F) # parametric (L-BFGS path)
For the linear estimators, use one-hot indicator features instead:
dataset_indicator(n) * X(dim) gives an independent linear
coefficient per experiment (block-diagonal Gram, PASTIS-prunable).
Per-particle inferred parameters in interacting models use the same
idea with the reserved particle_index extra inside
make_interactor() kernels (declare it via
particle_extras=...).
Reproducing one experiment. dataset_index is purely an
inference-time coordinate — it has no meaning in a simulation. To
reproduce a single experiment from a pooled fit, collapse the model to
that condition with
specialize(): per-dataset parameters are
folded at the chosen index and dataset_index drops out, leaving a
standalone single-condition model. simulate_bootstrapped_trajectory()
does this for you when given dataset=k (k is the experiment’s
position in the pooled collection):
coll_k, proc_k = inf.simulate_bootstrapped_trajectory(key, dataset=k)
# proc_k is experiment k's own model; coll_k is a plain single
# trajectory (no dataset_index), so re-inference uses a plain basis.
Do not set dataset_index yourself as a user extra — it is
framework-reserved and will be rejected.
This structure is useful when:
pooling multiple repeats of the same experiment,
combining trajectories recorded at different conditions and letting the state expression depend symbolically on those conditions (e.g. learning a force field
F(x, T)from datasets at different temperatures).
Saving and loading collections¶
Use TrajectoryCollection.save() and
TrajectoryCollection.load() for persistence and interchange.
The on-disk formats (CSV / Parquet / HDF5 table layout, extras-column
prefixes, metadata header) are specified in
Trajectory file formats.
Single-dataset case¶
For a single dataset, you can save and reload from a single file:
coll = TrajectoryCollection.from_arrays(X=X, dt=0.1)
# Save to a parquet file (recommended)
coll.save("traj.parquet")
# Later…
coll2 = TrajectoryCollection.load("traj.parquet")
Rules:
The collection must contain exactly one dataset when saving to a single file.
Masked samples are dropped; the file only contains valid rows.
At load time, particle IDs are optionally relabelled to
0..N-1and time indices shifted to start at 0 (configurable viarelabel).
Multi-dataset case¶
For multiple datasets, save to a directory:
# coll contains several datasets
coll.save("my_experiments/")
# Directory "my_experiments" will contain:
# ds_000.parquet, ds_001.parquet, ...
# manifest.yaml
coll2 = TrajectoryCollection.load("my_experiments/")
Each dataset is stored in its own file, and a manifest.yaml records
their names and filenames. Loading the directory reconstructs the full
collection with one dataset per file.
Round-trip guarantees¶
A collection that is saved and then reloaded:
preserves the state arrays, masks, time axis and extras, up to the loss of masked samples (which are not written to disk),
restores per-dataset metadata and I/O-level time information,
recomputes reasonable default weights (by default equal weights for multi-dataset collections, or the chosen policy).
You can rely on this for reproducible pipelines where simulation, storage, and inference may happen at different times or on different machines.
Train/test splitting¶
For held-out validation on data-abundant problems,
TrajectoryCollection.split_time() cuts every dataset along time:
train, test = coll.split_time(0.8) # optional: gap=, reweight=
Masks, time axes (kept absolute), and extras follow the split
(time-series extras are sliced; static extras are shared). See the
held-out note in Running inference for when this is — and
is not — the right tool: SFI’s default validation route
(force_predicted_MSE + diagnostics) costs no data.
Optional: degrading synthetic data¶
The TrajectoryCollection.degrade() method is a convenience wrapper
for generating synthetic “experimental-like” trajectories from clean
simulations:
degraded = coll.degrade(
downsample=4, # keep every 4th frame (effective dt ×4)
motion_blur=3, # average over motion_blur+1 frames before
# downsampling (exposure smear);
# 0 <= motion_blur < downsample
data_loss_fraction=0.1, # randomly mask out 10% of surviving points
noise=0.01, # add Gaussian localisation noise, std 0.01
reweight="pool", # recompute pooled weights after degrading
)
Two further options not shown above: ROI= drops points outside a
region of interest (a radial cutoff, an axis-aligned box, or a
predicate), and seed= fixes the RNG for the added noise and the
drop-out so a degradation is reproducible.
Typical use cases:
testing robustness of inference algorithms to motion blur and missing data,
benchmarking against known synthetic ground truth.
Real experimental data are usually ingested directly from tracking
tables via TrajectoryCollection.load() or
TrajectoryCollection.from_columns(); degradation is not required
for those workflows.
Quick plotting¶
The SFI.utils.plotting module provides simple helpers that work
directly with collections. For example:
import matplotlib.pyplot as plt
from SFI.utils import plotting
# Time series of all coordinates for the first dataset
plotting.timeseries(coll, dataset=0)
plt.show()
# 2D phase plot for coordinates 0 and 1
plotting.phase2d(coll, dataset=0, dims=(0, 1))
plt.gca().set_aspect("equal")
plt.show()
These utilities internally extract (t, X, mask) from the chosen
dataset, respect the mask when drawing trajectories, and can be used as
building blocks for more elaborate visualisation routines.
Sanity-checking your data¶
A quick look before inference rarely hurts: plot the trajectory with
plotting.timeseries / plotting.phase2d (above) and watch for
tracking artefacts (jumps, stuck particles, coordinate resets) or a
mismatched dt (displacements spanning a large fraction of the
explored range); ds.mask — None when fully observed — reports the
coverage. None of this is essential, though: the rigorous, fit-aware
checks happen after inference, through the diagnostics suite
(Diagnostics).
Further reading¶
Trajectory API for a curated reference of the public methods on
TrajectoryCollection.The autogenerated
SFI.trajectoryAPI page for full signatures and parameter documentation.