9  Real-Time and Online Serving

Postgres online backend — stream ingest, request-time features, and batch sync

Keywords

snowflake, feature store, ml, machine learning, mlops

PubPr (Public Preview)

The managed Postgres online backend and all capabilities in this chapter are PubPr (snowflake-ml-python >= 1.36). The Hybrid Table backend in Chapter 8 remains GA. See Abbreviations (GA, PubPr, PrivPr).

9.1 Overview

Chapter 8 defines the online feature model and documents the GA Hybrid Table backend — a single encapsulated Snowflake OFT per Feature View. This chapter documents the Postgres online backend, which requires a shared online service (create_online_service()) and uses a different physical serving plane even when the Snowflake catalog still shows CREATE ONLINE FEATURE TABLE objects for batch sync.

Online serving has two backends; Postgres adds three write paths on top of batch sync:

Backend Batch offline → online sync Stream ingest (SFV) Request-time compute (RTFV) Feature Groups
Hybrid Table (Ch 8)
Postgres (this chapter)

All Postgres paths share the same online service, Producer/Consumer RBAC, PAT authentication, and REST query/ingest endpoints. They differ in how features are written:

  1. Batch + Postgres — identical to Chapter 8 batch sync (Stream on offline FV + Task on target_lag); set store_type=OnlineStoreType.POSTGRES
  2. Stream Feature View — events via stream_ingest() / REST; online-first with async offline historization
  3. Real-Time Feature View — no offline table; compute_fn at read (and at training via generate_training_set())

See the canonical matrix in Chapter 8: Online backends and write paths.

Official references: Create and serve online features (Python), Stream ingest.

9.2 Learning Objectives

After completing this chapter, you will be able to:

  • Distinguish Hybrid vs Postgres backends and which write paths each supports
  • Explain the two-tier Postgres model (online service + per-FV objects) vs encapsulated Hybrid OFTs
  • Provision the Postgres-backed online service and configure Producer/Consumer RBAC
  • Enable Postgres online serving on batch Feature Views
  • Register StreamSource schemas and create Stream Feature Views with transformation_fn and backfill
  • Create Real-Time Feature Views with RealtimeConfig, RequestSource, and compute_fn
  • Register FeatureGroup objects that combine multiple Feature Views under one online table
  • Apply shared Python compute-function constraints for transformation_fn and compute_fn
  • Ingest events via the Python SDK and REST API; query features via Python and REST
  • Explain RTFV training/serving parity via generate_training_set() and when to use label vs ASOF-matched timestamps
  • Explain the dual-write architecture, offline historization, and when online is source of truth vs offline

9.3 Prerequisites

pip install "snowflake-ml-python>=1.36"

Your account must be enrolled in the Postgres online store PubPr. You also need a Programmatic Access Token (PAT) for online service authentication:

export SNOWFLAKE_PAT="<your_pat_token>"

This chapter uses the same clickstream sample context as the rest of the guide: database FEATURE_STORE_DEMO, source schema CLICKSTREAM_DATA, Feature Store schema FEATURE_STORE, warehouse FS_DEV_WH.


9.4 RBAC Setup

The Postgres online service sits outside your normal Snowflake role hierarchy for warehouse access. Snowflake provisions two application roles on the online service:

  • Producer — services and users that call stream_ingest() or the REST Ingest API
  • Consumer — inference services and notebooks that call read_feature_view(..., store_type="online"), read_feature_group(), or the REST Query API

Splitting ingest from read follows least-privilege: your event pipeline should not need query access, and your model server should not need ingest access. Pass these role names to create_online_service(producer_role, consumer_role); grant them to the Snowflake roles your applications use.

CREATE ROLE IF NOT EXISTS FS_PRODUCER_ROLE;
CREATE ROLE IF NOT EXISTS FS_CONSUMER_ROLE;
GRANT ROLE FS_PRODUCER_ROLE TO ROLE SYSADMIN;
GRANT ROLE FS_CONSUMER_ROLE TO ROLE SYSADMIN;

Grant the producer role to services or users that push events. Grant the consumer role to inference services that read features.


9.5 Provisioning the Online Service

Postgres online serving is two-tier: a shared online service plus per-object online tables. This is fundamentally different from Hybrid OFTs, where a single register_feature_view(..., online_config=...) call creates one self-contained Snowflake-managed OFT.

9.5.1 Postgres vs Hybrid managed object model

Aspect Hybrid Table OFT (Ch 8) Postgres online (this chapter)
Prerequisite Feature Store + warehouse create_online_service() once per Feature Store (Postgres instance, ingest/query endpoints, Producer/Consumer roles)
Per-FV registration register_feature_view with online_config → one OFT; Snowflake creates Hybrid Table + Stream + Task register_feature_view with store_type=POSTGRESCREATE ONLINE FEATURE TABLE ... FROM SPECIFICATION; Snowflake still orchestrates batch sync, but serving values live in managed Postgres, not a Hybrid Table
Physical storage Hybrid Table in your Snowflake account Managed Postgres behind the online service
Reads Warehouse SQL / SDK against Hybrid Table Online service — PAT auth, query endpoint (SDK or REST); not a warehouse point lookup
Stream FV / RTFV / Feature Group Not supported Additional object types materialized into the same Postgres service
Operational surface One Snowflake OFT object to manage Online service lifecycle plus each registered online Feature View / Feature Group

