flowchart TB
subgraph INT ["INTERNAL"]
direction LR
ST1[Source Tables] --> FV1[Dynamic Table Feature View]
end
subgraph TEM ["TEMPORAL API"]
direction LR
ST2[Source Tables] --> FV2[Tiled FV + aggregations]
end
subgraph EXT ["EXTERNAL"]
direction LR
ST3[Source Tables] --> DBT[dbt] --> TV[Tables / Views] --> FV3[View-based FV]
end
INT ~~~ TEM ~~~ EXT
5 Feature Pipelines
External orchestration, Dynamic Tables, and Temporal Aggregation API
snowflake, feature store, ml, machine learning, mlops
5.1 Overview
Feature pipelines are the workflows that transform raw data into features ready for ML consumption. They are the “plumbing” that connects your source data to your Feature Store. Snowflake supports multiple pipeline architectures, each with distinct trade-offs for freshness, cost, complexity, and flexibility.
This chapter covers four pipeline patterns and a hierarchical extension:
- External Orchestration – dbt, Airflow, or other tools managing transformations
- Internal Management – Dynamic Table-based pipelines fully managed by Snowflake
- Chained Feature Views – Using Feature Views as sources for downstream Feature Views (multi-stage DT → DT or DT → View pipelines)
- Temporal Aggregation API – The
Featureclass for time-windowed features with tiling (summary; full reference in Chapter 7) - Rollup Feature Views – Hierarchical aggregation from fine- to coarse-grained entities (summary; full reference in Chapter 7)
5.2 Learning Objectives
After completing this chapter, you will be able to:
- Choose the right pipeline architecture for your requirements
- Implement dbt-managed feature pipelines with View-based Feature Views
- Build internally-managed pipelines using Dynamic Tables (SQL and Snowpark DataFrame API)
- Chain Feature Views into multi-stage pipelines (DT → DT, DT → View) and understand refresh cascading
- Use the Temporal Aggregation API and rollup Feature Views (summary; full reference in Chapter 7)
- Configure refresh strategies for optimal freshness vs. cost
📂 Chapter code: Browse companion scripts on GitHub
5.3 Pipeline Architecture Patterns
5.3.1 The Feature Pipeline Landscape
- INTERNAL: Snowflake-managed automatic refresh and incremental compute.
- TEMPORAL API: Snowflake-managed optimized temporal aggregation (windows + tiling).
- EXTERNAL: Orchestrated by dbt Projects on Snowflake, Airflow, Dagster, or similar.
5.3.2 Architecture Comparison
| Aspect | Internal (Dynamic Tables) | Temporal API | Rollup | External (dbt / Airflow) |
|---|---|---|---|---|
| Orchestration | Snowflake managed | Snowflake managed | Snowflake managed | dbt Projects on Snowflake, or external tool |
| Feature View Type | DT-based | DT-based (tiled) | DT-based (from tiled source) | View-based |
| Refresh Trigger | refresh_freq |
refresh_freq |
Follows source FV | Scheduled jobs / Tasks |
| Typical Freshness | Seconds to minutes | Configurable | Follows source FV | Minutes to hours |
| Operational Complexity | Lower | Lowest | Low (once source exists) | Higher |
| Cost Model | Continuous compute | Continuous compute | Incremental from tiles | Job-based compute |
| Best For | New implementations | Time-windowed features | Entity hierarchy aggs | Existing dbt investments |
5.4 Internal Management (Dynamic Tables)
5.4.1 When to Use Dynamic Tables
Dynamic Tables are ideal when you:
- Want Snowflake to manage the entire pipeline
- Need low-latency feature freshness (seconds to minutes)
- Prefer declarative over imperative pipeline definition
- Are building new feature pipelines from scratch
5.4.2 Dynamic Table Implementation
📁 Full code:
_code/dynamic_table_pattern.py
The feature_df can be defined using either session.sql() or the Snowpark DataFrame API – both produce the same Dynamic Table. See Chapter 4: SQL vs Snowpark DataFrame API for guidance on choosing.
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
user_purchase_df = session.sql("""
SELECT
USER_ID,
COUNT(DISTINCT ORDER_ID) AS ORDER_CNT,
SUM(TOTAL_AMT) AS SPEND_SUM,
AVG(TOTAL_AMT) AS AVG_ORDER_AMT,
MAX(ORDER_TS) AS LAST_ORDER_TS
FROM FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS
GROUP BY USER_ID
""")from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
import snowflake.snowpark.functions as F
orders = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS")
user_purchase_df = orders.group_by("USER_ID").agg(
F.count_distinct("ORDER_ID").alias("ORDER_CNT"),
F.sum("TOTAL_AMT").alias("SPEND_SUM"),
F.avg("TOTAL_AMT").alias("AVG_ORDER_AMT"),
F.max("ORDER_TS").alias("LAST_ORDER_TS"),
)Both approaches produce a lazy Snowpark DataFrame – no data moves until the Feature View is registered and the Dynamic Table is materialized.
The DataFrame API emits SQL that is sometimes dense or deeply nested. For review, debugging, or side-by-side comparison with hand-written SQL, use a dialect-aware formatter (for example SQLGlot with pretty=True, optionally hoisting derived tables to CTEs). Trade-offs, alternatives (SQLFluff, sqlfmt, IDE tools), and an R dbplyr note are covered in Appendix C: Formatting machine-generated SQL.
# Create Dynamic Table-backed Feature View
user_purchase_fv = FeatureView(
name="USER_PURCHASE_STATS",
entities=[user_entity],
feature_df=user_purchase_df,
timestamp_col="LAST_ORDER_TS",
refresh_freq="15 minutes", # Dynamic Table with TARGET_LAG
desc="User purchase statistics - auto-refreshed"
)
# Register and wait for initial materialization
registered_fv = fs.register_feature_view(
feature_view=user_purchase_fv,
version="V01",
block=True,
)5.4.3 How refresh_freq Selects the Backing Object
refresh_freq on the Feature View determines whether Snowflake builds a view or a Dynamic Table, and how refresh is scheduled:
| Mode | refresh_freq |
Behavior |
|---|---|---|
| View-based Feature View | None or omitted |
No Dynamic Table. The feature_df is registered as a view; you refresh upstream tables (dbt, Airflow, manual loads). |
| Dynamic Table (lag) | Time period (e.g. "1 hour", "15 minutes") |
Snowflake creates a Dynamic Table with TARGET_LAG matching that interval. |
| Dynamic Table (scheduled) | CRON expression (e.g. "0 8 * * *") |
Dynamic Table refresh driven by a Task (or equivalent schedule) for fixed clock-time runs. |
For tiled temporal features, you still pass timestamp_col, feature_granularity, and features; refresh_freq follows the same idea (lag vs schedule) for how often tiles are updated.
5.4.4 Refresh Configuration Options (lag-based Dynamic Tables)
When using a time period for refresh_freq, typical choices balance freshness and cost:
| Refresh Freq | Use Case | Cost Impact |
|---|---|---|
1 minute |
Near real-time, critical features | High |
15 minutes |
Standard operational | Medium |
1 hour |
Balanced freshness/cost | Low |
1 day |
Slowly-changing features | Minimal |
5.4.5 Monitoring Dynamic Table Refresh
📁 Full code:
_code/monitoring.sql
-- Check refresh history
SELECT
NAME,
STATE,
REFRESH_START_TIME,
REFRESH_END_TIME,
STATISTICS:numInsertedRows::INT AS ROWS_INSERTED
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
NAME => 'FEATURE_STORE_DEMO.FEATURE_STORE.USER_PURCHASE_STATS$V01'
))
ORDER BY REFRESH_START_TIME DESC
LIMIT 10;5.4.6 Python Transforms in Feature Views
Not all feature logic can be expressed in pure SQL. Python UDFs, UDTFs, and UDAFs let you embed custom Python (including Pandas, NumPy, or scikit-learn) directly in a Feature View’s feature_df, with the transform running inside the Dynamic Table refresh.
Feature Views should contain model-generic transforms — logic that is useful across multiple models and consumers (e.g. text normalisation, geo-hashing, z-scoring within a partition). Model-specific preprocessing (scaling, encoding, target transformations) belongs in the model pipeline so that each model can apply its own scheme. See Chapter 9: Preprocessing for model-dependent patterns using sklearn and Snowflake ML pipelines.
Options at a glance:
| Approach | Granularity | Incremental-safe | Best for |
|---|---|---|---|
| Snowpark DataFrame API | Row/column | Yes | Joins, filters, window functions expressible as SQL |
| SQL UDF | One row in, one value out | Yes | Encapsulating repeated SQL patterns for consistency |
| Scalar Python UDF | One row in, one value out | Yes (if IMMUTABLE) |
Row-level transforms requiring Python libraries |
| Vectorized Python UDF | Pandas Series batch | Yes (if IMMUTABLE) |
Batch math, NumPy operations |
UDTF (end_partition) |
Partition in, rows out | Yes (non-SQL UDTFs) | Per-entity statistics, sessionisation, sequence features |
UDAF (aggregate/merge/finish) |
Group in, one value out | Check docs | Custom aggregate logic without ARRAY_AGG workarounds |
If the transform can be expressed using built-in Snowflake SQL functions, always prefer that — native functions run in the optimised SQL engine and are significantly faster than Python UDFs, which incur serialisation overhead and a Python runtime per row or batch. For example, SHA2 hashing is available as SHA2(LOWER(TRIM(email)), 256) in SQL. If you want to encapsulate a repeated SQL pattern for consistency across Feature Views, use a SQL UDF instead of a Python UDF. Reserve Python UDFs for logic that genuinely requires Python libraries (e.g. NLP tokenisation, custom statistical models, third-party packages).
Example: Scalar Python UDF in a Feature View. A hash-based anonymiser applied during feature materialisation. This example is for illustration purposes only — in practice, use the native SHA2() function or a SQL UDF (shown below) for better performance:
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import StringType
@udf(name="FEATURE_STORE.HASH_EMAIL", input_types=[StringType()],
return_type=StringType(), immutable=True, replace=True,
packages=["hashlib"])
def hash_email(email: str) -> str:
import hashlib
return hashlib.sha256(email.lower().strip().encode()).hexdigest()
feature_df = (
session.table("CLICKSTREAM_DATA.USERS")
.select(
col("USER_ID"),
hash_email(col("EMAIL")).alias("EMAIL_HASH"),
col("SIGNUP_TS"),
)
)The preferred approach for this specific transform would be a SQL UDF or inline SQL:
-- SQL UDF for reusable pattern encapsulation
CREATE OR REPLACE FUNCTION FEATURE_STORE.HASH_EMAIL_SQL(email VARCHAR)
RETURNS VARCHAR
IMMUTABLE
AS $$
SHA2(LOWER(TRIM(email)), 256)
$$;# Or simply use the built-in function inline via Snowpark
from snowflake.snowpark.functions import sha2, lower, trim, col, lit
feature_df = (
session.table("CLICKSTREAM_DATA.USERS")
.select(
col("USER_ID"),
sha2(lower(trim(col("EMAIL"))), lit(256)).alias("EMAIL_HASH"),
col("SIGNUP_TS"),
)
)Register as a Feature View in the usual way — the UDF or expression is embedded in the generated DT SQL. Because scalar UDFs declared IMMUTABLE (and all native SQL functions) are deterministic, the DT can use incremental refresh.
Example: UDTF partition — z-score within each user’s transactions.
from snowflake.snowpark.functions import col, table_function
from snowflake.snowpark.types import StructType, StructField, FloatType, StringType
class ZScorePartition:
def process(self, user_id, amount):
self._rows.append((user_id, amount))
def end_partition(self):
import statistics
amounts = [r[1] for r in self._rows]
mu = statistics.mean(amounts)
sigma = statistics.stdev(amounts) if len(amounts) > 1 else 1.0
for uid, amt in self._rows:
yield (uid, amt, (amt - mu) / sigma)
def __init__(self):
self._rows = []
zscore_udtf = session.udtf.register(
ZScorePartition,
output_schema=StructType([
StructField("USER_ID", StringType()),
StructField("AMOUNT", FloatType()),
StructField("AMOUNT_ZSCORE", FloatType()),
]),
input_types=[StringType(), FloatType()],
name="FEATURE_STORE.ZSCORE_PARTITION",
replace=True,
)
feature_df = (
session.table("CLICKSTREAM_DATA.ORDERS")
.join_table_function(
zscore_udtf(col("USER_ID"), col("TOTAL_AMT"))
.over(partition_by=col("USER_ID"))
)
)Python UDAFs use a handler class with aggregate(), merge(), and finish() methods — suitable for custom aggregation logic (e.g. weighted median, HyperLogLog) that cannot be expressed with built-in SQL aggregates. They can replace the common workaround of ARRAY_AGG + scalar UDF. See the Snowflake UDAF documentation for the handler interface and examples.
Scalar and vectorized Python UDFs marked IMMUTABLE are eligible for incremental DT refresh; VOLATILE Python UDFs are not. Python UDTFs (including those with end_partition()) also support incremental refresh — but SQL-based UDTFs do not, and SELECT blocks reading from UDTFs must explicitly specify columns (no *). Python UDAFs are not explicitly documented for DT incremental refresh — check the supported queries reference for the latest status. For high-frequency Feature Views, prefer native SQL functions or Snowpark DataFrame expressions where possible. See Appendix C: Python UDFs and UDTFs in Dynamic Tables for additional constraints and immutability requirements.
Note: the SELECT * restriction above applies specifically to UDTF sources. For DTs that use SELECT * FROM a regular base table, Snowflake has updated behaviour so that new columns added to the base table are now picked up incrementally on the next refresh rather than failing. See Chapter 4: Incremental Column Pickup with SELECT * for the semantics, backfill behaviour, and current limitations.
5.5 Chained Feature View Pipelines
Feature Views can reference other Feature Views (or their underlying Dynamic Tables / Views) as their feature_df source, creating multi-stage pipelines within the Feature Store. This is a natural way to layer transformations: base Feature Views handle raw-to-feature computation, and downstream Feature Views combine or enrich those outputs.
By default, Snowflake does not allow a Dynamic Table to read from a View that queries another Dynamic Table (DT → View → DT), because it cannot coordinate the refresh graph across the view boundary.
There are two solutions:
Remove the view – If you control the intermediate stage, make it a DT instead of a View. Intermediate DTs use
refresh_freq="DOWNSTREAM"so they only refresh when a downstream consumer needs fresh data.Wrap the view in
DYNAMIC_TABLE_REFRESH_BOUNDARY()– When you cannot or do not want to remove the view (e.g., you are reading through a third-party or shared view), wrap the reference:CREATE OR REPLACE DYNAMIC TABLE MY_DOWNSTREAM_DT TARGET_LAG = '15 minutes' WAREHOUSE = FS_DEV_WH AS SELECT s.*, p.category FROM MY_UPSTREAM_DT s JOIN DYNAMIC_TABLE_REFRESH_BOUNDARY(v_product_lookup) p ON s.product_id = p.product_id;The boundary decouples the two pipelines:
MY_UPSTREAM_DTrefreshes in a coordinated group withMY_DOWNSTREAM_DT, whilev_product_lookupis read at whatever state it is in at refresh time (no snapshot isolation on that edge). A failure in the upstream ofv_product_lookupdoes not blockMY_DOWNSTREAM_DTfrom refreshing.
Trade-off: Inputs wrapped in a refresh boundary lose snapshot isolation – joined inputs may reflect different points in time. Use the boundary only on inputs where temporal consistency with the rest of the pipeline is not required (e.g., slowly changing lookup tables, shared reference data).
In Feature Store terms: if a downstream Feature View is DT-backed (refresh_freq set), all upstream Feature Views it directly references must also be DT-backed, or you must wrap the view using the SQL pattern above (outside the Python API). A View-based Feature View can only be a terminal consumer in an unmodified chain.
See Data consistency and pipeline boundaries for the full reference.
5.5.1 How Chaining Works
flowchart TB O[ORDERS] --> B1[USER_ORDER_BASE_FV DT] E[EVENTS] --> B2[USER_EVENT_BASE_FV DT] B1 & B2 --> C[USER_COMBINED_FV]
5.5.2 Example: Base + Derived Feature Views
Base layer – order aggregates per user (DT-backed):
import snowflake.snowpark.functions as F
orders = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS")
order_base_df = (
orders
.with_column("ORDER_DATE", F.date_trunc("day", F.col("ORDER_TS")).cast("TIMESTAMP_NTZ"))
.group_by("USER_ID", "ORDER_DATE")
.agg(
F.sum("TOTAL_AMT").alias("ORDER_TOTAL_AMT_SUM"),
F.count("ORDER_ID").alias("ORDER_CNT"),
)
)Base layer – event aggregates per user (DT-backed):
events = session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.EVENTS")
event_base_df = (
events
.with_column("ACTIVITY_DATE", F.date_trunc("day", F.col("EVENT_TS")).cast("TIMESTAMP_NTZ"))
.group_by("USER_ID", "ACTIVITY_DATE")
.agg(
F.count("EVENT_ID").alias("EVENT_CNT"),
F.count_distinct("PRODUCT_ID").alias("PRODUCT_DISTINCT_CNT"),
)
)user_order_base = FeatureView(
name="USER_ORDER_BASE_FV", entities=[user_entity],
feature_df=order_base_df, timestamp_col="ORDER_DATE",
refresh_freq="1 hour", desc="Base: daily order aggregates per user",
)
fs.register_feature_view(user_order_base, version="V01", block=True)
user_event_base = FeatureView(
name="USER_EVENT_BASE_FV", entities=[user_entity],
feature_df=event_base_df, timestamp_col="ACTIVITY_DATE",
refresh_freq="1 hour", desc="Base: daily event aggregates per user",
)
fs.register_feature_view(user_event_base, version="V01", block=True)Derived layer – join both base FVs into a combined view:
order_base = session.table('FEATURE_STORE."USER_ORDER_BASE_FV$V01"')
event_base = session.table('FEATURE_STORE."USER_EVENT_BASE_FV$V01"')
combined_df = order_base.join(
event_base,
(order_base["USER_ID"] == event_base["USER_ID"])
& (order_base["ORDER_DATE"] == event_base["ACTIVITY_DATE"]),
).select(
order_base["USER_ID"], order_base["ORDER_DATE"],
order_base["ORDER_TOTAL_AMT_SUM"], order_base["ORDER_CNT"],
event_base["EVENT_CNT"], event_base["PRODUCT_DISTINCT_CNT"],
)user_combined_fv = FeatureView(
name="USER_COMBINED_FV",
entities=[user_entity],
feature_df=combined_df,
timestamp_col="ORDER_DATE",
refresh_freq=None, # View: always reflects latest base DT state
desc="Derived: combines order + event base Feature Views",
)
fs.register_feature_view(user_combined_fv, version="V01")5.5.3 DT-to-DT Chaining (Refresh Cascading)
When a downstream Feature View is also DT-backed (refresh_freq set to a period), Snowflake manages the refresh dependency graph automatically. The downstream DT waits for its upstream DTs to refresh before it starts its own refresh. This gives you:
- Guaranteed ordering – the downstream DT always reads consistent, refreshed data from upstream
- Incremental compute – only changed rows from upstream flow through to the downstream DT
- Simpler individual stages – breaking a complex single-DT query into multiple stages reduces the SQL complexity at each level, which can improve incremental refresh efficiency (the engine has smaller change sets to track per stage)
There are two approaches to building the upstream stages:
- Plain DTs outside the Feature Store – create raw Dynamic Tables via SQL, then have a final downstream Feature View (DT or View) consume them. This is simpler when the intermediate stages are not useful as standalone features.
- All stages as Feature Views – each stage is registered with entity keys, a
timestamp_col, and feature columns. This makes intermediate stages discoverable vialist_feature_views()and reusable by other Feature Views or training pipelines. The overhead is that each stage must conform to the Feature Store schema (entity keys, timestamp, feature columns).
If both layers specify independent lag values, latency can stack: a base DT with 1-hour lag and a derived DT with 1-hour lag yields up to ~2 hours of end-to-end staleness. To avoid this, set the upstream DT’s target lag to DOWNSTREAM. This tells the upstream DT to infer its refresh schedule from the most demanding downstream consumer:
# Base layer: refreshes on demand when downstream DTs need it
user_order_base = FeatureView(
name="USER_ORDER_BASE_FV",
entities=[user_entity],
feature_df=order_df,
refresh_freq="DOWNSTREAM", # Refresh driven by downstream consumers
desc="Base: order aggregates; refresh cascaded from downstream",
)
# Derived layer: controls the refresh cadence for the entire chain
user_enriched = FeatureView(
name="USER_ENRICHED_FV",
entities=[user_entity],
feature_df=enriched_df,
refresh_freq="1 hour", # This drives the upstream refresh too
desc="Derived: enriched features; sets refresh pace for chain",
)With this pattern, the scheduler refreshes the base DT just before the derived DT needs it, keeping end-to-end latency within a single lag target rather than the sum of two.
A DT set to DOWNSTREAM with no downstream consumers will never refresh. Always ensure at least one downstream DT has a concrete lag value.
5.5.4 DT-to-View Chaining
When the downstream Feature View is View-based (refresh_freq=None), it reads from the upstream DTs at query time. The View always reflects the latest state of the upstream DTs. This avoids latency stacking but means the combined query runs on the consumer’s warehouse at read time.
Beyond avoiding latency stacking, DT-to-View chaining enables transformations that are not compatible with incremental DT refresh. Some SQL operations prevent a DT from using incremental mode (e.g., non-deterministic functions, certain window functions, or self-referential patterns). By placing these in a View layer on top of incrementally-refreshed base DTs, you get the best of both worlds: efficient incremental refresh for the heavy data processing, with flexible transforms applied at read time.
If the upstream full-refresh DT produces a system-derived unique key – for example, its query uses GROUP BY or QUALIFY ROW_NUMBER() = 1 – a downstream DT can now opt into REFRESH_MODE = INCREMENTAL and process only the actual row-level changes between upstream full refreshes, rather than falling back to a View layer. This is a SQL-level capability (ALTER DYNAMIC TABLE ... SET REFRESH_MODE = INCREMENTAL); the Feature Store Python API does not yet expose it, so apply the same plain-DT or post-registration ALTER pattern described elsewhere in this chapter. Verify with SHOW UNIQUE KEYS IN <upstream_dt>. See Understanding primary keys in dynamic tables.
This also allows the inclusion of context or timestamp functions (CURRENT_TIMESTAMP(), DATEDIFF(... , CURRENT_DATE()), etc.) in feature derivations. These are non-deterministic and would force a DT into full-refresh mode, but in a View they evaluate fresh on every query – useful for real-time serving features like “days since last order” or “time since account creation.”
Features derived from CURRENT_TIMESTAMP() or CURRENT_DATE() reflect the query execution time, not a historical point-in-time. This is correct for online serving (the prediction is happening now), but breaks point-in-time correctness for training data generation via ASOF joins – the join retrieves a feature value relative to the spine timestamp, yet the DATEDIFF was computed against “today,” not the historical event date. If you need these features in training, compute them in the spine or training pipeline relative to the spine timestamp rather than relying on a View-based derivation. See Chapter 6: Temporal Features for details on ASOF semantics.
5.5.5 Trade-offs
| DT → DT (both materialized) | DT → View (derived is virtual) | |
|---|---|---|
| Read performance | Pre-computed; fast reads | Query-time join; depends on complexity |
| Latency | Stacks unless upstream uses DOWNSTREAM lag |
Single upstream lag only |
| Storage | Both layers stored | Only base layer stored |
| Compute | Separate refresh compute for each layer | No derived-layer refresh compute |
| Best for | High-read-volume derived features | Low-read-volume or simple joins |
5.5.6 Refresh and Dependency Considerations
- Version coupling: The derived Feature View’s SQL references specific base FV versions (e.g.,
USER_ORDER_BASE_FV$V01). When you create a new base version ($V02), update the derived FV to reference the new version and register a new derived version. - Lineage tracking: Use the Feature Store API’s
.lineage()method or SQLOBJECT_DEPENDENCIESto identify which downstream Feature Views depend on a base version before deprecating it (see Chapter 4: Feature Views). - Depth limits: Snowflake supports up to 10 levels of Dynamic Table dependency depth. For most Feature Store pipelines, 2-3 levels is typical. Deep chains increase end-to-end latency and complicate debugging.
5.6 Temporal Aggregation Pipelines
📁 Full code:
_code/temporal_api.py
The Temporal Aggregation API (Feature class) provides a fourth pipeline pattern: declarative time-windowed aggregations with automatic tiling for efficient incremental computation. Instead of recomputing an entire window (e.g., 7-day sum) on each refresh, the engine pre-computes fixed time buckets (tiles) and combines them at query time – only new tiles are refreshed.
The primary motivation for the Feature Aggregation API is correctness, not just efficiency. When you pre-compute a time-windowed aggregate (e.g., “7-day spend sum”) in a standard DT using SQL window functions, the window is anchored to the row’s own timestamp. At retrieval time, the ASOF join returns the most recent row with a timestamp <= the spine timestamp – but if the source data is sparse (no activity for several days), that row’s window covers the wrong time range relative to the spine’s point in time.
Tiling solves this by storing partial aggregates per time grain (tiles) in the DT. At retrieval time, the Feature Store reassembles the correct window by combining tiles backwards from the spine’s ASOF timestamp. This produces the correct windowed value regardless of source data sparsity. Without tiling, the only alternatives are to densify the data first (see Gap-Filling Pipelines below) or to accept that window boundaries may be misaligned.
For a summary of this problem, see Chapter 1: Core Concepts. For the full Feature class API reference, see Chapter 7: Aggregations API.
When to use the Temporal API:
- Time-windowed aggregations (7-day sum, 30-day count, etc.)
- Sparse, high-cardinality event data (clickstream, transactions)
- Multiple window sizes for the same base metric
- Comparative features via
offset(e.g., week-over-week trends)
Quick example:
features = [
Feature.sum("TOTAL_AMT", "7d").alias("SPEND_SUM_7D"),
Feature.count("ORDER_ID", "30d").alias("ORDER_CNT_30D"),
Feature.sum("TOTAL_AMT", "7d", offset="7d").alias("SPEND_PREV_7D"),
]
user_temporal_fv = FeatureView(
name="USER_TEMPORAL_AGGREGATES",
entities=[user_entity],
feature_df=session.table("FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS"),
timestamp_col="ORDER_TS",
refresh_freq="1 hour",
feature_granularity="1 hour",
features=features,
desc="User purchase aggregations with multiple time windows",
)
user_temporal_fv = fs.register_feature_view(
feature_view=user_temporal_fv, version="V01", block=True, overwrite=True,
)
print(f"Registered: {user_temporal_fv.name}/V01 (tiled={user_temporal_fv.is_tiled})")
print(f"Features: SPEND_SUM_7D, ORDER_CNT_30D, SPEND_PREV_7D")The feature_df can be an arbitrarily complex query (joins, filters, CASE expressions) – the tiling engine wraps it as a CTE.
Rollup Feature Views extend tiled pipelines with hierarchical aggregation – creating coarser-grained features from finer-grained tiled Feature Views without recomputing from raw data (e.g., visitor → subscriber). Use RollupConfig(source=registered_tiled_fv, mapping_df=entity_mapping_df).
For the complete Feature class API – all 14 aggregation functions, the "lifetime" window, offset for comparative features, .alias() and case_sensitive for column naming, tile size recommendations, column prefixing for disambiguation, and the full RollupConfig example – see Chapter 7: Aggregations API.
5.7 External Orchestration (dbt / Airflow)
5.7.1 When to Use External Orchestration
External orchestration is appropriate when you:
- Have an existing dbt or Airflow investment and want to incorporate Feature Store into those workflows
- Need complex multi-step transformations that span systems beyond Snowflake
- Require transformations that mix SQL and Python across multiple engines
Snowflake now supports running dbt Core projects natively via dbt Projects on Snowflake. You can create, deploy, schedule, and monitor dbt projects entirely within Snowflake using the CREATE DBT PROJECT, EXECUTE DBT PROJECT commands and Snowflake Tasks – no external dbt Cloud or self-hosted infrastructure required. If your team uses dbt for feature transformations, evaluate this option before setting up an external deployment.
5.7.2 dbt Implementation Pattern
📁 Full code:
_code/dbt_pattern.py
5.7.2.1 Step 1: Create dbt Feature Models
-- models/features/user_purchase_features.sql
--
-- dbt output table name = <database>.<schema>.<model_filename>
-- database: from dbt_project.yml / profiles.yml (e.g. FEATURE_STORE_DEMO)
-- schema: from config() below → FEATURE_STORE
-- model: from filename → USER_PURCHASE_FEATURES
-- Result: FEATURE_STORE_DEMO.FEATURE_STORE.USER_PURCHASE_FEATURES
-- Source: FEATURE_STORE_DEMO.CLICKSTREAM_DATA.ORDERS (ORDER_TS, TOTAL_AMT, USER_ID, ORDER_ID)
{{ config(
materialized='table',
schema='FEATURE_STORE',
tags=['feature_store', 'user_features']
) }}
WITH order_stats AS (
SELECT
USER_ID,
COUNT(DISTINCT ORDER_ID) AS ORDER_CNT,
SUM(TOTAL_AMT) AS SPEND_SUM,
AVG(TOTAL_AMT) AS AVG_ORDER_AMT,
MAX(ORDER_TS) AS LAST_ORDER_TS
FROM {{ ref('stg_orders') }}
GROUP BY USER_ID
)
SELECT *,
CURRENT_TIMESTAMP() AS _DBT_UPDATED_TS
FROM order_stats5.7.2.2 Step 2: Deploy and Schedule via dbt Projects on Snowflake
-- Deploy the dbt project as a Snowflake object
CREATE OR ALTER DBT PROJECT FEATURE_STORE_DEMO.FEATURE_STORE.USER_FEATURES_DBT
FROM @my_stage/dbt_project;
-- Execute the dbt project (builds the models)
EXECUTE DBT PROJECT FEATURE_STORE_DEMO.FEATURE_STORE.USER_FEATURES_DBT
ARGS = 'run --select user_purchase_features'
USING WAREHOUSE = FS_PROD_WH;
-- Schedule with a Snowflake Task for recurring runs
CREATE OR REPLACE TASK FEATURE_STORE_DEMO.FEATURE_STORE.DBT_USER_FEATURES_TASK
WAREHOUSE = FS_PROD_WH
SCHEDULE = 'USING CRON 0 8 * * * UTC'
AS
EXECUTE DBT PROJECT FEATURE_STORE_DEMO.FEATURE_STORE.USER_FEATURES_DBT
ARGS = 'run --select user_purchase_features';
ALTER TASK FEATURE_STORE_DEMO.FEATURE_STORE.DBT_USER_FEATURES_TASK RESUME;5.7.2.3 Step 3: Register as View-Based Feature View
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
# Reference the dbt-created table as a View-based Feature View
user_features_df = session.table(
"FEATURE_STORE_DEMO.FEATURE_STORE.USER_PURCHASE_FEATURES"
)
user_purchase_fv = FeatureView(
name="USER_PURCHASE_FEATURES",
entities=[user_entity],
feature_df=user_features_df,
timestamp_col="_DBT_UPDATED_TS",
# refresh_freq omitted / None → View-based Feature View (dbt handles refresh)
desc="User purchase features - managed by dbt"
)
fs.register_feature_view(feature_view=user_purchase_fv, version="V01")5.7.3 Hybrid Pattern: dbt Upstream + Aggregation API
For teams that prefer dbt for source-layer transformations but need PIT-correct temporal aggregations, combine dbt-managed tables with the Feature Store Aggregation API:
flowchart LR RAW[Raw source] -->|dbt model| DBT[dbt-managed table] DBT -->|View FV| VFV[View-based FV] DBT -->|Aggregation API| AFV[Tiled Agg FV] VFV --> GTS["generate_training_set()"] AFV --> GTS
- dbt model cleans and joins raw data into a dbt-managed table (e.g.
USER_ORDERS_CLEANED) - View-based Feature View wraps the dbt table for simple columns (latest-state features)
- Aggregation API Feature View reads from the same dbt table to produce tiled temporal aggregations (e.g. 7-day spend, 30-day order count) — these get PIT-correct retrieval via
generate_training_set
from snowflake.ml.feature_store import FeatureView, Entity
from snowflake.ml.feature_store.feature_view import FeatureViewSliceConfig
# The dbt table is already materialized
dbt_source = session.table("FEATURE_STORE_DEMO.DBT_FEATURES.USER_ORDERS_CLEANED")
# View FV for simple columns
simple_fv = FeatureView(
name="USER_ORDER_BASICS",
entities=[user_entity],
feature_df=dbt_source.select("USER_ID", "LIFETIME_ORDER_CNT", "LAST_ORDER_TS"),
timestamp_col="LAST_ORDER_TS",
desc="Basic order stats - dbt managed",
)
fs.register_feature_view(simple_fv, version="V01")
# Aggregation API FV for temporal features (tiled, PIT-correct)
from snowflake.ml.feature_store import Aggregation
agg_fv = fs.create_feature_view_from_aggregation(
name="USER_ORDER_AGGS",
entities=[user_entity],
source=dbt_source,
aggregations=[
Aggregation(column="TOTAL_AMT", func="SUM", windows=["7d", "30d"]),
Aggregation(column="ORDER_ID", func="COUNT", windows=["7d", "30d"]),
],
timestamp_col="ORDER_TS",
refresh_freq="1 hour",
desc="Tiled temporal aggregations on dbt-cleaned orders",
)
fs.register_feature_view(agg_fv, version="V01")Both Feature Views can then be combined in generate_training_set — the View FV returns latest-state values while the Aggregation FV provides PIT-correct windowed aggregates.
5.7.4 dbt vs Dynamic Table: When to Use Which
| dbt-only | Hybrid (dbt + Agg API) | DT-only | |
|---|---|---|---|
| Orchestration | dbt scheduler / dbt Cloud | dbt + FS DT refresh | FS DT auto-refresh |
| Incremental | dbt incremental models | dbt incremental + DT incremental | DT incremental |
| PIT retrieval | Latest state only (View FV) | PIT-correct for tiled aggs | PIT-correct for tiled aggs |
| Team fit | SQL-first / analytics engineering | Mixed dbt + ML platform | ML platform / Snowpark |
| When to choose | Features from existing dbt models; no temporal aggregation needed | dbt for data quality/lineage; FS for temporal features and PIT | Greenfield or teams comfortable with FS-managed refresh |
A View-based Feature View backed by a dbt table returns latest state only — there is no historical timeline for generate_training_set to ASOF-join against. If your training pipeline needs point-in-time correct feature values, either use the hybrid pattern above (Aggregation API for temporal features) or maintain a slowly-changing-dimension (SCD) history in the dbt model and register it with an appropriate timestamp_col.
5.8 Pipeline Selection Framework
5.8.1 Decision Tree
flowchart TD
R{Primary requirement}
R --> T{Time-windowed aggregations?}
T -->|YES| TA[Temporal Aggregation API]
T -->|NO| RT{Sub-minute freshness?}
RT -->|YES| OFT[DT + Online Feature Tables]
RT -->|NO| DBT{Existing dbt or Airflow?}
DBT -->|YES| EO[External orchestration or migrate hot paths to DT]
DBT -->|NO| IM[Internal Management DT]
5.8.2 Selection Matrix
| Requirement | Recommended Approach |
|---|---|
| Minimal operational overhead | Internal (Dynamic Tables) |
| Time-windowed aggregations | Temporal Aggregation API |
| Sub-minute freshness | Dynamic Tables (short refresh) |
| Existing dbt investment | External Orchestration (dbt Projects on Snowflake) |
| Complex multi-step transforms | External Orchestration |
| New Feature Store implementation | Internal (Dynamic Tables) |
5.9 Gap-Filling and Interpolation Pipelines
When source data is sparse – not a record for every time period – ML models that expect regular time-series input (e.g., daily lab values, hourly sensor readings) need a dense representation. This section covers techniques for filling gaps before (or as part of) a Feature View pipeline.
5.9.1 When to Gap-Fill vs. Use the Aggregation API
If your goal is time-windowed aggregations (7-day sum, 30-day average), the Temporal Aggregation API handles sparsity natively via tiling – you do not need to densify first. Gap-filling is for use cases that need a dense time series at a regular grain as input (e.g., forecasting models, trend/slope features, interpolated metrics).
5.9.2 Approach 1: RESAMPLE + INTERPOLATE (Preview)
Snowflake provides time-series-specific SQL constructs for gap-filling. RESAMPLE upsamples to a regular interval (inserting rows for missing periods), and INTERPOLATE_FFILL / INTERPOLATE_BFILL / INTERPOLATE_LINEAR fill the resulting NULLs:
SELECT
entity_id,
event_timestamp,
metric_value,
INTERPOLATE_FFILL(metric_value)
OVER (PARTITION BY entity_id ORDER BY event_timestamp) AS metric_ffill,
INTERPOLATE_LINEAR(metric_value)
OVER (PARTITION BY entity_id ORDER BY event_timestamp) AS metric_linear
FROM source_table
RESAMPLE (event_timestamp EVERY '1 day' PARTITION BY entity_id)
ORDER BY entity_id, event_timestampSee the Snowflake time-series documentation for full syntax and limitations.
RESAMPLE and the INTERPOLATE_* functions are in Preview as of March 2026. RESAMPLE only fills gaps within the observed range of timestamps per entity – it does not extend beyond the first or last observed row. To extend through the current date, use the generator approach below.
5.9.3 Approach 2: Generated date spine + JOIN
For full control over the date range (e.g., extending through CURRENT_DATE even if the last observation was weeks ago), generate a dense calendar and join the sparse data onto it:
-- Dense calendar: one row per day from the earliest observation through today
WITH date_spine AS (
SELECT DATEADD('day', ROW_NUMBER() OVER (ORDER BY SEQ4()) - 1, '2024-01-01') AS cal_date
FROM TABLE(GENERATOR(ROWCOUNT => 1000))
QUALIFY cal_date <= CURRENT_DATE
),
-- Cross with entities
entity_dates AS (
SELECT e.entity_id, d.cal_date
FROM (SELECT DISTINCT entity_id FROM source_table) e
CROSS JOIN date_spine d
),
-- Left join sparse data and fill forward
filled AS (
SELECT
ed.entity_id,
ed.cal_date AS event_timestamp,
s.metric_value,
LAST_VALUE(s.metric_value IGNORE NULLS)
OVER (PARTITION BY ed.entity_id ORDER BY ed.cal_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS metric_ffill
FROM entity_dates ed
LEFT JOIN source_table s
ON ed.entity_id = s.entity_id AND ed.cal_date = s.event_timestamp::DATE
)
SELECT * FROM filled5.9.4 Feature View integration
Wrap the densified query as a View-based Feature View with timestamp_col pointing to the calendar date column. This avoids materialising the full dense grid as a DT (which can be expensive and may force full refresh due to GENERATOR or CURRENT_DATE in the SQL):
dense_metric_df = session.sql("SELECT ... FROM filled_view") # the dense query above
dense_fv = FeatureView(
name="ENTITY_DAILY_METRIC_FV",
entities=[entity],
feature_df=dense_metric_df,
timestamp_col="EVENT_TIMESTAMP",
refresh_freq=None, # View-based: dense computation runs at query time
desc="Gap-filled daily metrics with forward-fill interpolation",
)If materialization is needed (high read volume), consider splitting the pipeline: a pre-built table or DT for the date spine (refreshed periodically), joined to the sparse data in a downstream DT. Be aware that GENERATOR(...) and CURRENT_DATE in DT SQL can prevent incremental refresh – see Chapter 6: Lookup Tables for Dynamic History for a workaround using reference tables instead of hardcoded dates.
5.10 Best Practices
1. Match Refresh to Consumer Freshness Needs
# GOOD: Source updates hourly, consumers need hourly freshness
hourly_source_fv = FeatureView(
name="HOURLY_FEATURES",
refresh_freq="1 hour",
# ...
)
# ALSO OK: Source updates daily, refresh_freq shorter than source cadence
# The DT scheduler detects no upstream changes and skips without compute
daily_fv = FeatureView(
name="DAILY_FEATURES",
refresh_freq="1 hour", # checks hourly, but only refreshes when data lands
# ...
)A shorter refresh_freq than the source update rate incurs near-zero cost – the DT scheduler checks for upstream changes and skips the refresh cycle if nothing has changed. See Chapter 4: Feature Views for details.
2. Match Tile Size to Data Frequency
When using the Temporal Aggregation API, choose feature_granularity to match your data arrival rate. See Chapter 7: Aggregations API – Tile Size Recommendations for detailed guidance.
3. Pipeline Ownership & SLAs
Define clear ownership for each pipeline. The example below shows a team-maintained manifest file – this is a documentation and CI/CD convention, not something consumed by the Feature Store API. Teams can use YAML, JSON, or any format that fits their toolchain:
4. Pipeline Health Monitoring
Snowflake’s built-in Alerts let you define condition-based monitoring directly in SQL – no external monitoring stack required. An Alert evaluates a SQL condition on a schedule and fires an action (email, webhook, or procedure call) when the condition is true. Combined with Dynamic Table metadata from INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY() and INFORMATION_SCHEMA.DYNAMIC_TABLES(), this gives you pipeline-native observability without leaving Snowflake:
-- Alert when any Feature Store Dynamic Table falls behind by more than 60 minutes
CREATE ALERT feature_staleness_alert
WAREHOUSE = FS_DEV_WH
SCHEDULE = 'USING CRON 0 * * * * UTC'
IF (EXISTS (
SELECT *
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE SCHEMA_NAME = 'FEATURE_STORE'
AND TIMESTAMPDIFF('minute', DATA_TIMESTAMP, CURRENT_TIMESTAMP()) > 60
))
THEN
CALL SYSTEM$SEND_EMAIL('ml-alerts@company.com', 'Stale Features', 'Alert');See Chapter 11: Operations for comprehensive monitoring patterns including refresh history dashboards, data quality checks, and Streamlit-based observability.
5.11 Common Pitfalls
5.11.1 ❌ Pitfall 1: Over-Engineering Simple Features
Problem: Using complex pipeline for simple lookups.
Solution: Use View-based Feature View directly on source for static data.
5.11.2 ❌ Pitfall 2: Ignoring Incremental Refresh Compatibility
Problem: Using SQL patterns that prevent incremental refresh (e.g., RANDOM(), self-referential queries, or aggregations on FLOAT columns with JOINs).
Solution: Use deterministic SQL compatible with DT incremental refresh. Check REFRESH_MODE in INFORMATION_SCHEMA.DYNAMIC_TABLES() after registration – if it shows FULL when you expected INCREMENTAL, review your feature_df SQL.
You can also inspect refresh mode and history:
fv = fs.get_feature_view("MY_FV", "v1")
# Check the refresh mode assigned by the DT engine (AUTO, FULL, or INCREMENTAL)
print(f"Refresh mode: {fv.refresh_mode}")
print(f"Reason: {fv.refresh_mode_reason}")
# Review recent refresh history -- REFRESH_ACTION shows whether each refresh was INCREMENTAL or FULL
fs.get_refresh_history(fv).show()-- Check current refresh mode
SELECT NAME, REFRESH_MODE, REFRESH_MODE_REASON, SCHEDULING_STATE
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE NAME = 'MY_FV$V1'
AND SCHEMA_NAME = 'FEATURE_STORE';
-- Review recent refresh history
SELECT NAME, STATE, REFRESH_ACTION, REFRESH_START_TIME, REFRESH_END_TIME,
STATISTICS:numInsertedRows::INT AS ROWS_INSERTED
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
NAME => 'FEATURE_STORE_DEMO.FEATURE_STORE.MY_FV$V1'
))
ORDER BY REFRESH_START_TIME DESC
LIMIT 10;When the DT engine assigns AUTO refresh mode, it uses internal heuristics to decide between INCREMENTAL and FULL on each refresh cycle. These heuristics can be overly conservative (forcing FULL when INCREMENTAL would work), inconsistent across engine versions, and opaque to monitoring. For production Feature Views, explicitly set refresh_mode="INCREMENTAL" on the FeatureView constructor and verify the DT confirms that mode after registration:
fv = FeatureView(
name="USER_ORDER_FV",
entities=[user_entity],
feature_df=user_order_df,
refresh_freq="15 minutes",
refresh_mode="INCREMENTAL",
desc="User order features — explicit incremental refresh",
)If the engine overrides this to FULL (check fv.refresh_mode after registration), the underlying SQL contains a pattern incompatible with incremental tracking – fix the SQL rather than falling back to AUTO.
For an existing Feature View, you can also change the refresh mode via SQL:
ALTER DYNAMIC TABLE FEATURE_STORE_DEMO.FEATURE_STORE."USER_ORDER_FV$V01"
SET REFRESH_MODE = INCREMENTAL;See the Snowflake documentation on Dynamic Table refresh modes for the full list of SQL patterns that support incremental refresh.
A common trigger is aggregating FLOAT-typed columns (SUM, AVG, MIN, MAX, VAR, STD) in the same query block as a JOIN. The DT engine cannot prove incremental correctness for floating-point aggregate diffs across joins, so it forces full refresh. Workaround: cast to fixed-point before aggregating (CAST(float_col AS NUMBER(38, 6))), or split the query into a base DT (aggregation, no join) and a View layer that joins. See Chapter 4: Feature Views for the Base + Presentation pattern.
Rather than casting in every Feature View query, consider altering the source column type to NUMBER/DECIMAL. This is a one-time change that benefits all downstream DTs and eliminates the risk of forgetting a cast in a new Feature View. The ML precision impact is negligible – see Chapter 7: Aggregations API for details.
5.12 Rescuing incremental processing with Dynamic Table primary keys
Even when a DT is stuck in full-refresh mode, downstream DTs can now process incrementally if the full-refresh DT has a system-derived unique key. This applies to two common Feature Store scenarios:
Scenario 1 — INSERT OVERWRITE / CTAS source tables. Many production pipelines fully replace a base table on each load cycle (e.g., INSERT OVERWRITE, CREATE TABLE ... AS SELECT, or save_as_table(..., mode="overwrite")). Historically this broke change tracking and forced every downstream DT into full refresh. You can now declare a RELY primary key on the base table so Snowflake uses it for row-level change tracking across rewrites:
-- One-time: add a RELY primary key to the source table
ALTER TABLE MY_SOURCE_TABLE ADD PRIMARY KEY (ENTITY_ID) RELY;
-- Or if the constraint already exists:
ALTER TABLE MY_SOURCE_TABLE ALTER CONSTRAINT my_pk RELY;Downstream DTs reading from this table can now refresh incrementally — Snowflake compares primary key values between the old and new versions of the table rather than relying on internal change-tracking columns.
Scenario 2 — Full-refresh DTs with derived unique keys. When a DT’s query uses GROUP BY or QUALIFY ROW_NUMBER() = 1, Snowflake automatically derives a unique key from the query structure. A downstream DT can then opt into incremental refresh:
-- Downstream DT explicitly opts into incremental refresh
ALTER DYNAMIC TABLE FEATURE_STORE."MY_DOWNSTREAM_FV$V01"
SET REFRESH_MODE = INCREMENTAL;This is particularly valuable for the Base + Presentation pattern (Chapter 4): if the base DT performs a GROUP BY aggregation that forces full refresh (e.g., the FLOAT+JOIN case above), any downstream DT reading from it can still be incremental thanks to the derived key.
Verification:
-- Check whether the upstream DT has a derived unique key
SHOW UNIQUE KEYS IN FEATURE_STORE."MY_BASE_FV$V01";Important caveats:
REFRESH_MODE = AUTOdoes not resolve to incremental in this scenario – you must setINCREMENTALexplicitly.- This is a SQL-level capability; the Feature Store Python API does not yet expose
RELYprimary keys or per-DT refresh mode overrides. UseALTER DYNAMIC TABLEafter API registration, or create the upstream stage as a plain DT outside the Feature Store (the same pattern recommended in Section 3 and the DT-to-DT Chaining section above for cases where the API lags behind SQL features). - See Understanding primary keys in dynamic tables for the full list of supported patterns.
5.12.1 ❌ Pitfall 3: Partial Leading-Edge Interval in Streaming/Continuous Sources
Problem: When a DT Feature View computes window functions over time-bucketed data (e.g., 15-minute or hourly aggregations of continuous transactions), the current time bucket at refresh time is typically incomplete. A 15-minute interval queried at minute 7 contains roughly half the expected events. Window functions using CURRENT ROW include this partial bucket, causing training/serving skew: the model trains on complete historical intervals but at inference time receives features computed over a partial current interval. This affects every feature that touches CURRENT ROW — rolling means, LAGs, DIFFs, rate-of-change — and produces systematically biased predictions.
Solution: Three complementary strategies, each appropriate for different layers of the pipeline:
Strategy 1 — Exclude CURRENT ROW in window functions (DT-safe):
Use 1 PRECEDING as the upper window bound inside the DT definition. This is deterministic SQL and preserves incremental DT refresh:
-- ✅ DT-safe: excludes potentially partial current row
AVG(QTY) OVER (PARTITION BY ITEM_CODE ORDER BY INTERVAL_ID
ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING)
-- ❌ Includes partial current row — biased at the leading edge
AVG(QTY) OVER (PARTITION BY ITEM_CODE ORDER BY INTERVAL_ID
ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)Strategy 2 — Use offset in tiled Feature Views (DT-safe):
The Feature class offset parameter shifts the aggregation window back, inherently skipping the potentially partial current interval:
# Window covers the 4 intervals ending one interval ago — excludes current
Feature.avg("QTY", "1h", offset="15m").alias("QTY_ROLL_4_MEAN")Strategy 3 — Gate the source table externally (outside the DT):
Manage the source table so it only ever contains complete intervals. Use a scheduled Task or Stream + Task pipeline that aggregates raw transactions and writes complete intervals to the source table:
-- Task runs every 15 minutes: write only complete intervals to source
INSERT INTO SOURCE_ORDERS_15MIN
SELECT TIME_SLICE(ORDER_TS, 15, 'MINUTE') AS INTERVAL_ID,
ITEM_CODE, BRANCH_CODE, COUNT(*) AS QTY
FROM RAW_ORDERS
WHERE TIME_SLICE(ORDER_TS, 15, 'MINUTE')
< TIME_SLICE(CURRENT_TIMESTAMP(), 15, 'MINUTE', 'START')
GROUP BY 1, 2, 3;CURRENT_TIMESTAMP() and other non-deterministic functions force a DT into FULL refresh. Apply time-based source gating in a Task, Stream, or View-based query layer outside the DT — never in the DT definition SQL.
Strategies 1 and 2 are the most practical for Feature Views because they work entirely within the DT. Strategy 3 is appropriate when you control the source ingestion pipeline and want to fix the problem once for all downstream consumers.
See also: Chapter 6: Data Leakage from Partial Intervals, Chapter 7: Window Offset as Safeguard.
5.12.2 ❌ Pitfall 4: Mismatched Freshness Expectations
Problem: Pipeline refresh doesn’t match business SLA.
Solution: Align refresh frequency with business requirements.
5.12.3 ❌ Pitfall 5: No Monitoring or Alerting
Problem: Stale features go undetected.
Solution: Implement monitoring and alerting for pipeline health.
5.12.4 ⚠️ Pitfall 6: Unexpected column additions from SELECT * DTs
Problem: A DT defined with SELECT * FROM base_table now picks up new columns added to the base table on the next refresh. If downstream consumers — training pipelines, serving jobs, or downstream DTs with explicit column lists — are not expecting the new column, this can cause schema mismatch errors or unintentional model input changes.
Solution: Use SELECT * DTs for pass-through Feature Views only when column additions are coordinated across source and consumer teams, and consumers are written to tolerate dynamic schemas. If you need an explicit gate before new columns reach consumers:
- Use an explicit column list in the DT query (
SELECT USER_ID, COL_A, COL_B FROM base_table) — new base-table columns will not appear until you update the DT definition. - Or use the OBJECT-bundled approach (Chapter 12:
#sec-select-star-vs-object), where the upstream VIEW definition controls exactly which features enter the bundle.
Pre-existing rows in the DT will be fully recomputed from source for the new column (not NULL-filled), as the INCREMENTAL refresh action triggers a full row-level delete + re-insert. The compute cost is equivalent to a full refresh. Use IMMUTABLE WHERE if you need to protect historical rows from this recompute, in which case those protected rows will receive NULL for the new column. See Chapter 4: Incremental Column Pickup with SELECT * for full details.
5.12.5 ❌ Pitfall 7: All-DOWNSTREAM Pipeline Never Refreshes
Problem: In a chained Feature View pipeline, every Feature View has refresh_freq="DOWNSTREAM" (or the equivalent SQL TARGET_LAG = DOWNSTREAM). The pipeline produces no data and raises no error – refreshes simply never trigger.
Why it happens: DOWNSTREAM means “only refresh when a downstream consumer needs fresh data.” If the terminal Feature View (the one consumed by training or serving) is also set to DOWNSTREAM, no DT in the chain has a time-based target lag, so nothing ever initiates a refresh.
Solution: At least one Feature View in the chain – the terminal (leaf) consumer that drives the pipeline – must have an explicit time-based refresh_freq:
# ✅ Intermediate stages: refresh only when needed
base_fv = FeatureView(
name="USER_ORDER_BASE",
entities=[user_entity],
feature_df=base_df,
refresh_freq="DOWNSTREAM", # only refreshes when downstream asks
)
# ✅ Terminal stage: owns the clock for the entire pipeline
combined_fv = FeatureView(
name="USER_COMBINED_FEATURES",
entities=[user_entity],
feature_df=combined_df,
refresh_freq="30 minutes", # drives the whole chain
)Verify by querying INFORMATION_SCHEMA.DYNAMIC_TABLES() after registration – any DT with TARGET_LAG_TYPE = 'USER_DEFINED' is clock-driven; DTs showing only DOWNSTREAM are waiting for a downstream trigger. If every DT in your Feature Store schema shows DOWNSTREAM, the pipeline will never refresh.
5.13 Summary
| Pipeline Pattern | Orchestration | Freshness | Best For |
|---|---|---|---|
| External (dbt) | External tool | Minutes-hours | Existing dbt investments |
| Hybrid (dbt + Agg API) | dbt + Snowflake | Mixed | dbt for source quality; FS for temporal/PIT features |
| Internal (DT) | Snowflake | Seconds-minutes | New implementations |
| Python transforms | UDF/UDTF/UDAF in DT | Follows DT | Custom Python logic (hashing, parsing, partition stats) |
| Chained FVs | Snowflake (DT graph) | Configurable (DOWNSTREAM minimises stacking) | Multi-stage pipelines, base + derived features |
| Temporal API | Snowflake | Configurable | Time-windowed aggs with tiling; see Ch07 |
| Rollup | Snowflake | Follows source | Hierarchical entity aggregation; see Ch07 |
5.14 Next Steps
Continue to Chapter 6: Temporal Features to learn about point-in-time correctness, ASOF joins, and preventing data leakage.