Archive for the ‘Code’ Category
Batch Processing With Python
System Administration problems often require a programatic approach; traversing the line between admin and developer. It’s more accurate refer to myself as an IT Technologist or System Infrastructure Engineer than a System Administrator at times. As the systems we work on become increasingly complex in both scale and technology the methods used to manage them change accordingly.
Recently when faced with the task of processing over 500 million files (nearly 5TB) as a batch job I used the opportunity to exercise my developer side. Bash was by no means cut out to the task (have fun forking!) Having used Python extensively in the past to automate other system functions and most recently to interface with the excellent func libraries (which I highly recommend you explore, more on that later.) After spending a couple hours writing my own thread pool class for Python I came across an _excellent_ example from an ActiveState contributer and immediately ditched mine.
The recipe from ActiveState provides a a great foundation for most threaded Python applications that does batch type processing. By creating a thread pool with callback functions one can easily insert tasks into a queue. For my particular task I had a deep nested tree of directories and files that needed to be queued, processed and their output written. The most basic example follows:
#!/usr/bin/env python
"""
threadpool.py: ThreadPool Example.
-i, --input Input Directory
-o, --output Output Directory
-t, --threads Number of Threads
-h, --help Help
"""
import sys
import getopt
import os
import threading
import subprocess
import glob
from time import sleep
def usage():
print __doc__
# ThreadPool recipe from ActiveState: http://code.activestate.com/recipes/203871/
# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
True
except NameError:
False = 0
True = not False
class ThreadPool:
"""Flexible thread pool class. Creates a pool of threads, then
accepts tasks that will be dispatched to the next available
thread."""
def __init__(self, numThreads):
"""Initialize the thread pool with numThreads workers."""
self.__threads = []
self.__resizeLock = threading.Condition(threading.Lock())
self.__taskLock = threading.Condition(threading.Lock())
self.__tasks = []
self.__isJoining = False
self.setThreadCount(numThreads)
def setThreadCount(self, newNumThreads):
""" External method to set the current pool size. Acquires
the resizing lock, then calls the internal version to do real
work."""
# Can't change the thread count if we're shutting down the pool!
if self.__isJoining:
return False
self.__resizeLock.acquire()
try:
self.__setThreadCountNolock(newNumThreads)
finally:
self.__resizeLock.release()
return True
def __setThreadCountNolock(self, newNumThreads):
"""Set the current pool size, spawning or terminating threads
if necessary. Internal use only; assumes the resizing lock is
held."""
# If we need to grow the pool, do so
while newNumThreads > len(self.__threads):
newThread = ThreadPoolThread(self)
self.__threads.append(newThread)
newThread.start()
# If we need to shrink the pool, do so
while newNumThreads < len(self.__threads):
self.__threads[0].goAway()
del self.__threads[0]
def getThreadCount(self):
"""Return the number of threads in the pool."""
self.__resizeLock.acquire()
try:
return len(self.__threads)
finally:
self.__resizeLock.release()
def queueTask(self, task, args=None, taskCallback=None):
"""Insert a task into the queue. task must be callable;
args and taskCallback can be None."""
if self.__isJoining == True:
return False
if not callable(task):
return False
self.__taskLock.acquire()
try:
self.__tasks.append((task, args, taskCallback))
return True
finally:
self.__taskLock.release()
def getNextTask(self):
""" Retrieve the next task from the task queue. For use
only by ThreadPoolThread objects contained in the pool."""
self.__taskLock.acquire()
try:
if self.__tasks == []:
return (None, None, None)
else:
return self.__tasks.pop(0)
finally:
self.__taskLock.release()
def joinAll(self, waitForTasks = True, waitForThreads = True):
""" Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish."""
# Mark the pool as joining to prevent any more task queueing
self.__isJoining = True
# Wait for tasks to finish
if waitForTasks:
while self.__tasks != []:
sleep(.1)
# Tell all the threads to quit
self.__resizeLock.acquire()
try:
self.__setThreadCountNolock(0)
self.__isJoining = True
# Wait until all threads have exited
if waitForThreads:
for t in self.__threads:
t.join()
del t
# Reset the pool for potential reuse
self.__isJoining = False
finally:
self.__resizeLock.release()
class ThreadPoolThread(threading.Thread):
""" Pooled thread class. """
threadSleepTime = 0.1
def __init__(self, pool):
""" Initialize the thread and remember the pool. """
threading.Thread.__init__(self)
self.__pool = pool
self.__isDying = False
def run(self):
""" Until told to quit, retrieve the next task and execute
it, calling the callback if any. """
while self.__isDying == False:
cmd, args, callback = self.__pool.getNextTask()
# If there's nothing to do, just sleep a bit
if cmd is None:
sleep(ThreadPoolThread.threadSleepTime)
elif callback is None:
cmd(args)
else:
callback(cmd(args))
def goAway(self):
""" Exit the run loop next time through."""
self.__isDying = True
def myTask(data):
file = data[0]
inputdir = data[1]
outputdir = data[2]
print "Procesing", file
# Do Stuff.
return data
def myTaskCallback(data):
file = data[0]
# Do Stuff. Check output status, cleanup, etc.
print "Finished", file
def main(argv):
try:
opts, args = getopt.getopt(argv, "hi:o:t:",["help","input","output","threads"])
except getopt.GetoptError, err:
print str(err)
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit()
elif opt in ("-i", "--input"):
inputdir = arg
elif opt in ("-o", "--output"):
outputdir = arg
elif opt in ("-t", "--threads"):
threads = arg
if len(opts) < 3:
usage()
sys.exit(2)
# Initialize the threadpool with the number of requested threads.
pool = ThreadPool(int(threads))
print "Starting..."
print
for file in os.listdir (inputdir):
print "Queueing", file
# Insert the tasks into the queue.
pool.queueTask(myTask, (file, inputdir, outputdir) , myTaskCallback)
# When all tasks are finished, allow the threads to terminate
pool.joinAll()
if __name__ == "__main__":
main(sys.argv[1:])
While this example doesn’t actually do anything it should provide a workable framework for your own tools. I’ve removed my particular code for privacy reasons for my client and replaced it with the skeleton you see here (I certainly don’t recommend queueing millions of tasks in a single process. While entirely possible you’ll have better performance by distributing that task.) You can quickly see how portable and extensible this type of framework is.
Till next time.
