Introduction

Logging and debugging are great ways to get insights into your programs, especially while developing code. Multiprocessing in Python introduces some quirks that make it more challenging to develop and monitor programs. Using VSCode on Windows, I have not been able to use the debugger without crashing, so I switched to using logging for the same purpose. Print statements can help, but logging gives a much nicer interface and ability to send outputs to different locations.

Naive Logging Approach

After the debugger VSCode debugger crashed for me, I switched to logging, writing some code like this. (See my previous post on basic multiprocessing)

import logging
import multiprocessing

logger = logging.getLogger()
fh = logging.FileHandler('my_log.log', encoding='utf-8')
logger.addHandler(fh)
logger.setLevel(logging.INFO)

data_to_process = [1, 2, 3]

def my_function(num, logger):
    logger.info('hello from inside process')
    return num

def run():
    num_processes = 3
    with multiprocessing.Pool(num_processes) as pool:
        logger.info('hello from before process')
        params = []
        for data in data_to_process:
            params.append((data, logger)) # Logger is passed to my_function
        for result in pool.starmap(my_function, params):
            logger.info(f'result of process: {result}')

if __name__ == '__main__':
    run()

It seems sensible to just pass the logger to the my_function and have it operate normally. This seems to work! Unfortunately, logging to a single file from multiple processes is not supported, and you will end up with some frustrating bugs.

Queue Handler Approach to take Logging Events

Using the logging cookbook as a starting point, I’ve developed a few relatively simple functions to help facilitate logging with multiprocesses code. The solution I’ve settled on is to create a listener process that monitors a queue. This queue is passed to the child processes, and they are able to log to the queue and the listener process sends the messages to the log file.

import logging
import logging.handlers
import multiprocessing
from os import path
import sys
import traceback

log_file_path = '' # Wherever your log files live
log_name = 'my_log'

def listener_configurer(log_name, log_file_path):
    """ Configures and returns a log file based on 
    the given name

    Arguments:
        log_name (str): String of the log name to use
        log_file_path (str): String of the log file path

    Returns:
        logger: configured logging object
    """
    logger = logging.getLogger(log_name)

    fh = logging.FileHandler(
        path.join(log_file_path, f'{log_name}.log'), encoding='utf-8')
    fmtr = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh.setFormatter(fmtr)
    logger.setLevel(logging.INFO)
    current_fh_names = [fh.__dict__.get(
        'baseFilename', '') for fh in logger.handlers]
    if not fh.__dict__['baseFilename'] in current_fh_names: # This prevents multiple logs to the same file
        logger.addHandler(fh)

    return logger

def listener_process(queue, configurer, log_name):
    """ Listener process is a target for a multiprocess process
    that runs and listens to a queue for logging events.

    Arguments:
        queue (multiprocessing.manager.Queue): queue to monitor
        configurer (func): configures loggers
        log_name (str): name of the log to use

    Returns:
        None
    """
    configurer(log_name, log_file_path)

    while True:
        try:
            record = queue.get()
            if record is None:
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except Exception:
            print('Failure in listener_process', file=sys.stderr)
            traceback.print_last(limit=1, file=sys.stderr)

def my_function(num, log_queue, i):
    qh = logging.handlers.QueueHandler(log_queue)
    root_logger = logging.getLogger(path.join(log_file_path, log_name))
    root_logger.addHandler(qh)
    root_logger.setLevel(logging.INFO)

    logger = logging.getLogger(log_name).getChild(f'child_{i}') # This allows you to know what process you're in
    logger.setLevel(logging.INFO)

    logger.info('hello from inside process')

    return num

def run():
    logger = listener_configurer(log_name, log_file_path)

    manager = multiprocessing.Manager()
    log_queue = manager.Queue()
    listener = multiprocessing.Process(target=listener_process,
                                        args=(log_queue, listener_configurer, log_name))
    listener.start() # Start the listener process

    data_to_process = [1, 2, 3]

    num_processes = 3
    with multiprocessing.Pool(num_processes) as pool:
        logger.info('hello from before process')
        params = []
        for i, data in enumerate(data_to_process):
            params.append((data, log_queue, i)) # Log QUEUE is passed to my_function, along with i
        for result in pool.starmap(my_function, params):
            logger.info(f'result of process: {result}')

    log_queue.put_nowait(None) # End the queue
    listener.join() # Stop the listener

if __name__ == '__main__':
    run()

Output:
2020-06-06 13:56:56,531 - my_log - INFO - hello from before process
2020-06-06 13:56:56,825 - my_log.child_0 - INFO - hello from inside process
2020-06-06 13:56:56,834 - my_log.child_1 - INFO - hello from inside process
2020-06-06 13:56:56,847 - my_log.child_2 - INFO - hello from inside process
2020-06-06 13:56:56,849 - my_log - INFO - result of process: 1
2020-06-06 13:56:56,850 - my_log - INFO - result of process: 2
2020-06-06 13:56:56,850 - my_log - INFO - result of process: 3

Summary

Creating a few simple function to help with multiprocessing logging can make life a lot easier. I keep these function in a library so they can be called when needed. This process works on Windows/Linux