Source code for metawards._outputfiles


from pathlib import Path as _Path

__all__ = ["OutputFiles"]


def _get_bool(arg):
    """Simple function to make sure that flags that are supposed
       to be true/false are actually stored as bools
    """
    if arg:
        return True
    else:
        return False


def _is_empty(outdir):
    """Simple function that checks whether or not the passed directory
       is empty
    """
    import os

    if os.path.isfile(outdir):
        # this is a file, so it definitely is not 'empty'
        return False
    elif os.path.isdir(outdir):
        # the directory is empty if it contains no hidden files
        # or other directories - sorry to wipe out dotfiles...!
        return len(os.listdir(outdir)) == 0
    else:
        return True


def _rmdir(directory):
    """Function modified from one copied from 'mitch' on stackoverflow
       https://stackoverflow.com/questions/13118029/deleting-folders-in-python-recursively
    """
    directory = _Path(directory)

    # first, check for removing important directories such as $HOME or root
    if directory == _Path.home():
        raise FileExistsError(f"We WILL NOT remove your "
                              f"home directory ${directory}")

    if directory == _Path("/"):
        raise FileExistsError(f"We WILL NOT remove the root directory "
                              f"{directory}")

    # get the directory containing '$HOME'
    if directory == _Path.home().parent:
        raise FileExistsError(f"We WILL NOT remove the users/home "
                              f"directory {directory}")

    if not directory.is_dir():
        directory.unlink()
        return

    from .utils._console import Console

    for item in directory.iterdir():
        if item.is_dir():
            _rmdir(item)
        else:
            item.unlink()

    Console.print(f"removing directory {directory}", style="warning")
    directory.rmdir()


def _check_remove(outdir, prompt):
    """Function to check if the user wants to remove the directory,
       giving them the option to continue, quit or remove all files
    """
    if prompt is None:
        raise FileExistsError(f"Cannot continue as {outdir} already exists!")

    from .utils._console import Console
    Console.warning(f"{outdir} already exists.")
    y = prompt("Do you want to remove it? (y/n) ")

    y = y.strip().lower()

    if len(y) > 0 and y == "y":
        Console.print(f"Removing all files in {outdir}", style="warning")
        _rmdir(_Path(outdir))
        return

    Console.warning(f"Continuing with this run will mix its output with "
                    f"the files already in {outdir}.")

    y = prompt("Do you want to continue with this run? (y/n) ")

    y = y.strip().lower()

    if len(y) == 0 or y != "y":
        from .utils._console import Console
        Console.error(f"Exiting the program as we cannot run any more.")
        import sys
        sys.exit(-1)


def _force_remove(outdir, prompt):
    """Function to force the removal of a directory, using the
       passed prompt to double-check with the user. If 'prompt'
       is None, then we go ahead
    """
    import os
    if not os.path.exists(outdir):
        return

    from .utils._console import Console

    if prompt:
        Console.warning(f"{outdir} already exists")

        y = prompt("Do you want to remove it? (y/n) ")
        y = y.strip().lower()

        if len(y) == 0 or y != "y":
            raise FileExistsError(
                f"Cannot continue as {outdir} already exists")

    Console.print(f"Removing all files in {outdir}", style="red")
    _rmdir(_Path(outdir))


def _expand(path):
    """Expand all variables and user indicators in the passed path"""
    import os
    return os.path.expanduser(os.path.expandvars(path))


def _bz2compress(filename, bz2filename):
    """bz2 compress 'filename' to write 'bz2filename'"""
    if filename == bz2filename:
        raise IOError(f"Cannot be equal {filename} vs {bz2filename}")

    import bz2 as _bz2

    compressor = _bz2.BZ2Compressor()
    BLOCK_SIZE = 2048

    with open(bz2filename, "wb") as BZ2FILE:
        with open(filename, "rb") as FILE:
            while True:
                block = FILE.read(BLOCK_SIZE)

                if not block:
                    remaining = compressor.flush()
                    BZ2FILE.write(remaining)
                    return

                compressed = compressor.compress(block)
                BZ2FILE.write(compressed)