Batch + Postgres reuses the same offline→online concept as Hybrid (Stream on offline FV, Task on target_lag), but the serving plane is externalized to Postgres and the online service must exist first. Stream and Real-Time Feature Views add new write paths (stream_ingest, compute_fn) that never touch Hybrid Tables at all.

Plan for several minutes on first create_online_service(). Individual Feature View registration is faster once the service is RUNNING, but every Postgres-backed object depends on that shared infrastructure.

from snowflake.ml.feature_store import FeatureStore, Entity, CreationMode

fs = FeatureStore(
    session=session,
    database="FEATURE_STORE_DEMO",
    name="FEATURE_STORE",
    default_warehouse="FS_DEV_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

create_result = fs.create_online_service("FS_PRODUCER_ROLE", "FS_CONSUMER_ROLE")
print(create_result)

Provisioning takes several minutes on first creation. Poll until the status reaches RUNNING:

import time

status = fs.get_online_service_status()
while status.status != "RUNNING":
    time.sleep(30)
    status = fs.get_online_service_status()
    print(f"Status: {status.status}")

print(f"Endpoints: {status.endpoints}")

Once running, retrieve the REST endpoint URLs for later use:

from snowflake.ml.feature_store import online_service

query_url = online_service.endpoint_url(status, "query")
ingest_url = online_service.endpoint_url(status, "ingest")

9.6 Batch Feature Views with Postgres Online Store

This is the same batch → online write path as Chapter 8: offline Dynamic Table or View, Snowflake Stream on the offline object, Task driven by target_lag, latest values per entity key. The only registration change is the online backend:

online_config=OnlineConfig(
    enable=True,
    target_lag="10s",
    store_type=OnlineStoreType.POSTGRES,  # default is HYBRID_TABLE (Ch 8)
)

Batch online concepts are covered in Chapter 8. target_lag vs refresh_freq, latency vs freshness, multi-FV read patterns, warm-up, benchmarking, and training-serving skew apply unchanged to batch + Postgres and are not repeated here.

What differs with Postgres:

Topic Hybrid (Ch 8) Postgres (here)
Provisioning Per-FV register_feature_view creates one encapsulated OFT (Hybrid Table + Stream + Task) create_online_service() first; then per-FV registration materializes into shared Postgres
Snowflake catalog object CREATE ONLINE FEATURE TABLE ... FROM <offline_table> CREATE ONLINE FEATURE TABLE ... FROM SPECIFICATION (Postgres-backed)
Read path Warehouse read_feature_view → Hybrid Table Online service (PAT + query endpoint) → Postgres
Typical read latency 10–100 ms warm ~10 ms p50 (preview targets)
Tiled Aggregations API FVs Needs online_fv_from_tiled() workaround Query-time tile reassembly at read
Stream FV / RTFV / Feature Group upstream Not supported Required for those object types

Batch + Postgres is the usual upstream for Real-Time Feature Views (registered with Postgres online_config before RTFV registration).

from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType

user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)

feature_df = session.sql("""
    SELECT
        USER_ID,
        SUM(TOTAL_AMT)   AS ORDER_TOTAL_AMT_SUM,
        COUNT(ORDER_ID)   AS ORDER_CNT,
        AVG(TOTAL_AMT)    AS ORDER_TOTAL_AMT_AVG,
        MAX(ORDER_TS)     AS LAST_ORDER_TS
    FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS
    GROUP BY USER_ID
""")

batch_fv = FeatureView(
    name="USER_ORDER_FEATURES_PG",
    entities=[user_entity],
    feature_df=feature_df,
    timestamp_col="LAST_ORDER_TS",
    refresh_freq="1 hour",
    online_config=OnlineConfig(
        enable=True,
        target_lag="10s",
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="User order features with Postgres online serving",
)

registered_fv = fs.register_feature_view(batch_fv, version="V01")
import snowflake.snowpark.functions as F
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType

user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)

orders = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS")
feature_df = orders.group_by("USER_ID").agg(
    F.sum("TOTAL_AMT").alias("ORDER_TOTAL_AMT_SUM"),
    F.count("ORDER_ID").alias("ORDER_CNT"),
    F.avg("TOTAL_AMT").alias("ORDER_TOTAL_AMT_AVG"),
    F.max("ORDER_TS").alias("LAST_ORDER_TS"),
)

