Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import logging
import numbers
import os
import os.path
import pendulum
import pickle
import re
Expand All @@ -50,6 +51,7 @@
import traceback
import warnings
import hashlib
import glob

import uuid
from datetime import datetime
Expand Down Expand Up @@ -324,13 +326,43 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
the module and look for dag objects within it.
"""
found_dags = []

# if the source file no longer exists in the DB or in the filesystem,
# return an empty list
# todo: raise exception?
if filepath is None or not os.path.isfile(filepath):
return found_dags

path_split = os.path.split(filepath)
pickle_dir = os.path.join(path_split[0],"__pickle_cache__")
file_name = os.path.splitext(path_split[1])[0]

# we create pickle_dir directory
try:
os.mkdir(pickle_dir)
self.log.info("Directory " , pickle_dir , " created.")
except FileExistsError:
self.log.debug("Directory ", pickle_dir, " already exists.")
except:
self.log.exception(e)

# if pickled dag, we return directly after adding them to our DAG
pickle_found = True
timestamp = str(os.path.getmtime(filepath)).replace(".", "_")
pickle_name = ".".join((file_name, timestamp, "p"))
pickle_path = os.path.join(pickle_dir, pickle_name)
try:
with open(pickle_path, "rb") as pickle_file:
pickled_dags = pickle.load(pickle_file)
for dag in pickled_dags:
self.dags[dag.dag_id] = dag
return pickled_dags
except FileNotFoundError:
pickle_found = False
self.log.exception("Couldn't load pickle path: %s", pickle_path)
except Exception as e:
pickle_found = False
self.log.exception(e)

try:
# This failed before in what may have been a git sync
# race condition
Expand Down Expand Up @@ -442,6 +474,25 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
file_last_changed_on_disk

self.file_last_changed[filepath] = file_last_changed_on_disk

if not pickle_found:

# we delete unused pickles
pickle_regex_name = ".".join((file_name, "*", "p"))
pickle_regex = os.path.join(pickle_dir, pickle_regex_name)
pickle_list = glob.glob(pickle_regex)
for pickle_file in pickle_list:
try:
os.remove(pickle_file)
except:
Comment thread
hqwisen marked this conversation as resolved.
self.log.exception("Failed to delete last pickle %s", pickle_file)
try:
self.log.debug("Pickling at path %s", pickle_path)
with open(pickle_path, "wb") as pickle_file:
pickle.dump(found_dags, pickle_file)
except:
self.log.exception(e)

return found_dags

@provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def shutdown_session(exception=None):


def root_app(env, resp):
resp(b'404 Not Found', [(b'Content-Type', b'text/plain')])
resp(b'404 Not Found', [('Content-Type', 'text/plain')])
return [b'Apache Airflow is not at this location']


Expand Down