Skip to main content

Databricks configurations

Configuring tables

When materializing a model as table, you may include several optional configs that are specific to the dbt-databricks plugin, in addition to the standard model configs.

dbt-databricks v1.9 adds support for the table_format: iceberg config. Try it now on the dbt Cloud "Latest" release track. All other table configurations were also supported in 1.8.

OptionDescriptionRequired?Model supportExample
table_formatWhether or not to provision Iceberg compatibility for the materializationOptionalSQL, Pythoniceberg
file_format The file format to use when creating tables (parquet, delta, hudi, csv, json, text, jdbc, orc, hive or libsvm).OptionalSQL, Pythondelta
location_rootThe created table uses the specified directory to store its data. The table alias is appended to it.OptionalSQL, Python/mnt/root
partition_byPartition the created table by the specified columns. A directory is created for each partition.OptionalSQL, Pythondate_day
liquid_clustered_byCluster the created table by the specified columns. Clustering method is based on Delta's Liquid Clustering feature. Available since dbt-databricks 1.6.2.OptionalSQL, Pythondate_day
clustered_byEach partition in the created table will be split into a fixed number of buckets by the specified columns.OptionalSQL, Pythoncountry_code
bucketsThe number of buckets to create while clusteringRequired if clustered_by is specifiedSQL, Python8
tblpropertiesTblproperties to be set on the created tableOptionalSQL, Python*{'this.is.my.key': 12}
databricks_tagsTags to be set on the created tableOptionalSQL , Python {'my_tag': 'my_value'}
compressionSet the compression algorithm.OptionalSQL, Pythonzstd

* We do not yet have a PySpark API to set tblproperties at table creation, so this feature is primarily to allow users to anotate their python-derived tables with tblproperties.

† When table_format is iceberg, file_format must be delta.

databricks_tags are currently only supported at the table level, and applied via ALTER statements.

Python submission methods

In dbt-databricks v1.9 (try it now in the dbt Cloud "Latest" release track), you can use these four options for submission_method:

  • all_purpose_cluster: Executes the python model either directly using the command api or by uploading a notebook and creating a one-off job run
  • job_cluster: Creates a new job cluster to execute an uploaded notebook as a one-off job run
  • serverless_cluster: Uses a serverless cluster to execute an uploaded notebook as a one-off job run
  • workflow_job: Creates/updates a reusable workflow and uploaded notebook, for execution on all-purpose, job, or serverless clusters.
    caution

    This approach gives you maximum flexibility, but will create persistent artifacts in Databricks (the workflow) that users could run outside of dbt.

We are currently in a transitionary period where there is a disconnect between old submission methods (which were grouped by compute), and the logically distinct submission methods (command, job run, workflow).

As such, the supported config matrix is somewhat complicated:

ConfigUseDefaultall_purpose_cluster*job_clusterserverless_clusterworkflow_job
create_notebookif false, use Command API, otherwise upload notebook and use job runfalse
timeoutmaximum time to wait for command/job to run0 (No timeout)
job_cluster_configconfigures a new cluster for running the model{}
access_control_listdirectly configures access control for the job{}
packageslist of packages to install on the executing cluster[]
index_urlurl to install packages fromNone (uses pypi)
additional_libsdirectly configures libraries[]
python_job_configadditional configuration for jobs/workflows (see table below){}
cluster_idid of existing all purpose cluster to execute againstNone
http_pathpath to existing all purpose cluster to execute againstNone

* Only timeout and cluster_id/http_path are supported when create_notebook is false

With the introduction of the workflow_job submission method, we chose to segregate further configuration of the python model submission under a top level configuration named python_job_config. This keeps configuration options for jobs and workflows namespaced in such a way that they do not interfere with other model config, allowing us to be much more flexible with what is supported for job execution.

The support matrix for this feature is divided into workflow_job and all others (assuming all_purpose_cluster with create_notebook==true). Each config option listed must be nested under python_job_config:

ConfigUseDefaultworkflow_jobAll others
nameThe name to give (or used to look up) the created workflowNone
grantsA simplified way to specify access control for the workflow{}
existing_job_idId to use to look up the created workflow (in place of name)None
post_hook_tasksTasks to include after the model notebook execution[]
additional_task_settingsAdditional task config to include in the model task{}
Other job run settingsConfig will be copied into the request, outside of the model taskNone
Other workflow settingsConfig will be copied into the request, outside of the model taskNone

This example uses the new configuration options in the previous table:

schema.yml
models:
- name: my_model
config:
submission_method: workflow_job

# Define a job cluster to create for running this workflow
# Alternately, could specify cluster_id to use an existing cluster, or provide neither to use a serverless cluster
job_cluster_config:
spark_version: "15.3.x-scala2.12"
node_type_id: "rd-fleet.2xlarge"
runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}"
data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}"
autoscale: { "min_workers": 1, "max_workers": 4 }

python_job_config:
# These settings are passed in, as is, to the request
email_notifications: { on_failure: ["me@example.com"] }
max_retries: 2

name: my_workflow_name

# Override settings for your model's dbt task. For instance, you can
# change the task key
additional_task_settings: { "task_key": "my_dbt_task" }

# Define tasks to run before/after the model
# This example assumes you have already uploaded a notebook to /my_notebook_path to perform optimize and vacuum
post_hook_tasks:
[
{
"depends_on": [{ "task_key": "my_dbt_task" }],
"task_key": "OPTIMIZE_AND_VACUUM",
"notebook_task":
{ "notebook_path": "/my_notebook_path", "source": "WORKSPACE" },
},
]

# Simplified structure, rather than having to specify permission separately for each user
grants:
view: [{ "group_name": "marketing-team" }]
run: [{ "user_name": "other_user@example.com" }]
manage: []

Incremental models

dbt-databricks plugin leans heavily on the incremental_strategy config. This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of five values:

  • append: Insert new records without updating or overwriting any existing data.
  • insert_overwrite: If partition_by is specified, overwrite partitions in the table with new data. If no partition_by is specified, overwrite the entire table with new data.
  • merge (default; Delta and Hudi file format only): Match records based on a unique_key, updating old records, and inserting new ones. (If no unique_key is specified, all new data is inserted, similar to append.)
  • replace_where (Delta file format only): Match records based on incremental_predicates, replacing all records that match the predicates from the existing table with records matching the predicates from the new data. (If no incremental_predicates are specified, all new data is inserted, similar to append.)
  • microbatch (Delta file format only): Implements the microbatch strategy using replace_where with predicates generated based event_time.

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, incremental_strategy may be specified in dbt_project.yml or within a model file's config() block.

The append strategy

Following the append strategy, dbt will perform an insert into statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy cannot update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.

databricks_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

The insert_overwrite strategy

caution

This strategy is currently only compatible with All Purpose Clusters, not SQL Warehouses.

This strategy is most effective when specified alongside a partition_by clause in your model config. dbt will run an atomic insert overwrite statement that dynamically replaces all partitions included in your query. Be sure to re-select all of the relevant data for a partition when using this incremental strategy.

If no partition_by is specified, then the insert_overwrite strategy will atomically replace all contents of the table, overriding all existing data with only the new records. The column schema of the table remains the same, however. This can be desirable in some limited circumstances, since it minimizes downtime while the table contents are overwritten. The operation is comparable to running truncate and insert on other databases. For atomic replacement of Delta-formatted tables, use the table materialization (which runs create or replace) instead.

databricks_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from new_events
group by 1

The merge strategy

The merge incremental strategy requires:

  • file_format: delta or hudi
  • Databricks Runtime 5.1 and above for delta file format
  • Apache Spark for hudi file format

