The subcommand functions' name follows the Python convention of using snake case, but looks odd on the command line. Fix it by converting underscores to dashes, eg.: list_keyboards -> list-keyboards.
		
			
				
	
	
		
			744 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			744 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | ||
| # coding=utf-8
 | ||
| """MILC - A CLI Framework
 | ||
| 
 | ||
| PYTHON_ARGCOMPLETE_OK
 | ||
| 
 | ||
| MILC is an opinionated framework for writing CLI apps. It optimizes for the
 | ||
| most common unix tool pattern- small tools that are run from the command
 | ||
| line but generally do not feature any user interaction while they run.
 | ||
| 
 | ||
| For more details see the MILC documentation:
 | ||
| 
 | ||
|     <https://github.com/clueboard/milc/tree/master/docs>
 | ||
| """
 | ||
| from __future__ import division, print_function, unicode_literals
 | ||
| import argparse
 | ||
| import logging
 | ||
| import os
 | ||
| import re
 | ||
| import shlex
 | ||
| import sys
 | ||
| from decimal import Decimal
 | ||
| from tempfile import NamedTemporaryFile
 | ||
| from time import sleep
 | ||
| 
 | ||
| try:
 | ||
|     from ConfigParser import RawConfigParser
 | ||
| except ImportError:
 | ||
|     from configparser import RawConfigParser
 | ||
| 
 | ||
| try:
 | ||
|     import thread
 | ||
|     import threading
 | ||
| except ImportError:
 | ||
|     thread = None
 | ||
| 
 | ||
| import argcomplete
 | ||
| import colorama
 | ||
| from appdirs import user_config_dir
 | ||
| 
 | ||
| # Disable logging until we can configure it how the user wants
 | ||
| logging.basicConfig(filename='/dev/null')
 | ||
| 
 | ||
| # Log Level Representations
 | ||
| EMOJI_LOGLEVELS = {
 | ||
|     'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}',
 | ||
|     'ERROR': '{fg_red}☒{style_reset_all}',
 | ||
|     'WARNING': '{fg_yellow}⚠{style_reset_all}',
 | ||
|     'INFO': '{fg_blue}ℹ{style_reset_all}',
 | ||
|     'DEBUG': '{fg_cyan}☐{style_reset_all}',
 | ||
|     'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯'
 | ||
| }
 | ||
| EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL']
 | ||
| EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING']
 | ||
| UNICODE_SUPPORT = sys.stdout.encoding.lower().startswith('utf')
 | ||
| 
 | ||
| # ANSI Color setup
 | ||
| # Regex was gratefully borrowed from kfir on stackoverflow:
 | ||
| # https://stackoverflow.com/a/45448194
 | ||
| ansi_regex = r'\x1b(' \
 | ||
|              r'(\[\??\d+[hl])|' \
 | ||
|              r'([=<>a-kzNM78])|' \
 | ||
|              r'([\(\)][a-b0-2])|' \
 | ||
|              r'(\[\d{0,2}[ma-dgkjqi])|' \
 | ||
|              r'(\[\d+;\d+[hfy]?)|' \
 | ||
|              r'(\[;?[hf])|' \
 | ||
|              r'(#[3-68])|' \
 | ||
|              r'([01356]n)|' \
 | ||
|              r'(O[mlnp-z]?)|' \
 | ||
|              r'(/Z)|' \
 | ||
|              r'(\d+)|' \
 | ||
|              r'(\[\?\d;\d0c)|' \
 | ||
|              r'(\d;\dR))'
 | ||
| ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE)
 | ||
| ansi_styles = (
 | ||
|     ('fg', colorama.ansi.AnsiFore()),
 | ||
|     ('bg', colorama.ansi.AnsiBack()),
 | ||
|     ('style', colorama.ansi.AnsiStyle()),
 | ||
| )
 | ||
| ansi_colors = {}
 | ||
| 
 | ||
| for prefix, obj in ansi_styles:
 | ||
