Source code for metawards.utils._parallel


from array import array
from sys import platform

__all__ = ["guess_num_threads_and_procs",
           "get_available_num_threads",
           "create_thread_generators"]


def guess_num_threads_and_procs(njobs: int,
                                nthreads: int = None,
                                nprocs: int = None,
                                ncores: int = None,
                                parallel_scheme: str = None):
    """Guess the number of processes and threads per process
       to use to make most-efficient processing of
       'njobs' jobs.

       Parameters
       ----------
       njobs: int
         The number of jobs (model runs) that must be performed
       nthreads: int
         The number of threads requested - if None then this this will be
         guessed
       nprocs: int
         The number of processes requested - if None then this will be
         guessed
       ncores: int
         The number of available cores on each node (or this computer).
         This will be obtained from the OS if it is not supplied
       parallel_scheme: str
         The parallelisation scheme that is being used to parallelise
         over processes

       Returns
       -------
       (nthreads, nprocs): Tuple[int, int]
         Tuple of the best-guessed 'nthreads' and 'nprocs' values to use
    """
    if nthreads is None:
        nthreads = 0

    if nprocs is None:
        nprocs = 0

    if njobs is None or njobs < 0:
        return (1, 1)

    from metawards.utils import is_openmp_supported

    if not is_openmp_supported():
        if nthreads > 1:
            from metawards.utils import Console
            Console.warning("Cannot use more than one thread as "
                            "this MetaWards executable has been "
                            "compiled without OpenMP support")

        nthreads = 1

    if parallel_scheme is None:
        parallel_scheme = "multiprocessing"

    if nprocs < 1 and parallel_scheme != "multiprocessing":
        # we have been told the number of processes to use by
        # the scoop or mpi4py scheduler
        from metawards.utils import get_number_of_processes
        nprocs = get_number_of_processes(parallel_scheme, nprocs)

    if ncores is None:
        ncores = get_available_num_threads()

    if nthreads < 1 and nprocs < 1:
        # we will need to calculate the best values of nprocs and
        # nthreads to use to divide the work most efficiently. The
        # goal should be to run as many model runs in parallel as
        # possible

        if njobs >= 0.8 * ncores:   # not worth going parallel for <80%
            nthreads = 1
            nprocs = min(ncores, njobs)
        else:
            import math
            nthreads = int(math.floor(ncores/njobs))
            if nthreads < 1:
                nthreads = 1
            nprocs = min(njobs, int(math.floor(ncores/nthreads)))
            if nprocs < 1:
                nprocs = 1
    elif nthreads < 1:
        import math
        nthreads = int(math.floor(ncores/nprocs))

        if nthreads < 1:
            nthreads = 1
    elif nprocs < 1:
        import math
        max_nprocs = int(math.floor(ncores/nthreads))
        if max_nprocs > njobs:
            nprocs = njobs
        else:
            nprocs = max_nprocs

    return (nthreads, nprocs)


[docs]def get_available_num_threads(): """Return the maximum number of threads that are recommended for this computer (the OMP_NUM_THREADS value) """ from ._check_openmp import is_openmp_supported if not is_openmp_supported(): return 1 import os # try OMP_NUM_THREADS as this is the accepted way to # override the number in a queueing system omp_num_threads = os.getenv("OMP_NUM_THREADS", None) if omp_num_threads is not None: try: return int(omp_num_threads) except Exception: pass # ok, get this from 'os' return os.cpu_count()
[docs]def create_thread_generators(rng, nthreads): """Return a set of random number generators, one for each thread - these are seeded using the next 'nthreads' random numbers drawn from the passed generator If 'nthreads' is 1, 0 or None, then then just returns the passed 'rng' """ rngs = [] if nthreads is None or nthreads <= 1: rngs.append(rng) else: from ._ran_binomial import seed_ran_binomial, ran_int from ._console import Console lines = [] for i in range(0, nthreads): seed = ran_int(rng) lines.append(f"* Random seed for thread {i} equals **{seed}**") rngs.append(seed_ran_binomial(seed)) Console.print("\n".join(lines), markdown=True) # need to return these as an array so that they are more easily # accessible from the OpenMP loops - rng is a unsigned 64-bit integer # as a uintptr_t - this best corresponds to unsigned long ("L") if platform == "win32": # Use unsigned long long ("Q") on Windows since it has a 32-bit long. return array("Q", rngs) else: return array("L", rngs)