from typing import Union as _Union
from typing import List as _List
from typing import IO as _IO
from datetime import datetime as _datetime
from contextlib import contextmanager as _contextmanager
__all__ = ["Console", "Table"]
# Global rich.Console()
_console = None
# Global console theme
_theme = None
class _NullProgress:
"""Null progress to use if user disables progress"""
def __init__(self):
pass
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
return False
def add_task(self, *args, **kwargs):
return 0
def update(self, *args, **kwargs):
return
class _Progress:
def __init__(self, show_limit: int = 50):
from rich.progress import Progress, BarColumn, TimeRemainingColumn
self._progress = Progress(
"[progress.description]{task.description}",
BarColumn(style="bar.back",
complete_style="bar.complete",
finished_style="bar.complete"),
"[progress.percentage]{task.percentage:>3.0f}%",
TimeRemainingColumn(),
auto_refresh=False)
self._last_update = _datetime.now()
self._show_limit = show_limit
self._completed = {}
self._have_entered = False
self._enter_args = None
def __enter__(self, *args, **kwargs):
self._enter_args = (args, kwargs)
return self
def __exit__(self, *args, **kwargs):
if self._have_entered:
self._progress.__exit__(*args, **kwargs)
return False
def add_task(self, description: str, total: int):
if total > self._show_limit:
if not self._have_entered:
args, kwargs = self._enter_args
self._progress.__enter__(*args, **kwargs)
self._have_entered = True
task_id = self._progress.add_task(description=description,
total=total)
self._completed[task_id] = 0
return task_id
else:
return None
def update(self, task_id: int, completed: float = None,
advance: float = None, force_update: bool = False):
if task_id is None:
return
elif completed is not None:
self._completed[task_id] = completed
elif advance is not None:
self._completed[task_id] += advance
else:
return
now = _datetime.now()
if force_update or (now - self._last_update).total_seconds() > 0.1:
self._progress.update(task_id=task_id,
completed=self._completed[task_id])
self._progress.refresh()
self._last_update = now
class _NullSpinner:
"""Null spinner to use if yaspin is not available
disables spinners"""
def __init__(self):
pass
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
return False
def success(self):
pass
def failure(self):
pass
class Table:
"""This a rich.table, with an additional "to_string()" function"""
def __init__(self, title=None, show_footer=False, show_edge=True):
from rich.table import Table as _Table
self._table = _Table(title=title, show_edge=show_edge,
show_footer=show_footer)
def add_column(self, header, justify="center", style=None, no_wrap=False,
footer=None):
"""Add a column called 'header', with specified justification,
style and wrapping
"""
if footer is not None:
footer = str(footer)
self._table.add_column(header=header, style=style, justify=justify,
no_wrap=no_wrap, footer=footer)
def add_row(self, row):
"""Add the passed row of data to the table"""
row = [str(x) if x is not None else None for x in row]
self._table.add_row(*row)
def to_string(self):
"""Return this table rendered to a string"""
from rich.console import Console as _Console
console = _Console()
output = ""
for s in console.render(self._table):
output += s.text
return output
[docs]class Console:
"""This is a singleton class that provides access to printing
and logging functions to the console. This uses 'rich'
for rich console printing
"""
[docs] @staticmethod
def supports_emojis():
"""Return whether or not you can print emojis to this console"""
import sys
if sys.platform == "win32":
return False
else:
return True
[docs] @staticmethod
def set_debugging_enabled(enabled, level=None):
"""Switch on or off debugging output"""
console = Console._get_console()
console._debugging_enabled = bool(enabled)
console._debugging_level = level
[docs] @staticmethod
def set_theme(theme):
"""Set the theme used for the console - this should be
one of the themes in metawards.themes
"""
global _theme
if isinstance(theme, str):
if theme.lower().strip() == "simple":
from metawards.themes import Simple
_theme = Simple()
elif theme.lower().strip() == "default":
from metawards.themes import SpringFlowers
_theme = SpringFlowers()
else:
_theme = theme
@staticmethod
def _get_theme():
global _theme
if _theme is None:
from metawards.themes import SpringFlowers
_theme = SpringFlowers()
return _theme
@staticmethod
def _get_console():
global _console
if _console is None:
from rich.console import Console as _Console
theme = Console._get_theme()
_console = _Console(record=True,
highlight=theme.should_highlight(),
highlighter=theme.highlighter(),
markup=theme.should_markup(),
log_time=True, log_path=True,
emoji=Console.supports_emojis())
_console._use_spinner = True
_console._use_progress = True
_console._debugging_enabled = False
_console._debugging_level = None
# also install pretty traceback support
from rich.traceback import install as _install_rich
_install_rich()
return _console
[docs] @staticmethod
@_contextmanager
def redirect_output(outdir: str, auto_bzip: bool = True):
"""Redirect all output and error to the directory 'outdir'"""
import os as os
import sys as sys
import bz2
from rich.console import Console as _Console
outfile = os.path.join(outdir, "output.txt")
if auto_bzip:
outfile += ".bz2"
OUTFILE = bz2.open(outfile, "wt", encoding="utf-8")
else:
OUTFILE = open(outfile, "wt", encoding="utf-8")
console = Console._get_console()
if console is None:
raise AssertionError("The global console should never be None")
new_out = _Console(file=OUTFILE, record=False, log_time=True,
log_path=True, emoji=Console.supports_emojis())
new_out._use_spinner = False
new_out._use_progress = False
new_out._debugging_enabled = console._debugging_enabled
new_out._debugging_level = console._debugging_level
old_out = console
global _console
_console = new_out
try:
yield new_out
finally:
_console = old_out
OUTFILE.close()
[docs] @staticmethod
def debugging_enabled(level: int = None):
"""Return whether debug output is enabled (optionally for
the specified level) - if not, then
anything sent to 'debug' (for that level) is not printed
"""
console = Console._get_console()
if not console._debugging_enabled:
return False
elif console._debugging_level is not None:
if level is None:
return True
else:
return level <= console._debugging_level
else:
return level is None
@staticmethod
def _retrieve_name(variable):
# thanks to scohe001 on stackoverflow
#Â https://stackoverflow.com/questions/18425225/getting-the-name-of-a-variable-as-a-string
import inspect
callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
return [var_name for var_name, var_val in callers_local_vars
if var_val is variable][-1]
[docs] @staticmethod
def debug(text: str, variables: _List[any] = None, level: int = None,
markdown: bool = False, **kwargs):
"""Print a debug string to the console. This will only be
printed if debugging is enabled. You can also print the
values of variables by passing them as a list to
'variables'
"""
if not Console.debugging_enabled(level=level):
return
if hasattr(text, "__call__"):
# the user passed in a lambda for delayed printing
text = text()
if not isinstance(text, str):
text = str(text)
if level is not None:
text = f"Level {level}: {text}"
console = Console._get_console()
if markdown:
from rich.markdown import Markdown as _Markdown
try:
text = _Markdown(text)
except Exception:
text = _Markdown(str(text))
console.log(text, justify="center", _stack_offset=2, **kwargs)
if variables is not None:
from rich.table import Table
from rich import box
import inspect
# get the local variables in the caller's scope
callers_local_vars = inspect.currentframe().f_back.f_locals.items()
table = Table(box=box.MINIMAL_DOUBLE_HEAD, style="on magenta")
table.add_column("Name", justify="right", style="cyan",
no_wrap=True)
table.add_column("Value", justify="left", style="green")
for variable in variables:
# get the name of the variable in the caller
try:
# go for the last matching variable in the caller's scope
name = [var_name for var_name, var_val
in callers_local_vars
if var_val is variable][-1]
except Exception:
name = "variable"
table.add_row(name, str(variable))
console.print(table)
[docs] @staticmethod
def print(text: str, markdown: bool = False, style: str = None,
markup: bool = None, *args, **kwargs):
"""Print to the console"""
if markdown:
from rich.markdown import Markdown as _Markdown
try:
text = _Markdown(text)
except Exception:
text = _Markdown(str(text))
theme = Console._get_theme()
style = theme.text(style)
try:
Console._get_console().print(text, style=style, markup=markup)
except UnicodeEncodeError:
# this output can't cope with a complex theme - switch
# to theme 'simple'
Console.set_theme("simple")
if isinstance(text, str):
# try to print this as latin-1
try:
text = text.encode("latin-1",
errors="replace").decode("latin-1")
Console._get_console().print(text, markup=markup)
except Exception:
# accept that this isn't going to be printable
pass
[docs] @staticmethod
def rule(title: str = None, style=None, **kwargs):
"""Write a rule across the screen with optional title"""
from rich.rule import Rule as _Rule
Console.print("")
theme = Console._get_theme()
style = theme.rule(style)
Console.print(_Rule(title, style=style))
[docs] @staticmethod
def panel(text: str, markdown: bool = False, width=None,
padding: bool = True, style: str = None,
expand=True, *args, **kwargs):
"""Print within a panel to the console"""
from rich.panel import Panel as _Panel
if markdown:
from rich.markdown import Markdown as _Markdown
text = _Markdown(text)
theme = Console._get_theme()
padding_style = theme.padding_style(style)
style = theme.panel(style)
box = theme.panel_box(style)
from rich.padding import Padding as _Padding
if padding:
text = _Padding(text, (1, 2), style=padding_style)
else:
text = _Padding(text, (0, 1), style=padding_style)
Console.print(_Panel(text, box=box, width=width,
expand=expand,
style=style, *args, **kwargs))
[docs] @staticmethod
def error(text: str, *args, **kwargs):
"""Print an error to the console"""
Console.rule("ERROR", style="error")
Console.print(text, style="error", *args, **kwargs)
Console.rule(style="error")
[docs] @staticmethod
def warning(text: str, *args, **kwargs):
"""Print a warning to the console"""
Console.rule("WARNING", style="warning")
Console.print(text, style="warning", *args, **kwargs)
Console.rule(style="warning")
[docs] @staticmethod
def info(text: str, *args, **kwargs):
"""Print an info section to the console"""
Console.rule("INFO", style="info")
Console.print(text, style="info", *args, **kwargs)
Console.rule(style="info")
@staticmethod
def center(text: str, *args, **kwargs):
from rich.text import Text as _Text
Console.print(_Text(str, justify="center"), *args, **kwargs)
@staticmethod
def command(text: str, *args, **kwargs):
Console.panel(text, style="command", padding=False)
@staticmethod
def print_population(population, demographics=None,
*args, **kwargs):
Console.print(population.summary(demographics=demographics))
@staticmethod
def print_profiler(profiler, *args, **kwargs):
Console.print(str(profiler))
[docs] @staticmethod
def print_exception():
"""Print the current-handled exception"""
console = Console._get_console()
console.print_exception()
@staticmethod
def set_use_spinner(use_spinner: bool = True):
console = Console._get_console()
console._use_spinner = use_spinner
@staticmethod
def set_use_progress(use_progress: bool = True):
console = Console._get_console()
console._use_progress = use_progress
@staticmethod
def progress(visible: bool = True, show_limit: int = 50):
console = Console._get_console()
if visible and console._use_progress:
return _Progress(show_limit=show_limit)
else:
return _NullProgress()
@staticmethod
def spinner(text: str = ""):
try:
from yaspin import yaspin, Spinner
have_yaspin = True
except ImportError:
have_yaspin = False
if not have_yaspin:
return _NullSpinner()
console = Console._get_console()
if console._use_spinner:
theme = Console._get_theme()
frames, delay = theme.get_frames(
width=console.width - len(text) - 10)
sp = Spinner(frames, delay)
y = yaspin(sp, text=text, side="right")
y.success = lambda: theme.spinner_success(y)
y.failure = lambda: theme.spinner_failure(y)
return y
else:
return _NullSpinner()
[docs] @staticmethod
def save(file: _Union[str, _IO]):
"""Save the accumulated printing to the console to 'file'.
This can be a file or a filehandle. The buffer is
cleared after saving
"""
# get the console contents
try:
text = Console._get_console().export_text(clear=True,
styles=False)
except Exception as e:
Console.error(f"Cannot get console output: {e.__class__} {e}")
return
if isinstance(file, str):
with open(file, "w", encoding="UTF-8") as FILE:
FILE.write(text)
else:
try:
if file.encoding.lower() != "utf-8":
# make sure that encoding the file to the right character
# set will replace invalid characters, rather than throw
# unicode encoding errors
text = text.encode(file.encoding,
errors="replace").decode(file.encoding)
file.write(text)
except UnicodeEncodeError:
# this still didn't work - use a latin-1 round-robin
# to make everything ascii...
text = text.encode("latin-1",
errors="replace").decode("latin-1")
file.write(text)