Skip to content

Allow UPSERT when call SnowflakeHook.insert_rows #34860

@DEworkerDE

Description

@DEworkerDE

Apache Airflow version

2.5.0

What happened

When you want to use method 'insert_rows' from dbapi_hook it will nor work for database which not support 'replace into' statement, for example in Mysql you can, but in Snowflake for example can not.
from source code(https://airflow.apache.org/docs/apache-airflow/2.3.3/_modules/airflow/hooks/dbapi.html#DbApiHook.insert_rows):
image

What you think should happen instead

Maybe I miss something in docs, but we need info which databases can work here for example, or make it agnostic for different databases, as we can create hook for different connections, and looks like method with its params will not if we set param:
'replace' = True if database doesn't support REPLACE INTO

How to reproduce

Just create any Dag and task with this code:

from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
import pandas as pd
@task()
def rep_bug():
    source_df = pd.DataFrame() # some data if needed
    sf_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id)
    sf_hook.insert_rows(table=table_name, rows=source_df.values.tolist())

Operating System

windows 10

Versions of Apache Airflow Providers

from docker-compose.yaml file:

version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0-python3.10}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- pandas==1.5.2 apache-airflow==2.5.0 apache-airflow-providers-snowflake==4.0.2 snowflake-connector-python[pandas]==2.9.0 apache-airflow-providers-microsoft-azure==5.0.1 apache-airflow-providers-postgres==5.4.0 }

Deployment

Docker-Compose

Deployment details

Docker Compose version v2.15.1

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions