HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib64/python3.10/site-packages/sqlalchemy_serializer/lib/schema.py
import logging
from collections import defaultdict


logger = logging.getLogger('serializer')


class Tree(defaultdict):
    def __init__(self, to_include=None, to_exclude=None, is_greedy=True, *args, **kwargs):
        super(Tree, self).__init__(*args, **kwargs)
        self.default_factory = Tree
        self.to_include = to_include
        self.to_exclude = to_exclude
        self.is_greedy = is_greedy

    def apply(self, node: 'Tree'):
        if self.is_greedy and not node.is_greedy:
            # Apply strictness to all subtrees
            # not included into the node
            for k, subtree in self.items():
                if k not in node.keys() and subtree:
                    subtree.to_strict()

        self.is_greedy = node.is_greedy and self.is_greedy

        if node.to_include is not None:
            self.to_include = node.to_include

        if node.to_exclude is not None:
            self.to_exclude = node.to_exclude

    def to_strict(self):
        self.is_greedy = False
        for tree in self.values():
            if not tree:
                continue  # Exclude leafs
            tree.to_strict()

    def __repr__(self):
        include = f'to_include={self.to_include}'
        exclude = f'to_exclude={self.to_exclude}'
        greedy = f'is_greedy={self.is_greedy}'
        keys = '\n'.join(f'{k}: {v}'.replace('\n', '\n  ') for k, v in self.items())
        keys = f'\n{keys}' if keys else ''
        return f'Tree({include}, {exclude}, {greedy})[{keys}]'


class Rule:
    DELIM = '.'  # Delimiter to separate nested rules
    NEGATION = '-'  # Prefix for negative rules

    def __init__(self, rule: str):
        self.is_negative = rule.startswith(self.NEGATION)
        rule = rule.replace(self.NEGATION, '')
        self.keys = rule.split(self.DELIM)

    def __repr__(self):
        prefix = self.NEGATION if self.is_negative else ""
        return f'{prefix}{self.DELIM.join(self.keys)}'


class Schema:
    def __init__(self, tree: Tree = None):
        self._tree = tree or Tree()

    @property
    def keys(self) -> set:
        return {k for k, t in self._tree.items() if t.to_include}

    @property
    def is_greedy(self) -> bool:
        return self._tree.is_greedy

    def update(self, extend=(), only=()):
        if extend:
            self.apply(rules=extend, is_greedy=True)
        if only:
            self.apply(rules=only, is_greedy=False)

    def apply(self, rules, is_greedy):
        rules_tree = Tree()
        for raw in rules:
            logger.debug('Checking rule:%s', raw)
            rule = Rule(raw)

            current = self._tree
            chain = Tree()
            chain.is_greedy = is_greedy

            keys_num = len(rule.keys)
            new = chain
            for i, k in enumerate(rule.keys):
                is_last_key = keys_num == i + 1
                parent = current
                node = current.get(k, Tree())  # Does not create a new node
                new = new[k]  # Creates a new node

                if not (is_last_key or rule.is_negative) and node.is_greedy:
                    new.is_greedy = is_greedy

                if not node and node.to_exclude:
                    logger.debug('Ignore rule:%s leaf excludes key:%s', raw, k)
                    break

                if rule.is_negative:
                    new.to_exclude = True
                else:
                    new.to_include = True

                if is_last_key:
                    if not parent.is_greedy:
                        logger.debug('Ignore rule:%s parent does not accept new rules', raw)
                    elif rule.is_negative and node.to_include:
                        logger.debug('Ignore rule:%s leaf includes key:%s', raw, k)
                    else:
                        merge_trees(rules_tree, chain)
                else:
                    current = node  # Go deeper

        if rules_tree:
            logger.debug('Updating tree with rules:%s is_greedy:%s', rules, is_greedy)
            merge_trees(self._tree, rules_tree)

    def is_included(self, key: str) -> bool:
        node = self._tree.get(key)
        if self._tree.is_greedy:
            if not node:
                return node is None or not node.to_exclude
            return True
        else:
            return bool(node is not None and node.to_include)

    def fork(self, key: str) -> 'Schema':
        return Schema(tree=self._tree[key])


def merge_trees(old: Tree, *trees):
    for tree in trees:
        # logger.debug('Merging trees:\nold->\n%s\nnew->\n:%s', old, tree)
        old.apply(tree)
        for k in tree.keys():
            merge_trees(old[k], tree[k])