|     for color in [x for x in obj.__dict__ if not x.startswith('_')]:
 | ||
|         ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color)
 | ||
| 
 | ||
| 
 | ||
| def format_ansi(text):
 | ||
|     """Return a copy of text with certain strings replaced with ansi.
 | ||
|     """
 | ||
|     # Avoid .format() so we don't have to worry about the log content
 | ||
|     for color in ansi_colors:
 | ||
|         text = text.replace('{%s}' % color, ansi_colors[color])
 | ||
|     return text + ansi_colors['style_reset_all']
 | ||
| 
 | ||
| 
 | ||
| class ANSIFormatter(logging.Formatter):
 | ||
|     """A log formatter that inserts ANSI color.
 | ||
|     """
 | ||
| 
 | ||
|     def format(self, record):
 | ||
|         msg = super(ANSIFormatter, self).format(record)
 | ||
|         return format_ansi(msg)
 | ||
| 
 | ||
| 
 | ||
| class ANSIEmojiLoglevelFormatter(ANSIFormatter):
 | ||
|     """A log formatter that makes the loglevel an emoji on UTF capable terminals.
 | ||
|     """
 | ||
| 
 | ||
|     def format(self, record):
 | ||
|         if UNICODE_SUPPORT:
 | ||
|             record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors)
 | ||
|         return super(ANSIEmojiLoglevelFormatter, self).format(record)
 | ||
| 
 | ||
| 
 | ||
| class ANSIStrippingFormatter(ANSIFormatter):
 | ||
|     """A log formatter that strips ANSI.
 | ||
|     """
 | ||
| 
 | ||
|     def format(self, record):
 | ||
|         msg = super(ANSIStrippingFormatter, self).format(record)
 | ||
|         return ansi_escape.sub('', msg)
 | ||
| 
 | ||
| 
 | ||
| class Configuration(object):
 | ||
|     """Represents the running configuration.
 | ||
| 
 | ||
|     This class never raises IndexError, instead it will return None if a
 | ||
|     section or option does not yet exist.
 | ||
|     """
 | ||
| 
 | ||
|     def __contains__(self, key):
 | ||
|         return self._config.__contains__(key)
 | ||
| 
 | ||
|     def __iter__(self):
 | ||
|         return self._config.__iter__()
 | ||
| 
 | ||
|     def __len__(self):
 | ||
|         return self._config.__len__()
 | ||
| 
 | ||
|     def __repr__(self):
 | ||
|         return self._config.__repr__()
 | ||
| 
 | ||
|     def keys(self):
 | ||
|         return self._config.keys()
 | ||
| 
 | ||
|     def items(self):
 | ||
|         return self._config.items()
 | ||
| 
 | ||
|     def values(self):
 | ||
|         return self._config.values()
 | ||
| 
 | ||
|     def __init__(self, *args, **kwargs):
 | ||
|         self._config = {}
 | ||
| 
 | ||
|     def __getattr__(self, key):
 | ||
|         return self.__getitem__(key)
 | ||
| 
 | ||
|     def __getitem__(self, key):
 | ||
|         """Returns a config section, creating it if it doesn't exist yet.
 | ||
|         """
 | ||
|         if key not in self._config:
 | ||
|             self.__dict__[key] = self._config[key] = ConfigurationSection(self)
 | ||
| 
 | ||
|         return self._config[key]
 | ||
| 
 | ||
|     def __setitem__(self, key, value):
 | ||
|         self.__dict__[key] = value
 | ||
|         self._config[key] = value
 | ||
| 
 | ||
|     def __delitem__(self, key):
 | ||
|         if key in self.__dict__ and key[0] != '_':
 | ||
|             del self.__dict__[key]
 | ||
|         if key in self._config:
 | ||
|             del self._config[key]
 | ||
| 
 | ||
| 
 | ||
| class ConfigurationSection(Configuration):
 | ||
|     def __init__(self, parent, *args, **kwargs):
 | ||
