From efc346e2602e602cf46595c2e41b1050b299848f Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Mon, 19 Aug 2019 17:16:22 +0200 Subject: [PATCH 1/7] fix(airflow): pickle dags already loaded --- airflow/models.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/airflow/models.py b/airflow/models.py index c866ef668bba6..a741f27e195f5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -41,6 +41,7 @@ import logging import numbers import os +import os.path import pendulum import pickle import re @@ -50,6 +51,7 @@ import traceback import warnings import hashlib +import glob import uuid from datetime import datetime @@ -331,6 +333,19 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if filepath is None or not os.path.isfile(filepath): return found_dags + # if pickled dag, we return directly after adding them to our DAG + pickle_found = True + timestamp = str(os.path.getmtime(filepath)).replace(".", "_") + try: + pickle_path = filepath[:-2]+timestamp+".p" + pickled_dags = pickle.load(open(pickle_path, "rb")) + for dag in pickled_dags: + self.dags[dag.dag_id] = dag + return pickled_dags + except: + pickle_found = False + self.log.debug("Couldn't load pickle path: %s", pickle_path) + try: # This failed before in what may have been a git sync # race condition @@ -442,6 +457,22 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): file_last_changed_on_disk self.file_last_changed[filepath] = file_last_changed_on_disk + + if not pickle_found: + pickle_path = filepath[:-2]+timestamp+".p" + # before, we delete unused pickles + pickle_regex = filepath[:-2]+"*"+".p" + fileList = glob.glob(pickle_regex) + for filePath in fileList: + try: + os.remove(filePath) + except: + self.log.exception("Failed to delete last pickle %s", filePath) + try: + self.log.debug("Pickling at path %s", pickle_path) + pickle.dump(found_dags, open(pickle_path, "wb")) + except: + self.log.debug("Failed to create new pickle %s", pickle_path) return found_dags @provide_session From b7f1b31e77d1db73f309d23db8284dce7fcc01c7 Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Wed, 21 Aug 2019 13:35:51 +0200 Subject: [PATCH 2/7] fix(airflow): Content-Type bugfix --- airflow/www/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/app.py b/airflow/www/app.py index ad769b20bd839..95b189666fb4b 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -169,7 +169,7 @@ def shutdown_session(exception=None): def root_app(env, resp): - resp(b'404 Not Found', [(b'Content-Type', b'text/plain')]) + resp(b'404 Not Found', [('Content-Type', 'text/plain')]) return [b'Apache Airflow is not at this location'] From 5a27f597ebf57b19483354111802c67ffb044ec5 Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Fri, 23 Aug 2019 13:25:17 +0200 Subject: [PATCH 3/7] refactor(airflow): refactor based on pull request !1 --- airflow/models.py | 52 +++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index a741f27e195f5..f0a4217b02e2d 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -326,25 +326,41 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): the module and look for dag objects within it. """ found_dags = [] - # if the source file no longer exists in the DB or in the filesystem, # return an empty list # todo: raise exception? if filepath is None or not os.path.isfile(filepath): return found_dags + path_split = filepath.rsplit('/',1) + pickle_dir = path_split[0]+"/__pickle_cache__" + file_name = path_split[1].split(".")[0] + + # we create pickle_dir directory + try: + os.mkdir(pickle_dir) + print("Directory " , pickle_dir , " Created ") + except FileExistsError: + self.log.exception("Directory", pickle_dir, "already exists.") + except: + self.log.exception(e) + # if pickled dag, we return directly after adding them to our DAG pickle_found = True timestamp = str(os.path.getmtime(filepath)).replace(".", "_") + pickle_path = "%s/%s.%s.p" % (pickle_dir, file_name, timestamp) try: - pickle_path = filepath[:-2]+timestamp+".p" - pickled_dags = pickle.load(open(pickle_path, "rb")) - for dag in pickled_dags: - self.dags[dag.dag_id] = dag - return pickled_dags - except: + with open(pickle_path, "rb") as pickle_file: + pickled_dags = pickle.load(pickle_file) + for dag in pickled_dags: + self.dags[dag.dag_id] = dag + return pickled_dags + except FileNotFoundError: pickle_found = False - self.log.debug("Couldn't load pickle path: %s", pickle_path) + self.log.exception("Couldn't load pickle path: %s", pickle_path) + except Exception as e: + pickle_found = False + self.log.exception(e) try: # This failed before in what may have been a git sync @@ -459,20 +475,22 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): self.file_last_changed[filepath] = file_last_changed_on_disk if not pickle_found: - pickle_path = filepath[:-2]+timestamp+".p" - # before, we delete unused pickles - pickle_regex = filepath[:-2]+"*"+".p" - fileList = glob.glob(pickle_regex) - for filePath in fileList: + + # we delete unused pickles + pickle_regex = "%s/%s.*.p" % (pickle_dir, file_name) + pickle_list = glob.glob(pickle_regex) + for pickle_file in pickle_list: try: - os.remove(filePath) + os.remove(pickle_file) except: - self.log.exception("Failed to delete last pickle %s", filePath) + self.log.exception("Failed to delete last pickle %s", pickle_file) try: self.log.debug("Pickling at path %s", pickle_path) - pickle.dump(found_dags, open(pickle_path, "wb")) + with open(pickle_path, "wb") as pickle_file: + pickle.dump(found_dags, pickle_file) except: - self.log.debug("Failed to create new pickle %s", pickle_path) + self.log.exception(e) + return found_dags @provide_session From 21d279a1a218c56034bf8339fb833990c0bf6edf Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Fri, 23 Aug 2019 14:48:27 +0200 Subject: [PATCH 4/7] refactor(airflow): string manipulations to path manipulations --- airflow/models.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index f0a4217b02e2d..ac838f2dc0ab1 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -332,23 +332,24 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if filepath is None or not os.path.isfile(filepath): return found_dags - path_split = filepath.rsplit('/',1) + path_split = os.path.split(filepath) pickle_dir = path_split[0]+"/__pickle_cache__" - file_name = path_split[1].split(".")[0] + file_name = os.path.splitext(path_split[1])[0] # we create pickle_dir directory try: os.mkdir(pickle_dir) - print("Directory " , pickle_dir , " Created ") + self.log.info("Directory " , pickle_dir , " created.") except FileExistsError: - self.log.exception("Directory", pickle_dir, "already exists.") + self.log.debug("Directory ", pickle_dir, "already exists.") except: self.log.exception(e) # if pickled dag, we return directly after adding them to our DAG pickle_found = True timestamp = str(os.path.getmtime(filepath)).replace(".", "_") - pickle_path = "%s/%s.%s.p" % (pickle_dir, file_name, timestamp) + pickle_name = ".".join((file_name, timestamp, ".p")) + pickle_path = os.path.join(pickle_dir, file_name) try: with open(pickle_path, "rb") as pickle_file: pickled_dags = pickle.load(pickle_file) @@ -477,7 +478,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if not pickle_found: # we delete unused pickles - pickle_regex = "%s/%s.*.p" % (pickle_dir, file_name) + pickle_regex_name = ".".join((file_name, "*", ".p")) + pickle_regex = "%s/%s.*.p" % (pickle_dir, pickle_regex_name) pickle_list = glob.glob(pickle_regex) for pickle_file in pickle_list: try: From a2b44d16b898df63145c06c6f539850aab2ea8bc Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Fri, 23 Aug 2019 14:55:04 +0200 Subject: [PATCH 5/7] fix(airflow): extension problem --- airflow/models.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index ac838f2dc0ab1..e61c5144fa1e8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -341,14 +341,14 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): os.mkdir(pickle_dir) self.log.info("Directory " , pickle_dir , " created.") except FileExistsError: - self.log.debug("Directory ", pickle_dir, "already exists.") + self.log.debug("Directory ", pickle_dir, " already exists.") except: self.log.exception(e) # if pickled dag, we return directly after adding them to our DAG pickle_found = True timestamp = str(os.path.getmtime(filepath)).replace(".", "_") - pickle_name = ".".join((file_name, timestamp, ".p")) + pickle_name = ".".join((file_name, timestamp, "p")) pickle_path = os.path.join(pickle_dir, file_name) try: with open(pickle_path, "rb") as pickle_file: @@ -478,8 +478,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if not pickle_found: # we delete unused pickles - pickle_regex_name = ".".join((file_name, "*", ".p")) - pickle_regex = "%s/%s.*.p" % (pickle_dir, pickle_regex_name) + pickle_regex_name = ".".join((file_name, "*", "p")) + pickle_regex = os.path.join(pickle_dir, pickle_regex_name) pickle_list = glob.glob(pickle_regex) for pickle_file in pickle_list: try: From bb00ac34a335398868376573f893a99523642a51 Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Fri, 23 Aug 2019 15:03:43 +0200 Subject: [PATCH 6/7] fix(airflow): pickle_name was not used --- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index e61c5144fa1e8..61f6dec2b8b3c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -349,7 +349,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): pickle_found = True timestamp = str(os.path.getmtime(filepath)).replace(".", "_") pickle_name = ".".join((file_name, timestamp, "p")) - pickle_path = os.path.join(pickle_dir, file_name) + pickle_path = os.path.join(pickle_dir, pickle_name) try: with open(pickle_path, "rb") as pickle_file: pickled_dags = pickle.load(pickle_file) From 8e4ed086b56f10bee704b12b10b6cfcb9459183e Mon Sep 17 00:00:00 2001 From: Abdeselam El-Haman Abdeselam Date: Fri, 23 Aug 2019 15:18:10 +0200 Subject: [PATCH 7/7] refactor(airflow): __pickle__cache os.path.join --- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index 61f6dec2b8b3c..38ad4423e342e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -333,7 +333,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): return found_dags path_split = os.path.split(filepath) - pickle_dir = path_split[0]+"/__pickle_cache__" + pickle_dir = os.path.join(path_split[0],"__pickle_cache__") file_name = os.path.splitext(path_split[1])[0] # we create pickle_dir directory