flowchart LR
Client["Client / App"] -->|"stream_ingest() or REST"| Ingest["Ingest Service"]
Ingest -->|"transformation_fn"| PG["Postgres Online Store"]
Ingest -->|"async historization"| SF["Snowflake Offline Table"]
PG -->|"read_feature_view(store_type='online')"| Serve["Inference Service"]
SF -->|"generate_training_set()"| Train["Training Pipeline"]
SF -->|"Drift / monitoring"| Monitor["Model Monitoring"]
9 Real-Time and Online Serving
Postgres online backend — stream ingest, request-time features, and batch sync
snowflake, feature store, ml, machine learning, mlops
The managed Postgres online backend and all capabilities in this chapter are PubPr (snowflake-ml-python >= 1.36). The Hybrid Table backend in Chapter 8 remains GA. See Abbreviations (GA, PubPr, PrivPr).
9.1 Overview
Chapter 8 defines the online feature model and documents the GA Hybrid Table backend — a single encapsulated Snowflake OFT per Feature View. This chapter documents the Postgres online backend, which requires a shared online service (create_online_service()) and uses a different physical serving plane even when the Snowflake catalog still shows CREATE ONLINE FEATURE TABLE objects for batch sync.
Online serving has two backends; Postgres adds three write paths on top of batch sync:
| Backend | Batch offline → online sync | Stream ingest (SFV) | Request-time compute (RTFV) | Feature Groups |
|---|---|---|---|---|
| Hybrid Table (Ch 8) | ✅ | ❌ | ❌ | ❌ |
| Postgres (this chapter) | ✅ | ✅ | ✅ | ✅ |
All Postgres paths share the same online service, Producer/Consumer RBAC, PAT authentication, and REST query/ingest endpoints. They differ in how features are written:
- Batch + Postgres — identical to Chapter 8 batch sync (Stream on offline FV + Task on
target_lag); setstore_type=OnlineStoreType.POSTGRES - Stream Feature View — events via
stream_ingest()/ REST; online-first with async offline historization - Real-Time Feature View — no offline table;
compute_fnat read (and at training viagenerate_training_set())
See the canonical matrix in Chapter 8: Online backends and write paths.
Official references: Create and serve online features (Python), Stream ingest.
9.2 Learning Objectives
After completing this chapter, you will be able to:
- Distinguish Hybrid vs Postgres backends and which write paths each supports
- Explain the two-tier Postgres model (online service + per-FV objects) vs encapsulated Hybrid OFTs
- Provision the Postgres-backed online service and configure Producer/Consumer RBAC
- Enable Postgres online serving on batch Feature Views
- Register
StreamSourceschemas and create Stream Feature Views withtransformation_fnand backfill - Create Real-Time Feature Views with
RealtimeConfig,RequestSource, andcompute_fn - Register
FeatureGroupobjects that combine multiple Feature Views under one online table - Apply shared Python compute-function constraints for
transformation_fnandcompute_fn - Ingest events via the Python SDK and REST API; query features via Python and REST
- Explain RTFV training/serving parity via
generate_training_set()and when to use label vs ASOF-matched timestamps - Explain the dual-write architecture, offline historization, and when online is source of truth vs offline
9.3 Prerequisites
Your account must be enrolled in the Postgres online store PubPr. You also need a Programmatic Access Token (PAT) for online service authentication:
This chapter uses the same clickstream sample context as the rest of the guide: database FEATURE_STORE_DEMO, source schema CLICKSTREAM_DATA, Feature Store schema FEATURE_STORE, warehouse FS_DEV_WH.
9.4 RBAC Setup
The Postgres online service sits outside your normal Snowflake role hierarchy for warehouse access. Snowflake provisions two application roles on the online service:
- Producer — services and users that call
stream_ingest()or the REST Ingest API - Consumer — inference services and notebooks that call
read_feature_view(..., store_type="online"),read_feature_group(), or the REST Query API
Splitting ingest from read follows least-privilege: your event pipeline should not need query access, and your model server should not need ingest access. Pass these role names to create_online_service(producer_role, consumer_role); grant them to the Snowflake roles your applications use.
CREATE ROLE IF NOT EXISTS FS_PRODUCER_ROLE;
CREATE ROLE IF NOT EXISTS FS_CONSUMER_ROLE;
GRANT ROLE FS_PRODUCER_ROLE TO ROLE SYSADMIN;
GRANT ROLE FS_CONSUMER_ROLE TO ROLE SYSADMIN;Grant the producer role to services or users that push events. Grant the consumer role to inference services that read features.
9.5 Provisioning the Online Service
Postgres online serving is two-tier: a shared online service plus per-object online tables. This is fundamentally different from Hybrid OFTs, where a single register_feature_view(..., online_config=...) call creates one self-contained Snowflake-managed OFT.
9.5.1 Postgres vs Hybrid managed object model
| Aspect | Hybrid Table OFT (Ch 8) | Postgres online (this chapter) |
|---|---|---|
| Prerequisite | Feature Store + warehouse | create_online_service() once per Feature Store (Postgres instance, ingest/query endpoints, Producer/Consumer roles) |
| Per-FV registration | register_feature_view with online_config → one OFT; Snowflake creates Hybrid Table + Stream + Task |
register_feature_view with store_type=POSTGRES → CREATE ONLINE FEATURE TABLE ... FROM SPECIFICATION; Snowflake still orchestrates batch sync, but serving values live in managed Postgres, not a Hybrid Table |
| Physical storage | Hybrid Table in your Snowflake account | Managed Postgres behind the online service |
| Reads | Warehouse SQL / SDK against Hybrid Table | Online service — PAT auth, query endpoint (SDK or REST); not a warehouse point lookup |
| Stream FV / RTFV / Feature Group | Not supported | Additional object types materialized into the same Postgres service |
| Operational surface | One Snowflake OFT object to manage | Online service lifecycle plus each registered online Feature View / Feature Group |
Batch + Postgres reuses the same offline→online concept as Hybrid (Stream on offline FV, Task on target_lag), but the serving plane is externalized to Postgres and the online service must exist first. Stream and Real-Time Feature Views add new write paths (stream_ingest, compute_fn) that never touch Hybrid Tables at all.
Plan for several minutes on first create_online_service(). Individual Feature View registration is faster once the service is RUNNING, but every Postgres-backed object depends on that shared infrastructure.
from snowflake.ml.feature_store import FeatureStore, Entity, CreationMode
fs = FeatureStore(
session=session,
database="FEATURE_STORE_DEMO",
name="FEATURE_STORE",
default_warehouse="FS_DEV_WH",
creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)
create_result = fs.create_online_service("FS_PRODUCER_ROLE", "FS_CONSUMER_ROLE")
print(create_result)Provisioning takes several minutes on first creation. Poll until the status reaches RUNNING:
import time
status = fs.get_online_service_status()
while status.status != "RUNNING":
time.sleep(30)
status = fs.get_online_service_status()
print(f"Status: {status.status}")
print(f"Endpoints: {status.endpoints}")Once running, retrieve the REST endpoint URLs for later use:
from snowflake.ml.feature_store import online_service
query_url = online_service.endpoint_url(status, "query")
ingest_url = online_service.endpoint_url(status, "ingest")9.6 Batch Feature Views with Postgres Online Store
This is the same batch → online write path as Chapter 8: offline Dynamic Table or View, Snowflake Stream on the offline object, Task driven by target_lag, latest values per entity key. The only registration change is the online backend:
online_config=OnlineConfig(
enable=True,
target_lag="10s",
store_type=OnlineStoreType.POSTGRES, # default is HYBRID_TABLE (Ch 8)
)Batch online concepts are covered in Chapter 8. target_lag vs refresh_freq, latency vs freshness, multi-FV read patterns, warm-up, benchmarking, and training-serving skew apply unchanged to batch + Postgres and are not repeated here.
What differs with Postgres:
| Topic | Hybrid (Ch 8) | Postgres (here) |
|---|---|---|
| Provisioning | Per-FV register_feature_view creates one encapsulated OFT (Hybrid Table + Stream + Task) |
create_online_service() first; then per-FV registration materializes into shared Postgres |
| Snowflake catalog object | CREATE ONLINE FEATURE TABLE ... FROM <offline_table> |
CREATE ONLINE FEATURE TABLE ... FROM SPECIFICATION (Postgres-backed) |
| Read path | Warehouse read_feature_view → Hybrid Table |
Online service (PAT + query endpoint) → Postgres |
| Typical read latency | 10–100 ms warm | ~10 ms p50 (preview targets) |
| Tiled Aggregations API FVs | Needs online_fv_from_tiled() workaround |
Query-time tile reassembly at read |
| Stream FV / RTFV / Feature Group upstream | Not supported | Required for those object types |
Batch + Postgres is the usual upstream for Real-Time Feature Views (registered with Postgres online_config before RTFV registration).
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType
user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)
feature_df = session.sql("""
SELECT
USER_ID,
SUM(TOTAL_AMT) AS ORDER_TOTAL_AMT_SUM,
COUNT(ORDER_ID) AS ORDER_CNT,
AVG(TOTAL_AMT) AS ORDER_TOTAL_AMT_AVG,
MAX(ORDER_TS) AS LAST_ORDER_TS
FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS
GROUP BY USER_ID
""")
batch_fv = FeatureView(
name="USER_ORDER_FEATURES_PG",
entities=[user_entity],
feature_df=feature_df,
timestamp_col="LAST_ORDER_TS",
refresh_freq="1 hour",
online_config=OnlineConfig(
enable=True,
target_lag="10s",
store_type=OnlineStoreType.POSTGRES,
),
desc="User order features with Postgres online serving",
)
registered_fv = fs.register_feature_view(batch_fv, version="V01")import snowflake.snowpark.functions as F
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType
user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)
orders = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS")
feature_df = orders.group_by("USER_ID").agg(
F.sum("TOTAL_AMT").alias("ORDER_TOTAL_AMT_SUM"),
F.count("ORDER_ID").alias("ORDER_CNT"),
F.avg("TOTAL_AMT").alias("ORDER_TOTAL_AMT_AVG"),
F.max("ORDER_TS").alias("LAST_ORDER_TS"),
)
batch_fv = FeatureView(
name="USER_ORDER_FEATURES_PG",
entities=[user_entity],
feature_df=feature_df,
timestamp_col="LAST_ORDER_TS",
refresh_freq="1 hour",
online_config=OnlineConfig(
enable=True,
target_lag="10s",
store_type=OnlineStoreType.POSTGRES,
),
desc="User order features with Postgres online serving",
)
registered_fv = fs.register_feature_view(batch_fv, version="V01")9.7 Stream Feature Views
Stream Feature Views use a write-first model: events arrive via stream_ingest() or REST, optionally pass through a transformation_fn, and land in Postgres. Offline Snowflake tables are populated asynchronously for training and monitoring.
Components:
- A StreamSource that defines the event schema
- A StreamConfig linking the source to a transformation function and optional backfill data
- A FeatureView registered with
stream_configinstead offeature_df
9.7.1 Register a Stream Source
A StreamSource is the contract between your event producers and the Feature Store. Register it once per event type; multiple Stream Feature Views can consume the same source.
Column names and Snowpark types must exactly match the payloads you send via stream_ingest() or REST. Mismatches fail at ingest time (use dry_run: true during integration testing to catch schema errors early). The source schema is not inferred from sample data — you declare it explicitly, similar to an API OpenAPI schema.
Design tips:
- Include
USER_ID(or your entity join keys) on every event so features can be keyed correctly in Postgres - Use
TimestampType(TimestampTimeZone.NTZ)for event times unless you have a strong reason for TZ-aware types - Keep the schema stable; adding columns requires registering a new stream source or versioning your ingestion contract
from snowflake.ml.feature_store import StreamSource
from snowflake.snowpark.types import (
StructType, StructField, StringType, DoubleType,
TimestampType, TimestampTimeZone,
)
clickstream_events = StreamSource(
name="CLICKSTREAM_EVENTS",
schema=StructType([
StructField("USER_ID", StringType()),
StructField("EVENT_TS", TimestampType(TimestampTimeZone.NTZ)),
StructField("EVENT_TYPE", StringType()),
StructField("PRODUCT_ID", StringType()),
StructField("AMOUNT", DoubleType()),
]),
desc="Real-time clickstream events for feature computation",
)
fs.register_stream_source(clickstream_events)Manage stream sources with fs.list_stream_sources() and fs.delete_stream_source(name).
9.7.2 Define the Transformation Function
The transformation_fn runs on the hot path — every ingested micro-batch passes through it before landing in Postgres. It is a plain pandas function: input DataFrame in, output DataFrame out, same row count (unless you explicitly filter rows, which is rarely desirable on the ingest path).
Use it for per-event derivations: flags, type casts, simple arithmetic, string parsing. Do not use it for cross-event aggregation (use Feature + CONTINUOUS instead) or joins to other tables. The function is validated at registration; keep it deterministic and side-effect free.
import pandas as pd
def process_clickstream(df: pd.DataFrame) -> pd.DataFrame:
"""Per-event feature derivation on the hot path."""
df["IS_PURCHASE"] = (df["EVENT_TYPE"] == "purchase").astype(int)
df["IS_HIGH_VALUE"] = (df["AMOUNT"] > 100.0).astype(int)
return dfSee Python compute functions for the full constraint list shared with Real-Time Feature Views.
9.7.3 Create a Stream Feature View
Wire the stream source, optional transformation, and optional backfill into a FeatureView registered with stream_config. Registration creates the Postgres online table and starts the async offline historization pipeline.
backfill_df is strongly recommended for production cutover: it pre-populates the online store from historical Snowflake data so features are queryable immediately after registration, rather than waiting for live events to accumulate.
from snowflake.ml.feature_store import StreamConfig, FeatureView, OnlineConfig, OnlineStoreType
backfill_df = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.EVENTS")
stream_cfg = StreamConfig(
stream_source=clickstream_events,
transformation_fn=process_clickstream,
backfill_df=backfill_df,
)
stream_fv = FeatureView(
name="USER_REALTIME_EVENTS",
entities=[user_entity],
stream_config=stream_cfg,
timestamp_col="EVENT_TS",
online_config=OnlineConfig(
enable=True,
store_type=OnlineStoreType.POSTGRES,
),
desc="Real-time clickstream features with per-event transforms",
)
registered_stream_fv = fs.register_feature_view(stream_fv, version="V01")Key differences from a batch Feature View:
stream_configreplacesfeature_df— you cannot pass both- No
refresh_freqfor non-tiled stream FVs — the online store updates as events arrive backfill_dfpre-populates the online store with historical data at registration timestream_ingest()writes to the online path only; there is no direct Snowflake write from the SDK
9.8 Real-Time Feature Views
Real-Time Feature Views (RTFVs) compute features at inference time. There is no offline Dynamic Table — the compute_fn runs when a consumer calls read_feature_view(..., store_type="online") or the REST Query API.
Use RTFVs when you need to combine:
- Request context (amount, session ID, device type) from the live inference request, via
RequestSource - Pre-materialized online features from one or more upstream Postgres-backed Feature Views
9.8.1 RequestSource and RealtimeConfig
An RTFV is defined by RealtimeConfig, which bundles:
compute_fn— the shared training and serving function (see RTFV training parity)sources— optionalRequestSourceat position 0, then one or more registered Postgres-backed Feature Views or slicesoutput_schema— Snowpark schema for columnscompute_fnreturns
RequestSource declares which request-time fields inference callers must supply (and which columns the training spine must carry). These are not stored in Postgres — they arrive with each read request.
import pandas as pd
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType
from snowflake.ml.feature_store.realtime_config import RealtimeConfig
from snowflake.ml.feature_store.request_source import RequestSource
from snowflake.snowpark.types import (
StructType, StructField, StringType, DoubleType,
)
request_source = RequestSource(
schema=StructType([
StructField("USER_ID", StringType()),
StructField("amount", DoubleType()),
]),
)
def risk_score(req: pd.DataFrame, txn_features: pd.DataFrame) -> pd.DataFrame:
"""Combine live request amount with pre-computed user aggregates."""
return pd.DataFrame({
"risk_score": req["amount"] / (txn_features["ORDER_TOTAL_AMT_AVG"] + 1.0),
"risk_bucket": ["high" if a > 500 else "low" for a in req["amount"]],
})
rt_config = RealtimeConfig(
compute_fn=risk_score,
sources=[request_source, registered_fv], # RequestSource must be sources[0]
output_schema=StructType([
StructField("risk_score", DoubleType()),
StructField("risk_bucket", StringType()),
]),
)
rtfv = FeatureView(
name="USER_RISK_SCORE",
entities=[user_entity],
realtime_config=rt_config,
online_config=OnlineConfig(
enable=True,
store_type=OnlineStoreType.POSTGRES,
),
desc="Request-time risk score from live amount + order history",
)
registered_rtfv = fs.register_feature_view(rtfv, version="V01")Upstream requirements for RTFVs:
- Each upstream Feature View must already be registered with
online_configandOnlineStoreType.POSTGRES - Upstream entity join keys must be a subset of the RTFV’s declared entities
FeatureGroupis not a valid RTFV source — register individual Feature Views or slices
9.8.2 Inference-time reads
For RTFVs with a RequestSource, each keys entry is a dict of entity join keys plus request-context fields — not a simple list of entity key values. The online service passes request fields to compute_fn and fetches upstream features from Postgres by entity key.
Batch and stream Feature Views without request context still use list-form keys: keys=[["user_001"], ["user_002"]].
online_df = fs.read_feature_view(
registered_rtfv,
keys=[
[{"USER_ID": "user_001", "amount": 149.99}],
[{"USER_ID": "user_002", "amount": 42.00}],
],
store_type="online",
)
online_df.show()9.9 RTFV training and serving parity
Unlike Stream Feature Views, RTFVs do not historize outputs to an offline Snowflake table. Training/serving parity is achieved by re-executing the same compute_fn in both paths:
| Path | Mechanism |
|---|---|
| Training | generate_training_set() / generate_dataset() joins upstream FVs from offline tables (with ASOF when spine_timestamp_col is set), pulls RequestSource columns from the spine, and evaluates compute_fn in the warehouse via map_in_pandas |
| Serving | read_feature_view(..., store_type="online") or REST Query evaluates the same compute_fn against live request context + upstream Postgres rows |
The Feature Store persists and validates one compute_fn source at registration; both paths rehydrate from that source. You define the RTFV once — you do not materialize RTFV outputs offline and you do not convert a separate offline batch FV into an RTFV by copying feature values.
9.9.1 Training with generate_training_set()
Include registered RTFVs in the features list alongside batch or stream Feature Views. The spine must carry:
- Every entity join key declared on the RTFV
- Every column in the RTFV’s RequestSource schema
- A spine timestamp (
spine_timestamp_col) when upstream sources need point-in-time joins
labels_with_context = session.sql("""
SELECT
USER_ID,
DATE_OF_BIRTH,
EVENT_TS AS REFERENCE_TS, -- label / event time (training)
EVENT_TS, -- spine_timestamp_col for ASOF joins
IS_CONVERTED AS LABEL
FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.LABELS
""")
training_df = fs.generate_training_set(
spine_df=labels_with_context,
features=[registered_fv, registered_rtfv],
spine_timestamp_col="EVENT_TS",
)At serving time, pass the same RequestSource field names with inference-appropriate values (for example REFERENCE_TS = request time).
9.9.2 Example: CUSTOMER_AGE from date of birth
Do not train on a batch Feature View that uses CURRENT_DATE or DATEDIFF(..., CURRENT_DATE) — that breaks point-in-time correctness (Chapter 6). Define one RTFV whose compute_fn derives age from explicit inputs:
import pandas as pd
from snowflake.snowpark.types import TimestampType, TimestampTimeZone
def customer_age(request_df: pd.DataFrame, profile_df: pd.DataFrame) -> pd.DataFrame:
ref = pd.to_datetime(request_df["REFERENCE_TS"])
dob = pd.to_datetime(request_df["DATE_OF_BIRTH"])
age_years = (ref - dob).dt.days / 365.25
return pd.DataFrame({"CUSTOMER_AGE": age_years})
request_source = RequestSource(
schema=StructType([
StructField("USER_ID", StringType()),
StructField("DATE_OF_BIRTH", StringType()),
StructField("REFERENCE_TS", TimestampType(TimestampTimeZone.NTZ)),
]),
)
age_rt_config = RealtimeConfig(
compute_fn=customer_age,
sources=[request_source, registered_fv],
output_schema=StructType([StructField("CUSTOMER_AGE", DoubleType())]),
)- Training: spine sets
REFERENCE_TSto the label/event timestamp (EVENT_TS) so age is correct as of each training example. - Serving: pass
REFERENCE_TSas the inference request time (or the business event time you score on).
9.9.3 Label timestamp vs ASOF-matched feature timestamp
Tiled Feature Views and ASOF lag introduce a design nuance that applies beyond RTFVs.
spine_timestamp_col drives two related but distinct roles:
- ASOF cutoff — for each upstream batch/stream Feature View, the join selects the latest feature row with
timestamp_col <= spine_timestamp. For tiled Feature Views, tile reassembly also uses the spine timestamp as the window end boundary (Chapter 7). - Request context for RTFV
compute_fn— only columns on the spine or RequestSource are visible tocompute_fn. The platform does not automatically inject the ASOF-matched upstream timestamp intocompute_fn.
When the label timestamp is the right reference (for example CUSTOMER_AGE as of conversion time): put it on RequestSource as REFERENCE_TS and use it explicitly in compute_fn. This matches what the model should have “known” at the label event.
When the ASOF-matched feature timestamp is required (for example “days since last order at prediction time” where the meaningful anchor is when the order feature was last refreshed, not the raw label clock): use include_feature_view_timestamp_col=True in generate_training_set() / generate_dataset() for batch/stream features (Chapter 6). That returns each Feature View’s matched timestamp_col alongside the spine time so callers can derive gaps like DATEDIFF(spine_ts, fv_ts).
For RTFV upstream inputs, if compute_fn must see an upstream’s ASOF-matched timestamp, include that column in the upstream Feature View slice passed to RealtimeConfig.sources — the offline join in the training path selects it with the same ASOF semantics as other upstream features.
| Question | Use label / spine timestamp | Use ASOF-matched FV timestamp |
|---|---|---|
| Age as of conversion event | REFERENCE_TS = EVENT_TS on RequestSource |
— |
| Days since last order at label time | EVENT_TS for window end |
include_feature_view_timestamp_col=True or upstream slice with LAST_ORDER_TS |
| Tiled 7d spend ending at label time | spine_timestamp_col drives tile reassembly |
Optional: expose TILE_START via include_feature_view_timestamp_col for freshness QA |
Tiled Feature Views exist precisely because feature values must be recomputed relative to each spine row’s ASOF cutoff — not because the label timestamp alone is always the right input to every derived feature. Choose the reference time based on feature semantics; use include_feature_view_timestamp_col or upstream timestamp columns when the derivation depends on feature freshness lag, not just the label clock.
9.9.4 Monitoring and audit
Recomputing compute_fn at training time gives logical parity with serving. It does not log what was actually returned in production. For drift detection on as-served RTFV values, add inference-time logging (Chapter 8). Stream Feature Views provide async offline historization for that use case natively.
See Chapter 11: Real-Time Feature Views in training datasets for a full generate_training_set() walkthrough.
9.10 Feature Groups
A FeatureGroup bundles multiple Feature Views (or slices) under a single Postgres-backed online table. This simplifies multi-FV online reads without pre-joining offline tables.
from snowflake.ml.feature_store import FeatureGroup
fg = FeatureGroup(
name="fraud_features",
features=[
registered_fv.slice(["ORDER_TOTAL_AMT_SUM", "ORDER_CNT"]),
registered_stream_fv.with_name("events"),
],
desc="Combined fraud detection features",
auto_prefix=True,
)
fs.register_feature_group(fg, version="V01")Query a Feature Group via the REST Query API with object_type: "feature_group". Feature Groups are a serving convenience — offline training still uses individual Feature Views.
9.10.1 Entity keys and grains
Feature Groups do not require every source Feature View to share the same single entity or an identical join-key set. The online table’s primary key is the ordered union of all source Feature Views’ join keys (first-seen order). Sources at coarser grains broadcast over the wider compound key.
| Source A | Source B | Feature Group PK | Serving behaviour |
|---|---|---|---|
USER_ID |
USER_ID |
USER_ID |
Standard user-level bundle |
USER_ID |
USER_ID, ORDER_ID |
USER_ID, ORDER_ID |
User features broadcast per order row |
USER_ID, ORDER_ID |
ORDER_ID, LINE_ID |
USER_ID, ORDER_ID, LINE_ID |
Finer-grain sources drive the key |
Requirements:
- Every source must be registered with Postgres online serving (
OnlineStoreType.POSTGRES) - Join-key datatypes must agree across sources that share a key name
- RTFV sources included in a Feature Group must satisfy the same RequestSource / entity contracts as standalone RTFVs
FeatureGroupis a serving object — it is not a valid RTFV upstream source; register individual Feature Views or slices inRealtimeConfig.sources
9.10.2 Reading a Feature Group
Use fs.read_feature_group() (or REST Query with object_type: "feature_group"). Each row in keys must supply every join key in the group’s primary key, in the same order as the ordered union of source join keys. Omitting any key raises a validation error before the query is sent.
For example, if the group PK is (USER_ID, ORDER_ID) because one source is user-grain and another is order-grain, a read that supplies only USER_ID fails in the Python SDK:
# Feature Group PK = ['USER_ID', 'ORDER_ID'] after registration
# ❌ Fails — only one value for a two-column PK
fs.read_feature_group(
fg, version="V01",
keys=[["user_001"]],
store_type="online",
)
# ValueError: Each keys row must have 2 values (join keys ['USER_ID', 'ORDER_ID']), got 1
# ✅ Correct — full compound key per row
online_fg_df = fs.read_feature_group(
fg, version="V01",
keys=[
["user_001", "order_42"],
["user_002", "order_99"],
],
store_type="online",
)User-level features in the group are broadcast to each order-level key row, but each lookup row must still include ORDER_ID because it is part of the materialized online table’s primary key.
If the Feature Group includes Real-Time Feature View sources with a RequestSource, also pass request_context as a pandas DataFrame with one row per keys entry and columns matching the union of RequestSource fields.
9.11 Python Compute Functions
transformation_fn (Stream FVs) and compute_fn (RTFVs) share the same validation policy in _compute_fn_validation.py.
| Rule | Detail |
|---|---|
| Function shape | Named top-level def only — no lambdas, nested defs, or closures |
| Allowed imports | numpy, pandas, re, copy, dataclasses only |
| Operations | Row-level pandas transforms on micro-batches (stream) or per-request DataFrames (RTFV) |
| Not supported | Arbitrary SQL joins, sklearn, global state, UDTFs, nested functions |
| RTFV arity | One positional DataFrame argument per entry in RealtimeConfig.sources, in order |
Stream FV: think of transformation_fn as a map over ingested events. For rolling aggregates across events, use Feature + FeatureAggregationMethod.CONTINUOUS (below).
RTFV: compute_fn receives the request DataFrame first (if RequestSource is present), then one DataFrame per upstream online Feature View, entity-aligned on join keys.
9.12 Streaming with Time-Windowed Aggregation
Per-event transformation_fn derivations are not enough for rolling metrics (“total spend in last 48 hours”). Combine stream ingestion with the Feature class from Chapter 7 to define windowed aggregates on the stream.
Two aggregation modes matter:
| Mode | Where aggregates update | Freshness |
|---|---|---|
Default (no CONTINUOUS) |
Offline tile DT on refresh_freq; online reads reassemble from offline path |
Minutes (refresh schedule) |
FeatureAggregationMethod.CONTINUOUS |
Postgres online store at ingest time | Seconds |
Use CONTINUOUS when model quality depends on up-to-the-second windows (fraud, session scoring). Use batch tiling when minute-level staleness is acceptable and you want lower ingest-side compute.
This example adds refresh_freq and feature_granularity because tiled stream FVs also maintain an offline Dynamic Table for PIT-correct training — the online path updates continuously while the cold path historizes on schedule.
from snowflake.ml.feature_store import Feature, FeatureView, StreamConfig, OnlineConfig, OnlineStoreType
from snowflake.ml.feature_store.spec.enums import FeatureAggregationMethod
features = [
Feature.sum("AMOUNT", "48h").alias("TOTAL_SPEND_48H"),
Feature.count("AMOUNT", "48h").alias("TXN_COUNT_48H"),
Feature.avg("AMOUNT", "24h").alias("AVG_SPEND_24H"),
]
stream_agg_fv = FeatureView(
name="USER_REALTIME_AGG_FEATURES",
entities=[user_entity],
stream_config=stream_cfg,
timestamp_col="EVENT_TS",
refresh_freq="1 minute",
feature_granularity="1 minute",
features=features,
online_config=OnlineConfig(
enable=True,
store_type=OnlineStoreType.POSTGRES,
),
feature_aggregation_method=FeatureAggregationMethod.CONTINUOUS,
desc="Real-time user transaction aggregations with continuous processing",
)
registered_agg_fv = fs.register_feature_view(stream_agg_fv, version="V01")Without CONTINUOUS, aggregation tiles refresh on refresh_freq from the offline historization path. With CONTINUOUS, the online service updates running aggregates as events arrive — freshness in seconds rather than minutes.
9.13 Ingesting Events
Ingestion is the write path for Stream Feature Views. Events enter through fs.stream_ingest() (Python) or POST /ingest (any language). The ingest service:
- Validates each record against the registered
StreamSourceschema - Fans the batch out to every Stream Feature View that consumes that source
- Runs each FV’s
transformation_fn(if defined) on the micro-batch - Updates Postgres online rows and, for
CONTINUOUSFVs, running window aggregates - Asynchronously historizes to Snowflake (cold path — not blocking the ingest response)
Important: stream_ingest() does not write to Snowflake offline tables directly. Offline training data appears after async historization completes (typically seconds later). For integration testing, start with dry_run: true on the REST API to validate payloads without persisting.
Producer role / PAT: ingest callers need the Producer role (or equivalent PAT scope) and network access to the ingest endpoint.
9.13.1 Python SDK
The Python SDK is the simplest path when your event pipeline already runs in Snowflake Notebooks, Snowpark, or a Python service. Pass a list of dicts — one dict per event — with keys matching the StreamSource schema exactly.
The return value is the count of accepted records. Compare against len(new_events) to detect partial rejection; use REST include_diagnostics: true for per-record failure reasons.
new_events = [
{
"USER_ID": "user_001",
"EVENT_TS": "2026-04-10 10:00:00",
"EVENT_TYPE": "purchase",
"PRODUCT_ID": "SKU_42",
"AMOUNT": 149.99,
},
]
accepted = fs.stream_ingest(clickstream_events, new_events)
print(f"Ingested {accepted} / {len(new_events)} records")Usage considerations:
- Batch events in reasonable micro-batch sizes (hundreds to low thousands per call) to balance latency and overhead
- Ensure
EVENT_TSvalues are monotonic per entity when downstream ordering matters - Idempotency is not guaranteed at the ingest API — design producers to tolerate at-least-once delivery if your pipeline may retry
9.13.2 REST Ingest API
Use the REST Ingest API when producers are not Python-based: Flink jobs, Kafka consumers, mobile clients behind an API gateway, or partner webhooks. Retrieve ingest_url from get_online_service_status() after provisioning.
All requests require PAT authentication via the Authorization: Snowflake Token="..." header. The records map keys must be registered StreamSource names; each value is an array of event objects.
Endpoint: POST <ingest_url>/ingest
curl -X POST "${INGEST_URL}/ingest" \
-H "Authorization: Snowflake Token=\"${SNOWFLAKE_PAT}\"" \
-H "Content-Type: application/json" \
-d '{
"dry_run": false,
"include_diagnostics": true,
"records": {
"CLICKSTREAM_EVENTS": [
{
"USER_ID": "user_001",
"EVENT_TS": "2026-04-10T10:00:00Z",
"EVENT_TYPE": "purchase",
"PRODUCT_ID": "SKU_42",
"AMOUNT": 149.99
}
]
}
}'| Field | Required | Description |
|---|---|---|
records |
Yes | Map of stream source name to event records matching the registered StreamSource schema |
dry_run |
No | Validate without persisting (default: false) |
include_diagnostics |
No | Per-record diagnostics in response (default: false) |
Set "dry_run": true when wiring a new producer. The service validates types and required columns without writing to Postgres — catch AMOUNT sent as a string before production traffic arrives.
9.14 Querying Online Features
Postgres online reads use the online service (PAT + query endpoint), not warehouse SQL against a Hybrid Table. The logical read API is the same as Chapter 8 — read_feature_view(..., store_type="online") — but authentication, networking, and some key shapes differ.
Shared with Chapter 8: batch keys as keys=[["user_001"], ...]; batch many keys per call; warm-up before benchmarking; latency budgeting. See Chapter 8 for warehouse-oriented Hybrid read tuning — it does not apply directly to Postgres PAT reads.
Postgres-specific: PAT required; Consumer role; network policy on the online service; RTFV dict keys; Feature Group compound keys; REST Query API for non-Python inference services.
Object types:
| Object | Python API | REST object_type |
|---|---|---|
| Feature View | read_feature_view(..., store_type="online") |
"feature_view" |
| Feature Group | read_feature_group(..., store_type="online") |
"feature_group" |
| Real-Time FV | read_feature_view with dict keys + request fields |
"feature_view" (with request context in payload) |
9.14.1 Python SDK
Pass store_type="online" (or "online" string) to route reads to Postgres instead of the offline Snowflake table. The keys argument is a list of lookup rows:
- Batch / stream FV: each row is a list of join-key values, e.g.
[["user_001"], ["user_002"]] - RTFV with RequestSource: each row is a dict of join keys and request-context fields
- Feature Group: each row must include the full compound primary key (see Reading a Feature Group)
Request only the feature columns you need when the API supports column projection — smaller payloads reduce serialization cost on the inference path.
online_df = fs.read_feature_view(
registered_stream_fv,
keys=[["user_001"], ["user_002"]],
store_type="online",
)
online_df.show()Latency practices (see also Chapter 8):
- Batch many keys in one call — avoid per-entity loops
- Warm up the online service before benchmarking (first reads after idle periods are slower)
- Size inference workers for concurrent Consumer connections when QPS is high
9.14.2 REST Query API
The REST Query API mirrors the Python read contract for polyglot inference stacks. Use query_url from get_online_service_status(). Each request_rows entry contains an entity map of join-key names to values; optional features limits the response columns.
Endpoint: POST <query_url>/query
{
"name": "USER_REALTIME_AGG_FEATURES",
"version": "V01",
"object_type": "feature_view",
"request_rows": [
{ "entity": { "USER_ID": "user_001" } }
],
"features": ["TOTAL_SPEND_48H", "TXN_COUNT_48H"]
}| Field | Required | Description |
|---|---|---|
name |
Yes | Feature View or Feature Group name |
version |
Yes | Version string (e.g. "V01") |
object_type |
Yes | "feature_view" or "feature_group" |
request_rows |
Yes | Array of {"entity": {join_key: value, ...}} objects — all PK columns required for Feature Groups |
features |
No | Subset of columns to return; omit for all |
include_diagnostics |
No | Per-row diagnostics when true |
For Feature Groups with compound keys, every entity map must include the full primary key — the same rule as the Python SDK. For RTFV reads over REST, include RequestSource fields in the entity map (or the request-context payload shape your SDK version documents).
Online reads require a valid SNOWFLAKE_PAT and your IP must be allowed by the online service network policy. Offline reads (store_type="offline") use Snowflake warehouse SQL and are unrelated to this Postgres path.
9.15 Architecture and Offline Historization
Stream Feature Views implement a dual-write kappa architecture: the online Postgres store is the serving source of truth; Snowflake offline tables are the durable mirror for training, monitoring, and batch analytics. Batch + Postgres and RTFVs follow different patterns (offline-first sync and request-time compute, respectively).
Understanding which path owns truth matters for incident response: if offline historization lags, Stream FV online serving continues; training data freshness may trail by the async interval. RTFVs have no offline output table — training recomputes compute_fn via generate_training_set().
Hot path (stream write):
- Client sends events via
fs.stream_ingest()orPOST /ingest - Optional
transformation_fnapplies per-event derivations - Features land in Postgres;
CONTINUOUSaggregates update in place
Cold path (offline historization):
- Transformed events are asynchronously written to a Snowflake table (naming pattern
<FV>$<VERSION>$UDF_TRANSFORMED) via Snowpipe Streaming - Non-tiled stream FVs: offline object is a VIEW over that table
- Tiled stream FVs: an additional Dynamic Table materializes tiles on
refresh_freqfor PIT-correct training - RTFVs have no offline historization path — they exist only in Postgres at serving time
Source of truth by write path (Postgres backend):
| Write path | Source of truth | Offline role |
|---|---|---|
| Batch + Postgres | Offline DT | Synced mirror in Postgres (same as Hybrid batch model) |
| Stream FV | Postgres online store | Async durable mirror for training |
| RTFV | Postgres (computed on read) | Training re-runs compute_fn; no materialized history — log serving outputs for audit |
Hybrid batch sync source-of-truth semantics are in Chapter 8.
9.16 Performance Characteristics
Latency and freshness depend on online service sizing, event throughput, transformation_fn / compute_fn complexity, and network distance to the online endpoints. PubPr targets below are planning guides — benchmark in your account before committing SLAs.
Benchmarking tips: warm up with steady-state traffic before measuring p50/p95; batch keys in reads; test from the same network zone your inference service will use in production.
| Metric | Target / observed |
|---|---|
| End-to-end freshness (ingest → queryable) | < 2 seconds |
| Online read latency (p50) | ~10 ms |
| Stream FV status | STATIC (no DT refresh cycle) |
Backfill from backfill_df |
Applied at registration time |
9.17 Choosing a backend and write path
Start with Chapter 8’s matrix, then narrow by scenario:
| Scenario | Backend | Write path |
|---|---|---|
| GA production; no preview enrollment | Hybrid | Batch sync |
| Lower read latency; same batch pipelines | Postgres | Batch sync |
| Tiled Aggregations API FVs for online | Postgres | Batch sync (tile reassembly at read) |
| Feature freshness in seconds; event-driven | Postgres | Stream FV |
| Live request fields + online features at inference | Postgres | RTFV |
| Multi-FV online read without pre-joining offline | Postgres | Feature Group |
| Complex multi-table SQL transformations | Either backend | Batch sync only (full SQL / Snowpark offline) |
| No Postgres preview | Hybrid | Batch sync; external store only after ruling out Postgres (Ch 8 — self-managed ops, no Snowflake roadmap upside) |
Many production systems combine paths: batch Feature Views for slow-changing demographics and 7d/30d aggregates; Stream FVs for recent transactions; RTFVs for request-time scoring that blends both.
9.18 Known Limitations
- PubPr enrollment may be required depending on account and region
- Network policy on the online service controls external read/ingest access
- Compute function scope — row-level pandas only; no arbitrary joins or sklearn
- Stream FV status shows
STATICrather thanACTIVE(no DT refresh cycle) - RTFVs have no offline historization table — training uses
generate_training_set()to re-runcompute_fn; use serving-time logging for as-served audit trails - REST ingest is the HTTP path for streaming; the Python SDK does not write directly to Snowflake offline tables
9.19 Deployment and CI/CD
Feature View definitions for Postgres online, stream, and real-time paths should follow the same version-controlled deployment practices as batch Feature Views. This chapter focuses on serving architecture and APIs — not pipeline automation.
For declarative deployment, reconciliation scripts, repository layout, and emerging SnowCLI plan/apply workflows, see Chapter 13: CI/CD for Feature Store. Chapter 12: Operations covers monitoring and production runbooks.
9.20 Summary
| Concept | Description |
|---|---|
| Postgres backend | Managed online service; required for all paths in this chapter |
| Batch + Postgres | Same batch sync write path as Ch 8 Hybrid; different store_type and read stack |
| Stream FV | stream_config + stream_ingest(); online-first with async offline mirror |
| Real-Time FV | RealtimeConfig + compute_fn; training re-runs compute_fn via generate_training_set() |
| FeatureGroup | Multi-FV bundle under one online table |
| Compute functions | Shared pandas constraints for transformation_fn and compute_fn |
| CONTINUOUS aggregation | Running window metrics updated at ingest time |
9.21 Next Steps
Continue to Chapter 10: Preprocessing for model-dependent transformations, or review Chapter 13: Advanced Patterns for CI/CD and multi-environment deployment.