|         super(ConfigurationSection, self).__init__(*args, **kwargs)
 | ||
|         self.parent = parent
 | ||
| 
 | ||
|     def __getitem__(self, key):
 | ||
|         """Returns a config value, pulling from the `user` section as a fallback.
 | ||
|         """
 | ||
|         if key in self._config:
 | ||
|             return self._config[key]
 | ||
| 
 | ||
|         elif key in self.parent.user:
 | ||
|             return self.parent.user[key]
 | ||
| 
 | ||
|         return None
 | ||
| 
 | ||
| 
 | ||
| def handle_store_boolean(self, *args, **kwargs):
 | ||
|     """Does the add_argument for action='store_boolean'.
 | ||
|     """
 | ||
|     disabled_args = None
 | ||
|     disabled_kwargs = kwargs.copy()
 | ||
|     disabled_kwargs['action'] = 'store_false'
 | ||
|     disabled_kwargs['dest'] = self.get_argument_name(*args, **kwargs)
 | ||
|     disabled_kwargs['help'] = 'Disable ' + kwargs['help']
 | ||
|     kwargs['action'] = 'store_true'
 | ||
|     kwargs['help'] = 'Enable ' + kwargs['help']
 | ||
| 
 | ||
|     for flag in args:
 | ||
|         if flag[:2] == '--':
 | ||
|             disabled_args = ('--no-' + flag[2:],)
 | ||
|             break
 | ||
| 
 | ||
|     self.add_argument(*args, **kwargs)
 | ||
|     self.add_argument(*disabled_args, **disabled_kwargs)
 | ||
| 
 | ||
|     return (args, kwargs, disabled_args, disabled_kwargs)
 | ||
| 
 | ||
| 
 | ||
| class SubparserWrapper(object):
 | ||
|     """Wrap subparsers so we can populate the normal and the shadow parser.
 | ||
|     """
 | ||
| 
 | ||
|     def __init__(self, cli, submodule, subparser):
 | ||
|         self.cli = cli
 | ||
|         self.submodule = submodule
 | ||
|         self.subparser = subparser
 | ||
| 
 | ||
|         for attr in dir(subparser):
 | ||
|             if not hasattr(self, attr):
 | ||
|                 setattr(self, attr, getattr(subparser, attr))
 | ||
| 
 | ||
|     def completer(self, completer):
 | ||
|         """Add an arpcomplete completer to this subcommand.
 | ||
|         """
 | ||
|         self.subparser.completer = completer
 | ||
| 
 | ||
|     def add_argument(self, *args, **kwargs):
 | ||
|         if 'action' in kwargs and kwargs['action'] == 'store_boolean':
 | ||
|             return handle_store_boolean(self, *args, **kwargs)
 | ||
| 
 | ||
|         self.cli.acquire_lock()
 | ||
|         self.subparser.add_argument(*args, **kwargs)
 | ||
| 
 | ||
|         if 'default' in kwargs:
 | ||
|             del kwargs['default']
 | ||
|         if 'action' in kwargs and kwargs['action'] == 'store_false':
 | ||
|             kwargs['action'] == 'store_true'
 | ||
|         self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs)
 | ||
|         self.cli.release_lock()
 | ||
| 
 | ||
| 
 | ||
| class MILC(object):
 | ||
|     """MILC - An Opinionated Batteries Included Framework
 | ||
|     """
 | ||
| 
 | ||
|     def __init__(self):
 | ||
|         """Initialize the MILC object.
 | ||
|         """
 | ||
|         # Setup a lock for thread safety
 | ||
|         self._lock = threading.RLock() if thread else None
 | ||
| 
 | ||
|         # Define some basic info
 | ||
|         self.acquire_lock()
 | ||
|         self._description = None
 | ||
|         self._entrypoint = None
 | ||
|         self._inside_context_manager = False
 | ||
|         self.ansi = ansi_colors
 | ||
|         self.arg_only = []
 | ||
