Skip to content

Commit 4ee4e47

Browse files
committed
[AIRFLOW-1018] Make processor use logging framework
Until now, the dga processor had its own logging implementation, making it hard to adjust for certain use cases like working in a container. This patch moves everything to the standard logging framework. Closes apache#2728 from bolkedebruin/AIRFLOW-1018
1 parent 4fb7a90 commit 4ee4e47

9 files changed

+294
-175
lines changed

airflow/config_templates/airflow_local_settings.py

+16
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
LOG_FORMAT = conf.get('core', 'log_format')
2626

2727
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
28+
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
2829

2930
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
31+
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
3032

3133
DEFAULT_LOGGING_CONFIG = {
3234
'version': 1,
@@ -35,6 +37,9 @@
3537
'airflow.task': {
3638
'format': LOG_FORMAT,
3739
},
40+
'airflow.processor': {
41+
'format': LOG_FORMAT,
42+
},
3843
},
3944
'handlers': {
4045
'console': {
@@ -47,6 +52,12 @@
4752
'formatter': 'airflow.task',
4853
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
4954
'filename_template': FILENAME_TEMPLATE,
55+
},
56+
'file.processor': {
57+
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
58+
'formatter': 'airflow.processor',
59+
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
60+
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
5061
}
5162
# When using s3 or gcs, provide a customized LOGGING_CONFIG
5263
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
@@ -67,6 +78,11 @@
6778
# },
6879
},
6980
'loggers': {
81+
'airflow.processor' : {
82+
'handlers': ['file.processor'],
83+
'level': LOG_LEVEL,
84+
'propagate': False,
85+
},
7086
'airflow.task': {
7187
'handlers': ['file.task'],
7288
'level': LOG_LEVEL,

airflow/jobs.py

+26-53
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from __future__ import unicode_literals
1919

2020
import getpass
21+
import logging
2122
import multiprocessing
2223
import os
2324
import psutil
@@ -285,19 +286,16 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
285286
# Counter that increments everytime an instance of this class is created
286287
class_creation_counter = 0
287288

288-
def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file):
289+
def __init__(self, file_path, pickle_dags, dag_id_white_list):
289290
"""
290291
:param file_path: a Python file containing Airflow DAG definitions
291292
:type file_path: unicode
292293
:param pickle_dags: whether to serialize the DAG objects to the DB
293294
:type pickle_dags: bool
294295
:param dag_id_whitelist: If specified, only look at these DAG ID's
295296
:type dag_id_whitelist: list[unicode]
296-
:param log_file: the path to the file where log lines should be output
297-
:type log_file: unicode
298297
"""
299298
self._file_path = file_path
300-
self._log_file = log_file
301299
# Queue that's used to pass results from the child process.
302300
self._result_queue = multiprocessing.Queue()
303301
# The process that was launched to process the given .
@@ -319,17 +317,12 @@ def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file):
319317
def file_path(self):
320318
return self._file_path
321319

322-
@property
323-
def log_file(self):
324-
return self._log_file
325-
326320
@staticmethod
327321
def _launch_process(result_queue,
328322
file_path,
329323
pickle_dags,
330324
dag_id_white_list,
331-
thread_name,
332-
log_file):
325+
thread_name):
333326
"""
334327
Launch a process to process the given file.
335328
@@ -345,35 +338,21 @@ def _launch_process(result_queue,
345338
:type dag_id_white_list: list[unicode]
346339
:param thread_name: the name to use for the process that is launched
347340
:type thread_name: unicode
348-
:param log_file: the logging output for the process should be directed
349-
to this file
350-
:type log_file: unicode
351341
:return: the process that was launched
352342
:rtype: multiprocessing.Process
353343
"""
354344
def helper():
355345
# This helper runs in the newly created process
356-
357-
# Re-direct stdout and stderr to a separate log file. Otherwise,
358-
# the main log becomes too hard to read. No buffering to enable
359-
# responsive file tailing
360-
parent_dir, _ = os.path.split(log_file)
361-
362-
_log = LoggingMixin().log
363-
364-
# Create the parent directory for the log file if necessary.
365-
if not os.path.isdir(parent_dir):
366-
os.makedirs(parent_dir)
367-
368-
f = open(log_file, "a")
369-
original_stdout = sys.stdout
370-
original_stderr = sys.stderr
371-
372-
sys.stdout = f
373-
sys.stderr = f
346+
log = logging.getLogger("airflow.processor")
347+
for handler in log.handlers:
348+
try:
349+
handler.set_context(file_path)
350+
except AttributeError:
351+
# Not all handlers need to have context passed in so we ignore
352+
# the error when handlers do not have set_context defined.
353+
pass
374354

