mpiutils package¶
At its core, the package implements a simple load-balancing task scheduler where the dispatcher distributes tasks to the workers as they become available. This is wrapped in the command line interface mpimap and the map(), and farm() functions in the mpiutils.dispatcher module. The dispatcher runs on the first MPI process, so if you request n processes, you get n-1 workers.
Submodules¶
mpimap command¶
Usage: mpimap [OPTIONS] [BASH_COMMAND]...
Options:
-i, --input PATH Input directory or tarball [required]
-o, --output PATH Output directory or tarball [required]
-ip, --input_pattern TEXT Input filename pattern
-op, --output_pattern TEXT Output filename pattern
-l, --log_path PATH Log file location
-L, --log_level [DEBUG|INFO|WARNING|ERROR|CRITICAL]
Log level
--replace / --no_replace Replace existing output files
--help Show this message and exit.
mpimap maps files from the input path to files in the output path. It will replace {} in the input_pattern or {i} in the BASH_COMMAND with {} in the output_pattern or {o} in the BASH_COMMAND. So id.fasta maps to id.json if the input_pattern is {}.fasta and output_pattern is {}.json.
By default, the log file is left in the output directory or tarball.
The command can also operate on tarballs (gzipped or not) instead of directories. In this case, tar must be in your path and the tarball must contain a single tarred directory. In this instance, mpimap works by untarring the tarball, executing the commands on the files inside, then tarring up the result. If the input is gzipped, the output will be gzipped. (Un)tarring and (un)gzipping occur on a single thread.
The command works by having each worker call the appropriate bash command, so all workers must have access to the necessary file paths.
| Examples: |
|---|
To create a directory full of text files from a directory full of text files, with every instance of the string “Tony” in the files replaced with the string “Malcolm”:
mpimap -i /path/to/input/dir -o /path/to/output/dir sed 's/Tony/Malcolm/g'
To unzip a directory of gzipped files into a different directory using 2048 cores you could run:
mpirun -n 2048 mpimap -i /path/to/input/dir -o /path/to/output/dir gzip -dc {i}.gz
To create a tarball full of *.rst files from a tarball full of *.md files using 4 cores:
mpirun -n 4 mpimap -i /path/to/input.tar.gz -o /path/to/output.tar.gz cp {i}.rst {o}.md
Note that the above command only works if your reStructedText files are also valid Markdown files. A more realistic (but less portable) example would use, for instance, pandoc.
mpiutils.dispatcher module¶
-
mpiutils.dispatcher.farm(function, *args)¶ Apply the function to the elements of args, as if they were zipped together.
Parameters: - function (iterable) – function that takes as many parameters as there are args
- args – iterables
Returns: the function results, in random order
Return type: iterable
Distributes the function calls over the workers and makes the results available to the dispatcher process as soon as the workers return.
-
mpiutils.dispatcher.map(function, *args)¶ - Like
map(), but work gets distributed over the workers.Example: To calculate
in parallel in (the slow way):>>> from random import random >>> from mpiutils.dispatcher import map, am_dispatcher >>> >>> def am_in_circle(i): >>> x = 2*random() - 1 >>> y = 2*random() - 1 >>> return x*x + y*y < 1 >>> >>> num_points = 100000 >>> num_in_circle = 0 >>> for in_circle in map(am_in_circle, xrange(num_points)): >>> num_in_circle += in_circle >>> >>> if am_dispatcher(): >>> print 4.*num_in_circle/num_points
-
mpiutils.dispatcher.exit(errorcode=0)¶ Calls MPI Abort. Does a better job than
sys.exit()of killing all processes.
-
mpiutils.dispatcher.am_dispatcher()¶ True if this process is the dispatcher, False if it is a worker
-
mpiutils.dispatcher.barrier()¶ All processes must call this method before any will return
-
mpiutils.dispatcher.broadcast(obj)¶ Synchronises the workers’ copies of obj to the dispatcher’s obj
Parameters: obj (picklable object) – object to be broadcast Example: >>> obj = RandomPicklableObject() # all the processes have different objs >>> obj = broadcast(obj) # all the processes have a copy of the same obj
-
mpiutils.dispatcher.checkmakedirs(dirname)¶ Creates the input path if it doesn’t exist.
Parameters: dirname (str) – path to be created Checks whether the path exists then creates it if it doesn’t. Must be called by all MPI processes or it will block. Not safe from race conditions if an external influence creates the directory in between the check and the creation.
-
mpiutils.dispatcher.rank()¶ The MPI rank of this process
-
mpiutils.dispatcher.size()¶ The number of MPI processes
mpiutils.mpi_logging module¶
-
class
mpiutils.mpi_logging.MPIFileHandler(filename, filemode='a')¶ An MPI-safe
logging.StreamHandler.Example: >>> handler = MPIFileHandler(log_file) >>> handler.setLevel(logging.INFO) >>> hostpid = socket.gethostname()+':'+str(os.getpid())+':' >>> formatter = logging.Formatter('%(asctime)s:'+hostpid+ >>> '%(levelname)s:%(message)s') >>> handler.setFormatter(formatter) >>> logging.root.addHandler(handler) >>> logging.root.setLevel(log_level)
-
close()¶
-
emit(record)¶ Emit a record.
If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.
-