|         self.config = Configuration()
 | ||
|         self.config_file = None
 | ||
|         self.version = os.environ.get('QMK_VERSION', 'unknown')
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         # Figure out our program name
 | ||
|         self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0]
 | ||
|         self.prog_name = self.prog_name.split('/')[-1]
 | ||
| 
 | ||
|         # Initialize all the things
 | ||
|         self.initialize_argparse()
 | ||
|         self.initialize_logging()
 | ||
| 
 | ||
|     @property
 | ||
|     def description(self):
 | ||
|         return self._description
 | ||
| 
 | ||
|     @description.setter
 | ||
|     def description(self, value):
 | ||
|         self._description = self._arg_parser.description = self._arg_defaults.description = value
 | ||
| 
 | ||
|     def echo(self, text, *args, **kwargs):
 | ||
|         """Print colorized text to stdout.
 | ||
| 
 | ||
|         ANSI color strings (such as {fg-blue}) will be converted into ANSI
 | ||
|         escape sequences, and the ANSI reset sequence will be added to all
 | ||
|         strings.
 | ||
| 
 | ||
|         If *args or **kwargs are passed they will be used to %-format the strings.
 | ||
|         """
 | ||
|         if args and kwargs:
 | ||
|             raise RuntimeError('You can only specify *args or **kwargs, not both!')
 | ||
| 
 | ||
|         args = args or kwargs
 | ||
|         text = format_ansi(text)
 | ||
| 
 | ||
|         print(text % args)
 | ||
| 
 | ||
|     def initialize_argparse(self):
 | ||
|         """Prepare to process arguments from sys.argv.
 | ||
|         """
 | ||
|         kwargs = {
 | ||
|             'fromfile_prefix_chars': '@',
 | ||
|             'conflict_handler': 'resolve',
 | ||
|         }
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self.subcommands = {}
 | ||
|         self.subcommands_default = {}
 | ||
|         self._subparsers = None
 | ||
|         self._subparsers_default = None
 | ||
|         self.argwarn = argcomplete.warn
 | ||
|         self.args = None
 | ||
|         self._arg_defaults = argparse.ArgumentParser(**kwargs)
 | ||
|         self._arg_parser = argparse.ArgumentParser(**kwargs)
 | ||
|         self.set_defaults = self._arg_parser.set_defaults
 | ||
|         self.print_usage = self._arg_parser.print_usage
 | ||
|         self.print_help = self._arg_parser.print_help
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def completer(self, completer):
 | ||
|         """Add an argcomplete completer to this subcommand.
 | ||
|         """
 | ||
|         self._arg_parser.completer = completer
 | ||
| 
 | ||
|     def add_argument(self, *args, **kwargs):
 | ||
|         """Wrapper to add arguments to both the main and the shadow argparser.
 | ||
|         """
 | ||
|         if 'action' in kwargs and kwargs['action'] == 'store_boolean':
 | ||
|             return handle_store_boolean(self, *args, **kwargs)
 | ||
| 
 | ||
|         if kwargs.get('add_dest', True) and args[0][0] == '-':
 | ||
|             kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs)
 | ||
|         if 'add_dest' in kwargs:
 | ||
|             del kwargs['add_dest']
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self._arg_parser.add_argument(*args, **kwargs)
 | ||
| 
 | ||
|         # Populate the shadow parser
 | ||
|         if 'default' in kwargs:
 | ||
|             del kwargs['default']
 | ||
|         if 'action' in kwargs and kwargs['action'] == 'store_false':
 | ||
|             kwargs['action'] == 'store_true'
 | ||
|         self._arg_defaults.add_argument(*args, **kwargs)
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def initialize_logging(self):
 | ||
|         """Prepare the defaults for the logging infrastructure.
 | ||
|         """
 | ||
|         self.acquire_lock()
 | ||
|         self.log_file = None
 | ||
|         self.log_file_mode = 'a'
 | ||
