Skip to content

Issue using the PostGresOperator #56

@r39132

Description

@r39132

Specifically, what is expected with respect to the "conn_id" argument?

The code below successfully connects to Postgres. I then pass the db_conn object to the PostgresOperator and get an exception shown at the bottom

db_conn_string = "host='localhost' dbname='cousteau_dev' user='siddharth' password='secret'"
db_conn = psycopg2.connect(db_conn_string)
print "Successfully Connected to database\n    ->%s and conn=%s" % (db_conn_string, db_conn)

wait_for_new_data_in_db = PostgresOperator(
    task_id='wait_for_new_data_in_db',
    postgres_conn_id=db_conn,
    sql='select count(*) from receiver_domain_aggregate;',
    dag=dag)
wait_for_new_data_in_db.set_upstream(wait_for_first_sqs_message)

Here's the exception

2015-06-21 20:21:06,789 - root - INFO - Executing <Task(PostgresOperator): wait_for_new_data_in_db> for 2015-06-12 00:00:00
2015-06-21 20:21:06,801 - root - INFO - Executing: select count(*) from receiver_domain_aggregate;
2015-06-21 20:21:06,805 - root - ERROR - (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 746, in run
    task_copy.execute(context=self.get_template_context())
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/postgres_operator.py", line 36, in execute
    self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
  File "/usr/local/lib/python2.7/site-packages/airflow/hooks/postgres_hook.py", line 25, in __init__
    if db.count() == 0:
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2734, in count
    return self.from_self(col).scalar()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2503, in scalar
    ret = self.one()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2472, in one
    ret = list(self)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2515, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2530, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute
    cursor.execute(statement, parameters)
ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]

Since I can connect using psycopg2, I'm going to revert to using a PythonOperator. My flow is entirely made up of PythonOperators. I found it easier to use.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions