こんにちは、 @kz_morita です。
普段開発で dbt を用いて開発をしていますが、共通のDBにデータを入れるが、dbt の実行は別のタイミングで動かしたいという要件がありました。わかりやすい例で言うとマルチテナントのデータでテナントごとにデータの更新を行いたい場合などです。 このときに、dbt の incremental モデルを使うと、テナントごとにデータの更新を行うことができそうだったので調査しました。
このあとでは、データごとに client_id というテナントを管理する ID を持つデータを仮定し、dbt の incremental モデルを使って client_id ごとにデータの更新を行う方法について説明します。
結論
-
incremental model で unique_key を client_id, incremental_storategy を
delete+insert
にすれば、client_id ごとのモデルの実行ができた -
ただし、データモデルに以下の変更をする必要がある
-
materialized = incremetal
config( materialized='incremental', unique_key='client_id', incremental_strategy='delete+insert', )
-
is_incremental による client_id のフィルタをつける
{% if is_incremental() %} where client_id = {{ var('client_id') }} {% endif %}
-
-
dbt build 時などに client_id を変数として渡すこと対象のデータのみを更新できる
dbt build --vars '{"client_id": "client_1"}'
検証メモ
以下検証のメモです。
以下の様なサンプルデータを用意
- raw_data
CLIENT_ID ID NAME AGE 1 1 Alice 30 1 2 Bob 25 1 3 Charlie 35 1 4 David 28 1 5 Eve 22 1 6 Frank 40 1 7 Grace 29 1 8 Hannah 31 1 9 Ian 27 1 10 Jack 33 2 11 Kathy 26 2 12 Leo 32 2 13 Mia 24 2 14 Nina 36 2 15 Oscar 38 2 16 Paul 34 2 17 Quinn 23 2 18 Rita 37 2 19 Sam 39 2 20 Tina 30 3 21 Uma 25 3 22 Vera 35 3 23 Will 28 3 24 Xena 22 3 25 Yara 40 3 26 Zane 29 3 27 Alice 31 3 28 Bob 27 3 29 Charlie 33 3 30 David 26 4 31 Eve 32 4 32 Frank 24 4 33 Grace 36 4 34 Hannah 38 4 35 Ian 34 4 36 Jack 23 4 37 Kathy 37 4 38 Leo 39 4 39 Mia 30 4 40 Nina 25
以下の様に上記テーブルを読み込む staging model 相当のモデルを作成
{{
config(
materialized='table',
)
}}
with
source as (
select
*
from {{ ref('source') }}
)
select
*
, current_timestamp() as stg_load_time
from source
以下の様に timestamp が更新される。
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 1 1 Alice 30 2025-06-13 07:00:37.063 +0900 1 2 Bob 25 2025-06-13 07:00:37.063 +0900 1 3 Charlie 35 2025-06-13 07:00:37.063 +0900 1 4 David 28 2025-06-13 07:00:37.063 +0900 1 5 Eve 22 2025-06-13 07:00:37.063 +0900 1 6 Frank 40 2025-06-13 07:00:37.063 +0900 1 7 Grace 29 2025-06-13 07:00:37.063 +0900 1 8 Hannah 31 2025-06-13 07:00:37.063 +0900 1 9 Ian 27 2025-06-13 07:00:37.063 +0900 1 10 Jack 33 2025-06-13 07:00:37.063 +0900 2 11 Kathy 26 2025-06-13 07:00:37.063 +0900 2 12 Leo 32 2025-06-13 07:00:37.063 +0900 2 13 Mia 24 2025-06-13 07:00:37.063 +0900 2 14 Nina 36 2025-06-13 07:00:37.063 +0900 2 15 Oscar 38 2025-06-13 07:00:37.063 +0900 2 16 Paul 34 2025-06-13 07:00:37.063 +0900 2 17 Quinn 23 2025-06-13 07:00:37.063 +0900 2 18 Rita 37 2025-06-13 07:00:37.063 +0900 2 19 Sam 39 2025-06-13 07:00:37.063 +0900 2 20 Tina 30 2025-06-13 07:00:37.063 +0900 3 21 Uma 25 2025-06-13 07:00:37.063 +0900 3 22 Vera 35 2025-06-13 07:00:37.063 +0900 3 23 Will 28 2025-06-13 07:00:37.063 +0900 3 24 Xena 22 2025-06-13 07:00:37.063 +0900 3 25 Yara 40 2025-06-13 07:00:37.063 +0900 3 26 Zane 29 2025-06-13 07:00:37.063 +0900 3 27 Alice 31 2025-06-13 07:00:37.063 +0900 3 28 Bob 27 2025-06-13 07:00:37.063 +0900 3 29 Charlie 33 2025-06-13 07:00:37.063 +0900 3 30 David 26 2025-06-13 07:00:37.063 +0900 4 31 Eve 32 2025-06-13 07:00:37.063 +0900 4 32 Frank 24 2025-06-13 07:00:37.063 +0900 4 33 Grace 36 2025-06-13 07:00:37.063 +0900 4 34 Hannah 38 2025-06-13 07:00:37.063 +0900 4 35 Ian 34 2025-06-13 07:00:37.063 +0900 4 36 Jack 23 2025-06-13 07:00:37.063 +0900 4 37 Kathy 37 2025-06-13 07:00:37.063 +0900 4 38 Leo 39 2025-06-13 07:00:37.063 +0900 4 39 Mia 30 2025-06-13 07:00:37.063 +0900 4 40 Nina 25 2025-06-13 07:00:37.063 +0900
compile された SQL はこちら
create or replace transient table sample_db.sample_schema.poc_staging
as
(
with
source as (
select
*
from sample_db.sample_schema.poc_source
)
select
*
, current_timestamp() as stg_load_time
from source
);
次に、incremental model を試す。 まずは、materialization を incremental にしただけで試す。
{{
config(
materialized='incremental'
)
}}
with
source as (
select
*
from {{ ref('poc_source') }}
)
select
*
, current_timestamp() as stg_load_time
from source
以下の様な結果となった。データが重複した
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 1 1 Alice 30 2025-06-13 07:00:37.063 +0900 1 2 Bob 25 2025-06-13 07:00:37.063 +0900 1 3 Charlie 35 2025-06-13 07:00:37.063 +0900 1 4 David 28 2025-06-13 07:00:37.063 +0900 1 5 Eve 22 2025-06-13 07:00:37.063 +0900 1 6 Frank 40 2025-06-13 07:00:37.063 +0900 1 7 Grace 29 2025-06-13 07:00:37.063 +0900 1 8 Hannah 31 2025-06-13 07:00:37.063 +0900 1 9 Ian 27 2025-06-13 07:00:37.063 +0900 1 10 Jack 33 2025-06-13 07:00:37.063 +0900 2 11 Kathy 26 2025-06-13 07:00:37.063 +0900 2 12 Leo 32 2025-06-13 07:00:37.063 +0900 2 13 Mia 24 2025-06-13 07:00:37.063 +0900 2 14 Nina 36 2025-06-13 07:00:37.063 +0900 2 15 Oscar 38 2025-06-13 07:00:37.063 +0900 2 16 Paul 34 2025-06-13 07:00:37.063 +0900 2 17 Quinn 23 2025-06-13 07:00:37.063 +0900 2 18 Rita 37 2025-06-13 07:00:37.063 +0900 2 19 Sam 39 2025-06-13 07:00:37.063 +0900 2 20 Tina 30 2025-06-13 07:00:37.063 +0900 3 21 Uma 25 2025-06-13 07:00:37.063 +0900 3 22 Vera 35 2025-06-13 07:00:37.063 +0900 3 23 Will 28 2025-06-13 07:00:37.063 +0900 3 24 Xena 22 2025-06-13 07:00:37.063 +0900 3 25 Yara 40 2025-06-13 07:00:37.063 +0900 3 26 Zane 29 2025-06-13 07:00:37.063 +0900 3 27 Alice 31 2025-06-13 07:00:37.063 +0900 3 28 Bob 27 2025-06-13 07:00:37.063 +0900 3 29 Charlie 33 2025-06-13 07:00:37.063 +0900 3 30 David 26 2025-06-13 07:00:37.063 +0900 4 31 Eve 32 2025-06-13 07:00:37.063 +0900 4 32 Frank 24 2025-06-13 07:00:37.063 +0900 4 33 Grace 36 2025-06-13 07:00:37.063 +0900 4 34 Hannah 38 2025-06-13 07:00:37.063 +0900 4 35 Ian 34 2025-06-13 07:00:37.063 +0900 4 36 Jack 23 2025-06-13 07:00:37.063 +0900 4 37 Kathy 37 2025-06-13 07:00:37.063 +0900 4 38 Leo 39 2025-06-13 07:00:37.063 +0900 4 39 Mia 30 2025-06-13 07:00:37.063 +0900 4 40 Nina 25 2025-06-13 07:00:37.063 +0900 1 1 Alice 30 2025-06-13 07:05:15.745 +0900 1 2 Bob 25 2025-06-13 07:05:15.745 +0900 1 3 Charlie 35 2025-06-13 07:05:15.745 +0900 1 4 David 28 2025-06-13 07:05:15.745 +0900 1 5 Eve 22 2025-06-13 07:05:15.745 +0900 1 6 Frank 40 2025-06-13 07:05:15.745 +0900 1 7 Grace 29 2025-06-13 07:05:15.745 +0900 1 8 Hannah 31 2025-06-13 07:05:15.745 +0900 1 9 Ian 27 2025-06-13 07:05:15.745 +0900 1 10 Jack 33 2025-06-13 07:05:15.745 +0900 2 11 Kathy 26 2025-06-13 07:05:15.745 +0900 2 12 Leo 32 2025-06-13 07:05:15.745 +0900 2 13 Mia 24 2025-06-13 07:05:15.745 +0900 2 14 Nina 36 2025-06-13 07:05:15.745 +0900 2 15 Oscar 38 2025-06-13 07:05:15.745 +0900 2 16 Paul 34 2025-06-13 07:05:15.745 +0900 2 17 Quinn 23 2025-06-13 07:05:15.745 +0900 2 18 Rita 37 2025-06-13 07:05:15.745 +0900 2 19 Sam 39 2025-06-13 07:05:15.745 +0900 2 20 Tina 30 2025-06-13 07:05:15.745 +0900 3 21 Uma 25 2025-06-13 07:05:15.745 +0900 3 22 Vera 35 2025-06-13 07:05:15.745 +0900 3 23 Will 28 2025-06-13 07:05:15.745 +0900 3 24 Xena 22 2025-06-13 07:05:15.745 +0900 3 25 Yara 40 2025-06-13 07:05:15.745 +0900 3 26 Zane 29 2025-06-13 07:05:15.745 +0900 3 27 Alice 31 2025-06-13 07:05:15.745 +0900 3 28 Bob 27 2025-06-13 07:05:15.745 +0900 3 29 Charlie 33 2025-06-13 07:05:15.745 +0900 3 30 David 26 2025-06-13 07:05:15.745 +0900 4 31 Eve 32 2025-06-13 07:05:15.745 +0900 4 32 Frank 24 2025-06-13 07:05:15.745 +0900 4 33 Grace 36 2025-06-13 07:05:15.745 +0900 4 34 Hannah 38 2025-06-13 07:05:15.745 +0900 4 35 Ian 34 2025-06-13 07:05:15.745 +0900 4 36 Jack 23 2025-06-13 07:05:15.745 +0900 4 37 Kathy 37 2025-06-13 07:05:15.745 +0900 4 38 Leo 39 2025-06-13 07:05:15.745 +0900 4 39 Mia 30 2025-06-13 07:05:15.745 +0900 4 40 Nina 25 2025-06-13 07:05:15.745 +0900
unique_key とか指定してないので当然っぽい動き。
生成された SQL は以下
-- back compat for old kwarg name
begin;
insert into sample_db.sample_schema.poc_staging ("CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME")
(
select "CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME"
from sample_db.sample_schema.poc_staging__dbt_tmp
);
commit;
動きとしては、今回分の実行するデータの tmp table を作成してそこから INSERT してるっぽい。
それでは、unique_key に client_id を付与してみる。
{{
config(
materialized='incremental',
unique_key='client_id'
)
}}
with
source as (
select
*
from {{ ref('poc_source') }}
)
select
*
, current_timestamp() as stg_load_time
from source
poc_staging テーブルは一度クリアする。
再度実行(初回実行時の挙動)
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 1 1 Alice 30 2025-06-13 07:23:32.028 +0900 1 2 Bob 25 2025-06-13 07:23:32.028 +0900 1 3 Charlie 35 2025-06-13 07:23:32.028 +0900 1 4 David 28 2025-06-13 07:23:32.028 +0900 1 5 Eve 22 2025-06-13 07:23:32.028 +0900 1 6 Frank 40 2025-06-13 07:23:32.028 +0900 1 7 Grace 29 2025-06-13 07:23:32.028 +0900 1 8 Hannah 31 2025-06-13 07:23:32.028 +0900 1 9 Ian 27 2025-06-13 07:23:32.028 +0900 1 10 Jack 33 2025-06-13 07:23:32.028 +0900 2 11 Kathy 26 2025-06-13 07:23:32.028 +0900 2 12 Leo 32 2025-06-13 07:23:32.028 +0900 2 13 Mia 24 2025-06-13 07:23:32.028 +0900 2 14 Nina 36 2025-06-13 07:23:32.028 +0900 2 15 Oscar 38 2025-06-13 07:23:32.028 +0900 2 16 Paul 34 2025-06-13 07:23:32.028 +0900 2 17 Quinn 23 2025-06-13 07:23:32.028 +0900 2 18 Rita 37 2025-06-13 07:23:32.028 +0900 2 19 Sam 39 2025-06-13 07:23:32.028 +0900 2 20 Tina 30 2025-06-13 07:23:32.028 +0900 3 21 Uma 25 2025-06-13 07:23:32.028 +0900 3 22 Vera 35 2025-06-13 07:23:32.028 +0900 3 23 Will 28 2025-06-13 07:23:32.028 +0900 3 24 Xena 22 2025-06-13 07:23:32.028 +0900 3 25 Yara 40 2025-06-13 07:23:32.028 +0900 3 26 Zane 29 2025-06-13 07:23:32.028 +0900 3 27 Alice 31 2025-06-13 07:23:32.028 +0900 3 28 Bob 27 2025-06-13 07:23:32.028 +0900 3 29 Charlie 33 2025-06-13 07:23:32.028 +0900 3 30 David 26 2025-06-13 07:23:32.028 +0900 4 31 Eve 32 2025-06-13 07:23:32.028 +0900 4 32 Frank 24 2025-06-13 07:23:32.028 +0900 4 33 Grace 36 2025-06-13 07:23:32.028 +0900 4 34 Hannah 38 2025-06-13 07:23:32.028 +0900 4 35 Ian 34 2025-06-13 07:23:32.028 +0900 4 36 Jack 23 2025-06-13 07:23:32.028 +0900 4 37 Kathy 37 2025-06-13 07:23:32.028 +0900 4 38 Leo 39 2025-06-13 07:23:32.028 +0900 4 39 Mia 30 2025-06-13 07:23:32.028 +0900 4 40 Nina 25 2025-06-13 07:23:32.028 +0900
通常通り全件データが連携される。
この状態でもう一度実行してみる。
するとエラーになる
22:24:42
22:24:42 1 of 2 START sql table model sample_schema.poc_source ............................. [RUN]
22:24:43 1 of 2 OK created sql table model sample_schema.poc_source ........................ [SUCCESS 1 in 1.14s]
22:24:43 2 of 2 START sql incremental model sample_schema.poc_staging ...................... [RUN]
22:24:44 2 of 2 ERROR creating sql incremental model sample_schema.poc_staging ............. [ERROR in 1.62s]
22:24:44
22:24:44 Finished running 1 table model, 1 incremental model, 1 project hook in 0 hours 0 minutes and 3.92 seconds (3.92s).
22:24:45
22:24:45 Completed with 1 error and 0 warnings:
22:24:45
22:24:45 Database Error in model poc_staging (models/poc/poc_staging.sql)
100090 (42P18): Duplicate row detected during DML action
Row Values: [redacted]
compiled code at target/run/sample_db/models/poc/poc_staging.sql
22:24:45
22:24:45 Done. PASS=1 WARN=0 ERROR=1 SKIP=0 TOTAL=2
つまり default の merge mode ではちゃんと、unique になる key を指定しないとだめっぽい。
Compile されたコードはこちら
-- back compat for old kwarg name
begin;
merge into sample_db.sample_schema.poc_staging as DBT_INTERNAL_DEST
using sample_db.sample_schema.poc_staging__dbt_tmp as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.client_id = DBT_INTERNAL_DEST.client_id
)
when matched then update set
"CLIENT_ID" = DBT_INTERNAL_SOURCE."CLIENT_ID","ID" = DBT_INTERNAL_SOURCE."ID","NAME" = DBT_INTERNAL_SOURCE."NAME","AGE" = DBT_INTERNAL_SOURCE."AGE","STG_LOAD_TIME" = DBT_INTERNAL_SOURCE."STG_LOAD_TIME"
when not matched then insert
("CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME")
values
("CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME")
;
commit;
incremental_strategy を ‘delete+insert’ に変えてみる
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert',
)
}}
with
source as (
select
*
from {{ ref('poc_source') }}
)
select
*
, current_timestamp() as stg_load_time
from source
- 初回実行data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 1 1 Alice 30 2025-06-13 07:33:07.641 +0900 1 2 Bob 25 2025-06-13 07:33:07.641 +0900 1 3 Charlie 35 2025-06-13 07:33:07.641 +0900 1 4 David 28 2025-06-13 07:33:07.641 +0900 1 5 Eve 22 2025-06-13 07:33:07.641 +0900 1 6 Frank 40 2025-06-13 07:33:07.641 +0900 1 7 Grace 29 2025-06-13 07:33:07.641 +0900 1 8 Hannah 31 2025-06-13 07:33:07.641 +0900 1 9 Ian 27 2025-06-13 07:33:07.641 +0900 1 10 Jack 33 2025-06-13 07:33:07.641 +0900 2 11 Kathy 26 2025-06-13 07:33:07.641 +0900 2 12 Leo 32 2025-06-13 07:33:07.641 +0900 2 13 Mia 24 2025-06-13 07:33:07.641 +0900 2 14 Nina 36 2025-06-13 07:33:07.641 +0900 2 15 Oscar 38 2025-06-13 07:33:07.641 +0900 2 16 Paul 34 2025-06-13 07:33:07.641 +0900 2 17 Quinn 23 2025-06-13 07:33:07.641 +0900 2 18 Rita 37 2025-06-13 07:33:07.641 +0900 2 19 Sam 39 2025-06-13 07:33:07.641 +0900 2 20 Tina 30 2025-06-13 07:33:07.641 +0900 3 21 Uma 25 2025-06-13 07:33:07.641 +0900 3 22 Vera 35 2025-06-13 07:33:07.641 +0900 3 23 Will 28 2025-06-13 07:33:07.641 +0900 3 24 Xena 22 2025-06-13 07:33:07.641 +0900 3 25 Yara 40 2025-06-13 07:33:07.641 +0900 3 26 Zane 29 2025-06-13 07:33:07.641 +0900 3 27 Alice 31 2025-06-13 07:33:07.641 +0900 3 28 Bob 27 2025-06-13 07:33:07.641 +0900 3 29 Charlie 33 2025-06-13 07:33:07.641 +0900 3 30 David 26 2025-06-13 07:33:07.641 +0900 4 31 Eve 32 2025-06-13 07:33:07.641 +0900 4 32 Frank 24 2025-06-13 07:33:07.641 +0900 4 33 Grace 36 2025-06-13 07:33:07.641 +0900 4 34 Hannah 38 2025-06-13 07:33:07.641 +0900 4 35 Ian 34 2025-06-13 07:33:07.641 +0900 4 36 Jack 23 2025-06-13 07:33:07.641 +0900 4 37 Kathy 37 2025-06-13 07:33:07.641 +0900 4 38 Leo 39 2025-06-13 07:33:07.641 +0900 4 39 Mia 30 2025-06-13 07:33:07.641 +0900 4 40 Nina 25 2025-06-13 07:33:07.641 +0900 - 2回目実行data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 1 1 Alice 30 2025-06-13 07:34:20.662 +0900 1 2 Bob 25 2025-06-13 07:34:20.662 +0900 1 3 Charlie 35 2025-06-13 07:34:20.662 +0900 1 4 David 28 2025-06-13 07:34:20.662 +0900 1 5 Eve 22 2025-06-13 07:34:20.662 +0900 1 6 Frank 40 2025-06-13 07:34:20.662 +0900 1 7 Grace 29 2025-06-13 07:34:20.662 +0900 1 8 Hannah 31 2025-06-13 07:34:20.662 +0900 1 9 Ian 27 2025-06-13 07:34:20.662 +0900 1 10 Jack 33 2025-06-13 07:34:20.662 +0900 2 11 Kathy 26 2025-06-13 07:34:20.662 +0900 2 12 Leo 32 2025-06-13 07:34:20.662 +0900 2 13 Mia 24 2025-06-13 07:34:20.662 +0900 2 14 Nina 36 2025-06-13 07:34:20.662 +0900 2 15 Oscar 38 2025-06-13 07:34:20.662 +0900 2 16 Paul 34 2025-06-13 07:34:20.662 +0900 2 17 Quinn 23 2025-06-13 07:34:20.662 +0900 2 18 Rita 37 2025-06-13 07:34:20.662 +0900 2 19 Sam 39 2025-06-13 07:34:20.662 +0900 2 20 Tina 30 2025-06-13 07:34:20.662 +0900 3 21 Uma 25 2025-06-13 07:34:20.662 +0900 3 22 Vera 35 2025-06-13 07:34:20.662 +0900 3 23 Will 28 2025-06-13 07:34:20.662 +0900 3 24 Xena 22 2025-06-13 07:34:20.662 +0900 3 25 Yara 40 2025-06-13 07:34:20.662 +0900 3 26 Zane 29 2025-06-13 07:34:20.662 +0900 3 27 Alice 31 2025-06-13 07:34:20.662 +0900 3 28 Bob 27 2025-06-13 07:34:20.662 +0900 3 29 Charlie 33 2025-06-13 07:34:20.662 +0900 3 30 David 26 2025-06-13 07:34:20.662 +0900 4 31 Eve 32 2025-06-13 07:34:20.662 +0900 4 32 Frank 24 2025-06-13 07:34:20.662 +0900 4 33 Grace 36 2025-06-13 07:34:20.662 +0900 4 34 Hannah 38 2025-06-13 07:34:20.662 +0900 4 35 Ian 34 2025-06-13 07:34:20.662 +0900 4 36 Jack 23 2025-06-13 07:34:20.662 +0900 4 37 Kathy 37 2025-06-13 07:34:20.662 +0900 4 38 Leo 39 2025-06-13 07:34:20.662 +0900 4 39 Mia 30 2025-06-13 07:34:20.662 +0900 4 40 Nina 25 2025-06-13 07:34:20.662 +0900
日付だけ、更新されていることが確認できた。
生成された SQL は以下
begin;
delete from sample_db.sample_schema.poc_staging
where (
client_id) in (
select (client_id)
from sample_db.sample_schema.poc_staging__dbt_tmp
);
insert into sample_db.sample_schema.poc_staging ("CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME")
(
select "CLIENT_ID", "ID", "NAME", "AGE", "STG_LOAD_TIME"
from sample_db.sample_schema.poc_staging__dbt_tmp
);
commit;
is_incremental による差分更新を試す。
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert',
)
}}
with
source as (
select
*
from {{ ref('poc_source') }}
)
select
*
, current_timestamp() as stg_load_time
from source
{% if is_incremental() %}
where client_id = {{ var('poc_client_id') }}
{% endif %}
上記のコード実行してみる。(poc_client_id を指定しない状態)
当然エラーになる
:43:06 Concurrency: 4 threads (target='dev')
22:43:06
22:43:06 1 of 2 START sql table model sample_schema.poc_source ............................. [RUN]
22:43:08 1 of 2 OK created sql table model sample_schema.poc_source ........................ [SUCCESS 1 in 1.13s]
22:43:08 2 of 2 START sql incremental model sample_schema.poc_staging ...................... [RUN]
22:43:08 2 of 2 ERROR creating sql incremental model sample_schema.poc_staging ............. [ERROR in 0.04s]
22:43:08
22:43:08 Finished running 1 table model, 1 incremental model, 1 project hook in 0 hours 0 minutes and 2.43 seconds (2.43s).
22:43:08
22:43:08 Completed with 1 error and 0 warnings:
22:43:08
22:43:08 Compilation Error in model poc_staging (models/poc/poc_staging.sql)
Required var 'poc_client_id' not found in config:
Vars supplied to poc_staging = {
"client_id": 6
}
22:43:08
22:43:08 Done. PASS=1 WARN=0 ERROR=1 SKIP=0 TOTAL=2
CLI から var を渡す。
dbt run --vars '{"poc_client_id": 1}'
想定通り、client_id = 1 のデータのみ更新することができた。
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 2 11 Kathy 26 2025-06-13 07:34:20.662 +0900 2 12 Leo 32 2025-06-13 07:34:20.662 +0900 2 13 Mia 24 2025-06-13 07:34:20.662 +0900 2 14 Nina 36 2025-06-13 07:34:20.662 +0900 2 15 Oscar 38 2025-06-13 07:34:20.662 +0900 2 16 Paul 34 2025-06-13 07:34:20.662 +0900 2 17 Quinn 23 2025-06-13 07:34:20.662 +0900 2 18 Rita 37 2025-06-13 07:34:20.662 +0900 2 19 Sam 39 2025-06-13 07:34:20.662 +0900 2 20 Tina 30 2025-06-13 07:34:20.662 +0900 3 21 Uma 25 2025-06-13 07:34:20.662 +0900 3 22 Vera 35 2025-06-13 07:34:20.662 +0900 3 23 Will 28 2025-06-13 07:34:20.662 +0900 3 24 Xena 22 2025-06-13 07:34:20.662 +0900 3 25 Yara 40 2025-06-13 07:34:20.662 +0900 3 26 Zane 29 2025-06-13 07:34:20.662 +0900 3 27 Alice 31 2025-06-13 07:34:20.662 +0900 3 28 Bob 27 2025-06-13 07:34:20.662 +0900 3 29 Charlie 33 2025-06-13 07:34:20.662 +0900 3 30 David 26 2025-06-13 07:34:20.662 +0900 4 31 Eve 32 2025-06-13 07:34:20.662 +0900 4 32 Frank 24 2025-06-13 07:34:20.662 +0900 4 33 Grace 36 2025-06-13 07:34:20.662 +0900 4 34 Hannah 38 2025-06-13 07:34:20.662 +0900 4 35 Ian 34 2025-06-13 07:34:20.662 +0900 4 36 Jack 23 2025-06-13 07:34:20.662 +0900 4 37 Kathy 37 2025-06-13 07:34:20.662 +0900 4 38 Leo 39 2025-06-13 07:34:20.662 +0900 4 39 Mia 30 2025-06-13 07:34:20.662 +0900 4 40 Nina 25 2025-06-13 07:34:20.662 +0900 1 1 Alice 30 2025-06-13 07:44:42.332 +0900 1 2 Bob 25 2025-06-13 07:44:42.332 +0900 1 3 Charlie 35 2025-06-13 07:44:42.332 +0900 1 4 David 28 2025-06-13 07:44:42.332 +0900 1 5 Eve 22 2025-06-13 07:44:42.332 +0900 1 6 Frank 40 2025-06-13 07:44:42.332 +0900 1 7 Grace 29 2025-06-13 07:44:42.332 +0900 1 8 Hannah 31 2025-06-13 07:44:42.332 +0900 1 9 Ian 27 2025-06-13 07:44:42.332 +0900 1 10 Jack 33 2025-06-13 07:44:42.332 +0900
ためしに、3 も更新。問題なさそう。
dbt run --vars '{"poc_client_id": 3}'
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 2 11 Kathy 26 2025-06-13 07:34:20.662 +0900 2 12 Leo 32 2025-06-13 07:34:20.662 +0900 2 13 Mia 24 2025-06-13 07:34:20.662 +0900 2 14 Nina 36 2025-06-13 07:34:20.662 +0900 2 15 Oscar 38 2025-06-13 07:34:20.662 +0900 2 16 Paul 34 2025-06-13 07:34:20.662 +0900 2 17 Quinn 23 2025-06-13 07:34:20.662 +0900 2 18 Rita 37 2025-06-13 07:34:20.662 +0900 2 19 Sam 39 2025-06-13 07:34:20.662 +0900 2 20 Tina 30 2025-06-13 07:34:20.662 +0900 4 31 Eve 32 2025-06-13 07:34:20.662 +0900 4 32 Frank 24 2025-06-13 07:34:20.662 +0900 4 33 Grace 36 2025-06-13 07:34:20.662 +0900 4 34 Hannah 38 2025-06-13 07:34:20.662 +0900 4 35 Ian 34 2025-06-13 07:34:20.662 +0900 4 36 Jack 23 2025-06-13 07:34:20.662 +0900 4 37 Kathy 37 2025-06-13 07:34:20.662 +0900 4 38 Leo 39 2025-06-13 07:34:20.662 +0900 4 39 Mia 30 2025-06-13 07:34:20.662 +0900 4 40 Nina 25 2025-06-13 07:34:20.662 +0900 1 1 Alice 30 2025-06-13 07:44:42.332 +0900 1 2 Bob 25 2025-06-13 07:44:42.332 +0900 1 3 Charlie 35 2025-06-13 07:44:42.332 +0900 1 4 David 28 2025-06-13 07:44:42.332 +0900 1 5 Eve 22 2025-06-13 07:44:42.332 +0900 1 6 Frank 40 2025-06-13 07:44:42.332 +0900 1 7 Grace 29 2025-06-13 07:44:42.332 +0900 1 8 Hannah 31 2025-06-13 07:44:42.332 +0900 1 9 Ian 27 2025-06-13 07:44:42.332 +0900 1 10 Jack 33 2025-06-13 07:44:42.332 +0900 3 21 Uma 25 2025-06-13 08:14:13.864 +0900 3 22 Vera 35 2025-06-13 08:14:13.864 +0900 3 23 Will 28 2025-06-13 08:14:13.864 +0900 3 24 Xena 22 2025-06-13 08:14:13.864 +0900 3 25 Yara 40 2025-06-13 08:14:13.864 +0900 3 26 Zane 29 2025-06-13 08:14:13.864 +0900 3 27 Alice 31 2025-06-13 08:14:13.864 +0900 3 28 Bob 27 2025-06-13 08:14:13.864 +0900 3 29 Charlie 33 2025-06-13 08:14:13.864 +0900 3 30 David 26 2025-06-13 08:14:13.864 +0900
後続のモデルがどうなるかを調査する
以下の様なデータマートを考える
with
source as (
select
*
from {{ ref('poc_staging') }}
)
, aggregated as (
select
client_id
, avg(age) as age_avg
from source
group by 1
)
select
*
, current_timestamp() as mart_load_time
from aggregated
まずは materialization が view でどういう挙動になるのか?
client_id 1のみ実行してみる
dbt run --vars '{"poc_client_id": 1}'
以下の様に全データ追加された。
-
data
CLIENT_ID AGE_AVG MART_LOAD_TIME 1 30.000000 2025-06-13 08:27:19.016 +0900 2 31.900000 2025-06-13 08:27:19.016 +0900 3 29.600000 2025-06-13 08:27:19.016 +0900 4 31.800000 2025-06-13 08:27:19.016 +0900 -
再実行
CLIENT_ID AGE_AVG MART_LOAD_TIME 1 30.000000 2025-06-13 08:28:47.335 +0900 2 31.900000 2025-06-13 08:28:47.335 +0900 3 29.600000 2025-06-13 08:28:47.335 +0900 4 31.800000 2025-06-13 08:28:47.335 +0900
つまり後続のモデルは、普通に全件更新されるっぽい,
一応 table でもやってみる。一度 mart テーブル消して再実行
dbt run --vars '{"poc_client_id": 1}'
こちらも同様の挙動。
つまり後続も、同じ様に動かすのであれば materialization を incremental にして、delete+insert にする必要ありそう。
モデルを以下の様に修正して、
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert'
)
}}
with
source as (
select
*
from {{ ref('poc_staging') }}
)
, aggregated as (
select
client_id
, avg(age) as age_avg
from source
group by 1
)
select
*
, current_timestamp() as mart_load_time
from aggregated
再実行してみると、全件更新された。 (where 句書いてないので)
where 句を書いてみる
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert'
)
}}
with
source as (
select
*
from {{ ref('poc_staging') }}
)
, aggregated as (
select
client_id
, avg(age) as age_avg
from source
group by 1
)
select
*
, current_timestamp() as mart_load_time
from aggregated
{% if is_incremental() %}
where client_id = {{ var('poc_client_id') }}
{% endif %}
client_id 1 だけ更新できた。
- data
CLIENT_ID AGE_AVG MART_LOAD_TIME 2 31.900000 2025-06-13 08:33:27.667 +0900 3 29.600000 2025-06-13 08:33:27.667 +0900 4 31.800000 2025-06-13 08:33:27.667 +0900 1 30.000000 2025-06-13 08:34:41.730 +0900
最後ではなく、Import CTE に is_incremental のフィルタをつける。
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert'
)
}}
with
source as (
select
*
from {{ ref('poc_staging') }}
{% if is_incremental() %}
where client_id = {{ var('poc_client_id') }}
{% endif %}
)
, aggregated as (
select
client_id
, avg(age) as age_avg
from source
group by 1
)
select
*
, current_timestamp() as mart_load_time
from aggregated
同様に差分更新できる
- data (client_id: 4 を更新)
CLIENT_ID AGE_AVG MART_LOAD_TIME 2 31.900000 2025-06-13 08:33:27.667 +0900 3 29.600000 2025-06-13 08:33:27.667 +0900 1 30.000000 2025-06-13 08:34:41.730 +0900 4 31.800000 2025-06-13 08:35:50.674 +0900
delete+insert の挙動みるために SOURCE を消して試してみる。
id: 8 を消す。cilent_id 1 を再実行したところ問題なく差分更新できた
- data
CLIENT_ID ID NAME AGE STG_LOAD_TIME 2 11 Kathy 26 2025-06-13 07:34:20.662 +0900 2 12 Leo 32 2025-06-13 07:34:20.662 +0900 2 13 Mia 24 2025-06-13 07:34:20.662 +0900 2 14 Nina 36 2025-06-13 07:34:20.662 +0900 2 15 Oscar 38 2025-06-13 07:34:20.662 +0900 2 16 Paul 34 2025-06-13 07:34:20.662 +0900 2 17 Quinn 23 2025-06-13 07:34:20.662 +0900 2 18 Rita 37 2025-06-13 07:34:20.662 +0900 2 19 Sam 39 2025-06-13 07:34:20.662 +0900 2 20 Tina 30 2025-06-13 07:34:20.662 +0900 3 21 Uma 25 2025-06-13 08:14:13.864 +0900 3 22 Vera 35 2025-06-13 08:14:13.864 +0900 3 23 Will 28 2025-06-13 08:14:13.864 +0900 3 24 Xena 22 2025-06-13 08:14:13.864 +0900 3 25 Yara 40 2025-06-13 08:14:13.864 +0900 3 26 Zane 29 2025-06-13 08:14:13.864 +0900 3 27 Alice 31 2025-06-13 08:14:13.864 +0900 3 28 Bob 27 2025-06-13 08:14:13.864 +0900 3 29 Charlie 33 2025-06-13 08:14:13.864 +0900 3 30 David 26 2025-06-13 08:14:13.864 +0900 4 31 Eve 32 2025-06-13 08:35:48.155 +0900 4 32 Frank 24 2025-06-13 08:35:48.155 +0900 4 33 Grace 36 2025-06-13 08:35:48.155 +0900 4 34 Hannah 38 2025-06-13 08:35:48.155 +0900 4 35 Ian 34 2025-06-13 08:35:48.155 +0900 4 36 Jack 23 2025-06-13 08:35:48.155 +0900 4 37 Kathy 37 2025-06-13 08:35:48.155 +0900 4 38 Leo 39 2025-06-13 08:35:48.155 +0900 4 39 Mia 30 2025-06-13 08:35:48.155 +0900 4 40 Nina 25 2025-06-13 08:35:48.155 +0900 1 1 Alice 30 2025-06-13 08:58:17.085 +0900 1 2 Bob 25 2025-06-13 08:58:17.085 +0900 1 3 Charlie 35 2025-06-13 08:58:17.085 +0900 1 4 David 28 2025-06-13 08:58:17.085 +0900 1 5 Eve 22 2025-06-13 08:58:17.085 +0900 1 6 Frank 40 2025-06-13 08:58:17.085 +0900 1 7 Grace 29 2025-06-13 08:58:17.085 +0900 1 9 Ian 27 2025-06-13 08:58:17.085 +0900 1 10 Jack 33 2025-06-13 08:58:17.085 +0900 CLIENT_ID AGE_AVG MART_LOAD_TIME 2 31.900000 2025-06-13 08:33:27.667 +0900 3 29.600000 2025-06-13 08:33:27.667 +0900 4 31.800000 2025-06-13 08:35:50.674 +0900 1 29.888889 2025-06-13 08:58:20.079 +0900
中間モデルは、view にして stg と mart のみ incremental にするパターン
mrt でやっていた aggregate 処理を intermediate に移植した。
intermediate
with
source as (
select *
from {{ ref('poc_staging') }}
)
, aggregated as (
select
client_id
, avg(age) as age_avg
from source
group by 1
)
select
*
, current_timestamp() as int_load_time
from aggregated
mart
{{
config(
materialized='incremental',
unique_key='client_id',
incremental_strategy='delete+insert'
)
}}
with
source as (
select *
from {{ ref('poc_intermediate') }}
{% if is_incremental() %}
where client_id = {{ var('poc_client_id') }}
{% endif %}
)
select
*
, current_timestamp() as mart_load_time
from source
この状態で client_id = 1 を指定して実行
dbt run --vars '{"poc_client_id": 1}'
結果
- stagingモデルは client_id 1 のみ更新
- 結果
CLIENT_ID ID NAME AGE STG_LOAD_TIME 2 11 Kathy 26 2025-06-13 07:34:20.662 +0900 2 12 Leo 32 2025-06-13 07:34:20.662 +0900 2 13 Mia 24 2025-06-13 07:34:20.662 +0900 2 14 Nina 36 2025-06-13 07:34:20.662 +0900 2 15 Oscar 38 2025-06-13 07:34:20.662 +0900 2 16 Paul 34 2025-06-13 07:34:20.662 +0900 2 17 Quinn 23 2025-06-13 07:34:20.662 +0900 2 18 Rita 37 2025-06-13 07:34:20.662 +0900 2 19 Sam 39 2025-06-13 07:34:20.662 +0900 2 20 Tina 30 2025-06-13 07:34:20.662 +0900 3 21 Uma 25 2025-06-13 08:14:13.864 +0900 3 22 Vera 35 2025-06-13 08:14:13.864 +0900 3 23 Will 28 2025-06-13 08:14:13.864 +0900 3 24 Xena 22 2025-06-13 08:14:13.864 +0900 3 25 Yara 40 2025-06-13 08:14:13.864 +0900 3 26 Zane 29 2025-06-13 08:14:13.864 +0900 3 27 Alice 31 2025-06-13 08:14:13.864 +0900 3 28 Bob 27 2025-06-13 08:14:13.864 +0900 3 29 Charlie 33 2025-06-13 08:14:13.864 +0900 3 30 David 26 2025-06-13 08:14:13.864 +0900 4 31 Eve 32 2025-06-13 08:35:48.155 +0900 4 32 Frank 24 2025-06-13 08:35:48.155 +0900 4 33 Grace 36 2025-06-13 08:35:48.155 +0900 4 34 Hannah 38 2025-06-13 08:35:48.155 +0900 4 35 Ian 34 2025-06-13 08:35:48.155 +0900 4 36 Jack 23 2025-06-13 08:35:48.155 +0900 4 37 Kathy 37 2025-06-13 08:35:48.155 +0900 4 38 Leo 39 2025-06-13 08:35:48.155 +0900 4 39 Mia 30 2025-06-13 08:35:48.155 +0900 4 40 Nina 25 2025-06-13 08:35:48.155 +0900 1 1 Alice 30 2025-06-13 10:34:40.622 +0900 1 2 Bob 25 2025-06-13 10:34:40.622 +0900 1 3 Charlie 35 2025-06-13 10:34:40.622 +0900 1 4 David 28 2025-06-13 10:34:40.622 +0900 1 5 Eve 22 2025-06-13 10:34:40.622 +0900 1 6 Frank 40 2025-06-13 10:34:40.622 +0900 1 7 Grace 29 2025-06-13 10:34:40.622 +0900 1 9 Ian 27 2025-06-13 10:34:40.622 +0900 1 10 Jack 33 2025-06-13 10:34:40.622 +0900
- 結果
- intermediate モデルは全データ分計算が走りそう
- 結果
CLIENT_ID AGE_AVG INT_LOAD_TIME 1 29.888889 2025-06-13 10:38:08.605 +0900 2 31.900000 2025-06-13 10:38:08.605 +0900 3 29.600000 2025-06-13 10:38:08.605 +0900 4 31.800000 2025-06-13 10:38:08.605 +0900
- 結果
- mart はとくていのモデルのみ更新される
- 結果
CLIENT_ID AGE_AVG MART_LOAD_TIME 2 31.900000 2025-06-13 08:33:27.667 +0900 3 29.600000 2025-06-13 08:33:27.667 +0900 4 31.800000 2025-06-13 08:35:50.674 +0900 1 29.888889 2025-06-13 10:34:44.649 +0900
- 結果
この時に mart 実行時の動きを見るためQuery History を見てみた。 以下の様な時系列順になっている
poc_mart の tmp テーブル作成部分
create or replace temporary table sample_db.sample_schema.poc_mart__dbt_tmp
as
(
with
source as (
select *
from sample_db.sample_schema.poc_intermediate
where client_id = 1
)
select
*
, current_timestamp() as mart_load_time
from source
);
delete 側
delete from sample_db.sample_schema.poc_mart
where (
client_id) in (
select (client_id)
from sample_db.sample_schema.poc_mart__dbt_tmp
);
最後に insert。
insert into sample_db.sample_schema.poc_mart ("CLIENT_ID", "AGE_AVG", "MART_LOAD_TIME")
(
select "CLIENT_ID", "AGE_AVG", "MART_LOAD_TIME"
from sample_db.sample_schema.poc_mart__dbt_tmp
);
そして、内部の挙動見るために create tmp table 時の Query Profile を見ると、以下の様な順番で処理されている。
TableScan → Filter → Aggregate → CreateTableAsSelect → Result の順。
ということは、先に、 Filter されていそうなので、指定した client_id 以外の中間モデルの処理はしらなさそうだった。
まとめ
今回は、incremental model について挙動を確認した時のメモを書きました。実際にデータやSQLをみながら検証していくとどう言う挙動をするのかがわかりやすくよかったです。