|         self.log_file_handler = None
 | ||
|         self.log_print = True
 | ||
|         self.log_print_to = sys.stderr
 | ||
|         self.log_print_level = logging.INFO
 | ||
|         self.log_file_level = logging.DEBUG
 | ||
|         self.log_level = logging.INFO
 | ||
|         self.log = logging.getLogger(self.__class__.__name__)
 | ||
|         self.log.setLevel(logging.DEBUG)
 | ||
|         logging.root.setLevel(logging.DEBUG)
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit')
 | ||
|         self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose')
 | ||
|         self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes')
 | ||
|         self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output')
 | ||
|         self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.')
 | ||
|         self.add_argument('--log-file', help='File to write log messages to')
 | ||
|         self.add_argument('--color', action='store_boolean', default=True, help='color in output')
 | ||
|         self.add_argument('-c', '--config-file', help='The config file to read and/or write')
 | ||
|         self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file')
 | ||
| 
 | ||
|     def add_subparsers(self, title='Sub-commands', **kwargs):
 | ||
|         if self._inside_context_manager:
 | ||
|             raise RuntimeError('You must run this before the with statement!')
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs)
 | ||
|         self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs)
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def acquire_lock(self):
 | ||
|         """Acquire the MILC lock for exclusive access to properties.
 | ||
|         """
 | ||
|         if self._lock:
 | ||
|             self._lock.acquire()
 | ||
| 
 | ||
|     def release_lock(self):
 | ||
|         """Release the MILC lock.
 | ||
|         """
 | ||
|         if self._lock:
 | ||
|             self._lock.release()
 | ||
| 
 | ||
|     def find_config_file(self):
 | ||
|         """Locate the config file.
 | ||
|         """
 | ||
|         if self.config_file:
 | ||
|             return self.config_file
 | ||
| 
 | ||
|         if self.args and self.args.general_config_file:
 | ||
|             return self.args.general_config_file
 | ||
| 
 | ||
|         return os.path.join(user_config_dir(appname='qmk', appauthor='QMK'), '%s.ini' % self.prog_name)
 | ||
| 
 | ||
|     def get_argument_name(self, *args, **kwargs):
 | ||
|         """Takes argparse arguments and returns the dest name.
 | ||
|         """
 | ||
|         try:
 | ||
|             return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest']
 | ||
|         except ValueError:
 | ||
|             return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest']
 | ||
| 
 | ||
|     def argument(self, *args, **kwargs):
 | ||
|         """Decorator to call self.add_argument or self.<subcommand>.add_argument.
 | ||
|         """
 | ||
|         if self._inside_context_manager:
 | ||
|             raise RuntimeError('You must run this before the with statement!')
 | ||
| 
 | ||
|         def argument_function(handler):
 | ||
|             if 'arg_only' in kwargs and kwargs['arg_only']:
 | ||
|                 arg_name = self.get_argument_name(*args, **kwargs)
 | ||
|                 self.arg_only.append(arg_name)
 | ||
|                 del kwargs['arg_only']
 | ||
| 
 | ||
|             name = handler.__name__.replace("_", "-")
 | ||
|             if handler is self._entrypoint:
 | ||
|                 self.add_argument(*args, **kwargs)
 | ||
| 
 | ||
|             elif name in self.subcommands:
 | ||
|                 self.subcommands[name].add_argument(*args, **kwargs)
 | ||
| 
 | ||
|             else:
 | ||
|                 raise RuntimeError('Decorated function is not entrypoint or subcommand!')
 | ||
| 
 | ||
|             return handler
 | ||
| 
 | ||
|         return argument_function
 | ||
| 
 | ||
|     def arg_passed(self, arg):
 | ||
|         """Returns True if arg was passed on the command line.
 | ||
|         """
 | ||
|         return self.args_passed[arg] in (None, False)
 | ||
| 
 | ||
|     def parse_args(self):
 | ||
|         """Parse the CLI args.
 | ||
|         """
 | ||
