CREATE FLOW (Lakeflow Declarative Pipelines)
Use the CREATE FLOW
statement to create flows or backfills for your Lakeflow Declarative Pipelines tables.
Syntax
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parameters
-
flow_name
The name of the flow to create.
-
COMMENT
An optional description for the flow.
-
An
AUTO CDC ... INTO
statement that defines the flow, with acreate_auto_cdc_flow_spec
. You must either include anAUTO CDC ... INTO
statement, or anINSERT INTO
statement. UseAUTO CDC ... INTO
when the source query uses change data semantics.For more information, see AUTO CDC INTO (Lakeflow Declarative Pipelines).
-
target_table
The table to update. This must be a Streaming table.
-
INSERT INTO
Defines a table query that is inserted into to the target table. If the
ONCE
option is not supplied, the query must be a streaming query. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and theSkipChangeCommits
option to handle errors.INSERT INTO
is mutually exclusive withAUTO CDC ... INTO
. UseAUTO CDC ... INTO
when the source data includes change data capture (CDC) functionality. UseINSERT INTO
when the source does not.For more information on streaming data, see Transform data with pipelines.
-
ONCE
Optionally define the flow as a one time flow, such as a backfill. Using
ONCE
changes the flow in two ways:- The source
query
orcreate_auto_cdc_flow_spec
is not a streaming table. - The flow is run one time by default. If the pipeline is updated with a complete refresh, then the
ONCE
flow will run again to recreate the data.
- The source
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;