---
title: "Streaming Feature Views"
subtitle: "Real-time feature ingestion with Postgres-backed online serving"
---

::: {.callout-important title="Internal / NDA-Only Content"}
This chapter covers **Private Preview** functionality (`snowflake-ml-python >= 1.36`).
Share only with Snowflake internal teams (SE/FE) and customers under NDA.
Contact your Snowflake account team to enable Private Preview features.

This chapter is excluded from the public GitHub Pages site. To render locally, use:

```bash
cd Snowflake_FeatureStore_Implementation_Guide
quarto render --profile internal --to html
```
:::

## Overview

Streaming Feature Views introduce a **write-first** architecture for features that need near-real-time freshness. Instead of the traditional "batch offline table -> periodic online sync" model described in [Chapter 8](../08_online_features/index.qmd), streaming features flow through a **dual-write kappa path**:

1. Events arrive via the Python SDK (`fs.stream_ingest()`) or the REST Ingest API
2. An optional `transformation_fn` applies per-event transformations server-side
3. Features land directly in a **Postgres-backed online store** (<20ms p50 read latency)
4. Asynchronously, events are historized to the offline Snowflake table for training and monitoring

This flips the current model: **online is the source of truth**, with offline as the durable mirror.

| Capability | Batch FV (Ch8) | Streaming FV |
|---|---|---|
| **Online store** | Hybrid Table | Managed Postgres |
| **Freshness** | 10s+ (`target_lag` on sync Task) | 2-3s end-to-end |
| **Source of truth** | Offline (DT) | Online (Postgres) |
| **Transformation** | Full SQL / Snowpark DataFrame | Per-event pandas function |
| **Aggregation** | Tiled via `Feature` class + DT refresh | Continuous (`FeatureAggregationMethod.CONTINUOUS`) |
| **SDK version** | `snowflake-ml-python >= 1.7.0` | `snowflake-ml-python >= 1.36` |
| **Account enablement** | GA (Hybrid Table OFT) | Private Preview (account flag required) |

## Learning Objectives

After completing this chapter, you will be able to:

- Provision the Postgres-backed online service and configure Producer/Consumer RBAC
- Register a `StreamSource` with a typed event schema
- Create a Stream Feature View with `StreamConfig`, including a `transformation_fn` and historical backfill
- Combine streaming ingestion with time-windowed aggregation using `FeatureAggregationMethod.CONTINUOUS`
- Ingest events via the Python SDK and the REST Ingest API
- Query features from the Postgres online store via Python and the REST Query API
- Understand the dual-write architecture: online-first with async offline historization

---

## Prerequisites

```python
pip install "snowflake-ml-python>=1.36"
```

Your account must be enabled for the Feature Store Private Preview. You also need a **Programmatic Access Token (PAT)** for online service authentication:

```bash
export SNOWFLAKE_PAT="<your_pat_token>"
```

This chapter uses the same **clickstream** sample context as the rest of the guide: database `FEATURE_STORE_DEMO`, source schema `CLICKSTREAM_DATA`, Feature Store schema `FEATURE_STORE`, warehouse `FS_DEV_WH`.

---

## RBAC Setup

The online service uses separate **Producer** and **Consumer** roles to control who can ingest data versus who can query features.

```sql
CREATE ROLE IF NOT EXISTS FS_PRODUCER_ROLE;
CREATE ROLE IF NOT EXISTS FS_CONSUMER_ROLE;
GRANT ROLE FS_PRODUCER_ROLE TO ROLE SYSADMIN;
GRANT ROLE FS_CONSUMER_ROLE TO ROLE SYSADMIN;
```

Grant the producer role to services or users that will push events. Grant the consumer role to inference services that read features.

---

## Provisioning the Online Service {#sec-online-service}

The online service is provisioned once per Feature Store. It creates the Postgres-backed serving layer that all online Feature Views (batch and streaming) use.

```python
from snowflake.ml.feature_store import FeatureStore, Entity, CreationMode

fs = FeatureStore(
    session=session,
    database="FEATURE_STORE_DEMO",
    name="FEATURE_STORE",
    default_warehouse="FS_DEV_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

create_result = fs.create_online_service("FS_PRODUCER_ROLE", "FS_CONSUMER_ROLE")
print(create_result)
```

