145 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Functions that help us generate and use info.json files.
 | 
						|
"""
 | 
						|
import json
 | 
						|
import hjson
 | 
						|
import jsonschema
 | 
						|
from collections.abc import Mapping
 | 
						|
from functools import lru_cache
 | 
						|
from typing import OrderedDict
 | 
						|
from pathlib import Path
 | 
						|
from copy import deepcopy
 | 
						|
 | 
						|
from milc import cli
 | 
						|
 | 
						|
 | 
						|
def _dict_raise_on_duplicates(ordered_pairs):
 | 
						|
    """Reject duplicate keys."""
 | 
						|
    d = {}
 | 
						|
    for k, v in ordered_pairs:
 | 
						|
        if k in d:
 | 
						|
            raise ValueError("duplicate key: %r" % (k,))
 | 
						|
        else:
 | 
						|
            d[k] = v
 | 
						|
    return d
 | 
						|
 | 
						|
 | 
						|
@lru_cache(maxsize=20)
 | 
						|
def _json_load_impl(json_file, strict=True):
 | 
						|
    """Load a json file from disk.
 | 
						|
 | 
						|
    Note: file must be a Path object.
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        # Get the IO Stream for Path objects
 | 
						|
        # Not necessary if the data is provided via stdin
 | 
						|
        if isinstance(json_file, Path):
 | 
						|
            json_file = json_file.open(encoding='utf-8')
 | 
						|
        return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
 | 
						|
 | 
						|
    except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
 | 
						|
        cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
 | 
						|
        exit(1)
 | 
						|
    except Exception as e:
 | 
						|
        cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
 | 
						|
        exit(1)
 | 
						|
 | 
						|
 | 
						|
def json_load(json_file, strict=True):
 | 
						|
    return deepcopy(_json_load_impl(json_file=json_file, strict=strict))
 | 
						|
 | 
						|
 | 
						|
@lru_cache(maxsize=20)
 | 
						|
def load_jsonschema(schema_name):
 | 
						|
    """Read a jsonschema file from disk.
 | 
						|
    """
 | 
						|
    if Path(schema_name).exists():
 | 
						|
        return json_load(schema_name)
 | 
						|
 | 
						|
    schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
 | 
						|
 | 
						|
    if not schema_path.exists():
 | 
						|
        schema_path = Path('data/schemas/false.jsonschema')
 | 
						|
 | 
						|
    return json_load(schema_path)
 | 
						|
 | 
						|
 | 
						|
@lru_cache(maxsize=1)
 | 
						|
def compile_schema_store():
 | 
						|
    """Compile all our schemas into a schema store.
 | 
						|
    """
 | 
						|
    schema_store = {}
 | 
						|
 | 
						|
    for schema_file in Path('data/schemas').glob('*.jsonschema'):
 | 
						|
        schema_data = load_jsonschema(schema_file)
 | 
						|
        if not isinstance(schema_data, dict):
 | 
						|
            cli.log.debug('Skipping schema file %s', schema_file)
 | 
						|
            continue
 | 
						|
        schema_store[schema_data['$id']] = schema_data
 | 
						|
 | 
						|
    return schema_store
 | 
						|
 | 
						|
 | 
						|
@lru_cache(maxsize=20)
 | 
						|
def create_validator(schema):
 | 
						|
    """Creates a validator for the given schema id.
 | 
						|
    """
 | 
						|
    schema_store = compile_schema_store()
 | 
						|
    resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
 | 
						|
 | 
						|
    return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
 | 
						|
 | 
						|
 | 
						|
def validate(data, schema):
 | 
						|
    """Validates data against a schema.
 | 
						|
    """
 | 
						|
    validator = create_validator(schema)
 | 
						|
 | 
						|
    return validator(data)
 | 
						|
 | 
						|
 | 
						|
def deep_update(origdict, newdict):
 | 
						|
    """Update a dictionary in place, recursing to do a depth-first deep copy.
 | 
						|
    """
 | 
						|
    for key, value in newdict.items():
 | 
						|
        if isinstance(value, Mapping):
 | 
						|
            origdict[key] = deep_update(origdict.get(key, {}), value)
 | 
						|
 | 
						|
        else:
 | 
						|
            origdict[key] = value
 | 
						|
 | 
						|
    return origdict
 | 
						|
 | 
						|
 | 
						|
def merge_ordered_dicts(dicts):
 | 
						|
    """Merges nested OrderedDict objects resulting from reading a hjson file.
 | 
						|
    Later input dicts overrides earlier dicts for plain values.
 | 
						|
    If any value is "!delete!", the existing value will be removed from its parent.
 | 
						|
    Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
 | 
						|
    Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
 | 
						|
    """
 | 
						|
    result = OrderedDict()
 | 
						|
 | 
						|
    def add_entry(target, k, v):
 | 
						|
        if k in target and isinstance(v, (OrderedDict, dict)):
 | 
						|
            if "!reset!" in v:
 | 
						|
                target[k] = v
 | 
						|
            else:
 | 
						|
                target[k] = merge_ordered_dicts([target[k], v])
 | 
						|
            if "!reset!" in target[k]:
 | 
						|
                del target[k]["!reset!"]
 | 
						|
        elif k in target and isinstance(v, list):
 | 
						|
            if v[0] == '!reset!':
 | 
						|
                target[k] = v[1:]
 | 
						|
            else:
 | 
						|
                target[k] = target[k] + v
 | 
						|
        elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
 | 
						|
            del target[k]
 | 
						|
        else:
 | 
						|
            target[k] = v
 | 
						|
 | 
						|
    for d in dicts:
 | 
						|
        for (k, v) in d.items():
 | 
						|
            add_entry(result, k, v)
 | 
						|
 | 
						|
    return result
 |