Test parallelization


Introduction

Automated tests are awesome. A good test suite makes refactoring easier and allows developers to catch bugs before the code is deployed on live or is spotted by a tester.

Once the advantages of automated testing are discovered, the number of tests increases exponentially. At Paylogic, there were about 200 tests in the early testing stage. The tests took about 5 minutes to run. At the moment, we have about 2500 tests and using a single test process it takes 1 hour and 40 minutes to execute.

For developers, it's simply not an option to wait for 2 hours until the whole test suite finishes, as they want results as soon as possible. They might then take the risk to push the code untested. However, as we have continuous integration in place, if there are any failing tests, the developers will get their work back to fix them anyway. This is something that costs extra time.

It became clear that something had to be changed. We could completely rewrite the tests, make them very focused and try mock most of the things (such as the database). However, this means that the team would have to exclusively dedicate a few months to improve the tests and stop developing new features.

Another solution would involve test distribution over several executors that might run on one or several machines. The same tests run in parallel on many nodes reduces wall time. Everybody is happy: developers do not need to wait hours to get their test results and get an excuse to go for a coffee. Product managers do not need to change the long term planning. Customers are happy. Profit.

A closer look to the test suite

The testing tool we use is py.test. We keep our tests in a separate folder called tests, which is organized like this:

tests
├── conftest.py
├── deployment
├── fixtures
├── functional
├── __init__.py
└── unit

In conftest.py we store the py.test configuration and import fixtures which are defined in the fixtures folder. The folder functional contains functional tests, unit contains unit tests and so on.

The tests require the database to be available, as well as memcache. Functional tests also expect the web servers required for the tests to listen on specific ports. By web servers I mean those servers that serve applications whose code we cover with our tests. Among them can be the Django development server, the Flask development server, paste http server and so on.

A typical development session looks like this:

virtualenv env  # Create a virtualenv.
source env/bin/activate  # Activate it.
pip install -r requirements-testing.txt  # Install all the needed packages.
<generate settings command>  # Generate settings (set up the database connection string etc).
<database schema generation command> # Generate the database schema and insert initial data into it.

# Implement changes for given feature.

# Start web application(s)

py.test tests  # Run the tests.

Dependency satisfaction, configuration, database instantiation and population together with the startup of the required web application(s) is done outside of the test run. This makes sense, because none of them has to be done before every test run. Clearly, a developer has to install a package when a new dependency is introduced and regenerate settings if a new configuration parameter is added.

Test parallelization in theory and practice

If we run tests in parallel, for example in two sessions, each session will share the same settings (most importantly the database connection string) and the same web applications. This has several limitations. If we have two tests that access the same application simultaneously, their requests will be processed by only one application worker, which of course leads to a performance decrease.

Another more serious limitation comes from the way our tests are written. There are for example tests for ticket generation that check PDF generation. On a high level the tests look like this:

  1. Create an order.
  2. Execute the ticket generation function.
  3. Check that 1 ticket was generated.

The trick is in the second step. The ticket generation function is triggered by a periodical job. It selects from the database all the orders for which tickets have to be generated and generates them. In a sequential test run this is not a big deal because there will never be a situation that one call to the ticket generation function generates more than one ticket. The performed actions are:

Time Action
Test 1
1 Create an order.
2 Execute the ticket generation function.
3 Check that 1 ticket was generated.
Test 2
4 Create another order.
5 Execute the ticket generation function.
6 Check that 1 ticket was generated.

In a parallel run however, two orders may be generated simultaneously. Then, the generation function will get both orders, and consequently generate tickets for both. Imagine situations like this:

Time Action Action
Test 1 Test 2
1 Create an order. Create another order.
2 Execute the ticket generation function. (Generates 2 tickets.)
3 Check that 1 ticket was generated. (Fails! 2 tickets were generated.) Execute the ticket generation. (Does nothing!)
4   Check that 1 ticket was generated. (Fails! 0 tickets were generated.)

Because tests are not always meant to be run in parallel when they are written, situations like this can happen quite often.

The art of mocking

The simplest way to avoid situations where tests influence each other is to get rid of the shared resources. In our case, this means that each test session would have its own unique database connection string, which leads to a non-shared database.

