Client
ReactantServerClient is the inference client for talking to a ReactantServer worker or the gateway over KServe V2 gRPC. It depends only on ReactantServerCore and the gRPC layer, so it carries no Reactant dependency and installs on a plain client machine. See Client Usage for a worked example.
ReactantServerClient.AbstractInferenceIO — Type
AbstractInferenceIOInterface for streaming a dataset through batched inference with infer_async / infer_sync. A concrete subtype implements length(io), item_input_bytes(io), infer_encode_chunk!(io, range, slot) (stage a chunk's inputs into the pool slot and return the input descriptors), and infer_decode_chunk!(io, range, response) (consume the response). The pool owns the staging bytes; the IO must not retain slot references past infer_encode_chunk!.
A subtype may optionally implement output_specs to read its outputs back through shared memory. The default is empty, which keeps every output inline (the response carries raw_output_contents as before). Declaring outputs is transparent to infer_decode_chunk!: the driver reads the shared-memory results back into the response before handing it over, so InferOutput works the same on either transport.
ReactantServerClient.KServeModel — Type
KServeModel(host, port, model_name; secure=false, max_batch_size=1, deadline=10.0, ...)
KServeModel(url, model_name; max_batch_size=1, deadline=10.0, ...)A handle to one model served by a KServe V2 gRPC endpoint (a ReactantServer worker or the gateway). The second form parses a url of the form grpc://host:port (or host:port); grpcs/https selects a secure channel. max_batch_size caps how many items the batched infer_async / infer_sync drivers coalesce per request; deadline is the per-request timeout in seconds. max_send_message_length / max_receive_message_length bound a single gRPC message (default 256 MiB, matching the gateway's limits).
ReactantServerClient.OutputSpec — Type
OutputSpec(name, dtype, per_item_dims)Declares an expected network output so the driver can read it back through shared memory. name is the output tensor name, dtype its element type, and per_item_dims its Julia column-major shape for a single item, excluding the leading batch dimension. For each chunk the driver reserves sizeof(dtype) * prod(per_item_dims) * items_in_chunk bytes in the staging slot and asks the server to write that output there. See output_specs.
ReactantServerClient.InferInput — Method
InferInput(name, array) -> ModelInferRequest.InferInputTensorBuild a wire input tensor named name from a Julia array, shipping the bytes inline. You pass the array in its natural Julia column-major shape (W, H, …, N); the client reverses it to the network's row-major (N, …, H, W) internally (the bytes are unchanged). Pass a vector of these to the one-shot infer_sync(model, inputs). Variants taking an explicit Julia column-major shape and a typed contents vector are also provided.
ReactantServerClient.InferOutput — Method
InferOutput(name, response, dtype) -> Array
InferOutput(name, response) -> ArrayExtract the output tensor named name from a ModelInferResponse as a Julia array. The wire row-major shape (N, …, H, W) is reshaped (no copy) to Julia column-major (W, …, N). Pass the element type as dtype for a type-stable result; the two-argument form reads the dtype from the response metadata.
ReactantServerClient.infer_async — Method
infer_async(model, io::AbstractInferenceIO)Run inference over every item in io, staging inputs through the shared-memory [BufferPool] and dispatching chunks concurrently (bounded by the pool's slot count). Each chunk acquires a disjoint slot, so this is safe to call from multiple threads against one model. Results are delivered through io's infer_decode_chunk!. Use infer_sync for serial dispatch.
ReactantServerClient.infer_sync — Method
infer_sync(model, io::AbstractInferenceIO)
infer_sync(model, network_inputs) -> ModelInferResponseSynchronous inference. The AbstractInferenceIO form drives io one chunk at a time (no concurrency). The second form is a one-shot call: network_inputs is a vector of wire tensors built with InferInput, sent inline in a single ModelInferRequest, and the decoded ModelInferResponse is returned for reading with InferOutput.
ReactantServerClient.kserve_init — Method
kserve_init(; pool_bytes=DEFAULT_POOL_BYTES, n_slots=DEFAULT_POOL_SLOTS)Initialize the gRPC subsystem and (re)set the staging-pool parameters. n_slots is the number of fixed-size slots each pool is divided into, which bounds how many chunks can be in flight concurrently against a single pool.
ReactantServerClient.kserve_shutdown — Method
kserve_shutdown()Tear down the client: unregister and unlink the shared-memory pool from every server it was registered with, drop the cached pools and per-URL routes, and shut the gRPC subsystem down. Pair with kserve_init.
ReactantServerClient.manifest_io_spec — Method
manifest_io_spec(path) -> ModelIOSpecLoad a model's input/output spec from a manifest YAML at path, with no running server. Reuses the server's own wire encoding (encode_model_metadata), so the result matches model_io_spec for the same model. Suitable for an offline precompile or build-time check. The reported shapes are Julia column-major (batch axis last), so they read in the same axis order as the manifest's einsum letters.
ReactantServerClient.model_io_spec — Method
model_io_spec(model::KServeModel) -> ModelIOSpecFetch a model's input/output spec from a running server over the ModelMetadata RPC. Throws if the server is unreachable or does not implement ModelMetadata; the call is explicit, so failing loudly is intended. Pair with validate_io or use it to introspect a model's I/O directly. The reported shapes are Julia column-major (batch axis last), matching the order you build arrays in; the row-major KServe wire shape is reversed away by the client.
ReactantServerClient.output_specs — Method
output_specs(io) -> Vector{OutputSpec}Outputs an AbstractInferenceIO declares for shared-memory read-back. Defaults to empty, which keeps every output inline (raw_output_contents) exactly as before: no output declaration means no shared memory, the safe fallback. Returning a non-empty vector opts the IO into shared-memory outputs and explicit-output mode: the request asks the server for exactly these outputs in this order, so every output the IO consumes must be declared. Outputs with data-dependent (dynamic) shapes cannot be sized ahead of time and must stay inline (leave them out, which means returning empty here).
ReactantServerClient.validate_io — Method
validate_io(spec::ModelIOSpec, io::AbstractInferenceIO; items=1)
validate_io(model::KServeModel, io::AbstractInferenceIO; items=1)Dry-run io against a model's true I/O spec without sending an inference request. Runs the user's infer_encode_chunk! and infer_decode_chunk! (see AbstractInferenceIO) once for the first items items against a synthetic, spec-shaped request and response, checking input/output names, dtypes, and shapes and surfacing indexing or shape errors in the user's own code as exceptions.
spec comes from manifest_io_spec (offline) or model_io_spec (online); the KServeModel form fetches it online. The harness runs the user's real methods, so it has side effects (it may write into the io's buffers at positions 1:items); call it on a representative or dummy io. Zeroed synthetic data does not exercise data-dependent branches.
The dry run runs the user methods inside with_bounds_checks, so indexing written with @infer_inbounds is bounds-checked here even though it elides in normal use. Bare @inbounds is not affected by that context; to catch out-of-bounds in bare-@inbounds code, start Julia with --check-bounds=yes (which Pkg.test does).
ReactantServerClient.with_bounds_checks — Method
with_bounds_checks(f)Run f with forced bounds checking enabled for any @infer_inbounds blocks reached during the call (task-local, restored on exit). validate_io uses this so the dry run honors bounds checks even where the IO would normally elide them.
ReactantServerCore.scratch — Method
scratch(slot, name, dims, T) -> PoolInferInput
scratch(slot, ["name1" => (dims1, T1), "name2" => (dims2, T2), ...]) -> Vector{PoolInferInput}Carve one input buffer per named spec from the chunk's slot, advancing its cursor so the buffers occupy disjoint, contiguous byte ranges, and return the wire descriptors ready to hand back from infer_encode_chunk!. dims is the Julia column-major shape (per-item dims then the batch axis), or a bare integer for a vector. Get the writable views with pool_view: one descriptor returns one view, and splatting the vector returns all of them to destructure at once.
The scalar form returns one PoolInferInput; the vector form returns a Vector{PoolInferInput}. infer_encode_chunk! accepts either as its return value, so a single-input IO can return the scalar form directly without wrapping it in a vector.
function infer_encode_chunk!(io, r, slot)
n = length(r)
inputs = scratch(slot, ["INPUT__0" => ((4, n), Float32), "MASK" => ((n,), Int32)])
feats, mask = pool_view(inputs...)
for (j, i) in enumerate(r); feats[:, j] .= io.feats[i]; mask[j] = io.labels[i]; end
return inputs
endThe returned vector is homogeneous (Vector{PoolInferInput}), so it never triggers element promotion the way a literal of differently-typed arrays would. item_input_bytes(io) must equal the per-item sum over these buffers, since the driver sizes the slot from it before infer_encode_chunk! runs. The lower-level path (carve a subslot, pool_view it, build InferInput by hand) is still supported and may be mixed into the returned vector.
ReactantServerClient.@infer_inbounds — Macro
@infer_inbounds exprLike @inbounds, but the elision is conditional: outside a with_bounds_checks context the wrapped expr runs with bounds checks elided (as @inbounds); inside one it runs with bounds checks. Use it instead of @inbounds in infer_encode_chunk! / infer_decode_chunk! so a validate_io dry run still catches out-of-bounds indexing. Wrap a whole loop or block so the runtime branch is taken once, not per element.
function ReactantServerClient.infer_decode_chunk!(io::MyIO, r, response)
out = InferOutput("OUTPUT__0", response, Float32)
@infer_inbounds for (j, i) in enumerate(r)
io.results[i] = collect(out[:, j])
end
return nothing
end