Coroutine_python

So, when porting my python language software to iOS (yes, python on iOS), I ran into a problem where my multi-threaded program dependended on a library that was not thread safe.

Without the ability to change the library itself, I had only two options to serialize access to the library:

1. Re-write hundreds of lines of code to "wrap" thread-safe calls with locks around the library

2. Find an easier way.

I chose easier. Enter: Python coroutines.

Serializing non-thread-safe code using Python Coroutines

#!/usr/bin/env python

'''

Author: Michael R. Hines

This is an example of using Python coroutines to serialize parts

of a multi-threaded program without locking.

I ran into this need one day when a library I was using was not

thread-safe (without the ability to change the library).

Thus, I needed specific parts of each thread to run serially

*in the context* of a different, common thread who only has access

to the library.

'''

from threading import Thread, current_thread

import Queue

from time import sleep

# The code to be serialized in a different thread context

# (The main thread in this example)

# This is our coroutine.

def serial_function() :

(msg, input_var, rq) = (yield)

result = input_var + 10

rq.put(str(current_thread()) + " " + ": Computed: " + str(result))

rq.task_done()

# sleep(5), for example would make the serialization of this

# code in the main thread much more obvious

# A producer thread with both parallel and serial code requirements

def counter(q) :

x = 1

rq = Queue.Queue()

while True :

# some parallel code here

co = serial_function()

co.next() # prime the generator-based coroutine

q.put((co, x, rq))

# Get the result of the computation from the main thread

result = rq.get()

print str(current_thread()) + " result: " + result + " from " + str(x)

sleep(2)

x += 1

# maybe some more parallel code here, if you like

# A consumer which serializes a specific function requested by the producer

def consumer() :

try :

while True :

while not q.empty() :

(co, x, rq) = q.get()

try :

# Send the input from the thread to the coroutine

# The coroutine's function will execuite in the

# context of the main thread (or other thread if

# you wish.

co.send(("sending: " + str(x), x, rq))

except StopIteration :

continue

q.task_done()

except KeyboardInterrupt, e :

exit(1)

q = Queue.Queue()

num_threads = 2

for idx in range(0, num_threads) :

a = Thread(target=counter, args=[q])

a.daemon = True

a.start()

consumer()