Provisioning takes several minutes on first creation. Poll until the status reaches `RUNNING`:

```python
import time

status = fs.get_online_service_status()
while status.status != "RUNNING":
    time.sleep(30)
    status = fs.get_online_service_status()
    print(f"Status: {status.status}")

print(f"Endpoints: {status.endpoints}")
```

Once running, retrieve the REST endpoint URLs for later use:

```python
from snowflake.ml.feature_store import online_service

query_url = online_service.endpoint_url(status, "query")
ingest_url = online_service.endpoint_url(status, "ingest")
```

---

## Batch Feature Views with Postgres Online Store

Before diving into streaming, note that the Postgres online store also works with standard batch Feature Views. This is a drop-in replacement for the Hybrid Table path in [Chapter 8](../08_online_features/index.qmd), offering lower read latency.

::: {.panel-tabset group="lang"}

## SQL

```python
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType

user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)

feature_df = session.sql("""
    SELECT
        USER_ID,
        SUM(TOTAL_AMT)   AS ORDER_TOTAL_AMT_SUM,
        COUNT(ORDER_ID)   AS ORDER_CNT,
        AVG(TOTAL_AMT)    AS ORDER_TOTAL_AMT_AVG,
        MAX(ORDER_TS)     AS LAST_ORDER_TS
    FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS
    GROUP BY USER_ID
""")

batch_fv = FeatureView(
    name="USER_ORDER_FEATURES_PG",
    entities=[user_entity],
    feature_df=feature_df,
    timestamp_col="LAST_ORDER_TS",
    refresh_freq="1 hour",
    online_config=OnlineConfig(
        enable=True,
        target_lag="10s",
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="User order features with Postgres online serving",
)

registered_fv = fs.register_feature_view(batch_fv, version="V01")
```

## Snowpark DataFrame

```python
import snowflake.snowpark.functions as F
from snowflake.ml.feature_store import FeatureView, OnlineConfig, OnlineStoreType

user_entity = Entity(name="USER", join_keys=["USER_ID"])
fs.register_entity(user_entity)

orders = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS")
feature_df = orders.group_by("USER_ID").agg(
    F.sum("TOTAL_AMT").alias("ORDER_TOTAL_AMT_SUM"),
    F.count("ORDER_ID").alias("ORDER_CNT"),
    F.avg("TOTAL_AMT").alias("ORDER_TOTAL_AMT_AVG"),
    F.max("ORDER_TS").alias("LAST_ORDER_TS"),
)

batch_fv = FeatureView(
    name="USER_ORDER_FEATURES_PG",
    entities=[user_entity],
    feature_df=feature_df,
    timestamp_col="LAST_ORDER_TS",
    refresh_freq="1 hour",
    online_config=OnlineConfig(
        enable=True,
        target_lag="10s",
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="User order features with Postgres online serving",
)

registered_fv = fs.register_feature_view(batch_fv, version="V01")
```

:::

The only difference from [Chapter 8](../08_online_features/index.qmd) is `store_type=OnlineStoreType.POSTGRES` in the `OnlineConfig`. Everything else -- `refresh_freq`, `target_lag`, the Feature Store API -- works identically.

---

## Stream Feature Views

Stream Feature Views are the core new capability. They consist of three components:

1. A **StreamSource** that defines the event schema
2. A **StreamConfig** that links the source to a transformation function and optional backfill data
3. A **FeatureView** registered with `stream_config` instead of `feature_df`

### Register a Stream Source {#sec-stream-source}

A `StreamSource` defines the schema for real-time events. Column names and types must exactly match what is sent through the SDK or REST API.

```python
from snowflake.ml.feature_store import StreamSource
from snowflake.snowpark.types import (
    StructType, StructField, StringType, DoubleType,
    TimestampType, TimestampTimeZone,
)

clickstream_events = StreamSource(
    name="CLICKSTREAM_EVENTS",
    schema=StructType([
        StructField("USER_ID", StringType()),
        StructField("EVENT_TS", TimestampType(TimestampTimeZone.NTZ)),
        StructField("EVENT_TYPE", StringType()),
        StructField("PRODUCT_ID", StringType()),
        StructField("AMOUNT", DoubleType()),
    ]),
    desc="Real-time clickstream events for feature computation",
)

fs.register_stream_source(clickstream_events)
```