[docs]class OutputFiles: """This is a class that manages all of the output files that are written to during a model outbreak. This object is used to hold the 'FILE' objects for open files, and will ensure that these files are closed and written to disk as needed. It will also ensure that files are written to the correct output directory, and that they are only opened when they are needed (e.g. only the first call to open the file will actually open it - subsequent calls will return the already-open file handler) Examples -------- >>> output = OutputFiles(output_dir="output", check_empty=True) >>> FILE = output.open("output.txt") >>> FILE.write("some output\\n") >>> FILE = output.open("something.csv.bz2", auto_bzip=True) >>> FILE.write("something,else,is,here\\n") >>> output.flush() >>> FILE = output.open("output.txt") >>> FILE.write("some more output\\n") >>> output.close() Note that you can also use OutputFiles in a contexthandler, to ensure that all output files are automatically closed, e.g. >>> with OutputFiles(output_dir="output") as output: >>> FILE = output.open("output.txt") >>> FILE.write("something\\n") """
[docs] def __init__(self, output_dir: str = "output", check_empty: bool = True, force_empty: bool = False, prompt=input, auto_bzip: bool = False): """Construct a set of OutputFiles. These will all be written to 'output_dir'. Parameters ---------- output_dir: str The directory in which to create all of the output files. This directory will be created automatically if it doesn't exist check_empty: bool Whether or not to check if the directory is empty before continuing. If the directory is not empty, then the user will be prompted to make a decision to either keep going, choose a different directory, remove existing output or exit force_empty: bool Force the output directory to be empty. BE CAREFUL as this will remove all files in that directory! There are checks to stop you doing something silly, but these are not fool-proof. The user will be prompted to confirm that the files should be removed prompt: This is the function that should be called to prompt the user for input, e.g. to confirm whether or not files should be deleted. This defaults to `input`. Set this to None if you *really* want MetaWards to remove files silently (e.g. useful if you are running batch jobs on a cluster and you really know what you are doing) auto_bzip: bool The default flag for `auto_bzip` when opening files. If this is true then all files will be automatically bzipped (compressed) as they are written, unless the code opening the file has explicitly asked otherwise """ self._check_empty = _get_bool(check_empty) self._force_empty = _get_bool(force_empty) self._auto_bzip = _get_bool(auto_bzip) self._prompt = prompt self._output_dir = output_dir self._is_open = False self._open_files = {} self._filenames = {} self._is_database = {} self._open_dir()
def __enter__(self): self._open_dir() return self def __exit__(self, exc_type, exc_value, exc_traceback): self._close_dir() return False def _open_dir(self): """Internal function used to open the directory in which all output files will be placed """ if self._is_open or self._output_dir is None: return import os if self._output_dir is None: raise ValueError("You cannot open an empty OutputFiles!") outdir = _expand(self._output_dir) mask = None if os.path.exists(outdir): outdir = os.path.abspath(outdir) if self._check_empty: if not _is_empty(outdir): if self._force_empty: _force_remove(outdir, self._prompt) else: _check_remove(outdir, self._prompt) # remake the directory after it has been removed try: import sys # Make sure write bit is set on Windows. if sys.platform == "win32": # Safely preserve the permissions of the # current process import stat mask = os.umask(0) os.makedirs(outdir, stat.S_IWRITE) os.umask(mask) mask = None else: os.makedirs(outdir) except FileExistsError: pass finally: if mask is not None: os.umask(mask) mask = None if not os.path.isdir(outdir): from .utils._console import Console Console.error( f"Cannot open {outdir} as it is not a directory!") raise FileExistsError(f"{outdir} is an existing file!") try: import sys # Make sure write bit is set on Windows. if sys.platform == "win32": # Safely preserve the permissions of the current process import stat mask = os.umask(0) os.makedirs(outdir, stat.S_IWRITE) os.umask(mask) mask = None else: os.makedirs(outdir) except FileExistsError as e: # this is no problem, as we have already validated # that the directory already existing is ok if not os.path.isdir(outdir): # but it is a problem if it is not a directory... raise e finally: if mask is not None: os.umask(mask) mask = None self._output_dir = str(_Path(outdir).absolute().resolve()) self._is_open = True def _close_dir(self): """Internal function used to close all of the output files""" if not self._is_open: return errors = [] for filename, handle in self._open_files.items(): try: if self._is_database.get(filename, False): handle.commit() handle.close() if self._filenames[filename].endswith("bz2"): # we need to manually compress this file _bz2compress(filename, self._filenames[filename]) import os as _os _os.remove(filename) else: handle.close() except Exception as e: errors.append(f"Could not close {filename}: " f"{e.__class__} {e}") self._is_open = False self._open_files = {} self._filenames = {} self._is_database = {}
[docs] def is_database(self, filename): """Return whether or not 'filename' is an open database""" return self._is_database.get(filename, False)
[docs] @staticmethod def remove(path, prompt=input): """Remove the passed filename or directory Parameters ---------- path: str The path to the file or directory or remove prompt: Prompt to use to ask the user - if None then no checks! """ path = _Path(_expand(path)).absolute().resolve() _force_remove(path, prompt)
[docs] def is_open(self): """Return whether or not the output files are open""" return self._is_open
[docs] def is_closed(self): """Return whether or not the output files are closed""" return not self.is_open()
[docs] def output_dir(self): """Return the absolute path of the directory to which the output files will be written """ return self._output_dir
[docs] def open_db(self, filename: str, auto_bzip=None, initialise=None): """Open up a SQLite3 database connection to a file called 'filename' in the output directory, returning the SQLite3 connection to the database. Note that this will open the database one, and will return the already-made connection on all subsequence calls. Parameters ---------- filename: str The name of the file containing the database to open. This must be relative to the output directory, and within that directory. It is an error to try to open a database that is not contained in this directory. auto_bzip: bool Whether or not to automatically compress the file using bzip2 when it is closed. The filename will automatically have '.bz2' appended so that this is clear. If 'None' is passed (the default) then the value of 'auto_bzip' that was passed to the constructor of this OutputFiles will be used. Note that this flag is ignored if the database is already open initialise: function A function that is called to initialise the database the first time that it is opened. The function is called with the argument "CONN" (representing the sqlite3 database connection). Use this to create the tables that you need """ import os self._open_dir() outdir = self._output_dir p = _Path(_expand(filename)) if not p.is_absolute(): p = _Path(os.path.join(outdir, filename)) filename = str(p.absolute().resolve()) prefix = os.path.commonprefix([outdir, filename]) if prefix != outdir: raise ValueError(f"You cannot try to open {filename} as " f"this is not in the output directory " f"{outdir} - common prefix is {prefix}") if filename in self._open_files: if self._is_database.get(filename, False): return self._open_files[filename] else: raise IOError(f"{filename} is a file, not a database!") if auto_bzip is None: auto_bzip = self._auto_bzip auto_bzip = _get_bool(auto_bzip) if auto_bzip is None: auto_bzip = self._auto_bzip auto_bzip = _get_bool(auto_bzip) import sqlite3 as _sqlite3 CONN = _sqlite3.connect(filename) if initialise is not None: initialise(CONN) self._open_files[filename] = CONN self._is_database[filename] = True if auto_bzip: if not filename.endswith(".bz2"): suffix = ".bz2" else: suffix = "" self._filenames[filename] = f"{filename}{suffix}" else: self._filenames[filename] = filename return CONN
[docs] def open(self, filename: str, auto_bzip=None, mode="t", headers=None, sep=" "): """Open the file called 'filename' in the output directory, returning a handle to that file. Note that this will open the file once, and will return the already-open file handle on all subsequent calls. Parameters ---------- filename: str The name of the file to open. This must be relative to the output directory, and within that directory. It is an error to try to open a file that is not contained within this directory. auto_bzip: bool Whether or not to open the file in auto-bzip (compression) mode. If this is True then the file will be automatically compressed as it is written. The filename will have '.bz2' automatically appended so that this is clear. If this is False then the file will be written uncompressed. If 'None' is passed (the default) then the value of `auto_bzip` that was passed to the constructor of this OutputFiles will be used. Note that this flag is ignored if the file is already open. mode: str The mode of opening the file, e.g. 't' for text mode, and 'b' for binary mode. The default is text mode headers: list[str] or plain str or function The headers to add to the top of the file, e.g. if it will contain column data. This will be written to the first line when the file is opened. If a list is passed, then this will be written joined together using 'sep'. If a plain string is passed then this will be written. If this is a function then this function will be called with "FILE" as the argument. If nothing is passed then no headers will be written. sep: str The separator used for the headers (e.g. " " or "," are good choices). By default things are space-separated Returns ------- file The handle to the open file """ import os self._open_dir() outdir = self._output_dir p = _Path(_expand(filename)) if not p.is_absolute(): p = _Path(os.path.join(outdir, filename)) filename = str(p.absolute().resolve()) prefix = os.path.commonprefix([outdir, filename]) if prefix != outdir: raise ValueError(f"You cannot try to open {filename} as " f"this is not in the output directory " f"{outdir} - common prefix is {prefix}") if filename in self._open_files: if self._is_database.get(filename, False): raise IOError(f"{filename} is a database, not a file!") else: return self._open_files[filename] if auto_bzip is None: auto_bzip = self._auto_bzip auto_bzip = _get_bool(auto_bzip) if mode is None: mode = "w" elif mode.find("w") == -1: mode = f"w{mode}" if mode.find("b") == -1: # text file = encoding should be "UTF-8" encoding = "UTF-8" else: encoding = None if auto_bzip: import bz2 if not filename.endswith(".bz2"): suffix = ".bz2" else: suffix = "" if encoding: FILE = bz2.open(f"{filename}{suffix}", mode=mode, encoding=encoding) else: FILE = bz2.open(f"{filename}{suffix}", mode=mode) self._open_files[filename] = FILE self._filenames[filename] = f"{filename}{suffix}" else: if encoding: FILE = open(filename, mode=mode, encoding=encoding) else: FILE = open(filename, mode=mode) self._open_files[filename] = FILE self._filenames[filename] = filename if headers is not None: if isinstance(headers, str): FILE.write(headers) FILE.write("\n") elif hasattr(headers, "__call__"): headers(FILE) else: FILE.write(sep.join([str(x) for x in headers])) FILE.write("\n") return FILE
[docs] def open_subdir(self, dirname): """Create and open a sub-directory in this OutputFiles called 'dirname'. This will inherit all properties, e.g. check_empty, auto_bzip etc from this OutputFiles Parameters ---------- dirname: str The name of the subdirectory to open Returns ------- subdir: OutputFiles The open subdirectory """ import os self._open_dir() outdir = self._output_dir p = _Path(_expand(dirname)) if not p.is_absolute(): p = _Path(os.path.join(outdir, dirname)) subdir = str(p.absolute().resolve()) prefix = os.path.commonprefix([outdir, subdir]) if prefix != outdir: raise ValueError(f"You cannot try to open {dirname} as " f"this is not in the output directory " f"{outdir} - common prefix is {prefix}") return OutputFiles(output_dir=subdir, check_empty=self._check_empty, force_empty=self._force_empty, prompt=self._prompt, auto_bzip=self._auto_bzip)
[docs] def auto_bzip(self): """Return whether the default is to automatically bzip2 files""" return self._auto_bzip
[docs] def get_path(self): """Return the full expanded path to this directory""" return self._output_dir
[docs] def get_filename(self, filename): """Return the full expanded filename for 'filename'""" import os self._open_dir() outdir = self._output_dir p = _Path(_expand(filename)) if not p.is_absolute(): p = _Path(os.path.join(outdir, filename)) filename = str(p.absolute().resolve()) if filename in self._filenames: return self._filenames[filename] else: raise FileNotFoundError(f"No open file {filename}")
[docs] def close(self): """Close all of the files and this directory""" self._close_dir()
[docs] def flush(self): """Flush the contents of all files to disk""" for filename, handle in self._open_files.items(): try: handle.flush() except Exception: pass