from typing import Union as _Union
from typing import List as _List
from typing import IO as _IO
from datetime import datetime as _datetime
from contextlib import contextmanager as _contextmanager
__all__ = ["Console", "Table"]
# Global rich.Console()
_console = None
# Global console theme
_theme = None
class _NullProgress:
    """Null progress to use if user disables progress"""
    def __init__(self):
        pass
    def __enter__(self, *args, **kwargs):
        return self
    def __exit__(self, *args, **kwargs):
        return False
    def add_task(self, *args, **kwargs):
        return 0
    def update(self, *args, **kwargs):
        return
class _Progress:
    def __init__(self, show_limit: int = 50):
        from rich.progress import Progress
        self._progress = Progress(auto_refresh=False)
        self._last_update = _datetime.now()
        self._show_limit = show_limit
        self._completed = {}
    def __enter__(self, *args, **kwargs):
        return self
    def __exit__(self, *args, **kwargs):
        if len(self._completed) > 0:
            self._progress.refresh()
            Console.print("")
        return False
    def add_task(self, description: str, total: int):
        if total > self._show_limit:
            task_id = self._progress.add_task(description=description,
                                              total=total)
            self._completed[task_id] = 0
            return task_id
        else:
            return None
    def update(self, task_id: int, completed: float = None,
               advance: float = None, force_update: bool = False):
        if task_id is None:
            return
        elif completed is not None:
            self._completed[task_id] = completed
        elif advance is not None:
            self._completed[task_id] += advance
        else:
            return
        now = _datetime.now()
        if force_update or (now - self._last_update).total_seconds() > 0.1:
            self._progress.update(task_id=task_id,
                                  completed=self._completed[task_id])
            self._progress.refresh()
            self._last_update = now
class _NullSpinner:
    """Null spinner to use if yaspin is not available
       disables spinners"""
    def __init__(self):
        pass
    def __enter__(self, *args, **kwargs):
        return self
    def __exit__(self, *args, **kwargs):
        return False
    def success(self):
        pass
    def failure(self):
        pass
class Table:
    """This a rich.table, with an additional "to_string()" function"""
    def __init__(self, title=None, show_footer=False, show_edge=True):
        from rich.table import Table as _Table
        self._table = _Table(title=title, show_edge=show_edge,
                             show_footer=show_footer)
    def add_column(self, header, justify="center", style=None, no_wrap=False,
                   footer=None):
        """Add a column called 'header', with specified justification,
           style and wrapping
        """
        if footer is not None:
            footer = str(footer)
        self._table.add_column(header=header, style=style, justify=justify,
                               no_wrap=no_wrap, footer=footer)
    def add_row(self, row):
        """Add the passed row of data to the table"""
        row = [str(x) if x is not None else None for x in row]
        self._table.add_row(*row)
    def to_string(self):
        """Return this table rendered to a string"""
        from rich.console import Console as _Console
        console = _Console()
        output = ""
        for s in console.render(self._table):
            output += s.text
        return output
[docs]class Console:
    """This is a singleton class that provides access to printing
       and logging functions to the console. This uses 'rich'
       for rich console printing
    """
[docs]    @staticmethod
    def supports_emojis():
        """Return whether or not you can print emojis to this console"""
        import sys
        if sys.platform == "win32":
            return False
        else:
            return True 
[docs]    @staticmethod
    def set_debugging_enabled(enabled, level=None):
        """Switch on or off debugging output"""
        console = Console._get_console()
        console._debugging_enabled = bool(enabled)
        console._debugging_level = level 
[docs]    @staticmethod
    def set_theme(theme):
        """Set the theme used for the console - this should be
           one of the themes in metawards.themes
        """
        global _theme
        if isinstance(theme, str):
            if theme.lower().strip() == "simple":
                from metawards.themes import Simple
                _theme = Simple()
            elif theme.lower().strip() == "default":
                from metawards.themes import SpringFlowers
                _theme = SpringFlowers()
        else:
            _theme = theme 
    @staticmethod
    def _get_theme():
        global _theme
        if _theme is None:
            from metawards.themes import SpringFlowers
            _theme = SpringFlowers()
        return _theme
    @staticmethod
    def _get_console():
        global _console
        if _console is None:
            from rich.console import Console as _Console
            theme = Console._get_theme()
            _console = _Console(record=True,
                                highlight=theme.should_highlight(),
                                highlighter=theme.highlighter(),
                                markup=theme.should_markup(),
                                log_time=True, log_path=True,
                                emoji=Console.supports_emojis())
            _console._use_spinner = True
            _console._use_progress = True
            _console._debugging_enabled = False
            _console._debugging_level = None
            # also install pretty traceback support
            from rich.traceback import install as _install_rich
            _install_rich()
        return _console
