ServingPipeline¶
The ServingPipeline
is a specialized Numaflow resource designed to expose a standard Numaflow data processing
Pipeline as an interactive HTTP service. The major design idea behind ServingPipeline
is to bridge traditional
request/response API patterns with the power of Numaflow's stream processing capabilities, allowing external clients to
directly inject data, trigger processing, and retrieve results via familiar REST or Server-Sent Events (SSE) mechanisms.
A ServingPipeline
consists of two main parts:
- An HTTP Serving Layer: Defined in
spec.serving
, this configures the external interface, including service exposure, request identification (via headers), timeouts, authentication, and result storage options. - A Standard Numaflow Pipeline: Defined in
spec.pipeline
, this contains the familiar vertices and edges for data processing. However, it must use a specificserving
source as its entry point and conclude with a User Defined Sink (UDSink) capable of handling serving responses (usingResponseServe
).
This structure enables features like synchronous (/sync
), asynchronous (/async
), and streaming (/sse
) API endpoints,
along with request tracking via unique IDs. Unlike a standard Pipeline
focused purely on stream processing, ServingPipeline
adds this interactive HTTP request/response lifecycle management layer on top.
The major benefits of ServingPipeline
are as follows:
- API Exposure: Easily expose complex stream processing logic or ML models within Numaflow pipelines via standard RESTful APIs.
- Interactive Workflows: Supports common application patterns requiring synchronous request/response or traceable asynchronous tasks initiated via HTTP.
- Flexible Interaction: Offers synchronous, asynchronous, and streaming (Server-Sent Events) options for clients to receive results based on their needs.
- Traceability: Built-in request ID mechanism allows tracking individual requests and retrieving their specific results or status.
Use Cases of ServingPipeline¶
ServingPipeline
is ideal for scenarios where external systems or users need to interact directly with a Numaflow pipeline via HTTP:
- ML Model Serving: Deploying a machine learning model within a Numaflow pipeline (e.g., for pre/post-processing) and exposing it as a real-time inference API endpoint.
- Interactive Data Services: Building services for data validation, enrichment, or transformation where clients submit data via an API call and receive the processed result.
- API Gateway Pattern: Using
ServingPipeline
as a front-end to trigger event-driven backends processed by Numaflow, potentially returning a final status or result synchronously or asynchronously. - Traceable Asynchronous Jobs: Kicking off complex, multi-step processing within Numaflow via an API call and allowing the client to poll for status or results later using a unique request ID.
Anatomy of ServingPipeline¶
A ServingPipeline
resource defines both the serving layer configuration (spec.serving
) and the underlying processing
pipeline (spec.pipeline
).
apiVersion: numaflow.numaproj.io/v1alpha1
kind: ServingPipeline
metadata:
name: serving-pipeline-custom-store # Example name
spec:
# Configures the HTTP serving aspects
serving:
... serving configuration options ...
# Defines the underlying Numaflow pipeline for processing
pipeline:
... pipeline ...
Configuration (spec.serving
)¶
This section configures the HTTP interface:
service
(boolean): Iftrue
, automatically creates a Kubernetes Service.msgIDHeaderKey
(string): The HTTP header key for the unique Request ID.requestTimeoutSeconds
(integer): Timeout for/sync
and/sse
requests (default:120
).auth
: Optional configuration for token-based authentication using a KubernetesSecret
.store
: Optional configuration for a custom result storage backend. If omitted, internal storage (e.g., JetStream) is used. See Custom Results Store below.
Custom Results Store (spec.serving.store
)¶
You can provide a custom storage backend by specifying a container image in spec.serving.store.container
. This
container must implement a specific gRPC interface (defined in the Numaflow SDKs) for storing (Put
) and retrieving
(Get
) results associated with request IDs. This allows using preferred databases or caches.
Refer to the Numaflow SDK documentation for your language for the exact interface. The Go interface requires methods like:
type ServingStorer interface {
Put(ctx context.Context, put PutDatum)
Get(ctx context.Context, get GetDatum) StoredResult
}
// PutDatum, GetDatum, StoredResult provide necessary details
A Golang example can be found here.
User-Defined Sink Implementation for Serving¶
The User Defined Sink (UDSink) in a ServingPipeline
's pipeline (spec.pipeline.vertices[].sink.udsink
) must signal the
final response payload for the original HTTP request. Use the ResponseServe(requestID, resultBytes)
function (or
SDK equivalent) in your sink code.
Example (Go SDK)¶
type serveSink struct{}
func (l *serveSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
result := sinksdk.ResponsesBuilder()
for d := range datumStreamCh {
id := d.ID() // Original Request ID
val := d.Value() // Final payload from pipeline
// Use ResponseServe to mark 'val' as the result for 'id'
result = result.Append(sinksdk.ResponseServe(id, val))
}
return result
}
Using ResponseServe
ensures the result is correctly stored and available via the API endpoints.
API Endpoints¶
The ServingPipeline
exposes the following endpoints:
POST /v1/process/sync
: Submit data, wait for result in response body.POST /v1/process/async
: Submit data, return immediately; fetch result later.GET /v1/process/fetch?id=<request_id>
: Retrieve status/result(s) for a request ID.GET /v1/process/sse?id=<request_id>
: Stream results using Server-Sent Events.GET /v1/process/message?id=<request_id>
: Get message path info.
Interaction Example¶
-
Submit data synchronously:
curl -k -XPOST \ --header 'X-Numaflow-Id: job-456' \ --header 'content-type: application/json' \ --header 'Authorization: Bearer <your-token-if-auth-enabled>' \ --data '{"value":123}' \ --url https://<serving-pipeline-service-address>:8443/v1/process/sync
-
Fetch results later:
curl -k \ --header 'Authorization: Bearer <your-token-if-auth-enabled>' \ --url https://<serving-pipeline-service-address>:8443/v1/process/fetch?id=job-456
-
Stream results using SSE:
curl -k -N \ --header 'X-Numaflow-Id: stream-789' \ --header 'Authorization: Bearer <your-token-if-auth-enabled>' \ --url https://<serving-pipeline-service-address>:8443/v1/process/sse?id=stream-789 # Client receives events as results become available
To query ServingPipeline
objects with kubectl
:
kubectl get servingpipeline