Manage stream sources with:

- `fs.list_stream_sources()` -- list all registered sources
- `fs.delete_stream_source(name)` -- remove a source

### Define the Transformation Function {#sec-transformation-fn}

The `transformation_fn` is a plain pandas function that receives and returns a `pd.DataFrame`. It runs server-side on each ingested micro-batch.

```python
import pandas as pd

def process_clickstream(df: pd.DataFrame) -> pd.DataFrame:
    """Per-event feature derivation on the hot path."""
    df["IS_PURCHASE"] = (df["EVENT_TYPE"] == "purchase").astype(int)
    df["IS_HIGH_VALUE"] = (df["AMOUNT"] > 100.0).astype(int)
    return df
```

::: {.callout-warning title="Hot-Path Transformation Constraints"}
The `transformation_fn` is intentionally constrained to **per-event (row-level) operations**:

- Column derivation, filtering, type casting, simple conditional logic
- **Not supported:** joins against external tables, multi-step ETL, UDTFs, or operations that require global state

Think of it as a "map" function, not a "reduce." For aggregation across events, use the time-windowed aggregation approach described [below](#sec-streaming-aggregation).
:::

### Create a Stream Feature View {#sec-create-stream-fv}

Combine the stream source, transformation, and optional backfill data into a Feature View:

```python
from snowflake.ml.feature_store import StreamConfig, FeatureView, OnlineConfig, OnlineStoreType

backfill_df = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.EVENTS")

stream_cfg = StreamConfig(
    stream_source=clickstream_events,
    transformation_fn=process_clickstream,
    backfill_df=backfill_df,
)

stream_fv = FeatureView(
    name="USER_REALTIME_EVENTS",
    entities=[user_entity],
    stream_config=stream_cfg,
    timestamp_col="EVENT_TS",
    online_config=OnlineConfig(
        enable=True,
        store_type=OnlineStoreType.POSTGRES,
    ),
    desc="Real-time clickstream features with per-event transforms",
)

registered_stream_fv = fs.register_feature_view(stream_fv, version="V01")
```

Key differences from a batch Feature View:

- `stream_config` replaces `feature_df` -- you cannot pass both
- No `refresh_freq` -- the online store updates as events arrive (2-3s E2E freshness)
- The `backfill_df` pre-populates the online store with historical data so features are available immediately after registration

---

## Streaming with Time-Windowed Aggregation {#sec-streaming-aggregation}

The most powerful pattern combines stream ingestion with the `Feature` class aggregations from [Chapter 7](../07_aggregations_api/index.qmd). This produces rolling-window metrics (e.g., "total spend in last 48 hours") that update continuously as new events arrive.

The key enabler is `FeatureAggregationMethod.CONTINUOUS`, which instructs the online service to maintain running aggregates at ingest time rather than waiting for periodic tile refreshes from the offline store.

```python
from snowflake.ml.feature_store import Feature, FeatureView, StreamConfig, OnlineConfig, OnlineStoreType
from snowflake.ml.feature_store.spec.enums import FeatureAggregationMethod

features = [
    Feature.sum("AMOUNT", "48h").alias("TOTAL_SPEND_48H"),
    Feature.count("AMOUNT", "48h").alias("TXN_COUNT_48H"),
    Feature.avg("AMOUNT", "24h").alias("AVG_SPEND_24H"),
]

stream_agg_fv = FeatureView(
    name="USER_REALTIME_AGG_FEATURES",
    entities=[user_entity],
    stream_config=stream_cfg,
    timestamp_col="EVENT_TS",
    refresh_freq="1 minute",
    feature_granularity="1 minute",
    features=features,
    online_config=OnlineConfig(
        enable=True,
        store_type=OnlineStoreType.POSTGRES,
    ),
    feature_aggregation_method=FeatureAggregationMethod.CONTINUOUS,
    desc="Real-time user transaction aggregations with continuous processing",
)

registered_agg_fv = fs.register_feature_view(stream_agg_fv, version="V01")
```

::: {.callout-tip title="Batch vs Continuous Aggregation"}
Without `FeatureAggregationMethod.CONTINUOUS`, aggregation tiles are refreshed on the `refresh_freq` schedule from the offline store. With `CONTINUOUS`, the online service maintains running aggregates as events arrive -- giving you feature freshness measured in seconds rather than minutes.

Use `CONTINUOUS` for features where staleness directly impacts model quality (fraud detection, real-time recommendations). Use batch tiling for features where minute-level freshness is acceptable (daily spend trends, weekly engagement scores).
:::

---

## Ingesting Events {#sec-ingest}

### Python SDK

Push events through the Python SDK. Events are validated against the stream source schema and fanned out to all consuming Feature Views.

```python
new_events = [
    {
        "USER_ID": "user_001",
        "EVENT_TS": "2026-04-10 10:00:00",
        "EVENT_TYPE": "purchase",
        "PRODUCT_ID": "SKU_42",
        "AMOUNT": 149.99,
    },
    {
        "USER_ID": "user_002",
        "EVENT_TS": "2026-04-10 10:01:00",
        "EVENT_TYPE": "page_view",
        "PRODUCT_ID": "SKU_17",
        "AMOUNT": 0.0,
    },
]

accepted = fs.stream_ingest(clickstream_events, new_events)
print(f"Ingested {accepted} / {len(new_events)} records")
```

### REST Ingest API

For non-Python services, use the REST endpoint directly. All requests require a PAT-based `Snowflake Token` authorization header.

**Endpoint:** `POST <ingest_url>/ingest`

```bash
curl -X POST "${INGEST_URL}/ingest" \
  -H "Authorization: Snowflake Token=\"${SNOWFLAKE_PAT}\"" \
  -H "Content-Type: application/json" \
  -d '{
    "dry_run": false,
    "include_diagnostics": true,
    "records": {
      "CLICKSTREAM_EVENTS": [
        {
          "USER_ID": "user_001",
          "EVENT_TS": "2026-04-10T10:00:00Z",
          "EVENT_TYPE": "purchase",
          "PRODUCT_ID": "SKU_42",
          "AMOUNT": 149.99
        }
      ]
    }
  }'
```

| Field | Required | Description |
|---|---|---|
| `records` | Yes | Map of stream source name to array of event records. Schema must match the registered `StreamSource`. |
| `dry_run` | No | When `true`, validates payload without persisting. Useful for testing. Default: `false`. |
| `include_diagnostics` | No | When `true`, includes per-record diagnostics in the response. Default: `false`. |

::: {.callout-tip title="Schema Validation with dry_run"}
Use `dry_run: true` during integration testing to validate that your event payloads match the registered schema without actually writing data. This catches type mismatches and missing columns before they cause runtime errors.
:::

---

## Querying Online Features {#sec-query}

### Python SDK

```python
online_df = fs.read_feature_view(
    registered_stream_fv,
    keys=[["user_001"], ["user_002"], ["user_003"]],
    store_type="online",
)
online_df.show()
```

### REST Query API

**Endpoint:** `POST <query_url>/query`

```json
{
  "name": "USER_REALTIME_AGG_FEATURES",
  "version": "V01",
  "object_type": "feature_view",
  "request_rows": [
    { "entity": { "USER_ID": "user_001" } },
    { "entity": { "USER_ID": "user_002" } }
  ],
  "features": ["TOTAL_SPEND_48H", "TXN_COUNT_48H"]
}
```

| Field | Required | Description |
|---|---|---|
| `name` | Yes | Name of the Feature View. |
| `version` | Yes | Version string (e.g., `"V01"`). |
| `object_type` | Yes | Must be `"feature_view"`. |
| `request_rows` | Yes | Array of objects, each with an `entity` map of join key names to values. |
| `features` | No | List of feature columns to return. Omit for all features. |
| `include_diagnostics` | No | When `true`, includes diagnostics. Default: `false`. |

::: {.callout-note}
Online reads require a valid `SNOWFLAKE_PAT` environment variable and your IP address must be allowed by the online service's network policy.
:::

---

## Architecture: Dual-Write Kappa Path {#sec-architecture}

The streaming architecture uses a **dual-write** pattern inspired by kappa architecture:

```{mermaid}
%%| fig-cap: "Streaming Feature View: dual-write kappa architecture"
flowchart LR
    Client["Client / App"] -->|"stream_ingest() or REST"| Ingest["Ingest Service"]
    Ingest -->|"transformation_fn"| PG["Postgres Online Store"]
    Ingest -->|"async historization"| SF["Snowflake Offline Table"]
    PG -->|"read_feature_view(store_type='online')"| Serve["Inference Service"]
    SF -->|"generate_training_set()"| Train["Training Pipeline"]
    SF -->|"Drift / monitoring"| Monitor["Model Monitoring"]
```

**Hot path (write):**

1. Client sends events via `fs.stream_ingest()` or `POST /ingest`
2. The `transformation_fn` applies per-event derivations
3. Transformed features land in the Postgres online store
4. For `CONTINUOUS` aggregation FVs, running aggregates are updated in-place

**Cold path (historization):**

1. Events are asynchronously persisted to a Snowflake table in the Feature Store schema
2. For tiled aggregation FVs, tiles are materialized to the offline table on the `refresh_freq` schedule
3. The offline table supports `generate_training_set()` for PIT-correct training data

**Read paths:**

- **Online:** `read_feature_view(..., store_type="online")` or REST Query API -- <20ms p50
- **Offline:** Standard Feature Store APIs over the offline table

::: {.callout-important title="Online-First vs Offline-First"}
This is a fundamental shift. In the batch model ([Chapter 8](../08_online_features/index.qmd)), the offline Dynamic Table is the source of truth and the Hybrid Table is a lagging mirror. In the streaming model, the **online Postgres store is the source of truth** and the offline table is the durable copy. This means:

- Feature freshness is bounded by ingest latency (seconds), not DT refresh + sync lag (minutes)
- If the offline historization path fails, online serving continues unaffected
- Training data freshness may lag behind online serving by the async sync interval
:::

---

## Performance Characteristics

Based on early testing with the Private Preview:

| Metric | Observed Value |
|---|---|
| End-to-end freshness (ingest to queryable) | ~1.2s |
| Online read latency (p50) | <20ms |
| Stream FV status | `STATIC` (no DT refresh cycle) |
| Backfill from `backfill_df` | Applied at registration time |

The 1.2s E2E freshness significantly exceeds the 2-3s target from the original design. Actual performance will vary with instance size, event throughput, and transformation complexity.

---

## Known Limitations

- **Account enablement required:** Private Preview features require your account to be enabled by your Snowflake account team.
- **Network policy:** Online reads from external IPs require a network policy on the online service.
- **On-demand feature views:** `RequestSource` and `OnDemandConfig` are not yet available in Private Preview. On the roadmap.
- **Transformation scope:** `transformation_fn` is limited to per-event (row-level) pandas operations. No joins, no global aggregation, no UDTFs.
- **Stream FV status is STATIC:** Stream Feature Views show `STATIC` status rather than `ACTIVE` since there is no Dynamic Table refresh cycle.

---

## When to Use Streaming vs Batch Feature Views

| Scenario | Recommendation |
|---|---|
| Feature freshness measured in **minutes to hours** | Batch FV with DT + OFT sync ([Chapter 8](../08_online_features/index.qmd)) |
| Feature freshness measured in **seconds** | Stream FV with Postgres online store |
| Complex multi-table transformations | Batch FV (full SQL / Snowpark DataFrame expressiveness) |
| Per-event derivations + rolling aggregates | Stream FV with `transformation_fn` + `CONTINUOUS` aggregation |
| Read latency <20ms required | Either, with Postgres online store (`OnlineStoreType.POSTGRES`) |
| No Private Preview access | Batch FV with Hybrid Table OFT (GA) or external store ([Chapter 8](../08_online_features/index.qmd#sec-external-serving)) |

::: {.callout-tip}
Many production systems use **both patterns together**: batch Feature Views for slow-changing features (demographics, 7d/30d aggregates) and streaming Feature Views for fast-changing features (recent transactions, real-time engagement). The inference service assembles both at prediction time.
:::