The Databricks adapter will run an atomic merge statement similar to the default merge behavior on Snowflake and BigQuery. If a unique_key is specified (recommended), dbt will update old records with values from new records that match on the key column. If a unique_key is not specified, dbt will forgo match criteria and simply insert all new records (similar to append strategy).

Specifying merge as the incremental strategy is optional since it's the default strategy used when none is specified.

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta', # or 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Beginning with 1.9, merge behavior can be modified with the following additional configuration options:

  • target_alias, source_alias: Aliases for the target and source to allow you to describe your merge conditions more naturally. These default to DBT_INTERNAL_DEST and DBT_INTERNAL_SOURCE, respectively.
  • skip_matched_step: If set to true, the 'matched' clause of the merge statement will not be included.
  • skip_not_matched_step: If set to true, the 'not matched' clause will not be included.
  • matched_condition: Condition to apply to the WHEN MATCHED clause. You should use the target_alias and source_alias to write a conditional expression, such as DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3). This condition further restricts the matched set of rows.
  • not_matched_condition: Condition to apply to the WHEN NOT MATCHED [BY TARGET] clause. This condition further restricts the set of rows in the target that do not match the source that will be inserted into the merged table.
  • not_matched_by_source_condition: Condition to apply to the further filter WHEN NOT MATCHED BY SOURCE clause. Only used in conjunction with not_matched_by_source_action.
  • not_matched_by_source_action: The action to apply when the condition is met. Configure as an expression. For example: not_matched_by_source_action: "update set t.attr1 = 'deleted', t.tech_change_ts = current_timestamp()".
  • merge_with_schema_evolution: If set to true, the merge statement includes the WITH SCHEMA EVOLUTION clause.

For more details on the meaning of each merge clause, please see the Databricks documentation.

The following is an example demonstrating the use of these new options:

merge_incremental_options.sql
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
merge_with_schema_evolution=true
) }}

select
id,
attr1,
attr2,
tech_change_ts
from
{{ ref('source_table') }} as s

The replace_where strategy

The replace_where incremental strategy requires:

  • file_format: delta
  • Databricks Runtime 12.0 and above

dbt will run an atomic replace where statement which selectively overwrites data matching one or more incremental_predicates specified as a string or array. Only rows matching the predicates will be inserted. If no incremental_predicates are specified, dbt will perform an atomic insert, as with append.

caution

replace_where inserts data into columns in the order provided, rather than by column name. If you reorder columns and the data is compatible with the existing schema, you may silently insert values into an unexpected column. If the incoming data is incompatible with the existing schema, you will instead receive an error.

replace_where_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'replace_where'
incremental_predicates = 'user_id >= 10000' # Never replace users with ids < 10000
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

The microbatch strategy

The Databricks adapter implements the microbatch strategy using replace_where. Note the requirements and caution statements for replace_where above. For more information about this strategy, see the microbatch reference page.

In the following example, the upstream table events have been annotated with an event_time column called ts in its schema file.

microbatch_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'microbatch'
event_time='date' # Use 'date' as the grain for this microbatch table
) }}

with new_events as (

select * from {{ ref('events') }}

)

select
user_id,
date,
count(*) as visits

from events
group by 1, 2

Selecting compute per model

Beginning in version 1.7.2, you can assign which compute resource to use on a per-model basis. For SQL models, you can select a SQL Warehouse (serverless or provisioned) or an all purpose cluster. For details on how this feature interacts with python models, see Specifying compute for Python models.

note

This is an optional setting. If you do not configure this as shown below, we will default to the compute specified by http_path in the top level of the output section in your profile. This is also the compute that will be used for tasks not associated with a particular model, such as gathering metadata for all tables in a schema.

To take advantage of this capability, you will need to add compute blocks to your profile:

profile.yml

profile-name:
target: target-name # this is the default target
outputs:
target-name:
type: databricks
catalog: optional catalog name if you are using Unity Catalog
schema: schema name # Required
host: yourorg.databrickshost.com # Required

