sysadmin HQ

bridging the gap

Archive for the ‘Code’ Category

Batch Processing With Python

with 2 comments

System Administration problems often require a programatic approach; traversing the line between admin and developer. It’s more accurate refer to myself as an IT Technologist or System Infrastructure Engineer than a System Administrator at times. As the systems we work on become increasingly complex in both scale and technology the methods used to manage them change accordingly. 

Recently when faced with the task of processing over 500 million files (nearly 5TB) as a batch job I used the opportunity to exercise my developer side. Bash was by no means cut out to the task (have fun forking!) Having used Python extensively in the past to automate other system functions and most recently to interface with the excellent func libraries (which I highly recommend you explore, more on that later.) After spending a couple hours writing my own thread pool class for Python I came across an _excellent_ example from an  ActiveState contributer and immediately ditched mine.

The recipe from ActiveState provides a a great foundation for most threaded Python applications that does batch type processing. By creating a thread pool with callback functions one can easily insert tasks into a queue.  For my particular task I had a deep nested tree of directories and files that needed to be queued, processed and their output written. The most basic example follows:

#!/usr/bin/env python

"""
threadpool.py: ThreadPool Example.
	-i, --input	Input Directory
	-o, --output	Output Directory
	-t, --threads	Number of Threads
	-h, --help	Help
"""

import sys
import getopt
import os
import threading
import subprocess
import glob
from time import sleep

def usage():
	print __doc__

# ThreadPool recipe from ActiveState: http://code.activestate.com/recipes/203871/
# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
    True
except NameError:
    False = 0
    True = not False

class ThreadPool:

    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""

    def __init__(self, numThreads):

        """Initialize the thread pool with numThreads workers."""

        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)

    def setThreadCount(self, newNumThreads):

        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""

        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False

        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True

    def __setThreadCountNolock(self, newNumThreads):

        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""

        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]

    def getThreadCount(self):

        """Return the number of threads in the pool."""

        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()

    def queueTask(self, task, args=None, taskCallback=None):

        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""

        if self.__isJoining == True:
            return False
        if not callable(task):
            return False

        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()

    def getNextTask(self):

        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""

        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()

    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""

        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()

class ThreadPoolThread(threading.Thread):

    """ Pooled thread class. """

    threadSleepTime = 0.1

    def __init__(self, pool):

        """ Initialize the thread and remember the pool. """

        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False

    def run(self):

        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """

        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))

    def goAway(self):

        """ Exit the run loop next time through."""

        self.__isDying = True

def myTask(data):
	file = data[0]
	inputdir = data[1]
	outputdir = data[2]

	print "Procesing", file
	# Do Stuff.
	return data

def myTaskCallback(data):
	file = data[0]
        # Do Stuff. Check output status, cleanup, etc.
	print "Finished", file

def main(argv):
	try:
		opts, args = getopt.getopt(argv, "hi:o:t:",["help","input","output","threads"])
	except getopt.GetoptError, err:
		print str(err)
		sys.exit(2)
	for opt, arg in opts:
		if opt in ("-h", "--help"):
			usage()
			sys.exit()
		elif opt in ("-i", "--input"):
			inputdir = arg
		elif opt in ("-o", "--output"):
			outputdir = arg
		elif opt in ("-t", "--threads"):
			threads = arg

	if len(opts) < 3:
		usage()
		sys.exit(2)

	# Initialize the threadpool with the number of requested threads.
	pool = ThreadPool(int(threads))

	print "Starting..."
	print

	for file in os.listdir (inputdir):
		print "Queueing", file
		# Insert the tasks into the queue.
		pool.queueTask(myTask, (file, inputdir, outputdir) , myTaskCallback)

  # When all tasks are finished, allow the threads to terminate
	pool.joinAll()

if __name__ == "__main__":
	main(sys.argv[1:])

 

While this example doesn’t actually do anything it should provide a workable framework for your own tools. I’ve removed my particular code for privacy reasons for my client and replaced it with the skeleton you see here (I certainly don’t recommend queueing millions of tasks in a single process. While entirely possible you’ll have better performance by distributing that task.) You can quickly see how portable and extensible this type of framework is.

 

Till next time.

Written by Adam Serediuk

December 9, 2008 at 12:51 pm

Posted in Code, Operating Systems

Follow

Get every new post delivered to your Inbox.