|         if self.args:
 | ||
|             self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!')
 | ||
|             return
 | ||
| 
 | ||
|         argcomplete.autocomplete(self._arg_parser)
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self.args = self._arg_parser.parse_args()
 | ||
|         self.args_passed = self._arg_defaults.parse_args()
 | ||
| 
 | ||
|         if 'entrypoint' in self.args:
 | ||
|             self._entrypoint = self.args.entrypoint
 | ||
| 
 | ||
|         if self.args.general_config_file:
 | ||
|             self.config_file = self.args.general_config_file
 | ||
| 
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def read_config(self):
 | ||
|         """Parse the configuration file and determine the runtime configuration.
 | ||
|         """
 | ||
|         self.acquire_lock()
 | ||
|         self.config_file = self.find_config_file()
 | ||
| 
 | ||
|         if self.config_file and os.path.exists(self.config_file):
 | ||
|             config = RawConfigParser(self.config)
 | ||
|             config.read(self.config_file)
 | ||
| 
 | ||
|             # Iterate over the config file options and write them into self.config
 | ||
|             for section in config.sections():
 | ||
|                 for option in config.options(section):
 | ||
|                     value = config.get(section, option)
 | ||
| 
 | ||
|                     # Coerce values into useful datatypes
 | ||
|                     if value.lower() in ['1', 'yes', 'true', 'on']:
 | ||
|                         value = True
 | ||
|                     elif value.lower() in ['0', 'no', 'false', 'none', 'off']:
 | ||
|                         value = False
 | ||
|                     elif value.replace('.', '').isdigit():
 | ||
|                         if '.' in value:
 | ||
|                             value = Decimal(value)
 | ||
|                         else:
 | ||
|                             value = int(value)
 | ||
| 
 | ||
|                     self.config[section][option] = value
 | ||
| 
 | ||
|         # Fold the CLI args into self.config
 | ||
|         for argument in vars(self.args):
 | ||
|             if argument in ('subparsers', 'entrypoint'):
 | ||
|                 continue
 | ||
| 
 | ||
|             if '_' in argument:
 | ||
|                 section, option = argument.split('_', 1)
 | ||
|             else:
 | ||
|                 section = self._entrypoint.__name__
 | ||
|                 option = argument
 | ||
| 
 | ||
|             if option not in self.arg_only:
 | ||
|                 if hasattr(self.args_passed, argument):
 | ||
|                     arg_value = getattr(self.args, argument)
 | ||
|                     if arg_value:
 | ||
|                         self.config[section][option] = arg_value
 | ||
|                 else:
 | ||
|                     if option not in self.config[section]:
 | ||
|                         self.config[section][option] = getattr(self.args, argument)
 | ||
| 
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def save_config(self):
 | ||
|         """Save the current configuration to the config file.
 | ||
|         """
 | ||
|         self.log.debug("Saving config file to '%s'", self.config_file)
 | ||
| 
 | ||
|         if not self.config_file:
 | ||
|             self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__)
 | ||
|             return
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
| 
 | ||
|         config = RawConfigParser()
 | ||
|         config_dir = os.path.dirname(self.config_file)
 | ||
| 
 | ||
|         for section_name, section in self.config._config.items():
 | ||
|             config.add_section(section_name)
 | ||
|             for option_name, value in section.items():
 | ||
|                 if section_name == 'general':
 | ||
|                     if option_name in ['save_config']:
 | ||
|                         continue
 | ||
|                 config.set(section_name, option_name, str(value))
 | ||
| 
 | ||
|         if not os.path.exists(config_dir):
 | ||
|             os.makedirs(config_dir)
 | ||
| 
 | ||
|         with NamedTemporaryFile(mode='w', dir=config_dir, delete=False) as tmpfile:
 | ||
|             config.write(tmpfile)
 | ||
| 
 | ||
|         # Move the new config file into place atomically
 | ||