375355
try:
376-
configure_logging()
377356
# Re-configure the ORM engine as there are issues with multiple processes
378357
settings.configure_orm()
379358

@@ -383,26 +362,20 @@ def helper():
383362
threading.current_thread().name = thread_name
384363
start_time = time.time()
385364

386-
_log.info("Started process (PID=%s) to work on %s",
387-
os.getpid(),
388-
file_path)
389-
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list)
365+
log.info("Started process (PID=%s) to work on %s",
366+
os.getpid(), file_path)
367+
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
390368
result = scheduler_job.process_file(file_path,
391369
pickle_dags)
392370
result_queue.put(result)
393371
end_time = time.time()
394-
_log.info(
395-
"Processing %s took %.3f seconds",
396-
file_path, end_time - start_time
372+
log.info(
373+
"Processing %s took %.3f seconds", file_path, end_time - start_time
397374
)
398375
except:
399376
# Log exceptions through the logging framework.
400-
_log.exception("Got an exception! Propagating...")
377+
log.exception("Got an exception! Propagating...")
401378
raise
402-
finally:
403-
sys.stdout = original_stdout
404-
sys.stderr = original_stderr
405-
f.close()
406379

407380
p = multiprocessing.Process(target=helper,
408381
args=(),
@@ -419,8 +392,7 @@ def start(self):
419392
self.file_path,
420393
self._pickle_dags,
421394
self._dag_id_white_list,
422-
"DagFileProcessor{}".format(self._instance_id),
423-
self.log_file)
395+
"DagFileProcessor{}".format(self._instance_id))
424396
self._start_time = datetime.utcnow()
425397

426398
def terminate(self, sigkill=False):
@@ -538,6 +510,7 @@ def __init__(
538510
processor_poll_interval=1.0,
539511
run_duration=None,
540512
do_pickle=False,
513+
log=None,
541514
*args, **kwargs):
542515
"""
543516
:param dag_id: if specified, only schedule tasks with this DAG ID
@@ -574,6 +547,10 @@ def __init__(
574547

575548
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
576549
self.max_threads = conf.getint('scheduler', 'max_threads')
550+
551+
if log:
552+
self._log = log
553+
577554
self.using_sqlite = False
578555
if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
579556
if self.max_threads > 1:
@@ -591,9 +568,7 @@ def __init__(
591568
# Parse and schedule each file no faster than this interval. Default
592569
# to 3 minutes.
593570
self.file_process_interval = file_process_interval
594-
# Directory where log files for the processes that scheduled the DAGs reside
595-
self.child_process_log_directory = conf.get('scheduler',
596-
'child_process_log_directory')
571+
597572
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
598573
if run_duration is None:
599574
self.run_duration = conf.getint('scheduler',
@@ -1548,17 +1523,15 @@ def _execute(self):
15481523
known_file_paths = list_py_file_paths(self.subdir)
15491524
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
15501525

1551-
def processor_factory(file_path, log_file_path):
1526+
def processor_factory(file_path):
15521527
return DagFileProcessor(file_path,
15531528
pickle_dags,
1554-
self.dag_ids,
1555-
log_file_path)
1529+
self.dag_ids)
15561530

15571531
processor_manager = DagFileProcessorManager(self.subdir,
15581532
known_file_paths,
15591533
self.max_threads,
15601534
self.file_process_interval,
1561-
self.child_process_log_directory,
15621535
self.num_runs,
15631536
processor_factory)
15641537

airflow/utils/dag_processing.py

+6-103
Original file line numberDiff line numberDiff line change
@@ -276,15 +276,6 @@ def start_time(self):
276276
"""
277277
raise NotImplementedError()
278278

