diff --git a/airflow/models.py b/airflow/models.py index c866ef668bba6..38ad4423e342e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -41,6 +41,7 @@ import logging import numbers import os +import os.path import pendulum import pickle import re @@ -50,6 +51,7 @@ import traceback import warnings import hashlib +import glob import uuid from datetime import datetime @@ -324,13 +326,43 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): the module and look for dag objects within it. """ found_dags = [] - # if the source file no longer exists in the DB or in the filesystem, # return an empty list # todo: raise exception? if filepath is None or not os.path.isfile(filepath): return found_dags + path_split = os.path.split(filepath) + pickle_dir = os.path.join(path_split[0],"__pickle_cache__") + file_name = os.path.splitext(path_split[1])[0] + + # we create pickle_dir directory + try: + os.mkdir(pickle_dir) + self.log.info("Directory " , pickle_dir , " created.") + except FileExistsError: + self.log.debug("Directory ", pickle_dir, " already exists.") + except: + self.log.exception(e) + + # if pickled dag, we return directly after adding them to our DAG + pickle_found = True + timestamp = str(os.path.getmtime(filepath)).replace(".", "_") + pickle_name = ".".join((file_name, timestamp, "p")) + pickle_path = os.path.join(pickle_dir, pickle_name) + try: + with open(pickle_path, "rb") as pickle_file: + pickled_dags = pickle.load(pickle_file) + for dag in pickled_dags: + self.dags[dag.dag_id] = dag + return pickled_dags + except FileNotFoundError: + pickle_found = False + self.log.exception("Couldn't load pickle path: %s", pickle_path) + except Exception as e: + pickle_found = False + self.log.exception(e) + try: # This failed before in what may have been a git sync # race condition @@ -442,6 +474,25 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): file_last_changed_on_disk self.file_last_changed[filepath] = file_last_changed_on_disk + + if not pickle_found: + + # we delete unused pickles + pickle_regex_name = ".".join((file_name, "*", "p")) + pickle_regex = os.path.join(pickle_dir, pickle_regex_name) + pickle_list = glob.glob(pickle_regex) + for pickle_file in pickle_list: + try: + os.remove(pickle_file) + except: + self.log.exception("Failed to delete last pickle %s", pickle_file) + try: + self.log.debug("Pickling at path %s", pickle_path) + with open(pickle_path, "wb") as pickle_file: + pickle.dump(found_dags, pickle_file) + except: + self.log.exception(e) + return found_dags @provide_session diff --git a/airflow/www/app.py b/airflow/www/app.py index ad769b20bd839..95b189666fb4b 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -169,7 +169,7 @@ def shutdown_session(exception=None): def root_app(env, resp): - resp(b'404 Not Found', [(b'Content-Type', b'text/plain')]) + resp(b'404 Not Found', [('Content-Type', 'text/plain')]) return [b'Apache Airflow is not at this location']