|         if os.path.getsize(tmpfile.name) > 0:
 | ||
|             os.rename(tmpfile.name, self.config_file)
 | ||
|         else:
 | ||
|             self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file, tmpfile.name)
 | ||
| 
 | ||
|         self.release_lock()
 | ||
|         cli.log.info('Wrote configuration to %s', shlex.quote(self.config_file))
 | ||
| 
 | ||
|     def __call__(self):
 | ||
|         """Execute the entrypoint function.
 | ||
|         """
 | ||
|         if not self._inside_context_manager:
 | ||
|             # If they didn't use the context manager use it ourselves
 | ||
|             with self:
 | ||
|                 return self.__call__()
 | ||
| 
 | ||
|         if not self._entrypoint:
 | ||
|             raise RuntimeError('No entrypoint provided!')
 | ||
| 
 | ||
|         return self._entrypoint(self)
 | ||
| 
 | ||
|     def entrypoint(self, description):
 | ||
|         """Set the entrypoint for when no subcommand is provided.
 | ||
|         """
 | ||
|         if self._inside_context_manager:
 | ||
|             raise RuntimeError('You must run this before cli()!')
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self.description = description
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         def entrypoint_func(handler):
 | ||
|             self.acquire_lock()
 | ||
|             self._entrypoint = handler
 | ||
|             self.release_lock()
 | ||
| 
 | ||
|             return handler
 | ||
| 
 | ||
|         return entrypoint_func
 | ||
| 
 | ||
|     def add_subcommand(self, handler, description, name=None, **kwargs):
 | ||
|         """Register a subcommand.
 | ||
| 
 | ||
|         If name is not provided we use `handler.__name__`.
 | ||
|         """
 | ||
|         if self._inside_context_manager:
 | ||
|             raise RuntimeError('You must run this before the with statement!')
 | ||
| 
 | ||
|         if self._subparsers is None:
 | ||
|             self.add_subparsers()
 | ||
| 
 | ||
|         if not name:
 | ||
|             name = handler.__name__.replace("_", "-")
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         kwargs['help'] = description
 | ||
|         self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs)
 | ||
|         self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs))
 | ||
|         self.subcommands[name].set_defaults(entrypoint=handler)
 | ||
| 
 | ||
|         if name not in self.__dict__:
 | ||
|             self.__dict__[name] = self.subcommands[name]
 | ||
|         else:
 | ||
|             self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name)
 | ||
| 
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         return handler
 | ||
| 
 | ||
|     def subcommand(self, description, **kwargs):
 | ||
|         """Decorator to register a subcommand.
 | ||
|         """
 | ||
| 
 | ||
|         def subcommand_function(handler):
 | ||
|             return self.add_subcommand(handler, description, **kwargs)
 | ||
| 
 | ||
|         return subcommand_function
 | ||
| 
 | ||
|     def setup_logging(self):
 | ||
|         """Called by __enter__() to setup the logging configuration.
 | ||
|         """
 | ||
|         if len(logging.root.handlers) != 0:
 | ||
|             # MILC is the only thing that should have root log handlers
 | ||
|             logging.root.handlers = []
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
| 
 | ||
|         if self.config['general']['verbose']:
 | ||
|             self.log_print_level = logging.DEBUG
 | ||
| 
 | ||
|         self.log_file = self.config['general']['log_file'] or self.log_file
 | ||
|         self.log_file_format = self.config['general']['log_file_fmt']
 | ||
|         self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt'])
 | ||
|         self.log_format = self.config['general']['log_fmt']
 | ||
| 
 | ||
|         if self.config.general.color:
 | ||
|             self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
 | ||
|         else:
 | ||
|             self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
 | ||
| 
 | ||
|         if self.log_file:
 | ||
|             self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode)
 | ||
|             self.log_file_handler.setLevel(self.log_file_level)
 | ||
|             self.log_file_handler.setFormatter(self.log_file_format)
 | ||
|             logging.root.addHandler(self.log_file_handler)
 | ||