### This path is used as the default compute
http_path: /sql/your/http/path # Required

### New compute section
compute:

### Name that you will use to refer to an alternate compute
Compute1:
http_path: '/sql/your/http/path' # Required of each alternate compute

### A third named compute, use whatever name you like
Compute2:
http_path: '/some/other/path' # Required of each alternate compute
...

target-name: # additional targets
...
### For each target, you need to define the same compute,
### but you can specify different paths
compute:

### Name that you will use to refer to an alternate compute
Compute1:
http_path: '/sql/your/http/path' # Required of each alternate compute

### A third named compute, use whatever name you like
Compute2:
http_path: '/some/other/path' # Required of each alternate compute
...

The new compute section is a map of user chosen names to objects with an http_path property. Each compute is keyed by a name which is used in the model definition/configuration to indicate which compute you wish to use for that model/selection of models. We recommend choosing a name that is easily recognized as the compute resources you're using, such as the name of the compute resource inside the Databricks UI.

note

You need to use the same set of names for compute across your outputs, though you may supply different http_paths, allowing you to use different computes in different deployment scenarios.

To configure this inside of dbt Cloud, use the extended attributes feature on the desired environments:


compute:
Compute1:
http_path: /SOME/OTHER/PATH
Compute2:
http_path: /SOME/OTHER/PATH

Specifying the compute for models

As with many other configuaration options, you can specify the compute for a model in multiple ways, using databricks_compute. In your dbt_project.yml, the selected compute can be specified for all the models in a given directory:

dbt_project.yml

...

models:
+databricks_compute: "Compute1" # use the `Compute1` warehouse/cluster for all models in the project...
my_project:
clickstream:
+databricks_compute: "Compute2" # ...except for the models in the `clickstream` folder, which will use `Compute2`.

snapshots:
+databricks_compute: "Compute1" # all Snapshot models are configured to use `Compute1`.

For an individual model the compute can be specified in the model config in your schema file.

schema.yml

models:
- name: table_model
config:
databricks_compute: Compute1
columns:
- name: id
data_type: int

Alternatively the warehouse can be specified in the config block of a model's SQL file.

model.sql

{{
config(
materialized='table',
databricks_compute='Compute1'
)
}}
select * from {{ ref('seed') }}

To validate that the specified compute is being used, look for lines in your dbt.log like:

Databricks adapter ... using default compute resource.

or

Databricks adapter ... using compute resource <name of compute>.

Specifying compute for Python models

Materializing a python model requires execution of SQL as well as python. Specifically, if your python model is incremental, the current execution pattern involves executing python to create a staging table that is then merged into your target table using SQL.

The python code needs to run on an all purpose cluster (or serverless cluster, see Python Submission Methods), while the SQL code can run on an all purpose cluster or a SQL Warehouse.

When you specify your databricks_compute for a python model, you are currently only specifying which compute to use when running the model-specific SQL. If you wish to use a different compute for executing the python itself, you must specify an alternate compute in the config for the model. For example:

model.py

def model(dbt, session):
dbt.config(
http_path="sql/protocolv1/..."
)

If your default compute is a SQL Warehouse, you will need to specify an all purpose cluster http_path in this way.

Persisting model descriptions

Relation-level docs persistence is supported in dbt v0.17.0. For more information on configuring docs persistence, see the docs.

When the persist_docs option is configured appropriately, you'll be able to see model descriptions in the Comment field of describe [table] extended or show table extended in [database] like '*'.

Default file format configurations

To access advanced incremental strategies features, such as snapshots and the merge incremental strategy, you will want to use the Delta or Hudi file format as the default file format when materializing models as tables.

It's quite convenient to do this by setting a top-level configuration in your project file:

dbt_project.yml
models:
+file_format: delta # or hudi

seeds:
+file_format: delta # or hudi

