Module III - 2. Multiprocessing Module of Pythonx
Download
Report
Transcript Module III - 2. Multiprocessing Module of Pythonx
MULTIPROCESSING MODULE
OF PYTHON
CPYTHON
CPython is the default, most-widely used
implementation of the Python programming language.
CPython - single-threaded
Cannot coordinate processes across compute nodes
Threads can only be coordinated on a single node
INTRO TO MULTIPROCESSING
Multiprocessing enables to spawn multiple processes,
allowing programmer to fully leverage the computing
power of multiple processors.
Solution to inter-node communication barrier
Thread-like interface to multiple processes
Ability to communicate between nodes enables efficient
use of multiple cores
Slower than thread communication
Thread is a thread of execution in a program.
Thread share the memory and state of the parent.
Process is an instance of a computer program that is
being executed.
Processes share nothing but use inter-process
communication mechanisms to communicate.
EXAMPLE
Process
Process
Job
Process
Process
PROCESS
Process class is used to spawn processes by creating
Process objects
Call start() function to start a new process
Call join() function to properly terminate the process.
def printName(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=printName,
args=('foo', ))
p.start()
p.join()
COMMUNICATION AMONG PROCESSES
Pipe: send() recv()
Queue
Living in shared memory among processes
Semaphore and condition variables; like a gate to
resource: >0 open, 0 closed
PIPE
Python multiprocessing library provides two classes for
exchanging information between processes: Pipe and
Queue.
Pipe is used between two processes and it returns a
pair of connection objects which by default is duplex.
def writer(conn):
conn.send([5, 'foo', None])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=writer, args=(child_conn, ))
p.start()
print parent_conn.recv()
# will print [5, 'foo',
None]
p.join()
QUEUE
Queue is built on top of Pipe.
Used when we need to exchange objects between 2+
processes.
Use put() method to put the data.
Use get() method to get the data.
Example:
A reader and writer sharing a single queue
The writer sends a series of integers to the reader.
When the writer runs out of numbers, it sends 'DONE’.
The reader then knows to break out of the read loop.
from multiprocessing import Process, Queue
import time
def reader(queue):
## Read from the queue
while True:
msg = queue.get() #Read from the queue & do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in xrange(0, count):
queue.put(ii) #Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue()
# reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() #Launch reader as a process
_start = time.time()
writer(count, queue) #Send a lot to reader
reader_p.join()
# Wait for the reader to finish
print "Sending %s numbers to Queue() took %s
seconds" % (count, (time.time() - _start))
SYNCHRONIZATION PRIMITIVE (LOCK)
The synchronization primitives are used when we need
share states between processes.
import multiprocessing
import sys
global x
def do_this(lock):
global x
lock.acquire()
try:
while(x < 300):
x += 1
print x
finally:
lock.release()
def do_after(lock):
lock.acquire()
x = 450
try:
while(x < 600):
x += 1
print x
finally:
lock.release()
if __name__ == '__main__':
x = 0
lock = multiprocessing.Lock()
w = multiprocessing.Process(target=do_this,
args=(lock, ))
nw = multiprocessing.Process(target=do_after,
args=(lock, ))
w.start()
nw.start()
w.join()
nw.join()
'''
Outputs:
300
600
'''
SHARE STATE BETWEEN PROCESSES
Shared memory
Server process
A Manager object control a server process that holds
python objects and allow other process to
access/manipulate them.
The shared object gets updated in all processes when
anyone modifies it.
# Server
from multiprocessing import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=(‘', 50000))
s = m.get_server()
s.serve_forever()
# Client
from multiprocessing import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=('127.0.0.1', 50000))
m.connect
queue = m.get_queue()
queue.put('Hello World')
POOL
Python multiprocessing library provides a Pool class to
implement multiprocessing for simple concurrent
program.
Distribute the work among workers.
Collect the return value as a list.
Important methods:
apply/apply_async
map/map_async.
Takes care of managing queue, processes, shared
date/stats.
Makes it easy to implement quick/simple concurrent
Python programs.
from multiprocessing import Pool
def cube(x):
return x**3
if __name__ == '__main__':
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x \
in range(1,101)]
print(results)
Outputs:
[1, 8, 27, 64, …, 941192, 970299, 1000000]
REFERENCES
Python 2.6 documentation,
http://docs.python.org/library/multiprocessing.html
PyMOTW by Doug Hellmann,
http://www.doughellmann.com/PyMOTW/multiprocessing/