The problem is that we, using a common-use approach, store settings in python modules and instantiate them from templates before the test run! An example of configuration using python modules can be found in Django settings, Flask configuration, etc.

We could checkout the sources of Paylogic to two folders and change the settings to the ones we want. This would entail some crazy text file editing scripts to alter settings. In addition, it is not the way pytest-xdist works.

Another way is to mock the connection string using a fixture:

@pytest.fixture(scope='session')
def database_settings(database_connection):
    """Mock the database settings.

    :param str database_connection: the database connection string.

    """
    # Reset the connection string.
    from config import database
    config.database_connection = database_connection

To make the mock successful, our code should behave accordingly. Instead of:

from config.database import database_connection


def connect_to_db():
    """Connect to the database,

    A completely made up function to illustrate *incorrect* settings import.

    """
    return Connection(database_connection)

we write:

from config import database


def connect_to_db():
    """Connect to the database,

    A completely made up function to illustrate a *better* settings import.

    """
    return Connection(database.database_connection)

Web applications

For the unit tests, mocking the database connection is sufficient. If we want to start two instances of a web application, we need to change:

  1. The database connection string.
  2. The port the application is listening on.

An application could be a fixture that starts a subprocess and passes the custom port, if we use Circus:

from circus.watcher import Watcher


@pytest.fixture(scope='session')
def application(port, app_script):
    """Start application in a separate process.

    :param port: a random port the application should listen to.
    :param app_script: the path to application runner script.

    """

    watcher = Watcher(
        name='application',
        cmd=app_script,
        args='runserver {0}'.format(port),
    )

    watcher.start()
    request.addfinalizer(watcher.stop)

    return watcher

This is a rather limited solution, because we did not set up the database connection string. Furthermore, we couldn't pass it as an environment variable, nor pass the path to the custom settings. It is however possible to pass parameters to the script (app_script) in the example. This would help us to override the needed settings on the remote side. But then we should somehow marshal the complex data structures via the command line. This would require more custom code to write.

The first solution that came to mind was to use multiprocessing. This way we can use a python function instead of a file script to be a worker for our application. Code would look as follows:

import multiprocessing

def app_worker(database_connection, port):
    """Start web application.

    :param str database_connection: the database connection string.
    :param port: the port number that will be used by runserver.

    """
    # Remove modules that happen to be imported by the parent process.
    import sys
    for module in set(sys.modules).difference(sys.builtin_module_names):
        if not module.startswith('multiprocessing') and module != __name__:
            del sys.modules[module]

    # monkey patch the database connection
    from config import database
    database.database_connection = database_connection

    import tornado.httpserver
    import tornado.ioloop
    import tornado.web
    import tornado.wsgi

    wsgi_app = tornado.wsgi.WSGIContainer(
        app_wsgi_handler)
    tornado_app = tornado.web.Application([
        (r"/media/(.*)", tornado.web.StaticFileHandler, {"path": media_path}),
        ('.*', tornado.web.FallbackHandler, dict(fallback=wsgi_app)),
    ])

    server = tornado.httpserver.HTTPServer(tornado_app)
    server.listen(port)
    channel.send('started app on port: {0}'.format(port))
    tornado.ioloop.IOLoop.instance().start()


@pytest.fixture(scope='session')
def application(request, port, database_connection, timeout=10):
    """Start application in a separate process.

    :param port: a random port the application should listen to.

    """
    process = multiprocessing.Process(
        targer=app_worker,
        port=port,
        database_connection=database_connection,
    )
    request.addfinalizer(process.terminate)
    process.start()
    return process

This has one big downside: memory. Multiprocessing uses fork to do its work. This means that a lot of memory you've earned in the parent process will be copied into the child process. Of course it's declared to be copy-on-write but in reality python is not that efficient here.

So we decided to combine these 2 approaches: use a subprocess to run python but don't bother with marshalling the parameters manually via command line. The nice execnet library allows us to transparently run some python function inside of a remote python process. Here is the comprehensive example:

import execnet