[docs]    @staticmethod
    @_contextmanager
    def redirect_output(outdir: str, auto_bzip: bool = True):
        """Redirect all output and error to the directory 'outdir'"""
        import os as os
        import sys as sys
        import bz2
        from rich.console import Console as _Console
        outfile = os.path.join(outdir, "output.txt")
        if auto_bzip:
            outfile += ".bz2"
            OUTFILE = bz2.open(outfile, "wt", encoding="utf-8")
        else:
            OUTFILE = open(outfile, "wt", encoding="utf-8")
        console = Console._get_console()
        if console is None:
            raise AssertionError("The global console should never be None")
        new_out = _Console(file=OUTFILE, record=False, log_time=True,
                           log_path=True, emoji=Console.supports_emojis())
        new_out._use_spinner = False
        new_out._use_progress = False
        new_out._debugging_enabled = console._debugging_enabled
        new_out._debugging_level = console._debugging_level
        old_out = console
        global _console
        _console = new_out
        try:
            yield new_out
        finally:
            _console = old_out
            OUTFILE.close() 
[docs]    @staticmethod
    def debugging_enabled(level: int = None):
        """Return whether debug output is enabled (optionally for
           the specified level) - if not, then
           anything sent to 'debug' (for that level) is not printed
        """
        console = Console._get_console()
        if not console._debugging_enabled:
            return False
        elif console._debugging_level is not None:
            if level is None:
                return True
            else:
                return level <= console._debugging_level
        else:
            return level is None 
    @staticmethod
    def _retrieve_name(variable):
        # thanks to scohe001 on stackoverflow
        # https://stackoverflow.com/questions/18425225/getting-the-name-of-a-variable-as-a-string
        import inspect
        callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
        return [var_name for var_name, var_val in callers_local_vars
                if var_val is variable][-1]
[docs]    @staticmethod
    def debug(text: str, variables: _List[any] = None, level: int = None,
              markdown: bool = False, **kwargs):
        """Print a debug string to the console. This will only be
           printed if debugging is enabled. You can also print the
           values of variables by passing them as a list to
           'variables'
        """
        if not Console.debugging_enabled(level=level):
            return
        if hasattr(text, "__call__"):
            # the user passed in a lambda for delayed printing
            text = text()
        if not isinstance(text, str):
            text = str(text)
        if level is not None:
            text = f"Level {level}: {text}"
        console = Console._get_console()
        if markdown:
            from rich.markdown import Markdown as _Markdown
            try:
                text = _Markdown(text)
            except Exception:
                text = _Markdown(str(text))
        console.log(text, justify="center", _stack_offset=2, **kwargs)
        if variables is not None:
            from rich.table import Table
            from rich import box
            import inspect
            # get the local variables in the caller's scope
            callers_local_vars = inspect.currentframe().f_back.f_locals.items()
            table = Table(box=box.MINIMAL_DOUBLE_HEAD, style="on magenta")
            table.add_column("Name", justify="right", style="cyan",
                             no_wrap=True)
            table.add_column("Value", justify="left", style="green")
            for variable in variables:
                # get the name of the variable in the caller
                try:
                    # go for the last matching variable in the caller's scope
                    name = [var_name for var_name, var_val
                            in callers_local_vars
                            if var_val is variable][-1]
                except Exception:
                    name = "variable"
                table.add_row(name, str(variable))
            console.print(table) 
[docs]    @staticmethod
    def print(text: str, markdown: bool = False, style: str = None,
              *args, **kwargs):
        """Print to the console"""
        if markdown:
            from rich.markdown import Markdown as _Markdown
            try:
                text = _Markdown(text)
            except Exception:
                text = _Markdown(str(text))
        theme = Console._get_theme()
        style = theme.text(style)
        try:
            Console._get_console().print(text, style=style)
        except UnicodeEncodeError:
            # this output can't cope with a complex theme - switch
            # to theme 'simple'
            Console.set_theme("simple")
            if isinstance(text, str):
                # try to print this as latin-1
                try:
                    text = text.encode("latin-1",
                                       errors="replace").decode("latin-1")
                    Console._get_console().print(text)
                except Exception:
                    # accept that this isn't going to be printable
                    pass 