snapshots:
+file_format: delta # or hudi

Materialized views and streaming tables

Materialized views and streaming tables are alternatives to incremental tables that are powered by Delta Live Tables. See What are Delta Live Tables? for more information and use cases.

In order to adopt these materialization strategies, you will need a workspace that is enabled for Unity Catalog and serverless SQL Warehouses.

materialized_view.sql
{{ config(
materialized = 'materialized_view'
) }}

or

streaming_table.sql
{{ config(
materialized = 'streaming_table'
) }}

We support on_configuration_change for most available properties of these materializations. The following table summarizes our configuration support:

Databricks ConceptConfig NameMV/ST support
PARTITIONED BYpartition_byMV/ST
COMMENTdescriptionMV/ST
TBLPROPERTIEStblpropertiesMV/ST
SCHEDULE CRONschedule: { 'cron': '\<cron schedule\>', 'time_zone_value': '\<time zone value\>' }MV/ST
querydefined by your model sqlon_configuration_change for MV only
mv_example.sql

{{ config(
materialized='materialized_view',
partition_by='id',
schedule = {
'cron': '0 0 * * * ? *',
'time_zone_value': 'Etc/UTC'
},
tblproperties={
'key': 'value'
},
) }}
select * from {{ ref('my_seed') }}

Configuration Details

partition_by

partition_by works the same as for views and tables, i.e. can be a single column, or an array of columns to partition by.

description

As with views and tables, adding a description to your configuration will lead to a table-level comment getting added to your materialization.

tblproperties

tblproperties works the same as for views and tables with an important exception: the adapter maintains a list of keys that are set by Databricks when making an materialized view or streaming table which are ignored for the purpose of determining configuration changes.

schedule

Use this to set the refresh schedule for the model. If you use the schedule key, a cron key is required in the associated dictionary, but time_zone_value is optional (see the example above). The cron value should be formatted as documented by Databricks. If a schedule is set on the materialization in Databricks and your dbt project does not specify a schedule for it (when on_configuration_change is set to apply), the refresh schedule will be set to manual when you next run the project. Even when schedules are set, dbt will request that the materialization be refreshed manually when run.

query

For materialized views, if the compiled query for the model differs from the query in the database, we will the take the configured on_configuration_change action. Changes to query are not currently detectable for streaming tables; see the next section for details.

on_configuration_change

on_configuration_change is supported for materialized views and streaming tables, though the two materializations handle it different ways.

Materialized Views

Currently, the only change that can be applied without recreating the materialized view in Databricks is to update the schedule. This is due to limitations in the Databricks SQL API.

Streaming Tables

For streaming tables, only changes to the partitioning currently requires the table be dropped and recreated. For any other supported configuration change, we use CREATE OR REFRESH (plus an ALTER statement for changes to the schedule) to apply the changes. There is currently no mechanism for the adapter to detect if the streaming table query has changed, so in this case, regardless of the behavior requested by on_configuration_change, we will use a create or refresh statement (assuming partitioned by hasn't changed); this will cause the query to be applied to future rows without rerunning on any previously processed rows. If your source data is still available, running with '--full-refresh' will reprocess the available data with the updated current query.

Setting table properties

Table properties can be set with your configuration for tables or views using tblproperties:

with_table_properties.sql
{{ config(
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
'delta.autoOptimize.autoCompact' : 'true'
}
) }}
caution

These properties are sent directly to Databricks without validation in dbt, so be thoughtful with how you use this feature. You will need to do a full refresh of incremental materializations if you change their tblproperties.

One application of this feature is making delta tables compatible with iceberg readers using the Universal Format.

{{ config(
tblproperties={
'delta.enableIcebergCompatV2' = 'true'
'delta.universalFormat.enabledFormats' = 'iceberg'
}
) }}

tblproperties can be specified for python models, but they will be applied via an ALTER statement after table creation. This is due to a limitation in PySpark.

0