279-
@property
280-
@abstractmethod
281-
def log_file(self):
282-
"""
283-
:return: the log file associated with this processor
284-
:rtype: unicode
285-
"""
286-
raise NotImplementedError()
287-
288279
@property
289280
@abstractmethod
290281
def file_path(self):
@@ -313,7 +304,6 @@ def __init__(self,
313304
file_paths,
314305
parallelism,
315306
process_file_interval,
316-
child_process_log_directory,
317307
max_runs,
318308
processor_factory):
319309
"""
@@ -330,12 +320,9 @@ def __init__(self,
330320
:param max_runs: The number of times to parse and schedule each file. -1
331321
for unlimited.
332322
:type max_runs: int
333-
:param child_process_log_directory: Store logs for child processes in
334-
this directory
335-
:type child_process_log_directory: unicode
336323
:type process_file_interval: float
337324
:param processor_factory: function that creates processors for DAG
338-
definition files. Arguments are (dag_definition_path, log_file_path)
325+
definition files. Arguments are (dag_definition_path)
339326
:type processor_factory: (unicode, unicode) -> (AbstractDagFileProcessor)
340327
341328
"""
@@ -345,7 +332,6 @@ def __init__(self,
345332
self._dag_directory = dag_directory
346333
self._max_runs = max_runs
347334
self._process_file_interval = process_file_interval
348-
self._child_process_log_directory = child_process_log_directory
349335
self._processor_factory = processor_factory
350336
# Map from file path to the processor
351337
self._processors = {}
@@ -447,85 +433,6 @@ def set_file_paths(self, new_file_paths):
447433
processor.stop()
448434
self._processors = filtered_processors
449435

450-
@staticmethod
451-
def _split_path(file_path):
452-
"""
453-
Return the path elements of a path as an array. E.g. /a/b/c ->
454-
['a', 'b', 'c']
455-
456-
:param file_path: the file path to split
457-
:return: a list of the elements of the file path
458-
:rtype: list[unicode]
459-
"""
460-
results = []
461-
while True:
462-
head, tail = os.path.split(file_path)
463-
if len(tail) != 0:
464-
results.append(tail)
465-
if file_path == head:
466-
break
467-
file_path = head
468-
results.reverse()
469-
return results
470-
471-
def _get_log_directory(self):
472-
"""
473-
Log output from processing DAGs for the current day should go into
474-
this directory.
475-
476-
:return: the path to the corresponding log directory
477-
:rtype: unicode
478-
"""
479-
now = datetime.utcnow()
480-
return os.path.join(self._child_process_log_directory,
481-
now.strftime("%Y-%m-%d"))
482-
483-
def _get_log_file_path(self, dag_file_path):
484-
"""
485-
Log output from processing the specified file should go to this
486-
location.
487-
488-
:param dag_file_path: file containing a DAG
489-
:type dag_file_path: unicode
490-
:return: the path to the corresponding log file
491-
:rtype: unicode
492-
"""
493-
log_directory = self._get_log_directory()
494-
# General approach is to put the log file under the same relative path
495-
# under the log directory as the DAG file in the DAG directory
496-
relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory)
497-
path_elements = self._split_path(relative_dag_file_path)
498-
499-
# Add a .log suffix for the log file
500-
path_elements[-1] += ".log"
501-
502-
return os.path.join(log_directory, *path_elements)
503-
504-
def symlink_latest_log_directory(self):
505-
"""
506-
Create symbolic link to the current day's log directory to
507-
allow easy access to the latest scheduler log files.
508-
509-
:return: None
510-
"""
511-
log_directory = self._get_log_directory()
512-
latest_log_directory_path = os.path.join(
513-
self._child_process_log_directory, "latest")
514-
if os.path.isdir(log_directory):
515-
# if symlink exists but is stale, update it
516-
if os.path.islink(latest_log_directory_path):
517-
if os.readlink(latest_log_directory_path) != log_directory:
518-
os.unlink(latest_log_directory_path)
519-
os.symlink(log_directory, latest_log_directory_path)
520-
elif (os.path.isdir(latest_log_directory_path) or
521-
os.path.isfile(latest_log_directory_path)):
522-
self.log.warning(
523-
"%s already exists as a dir/file. Skip creating symlink.",
524-
latest_log_directory_path
525-
)
526-
else:
527-
os.symlink(log_directory, latest_log_directory_path)
528-
529436
def processing_count(self):
530437
"""
531438
:return: the number of files currently being processed
@@ -574,8 +481,8 @@ def heartbeat(self):
574481
for file_path, processor in finished_processors.items():
575482
if processor.result is None:
576483
self.log.warning(
577-
"Processor for %s exited with return code %s. See %s for details.",
578-
processor.file_path, processor.exit_code, processor.log_file
484+
"Processor for %s exited with return code %s.",
485+
processor.file_path, processor.exit_code
579486
)
580487
else:
581488
for simple_dag in processor.result:
@@ -622,19 +529,15 @@ def heartbeat(self):
622529
while (self._parallelism - len(self._processors) > 0 and
623530
len(self._file_path_queue) > 0):
624531
file_path = self._file_path_queue.pop(0)
625-
log_file_path = self._get_log_file_path(file_path)
626-
processor = self._processor_factory(file_path, log_file_path)
532+
processor = self._processor_factory(file_path)
627533

628534
processor.start()
629535
self.log.info(
630-
"Started a process (PID: %s) to generate tasks for %s - logging into %s",
631-
processor.pid, file_path, log_file_path
536+
"Started a process (PID: %s) to generate tasks for %s",
537+
processor.pid, file_path
632538
)
633-
634539
self._processors[file_path] = processor
635540

636-
self.symlink_latest_log_directory()
637-
638541
# Update scheduler heartbeat count.
639542
self._run_count[self._heart_beat_key] += 1
640543

0 commit comments

Comments
 (0)