[docs]    @staticmethod
    def rule(title: str = None, style=None, **kwargs):
        """Write a rule across the screen with optional title"""
        from rich.rule import Rule as _Rule
        Console.print("")
        theme = Console._get_theme()
        style = theme.rule(style)
        Console.print(_Rule(title, style=style)) 
[docs]    @staticmethod
    def panel(text: str, markdown: bool = False, width=None,
              padding: bool = True, style: str = None,
              expand=True, *args, **kwargs):
        """Print within a panel to the console"""
        from rich.panel import Panel as _Panel
        if markdown:
            from rich.markdown import Markdown as _Markdown
            text = _Markdown(text)
        theme = Console._get_theme()
        padding_style = theme.padding_style(style)
        style = theme.panel(style)
        box = theme.panel_box(style)
        from rich.padding import Padding as _Padding
        if padding:
            text = _Padding(text, (1, 2), style=padding_style)
        else:
            text = _Padding(text, (0, 1), style=padding_style)
        Console.print(_Panel(text, box=box, width=width,
                             expand=expand,
                             style=style, *args, **kwargs)) 
[docs]    @staticmethod
    def error(text: str, *args, **kwargs):
        """Print an error to the console"""
        Console.rule("ERROR", style="error")
        Console.print(text, style="error", *args, **kwargs)
        Console.rule(style="error") 
[docs]    @staticmethod
    def warning(text: str, *args, **kwargs):
        """Print a warning to the console"""
        Console.rule("WARNING", style="warning")
        Console.print(text, style="warning", *args, **kwargs)
        Console.rule(style="warning") 
[docs]    @staticmethod
    def info(text: str, *args, **kwargs):
        """Print an info section to the console"""
        Console.rule("INFO", style="info")
        Console.print(text, style="info", *args, **kwargs)
        Console.rule(style="info") 
    @staticmethod
    def center(text: str, *args, **kwargs):
        from rich.text import Text as _Text
        Console.print(_Text(str, justify="center"), *args, **kwargs)
    @staticmethod
    def command(text: str, *args, **kwargs):
        Console.panel(text, style="command", padding=False)
    @staticmethod
    def print_population(population, demographics=None,
                         *args, **kwargs):
        Console.print(population.summary(demographics=demographics))
    @staticmethod
    def print_profiler(profiler, *args, **kwargs):
        Console.print(str(profiler))
[docs]    @staticmethod
    def print_exception():
        """Print the current-handled exception"""
        console = Console._get_console()
        console.print_exception() 
    @staticmethod
    def set_use_spinner(use_spinner: bool = True):
        console = Console._get_console()
        console._use_spinner = use_spinner
    @staticmethod
    def set_use_progress(use_progress: bool = True):
        console = Console._get_console()
        console._use_progress = use_progress
    @staticmethod
    def progress(visible: bool = True, show_limit: int = 50):
        console = Console._get_console()
        if visible and console._use_progress:
            return _Progress(show_limit=show_limit)
        else:
            return _NullProgress()
    @staticmethod
    def spinner(text: str = ""):
        try:
            from yaspin import yaspin, Spinner
            have_yaspin = True
        except ImportError:
            have_yaspin = False
        if not have_yaspin:
            return _NullSpinner()
        console = Console._get_console()
        if console._use_spinner:
            theme = Console._get_theme()
            frames, delay = theme.get_frames(
                width=console.width - len(text) - 10)
            sp = Spinner(frames, delay)
            y = yaspin(sp, text=text, side="right")
            y.success = lambda: theme.spinner_success(y)
            y.failure = lambda: theme.spinner_failure(y)
            return y
        else:
            return _NullSpinner()
[docs]    @staticmethod
    def save(file: _Union[str, _IO]):
        """Save the accumulated printing to the console to 'file'.
           This can be a file or a filehandle. The buffer is
           cleared after saving
        """
        # get the console contents
        try:
            text = Console._get_console().export_text(clear=True,
                                                      styles=False)
        except Exception as e:
            Console.error(f"Cannot get console output: {e.__class__} {e}")
            return
        if isinstance(file, str):
            with open(file, "w", encoding="UTF-8") as FILE:
                FILE.write(text)
        else:
            try:
                if file.encoding.lower() != "utf-8":
                    # make sure that encoding the file to the right character
                    # set will replace invalid characters, rather than throw
                    # unicode encoding errors
                    text = text.encode(file.encoding,
                                       errors="replace").decode(file.encoding)
                file.write(text)
            except UnicodeEncodeError:
                # this still didn't work - use a latin-1 round-robin
                # to make everything ascii...
                text = text.encode("latin-1",
                                   errors="replace").decode("latin-1")
                file.write(text)