| 
 | ||
|         if self.log_print:
 | ||
|             self.log_print_handler = logging.StreamHandler(self.log_print_to)
 | ||
|             self.log_print_handler.setLevel(self.log_print_level)
 | ||
|             self.log_print_handler.setFormatter(self.log_format)
 | ||
|             logging.root.addHandler(self.log_print_handler)
 | ||
| 
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|     def __enter__(self):
 | ||
|         if self._inside_context_manager:
 | ||
|             self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.')
 | ||
|             return
 | ||
| 
 | ||
|         self.acquire_lock()
 | ||
|         self._inside_context_manager = True
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         colorama.init()
 | ||
|         self.parse_args()
 | ||
|         self.read_config()
 | ||
|         self.setup_logging()
 | ||
| 
 | ||
|         if 'save_config' in self.config.general and self.config.general.save_config:
 | ||
|             self.save_config()
 | ||
|             exit(0)
 | ||
| 
 | ||
|         return self
 | ||
| 
 | ||
|     def __exit__(self, exc_type, exc_val, exc_tb):
 | ||
|         self.acquire_lock()
 | ||
|         self._inside_context_manager = False
 | ||
|         self.release_lock()
 | ||
| 
 | ||
|         if exc_type is not None and not isinstance(SystemExit(), exc_type):
 | ||
|             print(exc_type)
 | ||
|             logging.exception(exc_val)
 | ||
|             exit(255)
 | ||
| 
 | ||
| 
 | ||
| cli = MILC()
 | ||
| 
 | ||
| if __name__ == '__main__':
 | ||
| 
 | ||
|     @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean')
 | ||
|     @cli.entrypoint('My useful CLI tool with subcommands.')
 | ||
|     def main(cli):
 | ||
|         comma = ',' if cli.config.general.comma else ''
 | ||
|         cli.log.info('{bg_green}{fg_red}Hello%s World!', comma)
 | ||
| 
 | ||
|     @cli.argument('-n', '--name', help='Name to greet', default='World')
 | ||
|     @cli.subcommand('Description of hello subcommand here.')
 | ||
|     def hello(cli):
 | ||
|         comma = ',' if cli.config.general.comma else ''
 | ||
|         cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name)
 | ||
| 
 | ||
|     def goodbye(cli):
 | ||
|         comma = ',' if cli.config.general.comma else ''
 | ||
|         cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name)
 | ||
| 
 | ||
|     @cli.argument('-n', '--name', help='Name to greet', default='World')
 | ||
|     @cli.subcommand('Think a bit before greeting the user.')
 | ||
|     def thinking(cli):
 | ||
|         comma = ',' if cli.config.general.comma else ''
 | ||
|         spinner = cli.spinner(text='Just a moment...', spinner='earth')
 | ||
|         spinner.start()
 | ||
|         sleep(2)
 | ||
|         spinner.stop()
 | ||
| 
 | ||
|         with cli.spinner(text='Almost there!', spinner='moon'):
 | ||
|             sleep(2)
 | ||
| 
 | ||
|         cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name)
 | ||
| 
 | ||
|     @cli.subcommand('Show off our ANSI colors.')
 | ||
|     def pride(cli):
 | ||
|         cli.echo('{bg_red}                    ')
 | ||
|         cli.echo('{bg_lightred_ex}                    ')
 | ||
|         cli.echo('{bg_lightyellow_ex}                    ')
 | ||
|         cli.echo('{bg_green}                    ')
 | ||
|         cli.echo('{bg_blue}                    ')
 | ||
|         cli.echo('{bg_magenta}                    ')
 | ||
| 
 | ||
|     # You can register subcommands using decorators as seen above, or using functions like like this:
 | ||
|     cli.add_subcommand(goodbye, 'This will show up in --help output.')
 | ||
|     cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World')
 | ||
| 
 | ||
|     cli()  # Automatically picks between main(), hello() and goodbye()
 |