Module III - 2. Multiprocessing Module of Pythonx

Download Report

Transcript Module III - 2. Multiprocessing Module of Pythonx

MULTIPROCESSING MODULE
OF PYTHON
CPYTHON
 CPython is the default, most-widely used
implementation of the Python programming language.
 CPython - single-threaded
 Cannot coordinate processes across compute nodes
 Threads can only be coordinated on a single node
INTRO TO MULTIPROCESSING
 Multiprocessing enables to spawn multiple processes,
allowing programmer to fully leverage the computing
power of multiple processors.
 Solution to inter-node communication barrier
 Thread-like interface to multiple processes
 Ability to communicate between nodes enables efficient
use of multiple cores
 Slower than thread communication
 Thread is a thread of execution in a program.
 Thread share the memory and state of the parent.
 Process is an instance of a computer program that is
being executed.
 Processes share nothing but use inter-process
communication mechanisms to communicate.
EXAMPLE
Process
Process
Job
Process
Process
PROCESS
 Process class is used to spawn processes by creating
Process objects
 Call start() function to start a new process
 Call join() function to properly terminate the process.
def printName(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=printName,
args=('foo', ))
p.start()
p.join()
COMMUNICATION AMONG PROCESSES
 Pipe: send() recv()
 Queue
 Living in shared memory among processes
 Semaphore and condition variables; like a gate to
resource: >0 open, 0 closed
PIPE
 Python multiprocessing library provides two classes for
exchanging information between processes: Pipe and
Queue.
 Pipe is used between two processes and it returns a
pair of connection objects which by default is duplex.
def writer(conn):
conn.send([5, 'foo', None])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=writer, args=(child_conn, ))
p.start()
print parent_conn.recv()
# will print [5, 'foo',
None]
p.join()
QUEUE
 Queue is built on top of Pipe.
 Used when we need to exchange objects between 2+
processes.
 Use put() method to put the data.
 Use get() method to get the data.
Example:
 A reader and writer sharing a single queue
 The writer sends a series of integers to the reader.
 When the writer runs out of numbers, it sends 'DONE’.
 The reader then knows to break out of the read loop.
from multiprocessing import Process, Queue
import time
def reader(queue):
## Read from the queue
while True:
msg = queue.get() #Read from the queue & do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in xrange(0, count):
queue.put(ii) #Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue()
# reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() #Launch reader as a process
_start = time.time()
writer(count, queue) #Send a lot to reader
reader_p.join()
# Wait for the reader to finish
print "Sending %s numbers to Queue() took %s
seconds" % (count, (time.time() - _start))
SYNCHRONIZATION PRIMITIVE (LOCK)
 The synchronization primitives are used when we need
share states between processes.
import multiprocessing
import sys
global x
def do_this(lock):
global x
lock.acquire()
try:
while(x < 300):
x += 1
print x
finally:
lock.release()
def do_after(lock):
lock.acquire()
x = 450
try:
while(x < 600):
x += 1
print x
finally:
lock.release()
if __name__ == '__main__':
x = 0
lock = multiprocessing.Lock()
w = multiprocessing.Process(target=do_this,
args=(lock, ))
nw = multiprocessing.Process(target=do_after,
args=(lock, ))
w.start()
nw.start()
w.join()
nw.join()
'''
Outputs:
300
600
'''
SHARE STATE BETWEEN PROCESSES
 Shared memory
 Server process
 A Manager object control a server process that holds
python objects and allow other process to
access/manipulate them.
 The shared object gets updated in all processes when
anyone modifies it.
# Server
from multiprocessing import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=(‘', 50000))
s = m.get_server()
s.serve_forever()
# Client
from multiprocessing import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=('127.0.0.1', 50000))
m.connect
queue = m.get_queue()
queue.put('Hello World')
POOL
 Python multiprocessing library provides a Pool class to
implement multiprocessing for simple concurrent
program.
 Distribute the work among workers.
 Collect the return value as a list.
 Important methods:
 apply/apply_async
 map/map_async.
 Takes care of managing queue, processes, shared
date/stats.
 Makes it easy to implement quick/simple concurrent
Python programs.
from multiprocessing import Pool
def cube(x):
return x**3
if __name__ == '__main__':
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x \
in range(1,101)]
print(results)
Outputs:
[1, 8, 27, 64, …, 941192, 970299, 1000000]
REFERENCES
 Python 2.6 documentation,
http://docs.python.org/library/multiprocessing.html
 PyMOTW by Doug Hellmann,
http://www.doughellmann.com/PyMOTW/multiprocessing/