def app_worker(channel, database_connection, port):
    """Start web application.

    :param channel: execnet channel to talk to the master process.
    :param str database_connection: the database connection string.
    :param port: the port number that will be used by runserver.

    """
    # monkey patch the database connection
    from config import database
    database.database_connection = database_connection

    import tornado.httpserver
    import tornado.ioloop
    import tornado.web
    import tornado.wsgi

    wsgi_app = tornado.wsgi.WSGIContainer(
        app_wsgi_handler)
    tornado_app = tornado.web.Application([
        (r"/media/(.*)", tornado.web.StaticFileHandler, {"path": media_path}),
        ('.*', tornado.web.FallbackHandler, dict(fallback=wsgi_app)),
    ])

    server = tornado.httpserver.HTTPServer(tornado_app)
    server.listen(port)
    channel.send('started app on port: {0}'.format(port))
    tornado.ioloop.IOLoop.instance().start()


@pytest.fixture(scope='session')
def application(request, port, database_connection, timeout=10):
    """Start application in a separate process.

    :param port: a random port the application should listen to.

    """
    # create execnet gateway
    gw = execnet.makegateway()

    # set the same python system path on remote python as on current one
    import sys
    gw.remote_exec('\n'.join(
        [
            "import sys",
            "sys.path = {0}".format(sys.path)
        ]
    )).waitclose()

    # create channel running worker function
    channel = gw.remote_exec(
        app_worker,
        port=port,
        database_connection=database_connection,
    )
    request.addfinalizer(gw.exit)
    return gw

In this way we can attach any customizations before starting the application.

Other isolated resources

Apart from the database connection string, there are other shared resources. One of them can be some folder where file artifacts need to be stored. They have to be isolated as well, because the filenames can clash in concurrent test processes (a.k.a. sessions). However, mocking can be done here in the same way as in the case of the connection string.

It is also possible to use only one server but with isolated databases. We then start as many MySQL instances as we have concurrent test sessions.

Requirements

Another nontrivial part is to distribute requirements to each node. We do this together with the code distribution as a virtualenv. Each node then activates it before running the tests:

def pytest_addoption(parser):
    """Add options custom pytest options."""
    group = parser.getgroup("xdist", "distributed and subprocess32 testing")
    group._addoption(
        '--activate-script',
        action="store", dest="activate_script",
        default='env/bin/activate_this.py',
        help="Activate virtual environment script (relative path). "
        "This is to make remote python aware about all the dependencies project needs.")

def pytest_configure_node(node):
    """Configure node information before it gets instantiated.

    Activate the virtual env, so the node is able to import Paylogic
    dependencies.

    """

    here = os.path.basename(os.path.dirname(os.path.dirname(__file__)))
    activate_script = os.path.normpath(os.path.join(here, node.config.option.activate_script))

    # remove pyc files and activate the virtual environment on the remote side.
    node.gateway.remote_exec('\n'.join(
        [
            "import os.path",
            "import subprocess",
            """subprocess.check_call(['find', '-name', '"*.pyc"', '-delete'])""",
            "activate_this = '{0}'".format(activate_script),
            "if os.path.exists(activate_this):",
            "    execfile(activate_this, {'__file__': activate_this})",
        ]
    )).waitclose()

Results

Test parallelization dramatically reduced the time needed to run unit and functional tests. It takes about 5 minutes to run unit and functional tests on a cluster of 6 old dual core machines, each of them running 2 sessions.

An experiment in the early stages gave these results:

parallelization performance comparison graph

The blue line is the test distribution over cluster machines, one worker on each of them. The pink line represents the "ideal situation", where doubling the number of works decreases the tests execution time by a factor of 2. Finally, the yellow line is the run executed on a developer's machine.

py.test-xdist behaves very well when it comes to parallel execution and the overhead is relatively small.

Open source

We announce the open source pytest plugins which simplify the process of running services (memcached, mysql, etc) on demand for every concurrent test session. We also will open source a helper for scheduling test jobs among test slave nodes.

Conclusion

Automated testing facilitates development of complex software. However, if a lot of time is required to get a test result, automated testing will be rejected by the majority of the team. Test parallelization and execution over several nodes solves this problem, with as trade-off the extra effort needed to make the tests ready for parallelization.


Comments