Icon Simple Concurrency in Python

 

Icon The Software Thief

 

Icon ASCII : A Love Letter

 

Icon My Neural Network isn't working! What should I do?

 

Icon Phase-Functioned Neural Networks for Character Control

 

Icon 17 Line Markov Chain

 

Icon 14 Character Random Number Generator

 

Icon Simple Two Joint IK

 

Icon Generating Icons with Pixel Sorting

 

Icon Neural Network Ambient Occlusion

 

Icon Three Short Stories about the East Coast Main Line

 

Icon The New Alphabet

 

Icon "The Color Munifni Exists"

 

Icon A Deep Learning Framework For Character Motion Synthesis and Editing

 

Icon The Halting Problem and The Moral Arbitrator

 

Icon The Witness

 

Icon Four Seasons Crisp Omelette

 

Icon At the Bottom of the Elevator

 

Icon Tracing Functions in Python

 

Icon Still Things and Moving Things

 

Icon water.cpp

 

Icon Making Poetry in Piet

 

Icon Learning Motion Manifolds with Convolutional Autoencoders

 

Icon Learning an Inverse Rig Mapping for Character Animation

 

Icon Infinity Doesn't Exist

 

Icon Polyconf

 

Icon Raleigh

 

Icon The Skagerrak

 

Icon Printing a Stack Trace with MinGW

 

Icon The Border Pines

 

Icon You could have invented Parser Combinators

 

Icon Ready for the Fight

 

Icon Earthbound

 

Icon Turing Drawings

 

Icon Lost Child Announcement

 

Icon Shelter

 

Icon Data Science, how hard can it be?

 

Icon Denki Furo

 

Icon In Defence of the Unitype

 

Icon Maya Velocity Node

 

Icon Sandy Denny

 

Icon What type of Machine is the C Preprocessor?

 

Icon Which AI is more human?

 

Icon Gone Home

 

Icon Thoughts on Japan

 

Icon Can Computers Think?

 

Icon Counting Sheep & Infinity

 

Icon How Nature Builds Computers

 

Icon Painkillers

 

Icon Correct Box Sphere Intersection

 

Icon Avoiding Shader Conditionals

 

Icon Writing Portable OpenGL

 

Icon The Only Cable Car in Ireland

 

Icon Is the C Preprocessor Turing Complete?

 

Icon The aesthetics of code

 

Icon Issues with SDL on iOS and Android

 

Icon How I learned to stop worrying and love statistics

 

Icon PyMark

 

Icon AutoC Tools

 

Icon Scripting xNormal with Python

 

Icon Six Myths About Ray Tracing

 

Icon The Web Giants Will Fall

 

Icon PyAutoC

 

Icon The Pirate Song

 

Icon Dear Esther

 

Icon Unsharp Anti Aliasing

 

Icon The First Boy

 

Icon Parallel programming isn't hard, optimisation is.

 

Icon Skyrim

 

Icon Recognizing a language is solving a problem

 

Icon Could an animal learn to program?

 

Icon RAGE

 

Icon Pure Depth SSAO

 

Icon Synchronized in Python

 

Icon 3d Printing

 

Icon Real Time Graphics is Virtual Reality

 

Icon Painting Style Renderer

 

Icon A very hard problem

 

Icon Indie Development vs Modding

 

Icon Corange

 

Icon 3ds Max PLY Exporter

 

Icon A Case for the Technical Artist

 

Icon Enums

 

Icon Scorpions have won evolution

 

Icon Dirt and Ashes

 

Icon Lazy Python

 

Icon Subdivision Modelling

 

Icon The Owl

 

Icon Mouse Traps

 

Icon Updated Art Reel

 

Icon Tech Reel

 

Icon Graphics Aren't the Enemy

 

Icon On Being A Games Artist

 

Icon The Bluebird

 

Icon Everything2

 

Icon Duck Engine

 

Icon Boarding Preview

 

Icon Sailing Preview

 

Icon Exodus Village Flyover

 

Icon Art Reel

 

Icon LOL I DREW THIS DRAGON

 

Icon One Cat Just Leads To Another

Simple Concurrency in Python

Created on April 29, 2018, 11:26 a.m.

Whenever I attempt concurrency in Python it seems that something like this happens:

KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "", line 1, in 
F  File "", line 1, in 
  File "", line 1, in 
  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
a  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
i    from . import context
    from . import context
