Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ Currently **officially** using Airflow:
1. [Glassdoor](https://github.com/Glassdoor) [[@syvineckruyk](https://github.com/syvineckruyk)]
1. [Global Fashion Group](http://global-fashion-group.com) [[@GFG](https://github.com/GFG)]
1. [GovTech GDS](https://gds-gov.tech) [[@chrissng](https://github.com/chrissng) & [@datagovsg](https://github.com/datagovsg)]
1. [Grab](https://www.grab.com/sg/) [[@grab](https://github.com/grab)]
1. [Gradeup](https://gradeup.co) [[@gradeup](https://github.com/gradeup)]
1. [Grand Rounds](https://www.grandrounds.com/) [[@richddr](https://github.com/richddr), [@timz1290](https://github.com/timz1290), [@wenever](https://github.com/@wenever), & [@runongirlrunon](https://github.com/runongirlrunon)]
1. [Groupalia](http://es.groupalia.com) [[@jesusfcr](https://github.com/jesusfcr)]
Expand Down
74 changes: 41 additions & 33 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ class AirflowConfigParser(ConfigParser):
def __init__(self, default_config=None, *args, **kwargs):
super(AirflowConfigParser, self).__init__(*args, **kwargs)

self.defaults = ConfigParser(*args, **kwargs)
self.airflow_defaults = ConfigParser(*args, **kwargs)
if default_config is not None:
self.defaults.read_string(default_config)
self.airflow_defaults.read_string(default_config)

self.is_validated = False

Expand Down Expand Up @@ -250,9 +250,9 @@ def get(self, section, key, **kwargs):
return option

# ...then the default config
if self.defaults.has_option(section, key):
if self.airflow_defaults.has_option(section, key):
return expand_env_var(
self.defaults.get(section, key, **kwargs))
self.airflow_defaults.get(section, key, **kwargs))

else:
log.warning(
Expand Down Expand Up @@ -308,8 +308,8 @@ def remove_option(self, section, option, remove_default=True):
if super(AirflowConfigParser, self).has_option(section, option):
super(AirflowConfigParser, self).remove_option(section, option)

if self.defaults.has_option(section, option) and remove_default:
self.defaults.remove_option(section, option)
if self.airflow_defaults.has_option(section, option) and remove_default:
self.airflow_defaults.remove_option(section, option)

def getsection(self, section):
"""
Expand All @@ -318,10 +318,11 @@ def getsection(self, section):
:param section: section from the config
:return: dict
"""
if section not in self._sections and section not in self.defaults._sections:
if (section not in self._sections and
section not in self.airflow_defaults._sections):
return None

_section = copy.deepcopy(self.defaults._sections[section])
_section = copy.deepcopy(self.airflow_defaults._sections[section])

if section in self._sections:
_section.update(copy.deepcopy(self._sections[section]))
Expand All @@ -340,47 +341,51 @@ def getsection(self, section):
_section[key] = val
return _section

def as_dict(self, display_source=False, display_sensitive=False):
def as_dict(
self, display_source=False, display_sensitive=False, raw=False):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg' or 'default'.
'airflow.cfg', 'default', 'env var', or 'cmd'.
:type display_source: bool
:param display_sensitive: If True, the values of options set by env
vars and bash commands will be displayed. If False, those options
are shown as '< hidden >'
:type display_sensitive: bool
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:type raw: bool
"""
cfg = copy.deepcopy(self.defaults._sections)
cfg.update(copy.deepcopy(self._sections))

# remove __name__ (affects Python 2 only)
for options in cfg.values():
options.pop('__name__', None)

# add source
if display_source:
for section in cfg:
for k, v in cfg[section].items():
cfg[section][k] = (v, 'airflow config')
cfg = {}
configs = [
('default', self.airflow_defaults),
('airflow.cfg', self),
]

for (source_name, config) in configs:
for section in config.sections():
sect = cfg.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
if display_source:
val = (val, source_name)
sect[k] = val

# add env vars and overwrite because they have priority
for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
try:
_, section, key = ev.split('__')
opt = self._get_env_var_option(section, key)
except ValueError:
opt = None
if opt:
if (
not display_sensitive and
ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})
continue
if (not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
elif raw:
opt = opt.replace('%', '%%')
if display_source:
opt = (opt, 'env var')
cfg.setdefault(section.lower(), OrderedDict()).update(
{key.lower(): opt})

# add bash commands
for (section, key) in self.as_command_stdout:
Expand All @@ -389,8 +394,11 @@ def as_dict(self, display_source=False, display_sensitive=False):
if not display_sensitive:
opt = '< hidden >'
if display_source:
opt = (opt, 'bash cmd')
opt = (opt, 'cmd')
elif raw:
opt = opt.replace('%', '%%')
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']

return cfg

Expand Down
3 changes: 1 addition & 2 deletions airflow/contrib/auth/backends/google_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ def login(self, request):
log.debug('Redirecting user to Google login')
return self.google_oauth.authorize(callback=url_for(
'google_oauth_callback',
_external=True,
_scheme='https'),
_external=True),
state=request.args.get('next') or request.referrer or None)

def get_google_user_profile_info(self, google_token):
Expand Down
40 changes: 22 additions & 18 deletions airflow/contrib/hooks/aws_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,33 +97,36 @@ def _get_credentials(self, region_name):
if self.aws_conn_id:
try:
connection_object = self.get_connection(self.aws_conn_id)
extra_config = connection_object.extra_dejson
if connection_object.login:
aws_access_key_id = connection_object.login
aws_secret_access_key = connection_object.password

elif 'aws_secret_access_key' in connection_object.extra_dejson:
aws_access_key_id = connection_object.extra_dejson[
elif 'aws_secret_access_key' in extra_config:
aws_access_key_id = extra_config[
'aws_access_key_id']
aws_secret_access_key = connection_object.extra_dejson[
aws_secret_access_key = extra_config[
'aws_secret_access_key']

elif 's3_config_file' in connection_object.extra_dejson:
elif 's3_config_file' in extra_config:
aws_access_key_id, aws_secret_access_key = \
_parse_s3_config(
connection_object.extra_dejson['s3_config_file'],
connection_object.extra_dejson.get('s3_config_format'))
extra_config['s3_config_file'],
extra_config.get('s3_config_format'),
extra_config.get('profile'))

if region_name is None:
region_name = connection_object.extra_dejson.get('region_name')
region_name = extra_config.get('region_name')

role_arn = connection_object.extra_dejson.get('role_arn')
external_id = connection_object.extra_dejson.get('external_id')
aws_account_id = connection_object.extra_dejson.get('aws_account_id')
aws_iam_role = connection_object.extra_dejson.get('aws_iam_role')
role_arn = extra_config.get('role_arn')
external_id = extra_config.get('external_id')
aws_account_id = extra_config.get('aws_account_id')
aws_iam_role = extra_config.get('aws_iam_role')

if role_arn is None and aws_account_id is not None and \
aws_iam_role is not None:
role_arn = "arn:aws:iam::" + aws_account_id + ":role/" + aws_iam_role
role_arn = "arn:aws:iam::{}:role/{}" \
.format(aws_account_id, aws_iam_role)

if role_arn is not None:
sts_session = boto3.session.Session(
Expand All @@ -143,11 +146,12 @@ def _get_credentials(self, region_name):
RoleSessionName='Airflow_' + self.aws_conn_id,
ExternalId=external_id)

aws_access_key_id = sts_response['Credentials']['AccessKeyId']
aws_secret_access_key = sts_response['Credentials']['SecretAccessKey']
aws_session_token = sts_response['Credentials']['SessionToken']
credentials = sts_response['Credentials']
aws_access_key_id = credentials['AccessKeyId']
aws_secret_access_key = credentials['SecretAccessKey']
aws_session_token = credentials['SessionToken']

endpoint_url = connection_object.extra_dejson.get('host')
endpoint_url = extra_config.get('host')

except AirflowException:
# No connection found: fallback on boto3 credential strategy
Expand Down Expand Up @@ -183,7 +187,7 @@ def get_credentials(self, region_name=None):
This contains the attributes: access_key, secret_key and token.
"""
session, _ = self._get_credentials(region_name)
# Credentials are refreshable, so accessing your access key / secret key
# separately can lead to a race condition.
# Credentials are refreshable, so accessing your access key and
# secret key separately can lead to a race condition.
# See https://stackoverflow.com/a/36291428/8283373
return session.get_credentials().get_frozen_credentials()
41 changes: 41 additions & 0 deletions airflow/contrib/hooks/wasb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.
#

from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook

from azure.storage.blob import BlockBlobService
Expand Down Expand Up @@ -148,3 +149,43 @@ def read_file(self, container_name, blob_name, **kwargs):
return self.connection.get_blob_to_text(container_name,
blob_name,
**kwargs).content

def delete_file(self, container_name, blob_name, is_prefix=False,
ignore_if_missing=False, **kwargs):
"""
Delete a file from Azure Blob Storage.

:param container_name: Name of the container.
:type container_name: str
:param blob_name: Name of the blob.
:type blob_name: str
:param is_prefix: If blob_name is a prefix, delete all matching files
:type is_prefix: bool
:param ignore_if_missing: if True, then return success even if the
blob does not exist.
:type ignore_if_missing: bool
:param kwargs: Optional keyword arguments that
`BlockBlobService.create_blob_from_path()` takes.
:type kwargs: object
"""

if is_prefix:
blobs_to_delete = [
blob.name for blob in self.connection.list_blobs(
container_name, prefix=blob_name, **kwargs
)
]
elif self.check_for_blob(container_name, blob_name):
blobs_to_delete = [blob_name]
else:
blobs_to_delete = []

if not ignore_if_missing and len(blobs_to_delete) == 0:
raise AirflowException('Blob(s) not found: {}'.format(blob_name))

for blob_uri in blobs_to_delete:
self.log.info("Deleting blob: " + blob_uri)
self.connection.delete_blob(container_name,
blob_uri,
delete_snapshots='include',
**kwargs)
71 changes: 71 additions & 0 deletions airflow/contrib/operators/wasb_delete_blob_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class WasbDeleteBlobOperator(BaseOperator):
"""
Deletes blob(s) on Azure Blob Storage.

:param container_name: Name of the container. (templated)
:type container_name: str
:param blob_name: Name of the blob. (templated)
:type blob_name: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param check_options: Optional keyword arguments that
`WasbHook.check_for_blob()` takes.
:param is_prefix: If blob_name is a prefix, delete all files matching prefix.
:type is_prefix: bool
:param ignore_if_missing: if True, then return success even if the
blob does not exist.
:type ignore_if_missing: bool
"""

template_fields = ('container_name', 'blob_name')

@apply_defaults
def __init__(self, container_name, blob_name,
wasb_conn_id='wasb_default', check_options=None,
is_prefix=False, ignore_if_missing=False,
*args,
**kwargs):
super(WasbDeleteBlobOperator, self).__init__(*args, **kwargs)
if check_options is None:
check_options = {}
self.wasb_conn_id = wasb_conn_id
self.container_name = container_name
self.blob_name = blob_name
self.check_options = check_options
self.is_prefix = is_prefix
self.ignore_if_missing = ignore_if_missing

def execute(self, context):
self.log.info(
'Deleting blob: {self.blob_name}\n'
'in wasb://{self.container_name}'.format(**locals())
)
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)

hook.delete_file(self.container_name, self.blob_name,
self.is_prefix, self.ignore_if_missing,
**self.check_options)
Loading