139 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Functions that help us generate and use info.json files.
 | |
| """
 | |
| import json
 | |
| import hjson
 | |
| import jsonschema
 | |
| from collections.abc import Mapping
 | |
| from functools import lru_cache
 | |
| from typing import OrderedDict
 | |
| from pathlib import Path
 | |
| 
 | |
| from milc import cli
 | |
| 
 | |
| 
 | |
| def _dict_raise_on_duplicates(ordered_pairs):
 | |
|     """Reject duplicate keys."""
 | |
|     d = {}
 | |
|     for k, v in ordered_pairs:
 | |
|         if k in d:
 | |
|             raise ValueError("duplicate key: %r" % (k,))
 | |
|         else:
 | |
|             d[k] = v
 | |
|     return d
 | |
| 
 | |
| 
 | |
| def json_load(json_file, strict=True):
 | |
|     """Load a json file from disk.
 | |
| 
 | |
|     Note: file must be a Path object.
 | |
|     """
 | |
|     try:
 | |
|         # Get the IO Stream for Path objects
 | |
|         # Not necessary if the data is provided via stdin
 | |
|         if isinstance(json_file, Path):
 | |
|             json_file = json_file.open(encoding='utf-8')
 | |
|         return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
 | |
| 
 | |
|     except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
 | |
|         cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
 | |
|         exit(1)
 | |
|     except Exception as e:
 | |
|         cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
 | |
|         exit(1)
 | |
| 
 | |
| 
 | |
| @lru_cache(maxsize=0)
 | |
| def load_jsonschema(schema_name):
 | |
|     """Read a jsonschema file from disk.
 | |
|     """
 | |
|     if Path(schema_name).exists():
 | |
|         return json_load(schema_name)
 | |
| 
 | |
|     schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
 | |
| 
 | |
|     if not schema_path.exists():
 | |
|         schema_path = Path('data/schemas/false.jsonschema')
 | |
| 
 | |
|     return json_load(schema_path)
 | |
| 
 | |
| 
 | |
| @lru_cache(maxsize=0)
 | |
| def compile_schema_store():
 | |
|     """Compile all our schemas into a schema store.
 | |
|     """
 | |
|     schema_store = {}
 | |
| 
 | |
|     for schema_file in Path('data/schemas').glob('*.jsonschema'):
 | |
|         schema_data = load_jsonschema(schema_file)
 | |
|         if not isinstance(schema_data, dict):
 | |
|             cli.log.debug('Skipping schema file %s', schema_file)
 | |
|             continue
 | |
|         schema_store[schema_data['$id']] = schema_data
 | |
| 
 | |
|     return schema_store
 | |
| 
 | |
| 
 | |
| @lru_cache(maxsize=0)
 | |
| def create_validator(schema):
 | |
|     """Creates a validator for the given schema id.
 | |
|     """
 | |
|     schema_store = compile_schema_store()
 | |
|     resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
 | |
| 
 | |
|     return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
 | |
| 
 | |
| 
 | |
| def validate(data, schema):
 | |
|     """Validates data against a schema.
 | |
|     """
 | |
|     validator = create_validator(schema)
 | |
| 
 | |
|     return validator(data)
 | |
| 
 | |
| 
 | |
| def deep_update(origdict, newdict):
 | |
|     """Update a dictionary in place, recursing to do a depth-first deep copy.
 | |
|     """
 | |
|     for key, value in newdict.items():
 | |
|         if isinstance(value, Mapping):
 | |
|             origdict[key] = deep_update(origdict.get(key, {}), value)
 | |
| 
 | |
|         else:
 | |
|             origdict[key] = value
 | |
| 
 | |
|     return origdict
 | |
| 
 | |
| 
 | |
| def merge_ordered_dicts(dicts):
 | |
|     """Merges nested OrderedDict objects resulting from reading a hjson file.
 | |
|     Later input dicts overrides earlier dicts for plain values.
 | |
|     If any value is "!delete!", the existing value will be removed from its parent.
 | |
|     Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
 | |
|     Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
 | |
|     """
 | |
|     result = OrderedDict()
 | |
| 
 | |
|     def add_entry(target, k, v):
 | |
|         if k in target and isinstance(v, (OrderedDict, dict)):
 | |
|             if "!reset!" in v:
 | |
|                 target[k] = v
 | |
|             else:
 | |
|                 target[k] = merge_ordered_dicts([target[k], v])
 | |
|             if "!reset!" in target[k]:
 | |
|                 del target[k]["!reset!"]
 | |
|         elif k in target and isinstance(v, list):
 | |
|             if v[0] == '!reset!':
 | |
|                 target[k] = v[1:]
 | |
|             else:
 | |
|                 target[k] = target[k] + v
 | |
|         elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
 | |
|             del target[k]
 | |
|         else:
 | |
|             target[k] = v
 | |
| 
 | |
|     for d in dicts:
 | |
|         for (k, v) in d.items():
 | |
|             add_entry(result, k, v)
 | |
| 
 | |
|     return result
 |