batch_fv = FeatureView(
    name="USER_ORDER_FEATURES_PG",
    entities=[user_entity],
    feature_df=feature_df,
    timestamp_col="LAST_ORDER_TS",
    refresh_freq="1 hour",
    online_config=OnlineConfig(
        enable=True,
        target_lag="10s",
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="User order features with Postgres online serving",
)

registered_fv = fs.register_feature_view(batch_fv, version="V01")

9.7 Stream Feature Views

Stream Feature Views use a write-first model: events arrive via stream_ingest() or REST, optionally pass through a transformation_fn, and land in Postgres. Offline Snowflake tables are populated asynchronously for training and monitoring.

Components:

  1. A StreamSource that defines the event schema
  2. A StreamConfig linking the source to a transformation function and optional backfill data
  3. A FeatureView registered with stream_config instead of feature_df

9.7.1 Register a Stream Source

A StreamSource is the contract between your event producers and the Feature Store. Register it once per event type; multiple Stream Feature Views can consume the same source.

Column names and Snowpark types must exactly match the payloads you send via stream_ingest() or REST. Mismatches fail at ingest time (use dry_run: true during integration testing to catch schema errors early). The source schema is not inferred from sample data — you declare it explicitly, similar to an API OpenAPI schema.

Design tips:

  • Include USER_ID (or your entity join keys) on every event so features can be keyed correctly in Postgres
  • Use TimestampType(TimestampTimeZone.NTZ) for event times unless you have a strong reason for TZ-aware types
  • Keep the schema stable; adding columns requires registering a new stream source or versioning your ingestion contract
from snowflake.ml.feature_store import StreamSource
from snowflake.snowpark.types import (
    StructType, StructField, StringType, DoubleType,
    TimestampType, TimestampTimeZone,
)

clickstream_events = StreamSource(
    name="CLICKSTREAM_EVENTS",
    schema=StructType([
        StructField("USER_ID", StringType()),
        StructField("EVENT_TS", TimestampType(TimestampTimeZone.NTZ)),
        StructField("EVENT_TYPE", StringType()),
        StructField("PRODUCT_ID", StringType()),
        StructField("AMOUNT", DoubleType()),
    ]),
    desc="Real-time clickstream events for feature computation",
)

fs.register_stream_source(clickstream_events)

Manage stream sources with fs.list_stream_sources() and fs.delete_stream_source(name).

9.7.2 Define the Transformation Function

The transformation_fn runs on the hot path — every ingested micro-batch passes through it before landing in Postgres. It is a plain pandas function: input DataFrame in, output DataFrame out, same row count (unless you explicitly filter rows, which is rarely desirable on the ingest path).

Use it for per-event derivations: flags, type casts, simple arithmetic, string parsing. Do not use it for cross-event aggregation (use Feature + CONTINUOUS instead) or joins to other tables. The function is validated at registration; keep it deterministic and side-effect free.

import pandas as pd

def process_clickstream(df: pd.DataFrame) -> pd.DataFrame:
    """Per-event feature derivation on the hot path."""
    df["IS_PURCHASE"] = (df["EVENT_TYPE"] == "purchase").astype(int)
    df["IS_HIGH_VALUE"] = (df["AMOUNT"] > 100.0).astype(int)
    return df

See Python compute functions for the full constraint list shared with Real-Time Feature Views.

9.7.3 Create a Stream Feature View

Wire the stream source, optional transformation, and optional backfill into a FeatureView registered with stream_config. Registration creates the Postgres online table and starts the async offline historization pipeline.

backfill_df is strongly recommended for production cutover: it pre-populates the online store from historical Snowflake data so features are queryable immediately after registration, rather than waiting for live events to accumulate.

from snowflake.ml.feature_store import StreamConfig, FeatureView, OnlineConfig, OnlineStoreType

backfill_df = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.EVENTS")

stream_cfg = StreamConfig(
    stream_source=clickstream_events,
    transformation_fn=process_clickstream,
    backfill_df=backfill_df,
)

stream_fv = FeatureView(
    name="USER_REALTIME_EVENTS",
    entities=[user_entity],
    stream_config=stream_cfg,
    timestamp_col="EVENT_TS",
    online_config=OnlineConfig(
        enable=True,
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="Real-time clickstream features with per-event transforms",
)

registered_stream_fv = fs.register_feature_view(stream_fv, version="V01")

Key differences from a batch Feature View:

  • stream_config replaces feature_df — you cannot pass both
  • No refresh_freq for non-tiled stream FVs — the online store updates as events arrive
  • backfill_df pre-populates the online store with historical data at registration time
  • stream_ingest() writes to the online path only; there is no direct Snowflake write from the SDK

9.8 Real-Time Feature Views

Real-Time Feature Views (RTFVs) compute features at inference time. There is no offline Dynamic Table — the compute_fn runs when a consumer calls read_feature_view(..., store_type="online") or the REST Query API.

Use RTFVs when you need to combine:

  • Request context (amount, session ID, device type) from the live inference request, via RequestSource
  • Pre-materialized online features from one or more upstream Postgres-backed Feature Views

9.8.1 RequestSource and RealtimeConfig

An RTFV is defined by RealtimeConfig, which bundles:

  1. compute_fn — the shared training and serving function (see RTFV training parity)
  2. sources — optional RequestSource at position 0, then one or more registered Postgres-backed Feature Views or slices
  3. output_schema — Snowpark schema for columns compute_fn returns

RequestSource declares which request-time fields inference callers must supply (and which columns the training spine must carry). These are not stored in Postgres — they arrive with each read request.

import pandas as pd
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType
from snowflake.ml.feature_store.realtime_config import RealtimeConfig
from snowflake.ml.feature_store.request_source import RequestSource
from snowflake.snowpark.types import (
    StructType, StructField, StringType, DoubleType,
)

request_source = RequestSource(
    schema=StructType([
        StructField("USER_ID", StringType()),
        StructField("amount", DoubleType()),
    ]),
)

def risk_score(req: pd.DataFrame, txn_features: pd.DataFrame) -> pd.DataFrame:
    """Combine live request amount with pre-computed user aggregates."""
    return pd.DataFrame({
        "risk_score": req["amount"] / (txn_features["ORDER_TOTAL_AMT_AVG"] + 1.0),
        "risk_bucket": ["high" if a > 500 else "low" for a in req["amount"]],
    })

rt_config = RealtimeConfig(
    compute_fn=risk_score,
    sources=[request_source, registered_fv],  # RequestSource must be sources[0]
    output_schema=StructType([
        StructField("risk_score", DoubleType()),
        StructField("risk_bucket", StringType()),
    ]),
)

rtfv = FeatureView(
    name="USER_RISK_SCORE",
    entities=[user_entity],
    realtime_config=rt_config,
    online_config=OnlineConfig(
        enable=True,
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="Request-time risk score from live amount + order history",
)

registered_rtfv = fs.register_feature_view(rtfv, version="V01")

Upstream requirements for RTFVs:

  • Each upstream Feature View must already be registered with online_config and OnlineStoreType.POSTGRES
  • Upstream entity join keys must be a subset of the RTFV’s declared entities
  • FeatureGroup is not a valid RTFV source — register individual Feature Views or slices

9.8.2 Inference-time reads

For RTFVs with a RequestSource, each keys entry is a dict of entity join keys plus request-context fields — not a simple list of entity key values. The online service passes request fields to compute_fn and fetches upstream features from Postgres by entity key.

Batch and stream Feature Views without request context still use list-form keys: keys=[["user_001"], ["user_002"]].

online_df = fs.read_feature_view(
    registered_rtfv,
    keys=[
        [{"USER_ID": "user_001", "amount": 149.99}],
        [{"USER_ID": "user_002", "amount": 42.00}],
    ],
    store_type="online",
)
online_df.show()

9.9 RTFV training and serving parity

Unlike Stream Feature Views, RTFVs do not historize outputs to an offline Snowflake table. Training/serving parity is achieved by re-executing the same compute_fn in both paths:

Path Mechanism
Training generate_training_set() / generate_dataset() joins upstream FVs from offline tables (with ASOF when spine_timestamp_col is set), pulls RequestSource columns from the spine, and evaluates compute_fn in the warehouse via map_in_pandas
Serving read_feature_view(..., store_type="online") or REST Query evaluates the same compute_fn against live request context + upstream Postgres rows

The Feature Store persists and validates one compute_fn source at registration; both paths rehydrate from that source. You define the RTFV once — you do not materialize RTFV outputs offline and you do not convert a separate offline batch FV into an RTFV by copying feature values.

9.9.1 Training with generate_training_set()

Include registered RTFVs in the features list alongside batch or stream Feature Views. The spine must carry:

  1. Every entity join key declared on the RTFV
  2. Every column in the RTFV’s RequestSource schema
  3. A spine timestamp (spine_timestamp_col) when upstream sources need point-in-time joins
labels_with_context = session.sql("""
    SELECT
        USER_ID,
        DATE_OF_BIRTH,
        EVENT_TS AS REFERENCE_TS,   -- label / event time (training)
        EVENT_TS,                   -- spine_timestamp_col for ASOF joins
        IS_CONVERTED AS LABEL
    FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.LABELS
""")

training_df = fs.generate_training_set(
    spine_df=labels_with_context,
    features=[registered_fv, registered_rtfv],
    spine_timestamp_col="EVENT_TS",
)

At serving time, pass the same RequestSource field names with inference-appropriate values (for example REFERENCE_TS = request time).

9.9.2 Example: CUSTOMER_AGE from date of birth

Do not train on a batch Feature View that uses CURRENT_DATE or DATEDIFF(..., CURRENT_DATE) — that breaks point-in-time correctness (Chapter 6). Define one RTFV whose compute_fn derives age from explicit inputs:

import pandas as pd
from snowflake.snowpark.types import TimestampType, TimestampTimeZone

def customer_age(request_df: pd.DataFrame, profile_df: pd.DataFrame) -> pd.DataFrame:
    ref = pd.to_datetime(request_df["REFERENCE_TS"])
    dob = pd.to_datetime(request_df["DATE_OF_BIRTH"])
    age_years = (ref - dob).dt.days / 365.25
    return pd.DataFrame({"CUSTOMER_AGE": age_years})

request_source = RequestSource(
    schema=StructType([
        StructField("USER_ID", StringType()),
        StructField("DATE_OF_BIRTH", StringType()),
        StructField("REFERENCE_TS", TimestampType(TimestampTimeZone.NTZ)),
    ]),
)

age_rt_config = RealtimeConfig(
    compute_fn=customer_age,
    sources=[request_source, registered_fv],
    output_schema=StructType([StructField("CUSTOMER_AGE", DoubleType())]),
)
  • Training: spine sets REFERENCE_TS to the label/event timestamp (EVENT_TS) so age is correct as of each training example.
  • Serving: pass REFERENCE_TS as the inference request time (or the business event time you score on).

9.9.3 Label timestamp vs ASOF-matched feature timestamp

Tiled Feature Views and ASOF lag introduce a design nuance that applies beyond RTFVs.

spine_timestamp_col drives two related but distinct roles:

  1. ASOF cutoff — for each upstream batch/stream Feature View, the join selects the latest feature row with timestamp_col <= spine_timestamp. For tiled Feature Views, tile reassembly also uses the spine timestamp as the window end boundary (Chapter 7).
  2. Request context for RTFV compute_fn — only columns on the spine or RequestSource are visible to compute_fn. The platform does not automatically inject the ASOF-matched upstream timestamp into compute_fn.

When the label timestamp is the right reference (for example CUSTOMER_AGE as of conversion time): put it on RequestSource as REFERENCE_TS and use it explicitly in compute_fn. This matches what the model should have “known” at the label event.

When the ASOF-matched feature timestamp is required (for example “days since last order at prediction time” where the meaningful anchor is when the order feature was last refreshed, not the raw label clock): use include_feature_view_timestamp_col=True in generate_training_set() / generate_dataset() for batch/stream features (Chapter 6). That returns each Feature View’s matched timestamp_col alongside the spine time so callers can derive gaps like DATEDIFF(spine_ts, fv_ts).

For RTFV upstream inputs, if compute_fn must see an upstream’s ASOF-matched timestamp, include that column in the upstream Feature View slice passed to RealtimeConfig.sources — the offline join in the training path selects it with the same ASOF semantics as other upstream features.

Question Use label / spine timestamp Use ASOF-matched FV timestamp
Age as of conversion event REFERENCE_TS = EVENT_TS on RequestSource
Days since last order at label time EVENT_TS for window end include_feature_view_timestamp_col=True or upstream slice with LAST_ORDER_TS
Tiled 7d spend ending at label time spine_timestamp_col drives tile reassembly Optional: expose TILE_START via include_feature_view_timestamp_col for freshness QA

Tiled Feature Views exist precisely because feature values must be recomputed relative to each spine row’s ASOF cutoff — not because the label timestamp alone is always the right input to every derived feature. Choose the reference time based on feature semantics; use include_feature_view_timestamp_col or upstream timestamp columns when the derivation depends on feature freshness lag, not just the label clock.

9.9.4 Monitoring and audit

Recomputing compute_fn at training time gives logical parity with serving. It does not log what was actually returned in production. For drift detection on as-served RTFV values, add inference-time logging (Chapter 8). Stream Feature Views provide async offline historization for that use case natively.

See Chapter 11: Real-Time Feature Views in training datasets for a full generate_training_set() walkthrough.


9.10 Feature Groups

A FeatureGroup bundles multiple Feature Views (or slices) under a single Postgres-backed online table. This simplifies multi-FV online reads without pre-joining offline tables.

from snowflake.ml.feature_store import FeatureGroup

fg = FeatureGroup(
    name="fraud_features",
    features=[
        registered_fv.slice(["ORDER_TOTAL_AMT_SUM", "ORDER_CNT"]),
        registered_stream_fv.with_name("events"),
    ],
    desc="Combined fraud detection features",
    auto_prefix=True,
)

fs.register_feature_group(fg, version="V01")

Query a Feature Group via the REST Query API with object_type: "feature_group". Feature Groups are a serving convenience — offline training still uses individual Feature Views.

9.10.1 Entity keys and grains

Feature Groups do not require every source Feature View to share the same single entity or an identical join-key set. The online table’s primary key is the ordered union of all source Feature Views’ join keys (first-seen order). Sources at coarser grains broadcast over the wider compound key.

Source A Source B Feature Group PK Serving behaviour
USER_ID USER_ID USER_ID Standard user-level bundle
USER_ID USER_ID, ORDER_ID USER_ID, ORDER_ID User features broadcast per order row
USER_ID, ORDER_ID ORDER_ID, LINE_ID USER_ID, ORDER_ID, LINE_ID Finer-grain sources drive the key

Requirements:

  • Every source must be registered with Postgres online serving (OnlineStoreType.POSTGRES)
  • Join-key datatypes must agree across sources that share a key name
  • RTFV sources included in a Feature Group must satisfy the same RequestSource / entity contracts as standalone RTFVs
  • FeatureGroup is a serving object — it is not a valid RTFV upstream source; register individual Feature Views or slices in RealtimeConfig.sources

9.10.2 Reading a Feature Group

Use fs.read_feature_group() (or REST Query with object_type: "feature_group"). Each row in keys must supply every join key in the group’s primary key, in the same order as the ordered union of source join keys. Omitting any key raises a validation error before the query is sent.

For example, if the group PK is (USER_ID, ORDER_ID) because one source is user-grain and another is order-grain, a read that supplies only USER_ID fails in the Python SDK:

# Feature Group PK = ['USER_ID', 'ORDER_ID'] after registration

# ❌ Fails — only one value for a two-column PK
fs.read_feature_group(
    fg, version="V01",
    keys=[["user_001"]],
    store_type="online",
)
# ValueError: Each keys row must have 2 values (join keys ['USER_ID', 'ORDER_ID']), got 1

# ✅ Correct — full compound key per row
online_fg_df = fs.read_feature_group(
    fg, version="V01",
    keys=[
        ["user_001", "order_42"],
        ["user_002", "order_99"],
    ],
    store_type="online",
)

User-level features in the group are broadcast to each order-level key row, but each lookup row must still include ORDER_ID because it is part of the materialized online table’s primary key.

If the Feature Group includes Real-Time Feature View sources with a RequestSource, also pass request_context as a pandas DataFrame with one row per keys entry and columns matching the union of RequestSource fields.


9.11 Python Compute Functions

transformation_fn (Stream FVs) and compute_fn (RTFVs) share the same validation policy in _compute_fn_validation.py.

Rule Detail
Function shape Named top-level def only — no lambdas, nested defs, or closures
Allowed imports numpy, pandas, re, copy, dataclasses only
Operations Row-level pandas transforms on micro-batches (stream) or per-request DataFrames (RTFV)
Not supported Arbitrary SQL joins, sklearn, global state, UDTFs, nested functions
RTFV arity One positional DataFrame argument per entry in RealtimeConfig.sources, in order

Stream FV: think of transformation_fn as a map over ingested events. For rolling aggregates across events, use Feature + FeatureAggregationMethod.CONTINUOUS (below).

RTFV: compute_fn receives the request DataFrame first (if RequestSource is present), then one DataFrame per upstream online Feature View, entity-aligned on join keys.


9.12 Streaming with Time-Windowed Aggregation

Per-event transformation_fn derivations are not enough for rolling metrics (“total spend in last 48 hours”). Combine stream ingestion with the Feature class from Chapter 7 to define windowed aggregates on the stream.

Two aggregation modes matter:

Mode Where aggregates update Freshness
Default (no CONTINUOUS) Offline tile DT on refresh_freq; online reads reassemble from offline path Minutes (refresh schedule)
FeatureAggregationMethod.CONTINUOUS Postgres online store at ingest time Seconds

Use CONTINUOUS when model quality depends on up-to-the-second windows (fraud, session scoring). Use batch tiling when minute-level staleness is acceptable and you want lower ingest-side compute.

This example adds refresh_freq and feature_granularity because tiled stream FVs also maintain an offline Dynamic Table for PIT-correct training — the online path updates continuously while the cold path historizes on schedule.

from snowflake.ml.feature_store import Feature, FeatureView, StreamConfig, OnlineConfig, OnlineStoreType
from snowflake.ml.feature_store.spec.enums import FeatureAggregationMethod

features = [
    Feature.sum("AMOUNT", "48h").alias("TOTAL_SPEND_48H"),
    Feature.count("AMOUNT", "48h").alias("TXN_COUNT_48H"),
    Feature.avg("AMOUNT", "24h").alias("AVG_SPEND_24H"),
]

stream_agg_fv = FeatureView(
    name="USER_REALTIME_AGG_FEATURES",
    entities=[user_entity],
    stream_config=stream_cfg,
    timestamp_col="EVENT_TS",
    refresh_freq="1 minute",
    feature_granularity="1 minute",
    features=features,
    online_config=OnlineConfig(
        enable=True,
        store_type=OnlineStoreType.POSTGRES,
    ),
    feature_aggregation_method=FeatureAggregationMethod.CONTINUOUS,
    desc="Real-time user transaction aggregations with continuous processing",
)

registered_agg_fv = fs.register_feature_view(stream_agg_fv, version="V01")
Batch vs Continuous Aggregation

Without CONTINUOUS, aggregation tiles refresh on refresh_freq from the offline historization path. With CONTINUOUS, the online service updates running aggregates as events arrive — freshness in seconds rather than minutes.


9.13 Ingesting Events

Ingestion is the write path for Stream Feature Views. Events enter through fs.stream_ingest() (Python) or POST /ingest (any language). The ingest service:

  1. Validates each record against the registered StreamSource schema
  2. Fans the batch out to every Stream Feature View that consumes that source
  3. Runs each FV’s transformation_fn (if defined) on the micro-batch
  4. Updates Postgres online rows and, for CONTINUOUS FVs, running window aggregates
  5. Asynchronously historizes to Snowflake (cold path — not blocking the ingest response)

Important: stream_ingest() does not write to Snowflake offline tables directly. Offline training data appears after async historization completes (typically seconds later). For integration testing, start with dry_run: true on the REST API to validate payloads without persisting.

Producer role / PAT: ingest callers need the Producer role (or equivalent PAT scope) and network access to the ingest endpoint.

9.13.1 Python SDK

The Python SDK is the simplest path when your event pipeline already runs in Snowflake Notebooks, Snowpark, or a Python service. Pass a list of dicts — one dict per event — with keys matching the StreamSource schema exactly.

The return value is the count of accepted records. Compare against len(new_events) to detect partial rejection; use REST include_diagnostics: true for per-record failure reasons.

new_events = [
    {
        "USER_ID": "user_001",
        "EVENT_TS": "2026-04-10 10:00:00",
        "EVENT_TYPE": "purchase",
        "PRODUCT_ID": "SKU_42",
        "AMOUNT": 149.99,
    },
]

accepted = fs.stream_ingest(clickstream_events, new_events)
print(f"Ingested {accepted} / {len(new_events)} records")

Usage considerations:

  • Batch events in reasonable micro-batch sizes (hundreds to low thousands per call) to balance latency and overhead
  • Ensure EVENT_TS values are monotonic per entity when downstream ordering matters
  • Idempotency is not guaranteed at the ingest API — design producers to tolerate at-least-once delivery if your pipeline may retry

9.13.2 REST Ingest API

Use the REST Ingest API when producers are not Python-based: Flink jobs, Kafka consumers, mobile clients behind an API gateway, or partner webhooks. Retrieve ingest_url from get_online_service_status() after provisioning.

All requests require PAT authentication via the Authorization: Snowflake Token="..." header. The records map keys must be registered StreamSource names; each value is an array of event objects.

Endpoint: POST <ingest_url>/ingest

curl -X POST "${INGEST_URL}/ingest" \
  -H "Authorization: Snowflake Token=\"${SNOWFLAKE_PAT}\"" \
  -H "Content-Type: application/json" \
  -d '{
    "dry_run": false,
    "include_diagnostics": true,
    "records": {
      "CLICKSTREAM_EVENTS": [
        {
          "USER_ID": "user_001",
          "EVENT_TS": "2026-04-10T10:00:00Z",
          "EVENT_TYPE": "purchase",
          "PRODUCT_ID": "SKU_42",
          "AMOUNT": 149.99
        }
      ]
    }
  }'
Field Required Description
records Yes Map of stream source name to event records matching the registered StreamSource schema
dry_run No Validate without persisting (default: false)
include_diagnostics No Per-record diagnostics in response (default: false)
Schema validation with dry_run

Set "dry_run": true when wiring a new producer. The service validates types and required columns without writing to Postgres — catch AMOUNT sent as a string before production traffic arrives.


9.14 Querying Online Features

Postgres online reads use the online service (PAT + query endpoint), not warehouse SQL against a Hybrid Table. The logical read API is the same as Chapter 8read_feature_view(..., store_type="online") — but authentication, networking, and some key shapes differ.

Shared with Chapter 8: batch keys as keys=[["user_001"], ...]; batch many keys per call; warm-up before benchmarking; latency budgeting. See Chapter 8 for warehouse-oriented Hybrid read tuning — it does not apply directly to Postgres PAT reads.

Postgres-specific: PAT required; Consumer role; network policy on the online service; RTFV dict keys; Feature Group compound keys; REST Query API for non-Python inference services.

Object types:

Object Python API REST object_type
Feature View read_feature_view(..., store_type="online") "feature_view"
Feature Group read_feature_group(..., store_type="online") "feature_group"
Real-Time FV read_feature_view with dict keys + request fields "feature_view" (with request context in payload)

9.14.1 Python SDK

Pass store_type="online" (or "online" string) to route reads to Postgres instead of the offline Snowflake table. The keys argument is a list of lookup rows:

  • Batch / stream FV: each row is a list of join-key values, e.g. [["user_001"], ["user_002"]]
  • RTFV with RequestSource: each row is a dict of join keys and request-context fields
  • Feature Group: each row must include the full compound primary key (see Reading a Feature Group)

Request only the feature columns you need when the API supports column projection — smaller payloads reduce serialization cost on the inference path.

online_df = fs.read_feature_view(
    registered_stream_fv,
    keys=[["user_001"], ["user_002"]],
    store_type="online",
)
online_df.show()

Latency practices (see also Chapter 8):

  • Batch many keys in one call — avoid per-entity loops
  • Warm up the online service before benchmarking (first reads after idle periods are slower)
  • Size inference workers for concurrent Consumer connections when QPS is high

9.14.2 REST Query API

The REST Query API mirrors the Python read contract for polyglot inference stacks. Use query_url from get_online_service_status(). Each request_rows entry contains an entity map of join-key names to values; optional features limits the response columns.

Endpoint: POST <query_url>/query

{
  "name": "USER_REALTIME_AGG_FEATURES",
  "version": "V01",
  "object_type": "feature_view",
  "request_rows": [
    { "entity": { "USER_ID": "user_001" } }
  ],
  "features": ["TOTAL_SPEND_48H", "TXN_COUNT_48H"]
}
Field Required Description
name Yes Feature View or Feature Group name
version Yes Version string (e.g. "V01")
object_type Yes "feature_view" or "feature_group"
request_rows Yes Array of {"entity": {join_key: value, ...}} objects — all PK columns required for Feature Groups
features No Subset of columns to return; omit for all
include_diagnostics No Per-row diagnostics when true

For Feature Groups with compound keys, every entity map must include the full primary key — the same rule as the Python SDK. For RTFV reads over REST, include RequestSource fields in the entity map (or the request-context payload shape your SDK version documents).

Note

Online reads require a valid SNOWFLAKE_PAT and your IP must be allowed by the online service network policy. Offline reads (store_type="offline") use Snowflake warehouse SQL and are unrelated to this Postgres path.


9.15 Architecture and Offline Historization

Stream Feature Views implement a dual-write kappa architecture: the online Postgres store is the serving source of truth; Snowflake offline tables are the durable mirror for training, monitoring, and batch analytics. Batch + Postgres and RTFVs follow different patterns (offline-first sync and request-time compute, respectively).

Understanding which path owns truth matters for incident response: if offline historization lags, Stream FV online serving continues; training data freshness may trail by the async interval. RTFVs have no offline output table — training recomputes compute_fn via generate_training_set().

flowchart LR
    Client["Client / App"] -->|"stream_ingest() or REST"| Ingest["Ingest Service"]
    Ingest -->|"transformation_fn"| PG["Postgres Online Store"]
    Ingest -->|"async historization"| SF["Snowflake Offline Table"]
    PG -->|"read_feature_view(store_type='online')"| Serve["Inference Service"]
    SF -->|"generate_training_set()"| Train["Training Pipeline"]
    SF -->|"Drift / monitoring"| Monitor["Model Monitoring"]

Stream Feature View: dual-write kappa architecture

Hot path (stream write):

  1. Client sends events via fs.stream_ingest() or POST /ingest
  2. Optional transformation_fn applies per-event derivations
  3. Features land in Postgres; CONTINUOUS aggregates update in place

Cold path (offline historization):

  1. Transformed events are asynchronously written to a Snowflake table (naming pattern <FV>$<VERSION>$UDF_TRANSFORMED) via Snowpipe Streaming
  2. Non-tiled stream FVs: offline object is a VIEW over that table
  3. Tiled stream FVs: an additional Dynamic Table materializes tiles on refresh_freq for PIT-correct training
  4. RTFVs have no offline historization path — they exist only in Postgres at serving time

Source of truth by write path (Postgres backend):

Write path Source of truth Offline role
Batch + Postgres Offline DT Synced mirror in Postgres (same as Hybrid batch model)
Stream FV Postgres online store Async durable mirror for training
RTFV Postgres (computed on read) Training re-runs compute_fn; no materialized history — log serving outputs for audit

Hybrid batch sync source-of-truth semantics are in Chapter 8.


9.16 Performance Characteristics

Latency and freshness depend on online service sizing, event throughput, transformation_fn / compute_fn complexity, and network distance to the online endpoints. PubPr targets below are planning guides — benchmark in your account before committing SLAs.

Benchmarking tips: warm up with steady-state traffic before measuring p50/p95; batch keys in reads; test from the same network zone your inference service will use in production.

Metric Target / observed
End-to-end freshness (ingest → queryable) < 2 seconds
Online read latency (p50) ~10 ms
Stream FV status STATIC (no DT refresh cycle)
Backfill from backfill_df Applied at registration time

9.17 Choosing a backend and write path

Start with Chapter 8’s matrix, then narrow by scenario:

Scenario Backend Write path
GA production; no preview enrollment Hybrid Batch sync
Lower read latency; same batch pipelines Postgres Batch sync
Tiled Aggregations API FVs for online Postgres Batch sync (tile reassembly at read)
Feature freshness in seconds; event-driven Postgres Stream FV
Live request fields + online features at inference Postgres RTFV
Multi-FV online read without pre-joining offline Postgres Feature Group
Complex multi-table SQL transformations Either backend Batch sync only (full SQL / Snowpark offline)
No Postgres preview Hybrid Batch sync; external store only after ruling out Postgres (Ch 8 — self-managed ops, no Snowflake roadmap upside)
Tip

Many production systems combine paths: batch Feature Views for slow-changing demographics and 7d/30d aggregates; Stream FVs for recent transactions; RTFVs for request-time scoring that blends both.


9.18 Known Limitations

  • PubPr enrollment may be required depending on account and region
  • Network policy on the online service controls external read/ingest access
  • Compute function scope — row-level pandas only; no arbitrary joins or sklearn
  • Stream FV status shows STATIC rather than ACTIVE (no DT refresh cycle)
  • RTFVs have no offline historization table — training uses generate_training_set() to re-run compute_fn; use serving-time logging for as-served audit trails
  • REST ingest is the HTTP path for streaming; the Python SDK does not write directly to Snowflake offline tables

9.19 Deployment and CI/CD

Feature View definitions for Postgres online, stream, and real-time paths should follow the same version-controlled deployment practices as batch Feature Views. This chapter focuses on serving architecture and APIs — not pipeline automation.

For declarative deployment, reconciliation scripts, repository layout, and emerging SnowCLI plan/apply workflows, see Chapter 13: CI/CD for Feature Store. Chapter 12: Operations covers monitoring and production runbooks.


9.20 Summary

Concept Description
Postgres backend Managed online service; required for all paths in this chapter
Batch + Postgres Same batch sync write path as Ch 8 Hybrid; different store_type and read stack
Stream FV stream_config + stream_ingest(); online-first with async offline mirror
Real-Time FV RealtimeConfig + compute_fn; training re-runs compute_fn via generate_training_set()
FeatureGroup Multi-FV bundle under one online table
Compute functions Shared pandas constraints for transformation_fn and compute_fn
CONTINUOUS aggregation Running window metrics updated at ingest time

9.21 Next Steps

Continue to Chapter 10: Preprocessing for model-dependent transformations, or review Chapter 13: Advanced Patterns for CI/CD and multi-environment deployment.