l  File "C:\Python35\lib\multiprocessing\context.py", line 5, in 
    from . import context
  File "C:\Python35\lib\multiprocessing\context.py", line 3, in 
e    from . import process
    import threading
d  File "C:\Python35\lib\multiprocessing\context.py", line 3, in 
  File "", line 969, in _find_and_load
  File "C:\Python35\lib\threading.py", line 7, in 
     import threading
  File "", line 958, in _find_and_load_unlocked
t    from traceback import format_exc as _format_exc
  File "C:\Python35\lib\threading.py", line 7, in 
  File "", line 673, in _load_unlocked
o  File "C:\Python35\lib\traceback.py", line 5, in 
    from traceback import format_exc as _format_exc
  File "", line 669, in exec_module
     import linecache
  File "C:\Python35\lib\traceback.py", line 5, in 
  File "", line 773, in get_code
i  File "C:\Python35\lib\linecache.py", line 11, in 
    import linecache
    import tokenize
m  File "", line 484, in _compile_bytecode
  File "C:\Python35\lib\linecache.py", line 11, in 
  File "C:\Python35\lib\tokenize.py", line 32, in 
pKeyboardInterrupt
    import tokenize
o    import re
  File "C:\Python35\lib\tokenize.py", line 32, in 
r  File "", line 969, in _find_and_load
t    import re
  File "", line 954, in _find_and_load_unlocked
   File "C:\Python35\lib\re.py", line 335, in 
t  File "", line 896, in _find_spec
h  File "", line 1147, in find_spec
e  File "", line 1121, in _get_spec
   File "", line 1229, in find_spec
s  File "", line 82, in _path_stat
iKeyboardInterrupt
te module
Traceback (most recent call last):
  File "C:\Python35\lib\site.py", line 563, in 
    main()
  File "C:\Python35\lib\site.py", line 550, in main
    known_paths = addsitepackages(known_paths)
  File "C:\Python35\lib\site.py", line 327, in addsitepackages
    addsitedir(sitedir, known_paths)
  File "C:\Python35\lib\site.py", line 206, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "C:\Python35\lib\site.py", line 167, in addpackage
    import copyreg
  File "", line 969, in _find_and_load
    exec(line)
  File "", line 958, in _find_and_load_unlocked
  File "", line 1, in 
  File "", line 673, in _load_unlocked

And I'm sure it isn't just me - the simple fact is that doing concurrency in Python sucks - and even the libraries which try to make it nicer often don't do that good a job or only make your life easy if you structure your program in a very particular way.

This is all made worse by the fact that threads in Python are not really executed concurrent but just have their operations interleaved by the Python interpreter. This means most forms of concurrency in python don't actually make use of multiple CPU cores and are therefore fairly useless from a performance perspective. To get true concurrency in Python you have to use the multiprocessing module, which at first looks like a drop-in replacement for threads, but in reality works in a completely different way - by launching multiple Python interpreters in separate processes and allowing some communication between them.


The saving grace of all of this is that the Python multiprocessing module actually provides some pretty nice basic tools, and if you understand how it works at a high level then making use of these tools to achieve concurrency is actually a fairly painless experience.

But first a little revision on how Python multiprocessing actually works. Let's take a look at the following basic Python script.

from multiprocessing import Process, Pipe
    
def greet(name):
    print('Hello %s!' % (name))
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

What actually happens when we run this script? Well everything runs basically as expected until we reach the line p.start().

When this line is executed a few important things happen. First, the arguments we supplied to the args parameter are pickled, which basically means they are converted into a raw stream of bytes and saved for later. Next, the script currently being run is run again by a new instance of Python. This second run of the script is given a different value for the __name__ variable, and has all of its output re-directed to the original process. The new run of the script executes everything as usual until it reaches the end of the script. At this point it unpickles the arguments that were saved and executes the function that was specified by the target parameter. Once it finishes executing this function it ends.

We can see this behavior if we run the same script as before but include another print statement in the middle:

from multiprocessing import Process, Pipe

def greet(name):
    print('Hello %s!' % (name))
    
print(__name__)
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

When we run this version it outputs something like this:

__main__
__mp_main__
Hello Daniel!

We can see the different values given to __name__ variable for the different runs, and if we were to look in task manager while it was executing we would see two instances of python.exe too (on Windows).

This little experiment highlights the behavior and also the main two limitations of Python's multiprocessing module - namely that any values given as the args parameter must be pickleable, which usually means simple to serializable data - and secondly that the target parameter must always be a top level function in the script and independent from anything that happens inside of the if __name__ == '__main__': part of the script.

(It is worth noting that while the multiprocessing module was designed to have the exact same interface as the Python threading module, and as such appears like it can be used as a drop-in replacement - I find it better to think about the two as completely separately entities as the above limitations mean this is almost always not the case.)


So here is my little recipe for multiprocessing in Python. It starts with a function which calls another function with a given set of arguments and key word arguments asynchronous - that is, it calls the provided function without waiting for it to finish. Instead what it returns is a handle, something we can use later to wait for the called function to finish and get the return value. Also, we are going to communicate with this process by constructing a Pipe which is a little object split into two parts (one for the parent process and one for the child) which we can use to send and receive things between the two processes. This asynchronous call function looks something like this:

def call_async(f, *args, **kwargs):
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    pipe_parent.send((f.__name__, args, kwargs))
    return (process, pipe_parent)

What this function does is creates a new Process, and runs the new process on a function called call_dispatch (which we will define later). Once this function is running we use the Pipe to send to the new process the name of the desired function we wish to call, as well as the arguments we want to use.

The call_dispatch function is itself very simple. It just waits for the function name and arguments to come from the pipe, calls the function at the top level with the given name, and then sends the return value back into the pipe once it is done.

def call_dispatch(pipe):
    name, args, kwargs = pipe.recv()
    output = globals()[name](*args, **kwargs)
    pipe.send(output)

Later on, when we want to wait for this function to finish, and get return value of it, we can use another function called call_await, which waits for the called function to finish and then returns its return value.

def call_await(h):
    process, pipe_parent = h
    output = pipe_parent.recv()
    process.join()
    return output

Using these little functions makes some things easy, such as asynchronously calling a function for each element of an array and then gathering the return values once they are all ready.

import time

def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = []
    for i, name in enumerate(names):
        h = call_async(greet, i, name)
        handles.append(h)
        
    for h in handles:
        print(call_await(h))

Notice how evaluating this script takes only one second instead of four since each call to `greet` is performed in a separate process. Also notice how although the evaluation order can vary, the return order is always the same since we await for each of the processes to finish in sequence.

1 Hello Chess!
2 Hello Tom!
0 Hello Dan!
3 Hello Mike!
0
1
2
3

One little limitation to this approach is that each time we want to call a function asynchronous we have to spin up a whole new process. Unless the function you are calling takes at least a few seconds to execute, most of the time is going to be spent starting up new processes. To avoid this issue we can actually start each of our processes ahead of time before using them to call functions asynchronous. Let us make a new function which does this called fork:

def fork():
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    return (process, pipe_parent)

And we'll change what happens in the call_dispatch function. Now the dispatch function will run in an infinite loop, each time waiting for a new top level function to call with a given set of arguments.

def call_dispatch(pipe):
    while True:
        name, args, kwargs = pipe.recv()
        if name == 'exit':
            break
        else:
            output = globals()[name](*args, **kwargs)
            pipe.send(output)

You'll notice that if this process get asked to call the exit function it will just break out of this loop and finish gracefully. We can make another function called join which does this for any new process allocated with fork.

def join(h):
    process, pipe_parent = h
    pipe_parent.send(('exit', (), {}))
    process.join()

Now we just need to define call_async and call_await. These are similar to before but just assume the process has already been started:

def call_async(h, f, *args, **kwargs):
    process, pipe_parent = h
    pipe_parent.send((f.__name__, args, kwargs))

def call_await(h):
    process, pipe_parent = h
    return pipe_parent.recv()

Now we can start processes ahead of time, and re-use them to call multiple different functions in parallel.

import time
    
def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
def square(i):
    return i*i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = [fork() for _ in range(len(names))]
    
    # Greet in Parallel
    
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, greet, i, name)
    
    for h in handles:
        print(call_await(h))

    # Square in Parallel
        
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, square, i)
    
    for h in handles:
        print(call_await(h))
        
    # Finish
        
    for h in handles:
        join(h)

Of course it isn't perfect, and there are times when this abstraction will fail, but I hope I've shown how it isn't too difficult to build simple useful things with the basic tools provided by the multiprocessing module as long as you have a high level understanding of how it works. With this sort of setup you can quickly start to think how you might build worker pools and other sorts of useful structures for parallel programming.

With that all said, good luck and happy concurrent programming!

github twitter rss