Skip to content

DictionaryInclude

Source code in mmcif/api/DictionaryInclude.py
class DictionaryInclude(object):
    def __init__(self, **kwargs):
        #
        self.__itemNameRelatives = [
            "_item.name",
            "_item_examples.name",
            "_ndb_item_description.name",
            "_item_related.name",
            "_category_key.name",
            "_item_structure.name",
            "_item_methods.name",
            "_item_aliases.name",
            "_item_dependent.dependent_name",
            "_item_default.name",
            "_pdbx_item_examples.name",
            "_item_units.name",
            "_item_related.related_name",
            "_item_description.name",
            "_item_dependent.name",
            "_item_range.name",
            "_item_sub_category.name",
            "_pdbx_item_range.name",
            "_pdbx_item_linked.condition_child_name",
            "_ndb_item_examples.name",
            "_pdbx_item_value_condition.item_name",
            "_ndb_item_range.name",
            "_item_linked.child_name",
            "_pdbx_item_description.name",
            "_pdbx_item_context.item_name",
            "_pdbx_item_enumeration_details.name",
            "_pdbx_item_linked_group_list.child_name",
            "_pdbx_item_linked_group_list.parent_name",
            "_pdbx_item_value_condition_list.target_item_name",
            "_ndb_item_enumeration.name",
            "_pdbx_item_linked.child_name",
            "_pdbx_item_value_condition.dependent_item_name",
            "_pdbx_item_enumeration.name",
            "_item_linked.parent_name",
            "_pdbx_item_value_condition_list.dependent_item_name",
            "_item_type.name",
            "_item_type_conditions.name",
            "_pdbx_item_linked.parent_name",
            "_item_enumeration.name",
        ]
        self.__categoryIdRelatives = [
            "_category.id",
            "_category_key.id",
            "_pdbx_item_linked_group.category_id",
            "_pdbx_category_examples.id",
            "_item.category_id",
            "_pdbx_category_context.category_id",
            "_pdbx_item_linked_group_list.parent_category_id",
            "_category_group.category_id",
            "_pdbx_category_description.id",
            "_ndb_category_examples.id",
            "_category_examples.id",
            "_category_methods.category_id",
            "_ndb_category_description.id",
            "_pdbx_item_linked_group_list.child_category_id",
        ]
        #
        self.__cwd = os.getcwd()
        self.__dirPath = kwargs.get("dirPath", os.getcwd())
        logger.info("Local dictionary include path relative to %s", self.__dirPath)
        self.__dirStack = []
        self.__locatorIndexD = {}

    def processIncludedContent(self, containerList, cleanup=False):
        """Process any dictionary, category or item include instructions in any data containers in the
        input list of dictionary data and definition containers.

        Args:
            containerList (list): list of input PdbxContainer data or definition container objects
            cleanup (bool, optional): flag to remove generator category objects after parsing (default: False)

        Returns:
            (list): list of data and definition containers incorporating included content

        Examples:
            ```python
            pathDdlIncludeDictionary = "mmcif_ddl-generator.dic"
            myIo = IoAdapter(raiseExceptions=True)
            containerList = myIo.readFile(inputFilePath=pathDdlIncludeDictionary)
            logger.info("Starting container list length (%d)", len(containerList))
            dIncl = DictionaryInclude()
            inclL = dIncl.processIncludedContent(containerList)
            logger.info("Processed included container length (%d)", len(inclL))
            ```

        """
        includeD = self.__getIncludeInstructions(containerList, cleanup=cleanup)
        includeContentD = self.__fetchIncludedContent(includeD, cleanup=cleanup)
        return self.__addIncludedContent(containerList, includeContentD)

    def __addIncludedContent(self, containerList, includeContentD):
        """Incorporate included content described in the input dictionary of include instructions produced by
        internal method __getIncludeInstructions().

        Args:
            containerList (list): list of input PdbxContainer data or definition container objects
            includeContentD (dict): {"dictionaryIncludeDict": {dictionary_id: {...include details...}},
                                    "categoryIncludeDict": {dictionary_id: {category_id: {...include details... }}},
                                    "itemIncludeDict": {dictionary_id: {category_id: {itemName: {...include details...}}}}
                                    }

        Returns:
            (list): list of data and definition containers incorporating included content
        """
        # Index the current container list...
        cD = OrderedDict()
        datablockName = "unnamed_1"
        for container in containerList:
            if container.getType() == "data":
                datablockName = container.getName()
            # Handle potentially unconsolidated definitions --
            cD.setdefault(datablockName, OrderedDict()).setdefault(container.getName(), []).append(container)
        #
        #
        for datablockName in cD:
            if datablockName in includeContentD:
                if "replace" in includeContentD[datablockName]:
                    # Organize the replacements by name
                    replaceDefinitionD = OrderedDict()
                    replaceDataD = OrderedDict()
                    for container in includeContentD[datablockName]["replace"]:
                        if container.getType() == "definition":
                            replaceDefinitionD.setdefault(container.getName(), []).append(container)
                        else:
                            replaceDataD.setdefault(datablockName, []).append(container)
                    #
                    for rN, rL in replaceDefinitionD.items():
                        if rN in cD[datablockName]:
                            cD[datablockName][rN] = rL
                    # replace data sections in the base container
                    baseContainer = cD[datablockName][datablockName][0]
                    for rN, containerL in replaceDataD.items():
                        for container in containerL:
                            for nm in container.getObjNameList():
                                obj = container.getObj(nm)
                                baseContainer.replace(obj)
                    #
                if "extend" in includeContentD[datablockName]:
                    extendDataD = OrderedDict()
                    for container in includeContentD[datablockName]["extend"]:
                        if container.getType() == "definition":
                            cD.setdefault(datablockName, OrderedDict()).setdefault(container.getName(), []).append(container)
                        else:
                            extendDataD.setdefault(datablockName, []).append(container)
                    # extend data sections in the base container
                    baseContainer = cD[datablockName][datablockName][0]
                    for rN, containerL in extendDataD.items():
                        for container in containerL:
                            for nm in container.getObjNameList():
                                obj = container.getObj(nm)
                                if baseContainer.exists(nm):
                                    baseObj = baseContainer.getObj(nm)
                                    for ii in range(obj.getRowCount()):
                                        rowD = obj.getRowAttributeDict(ii)
                                        baseObj.append(rowD)
                                else:
                                    baseContainer.append(obj)
        #
        # Unwind the container index
        #
        fullL = []
        for datablockName in cD:
            for cL in cD[datablockName].values():
                fullL.extend(cL)
        #
        return fullL

    def __getIncludeInstructions(self, containerList, cleanup=False):
        """Extract include instructions from categories pdbx_include_dictionary,  pdbx_include_category, and pdbx_include_item.

        Args:
          containerList (list): list of input PdbxContainer data or definition container objects
          cleanup (optional, bool): flag to remove generator category objects after parsing (default: False)

        Returns:
          A dictionary containing the dictionary, category and and item level include details.
          For example,

            ```python
            {
            "dictionaryIncludeDict": {dictionary_id: {...include details...}},
            "categoryIncludeDict": {dictionary_id: {category_id: {...include details... }}},
            "itemIncludeDict": {dictionary_id: {category_id: {itemName: {...include details...}}}},
            }
            ```


        """
        includeD = OrderedDict()
        try:
            unNamed = 1
            for container in containerList:
                if container.getType() == "data":
                    dictionaryIncludeDict = OrderedDict()
                    categoryIncludeDict = OrderedDict()
                    itemIncludeDict = OrderedDict()
                    if container.getName():
                        datablockName = container.getName()
                    else:
                        datablockName = str(unNamed)
                        unNamed += 1
                    logger.debug("Adding data sections from container name %s  type  %s", datablockName, container.getType())
                    tl = container.getObj("pdbx_include_dictionary")
                    if tl is not None:
                        for row in tl.getRowList():
                            tD = OrderedDict()
                            for atName in ["dictionary_id", "dictionary_locator", "include_mode", "dictionary_namespace_prefix", "dictionary_namespace_prefix_replace"]:
                                tD[atName] = row[tl.getIndex(atName)] if tl.hasAttribute(atName) else None
                            dictionaryIncludeDict[tD["dictionary_id"]] = tD
                        #
                        tl = container.getObj("pdbx_include_category")
                        if tl is not None:
                            for row in tl.getRowList():
                                tD = OrderedDict()
                                for atName in ["dictionary_id", "category_id", "include_as_category_id", "include_mode"]:
                                    tD[atName] = row[tl.getIndex(atName)] if tl.hasAttribute(atName) else None
                                categoryIncludeDict.setdefault(tD["dictionary_id"], {}).setdefault(tD["category_id"], tD)
                        #
                        tl = container.getObj("pdbx_include_item")
                        if tl is not None:
                            for row in tl.getRowList():
                                tD = OrderedDict()
                                for atName in ["dictionary_id", "item_name", "include_as_item_name", "include_mode"]:
                                    tD[atName] = row[tl.getIndex(atName)] if tl.hasAttribute(atName) else None
                                categoryId = CifName.categoryPart(tD["item_name"])
                                itemIncludeDict.setdefault(tD["dictionary_id"], {}).setdefault(categoryId, {}).setdefault(tD["item_name"], tD)
                    if cleanup:
                        for catName in ["pdbx_include_dictionary", "pdbx_include_category", "pdbx_include_item"]:
                            if container.exists(catName):
                                container.remove(catName)
                    #
                    includeD[datablockName] = {
                        "dictionaryIncludeDict": dictionaryIncludeDict,
                        "categoryIncludeDict": categoryIncludeDict,
                        "itemIncludeDict": itemIncludeDict,
                    }
        except Exception as e:
            logger.exception("Include processing failing with %s", str(e))
        return includeD

    def __fetchIncludedContent(self, includeD, cleanup=False):
        """Fetch included content following the instructions encoded in the input data structure.

        Args:
            includeD (dict):  {"dictionaryIncludeDict": {dictionary_id: {...include details...}},
                               "categoryIncludeDict": {dictionary_id: {category_id: {...include details... }}},
                               "itemIncludeDict": {dictionary_id: {category_id: {itemName: {...include details...}}}},
                               }
            cleanup (optional, bool): flag to remove generator category objects after parsing (default: false)

        Returns:
            (dict): {datablockName: {"extend": [container,...], "replace": [container, ...]}, ... }

        """

        includeDataD = {}
        try:
            for datablockName, inclD in includeD.items():
                cL = []
                for dictName, iD in inclD["dictionaryIncludeDict"].items():
                    locator = iD["dictionary_locator"]
                    if locator in self.__locatorIndexD:
                        logger.info("Skipping redundant include for %r at %r", dictName, locator)
                        continue
                    self.__locatorIndexD[locator] = dictName
                    #
                    # --- Fetch the dictionary component -
                    #
                    updateStack = self.__isLocal(locator)
                    if updateStack:
                        if not self.__dirStack:
                            # top-level include case
                            self.__dirStack.append(os.path.abspath(self.__dirPath))

                        # embedded include case (push directory containing the locator)
                        if not os.path.isabs(locator):
                            # handle the relative path case -
                            locator = os.path.abspath(os.path.join(self.__dirStack[-1], locator))
                            logger.debug("modified local relative locator is %r", locator)
                        self.__dirStack.append(os.path.dirname(locator))
                        logger.debug("dirStack (%d) top %r", len(self.__dirStack), self.__dirStack[-1])
                    containerList = self.processIncludedContent(self.__fetchLocator(locator), cleanup=cleanup)
                    if updateStack:
                        # restore stack context
                        self.__dirStack.pop()
                    #
                    nsPrefix = iD["dictionary_namespace_prefix"]
                    nsPrefixReplace = iD["dictionary_namespace_prefix_replace"]
                    dictInclMode = iD["include_mode"]
                    dataIncludeMode = iD["data_include_mode"] if "data_include_mode" in iD else "extend"
                    catInclD = inclD["categoryIncludeDict"][dictName] if dictName in inclD["categoryIncludeDict"] else None
                    itemInclD = inclD["itemIncludeDict"][dictName] if dictName in inclD["itemIncludeDict"] else None
                    #
                    #  Do data sections first.
                    for container in containerList:
                        if container.getType() == "data":
                            logger.debug("Including data container %r with %r", container.getName(), container.getObjNameList())
                            cL.append((container, dataIncludeMode))
                    #
                    if catInclD or itemInclD:
                        # Process only explicitly included categories/items in the dictionary component
                        if catInclD:
                            for container in containerList:
                                if container.getType() == "data":
                                    continue
                                cName = container.getName()
                                catName = cName if container.isCategory() else CifName.categoryPart(cName)
                                #
                                if catName in catInclD:
                                    if container.isAttribute() and itemInclD and catName in itemInclD and cName in itemInclD[catName]:
                                        inclMode = itemInclD[catName][cName]["include_mode"] if itemInclD[catName][cName]["include_mode"] else dictInclMode
                                        cL.append((self.__renameItem(container, itemInclD[catName][cName]["include_as_item_name"]), inclMode))
                                    else:
                                        inclMode = catInclD[catName]["include_mode"] if catInclD[catName]["include_mode"] else dictInclMode
                                        cL.append((self.__renameCategory(container, catInclD[catName]["include_as_category_id"]), inclMode))
                        elif itemInclD:
                            # Process only explicitly included items exclusive of explicitly included categories in the dictionary component
                            for container in containerList:
                                if container.getType() == "data":
                                    continue
                                cName = container.getName()
                                catName = cName if container.isCategory() else CifName.categoryPart(cName)
                                #
                                if container.isAttribute() and catName in itemInclD and cName in itemInclD[catName]:
                                    inclMode = itemInclD[catName][cName]["include_mode"] if itemInclD[catName][cName]["include_mode"] else dictInclMode
                                    cL.append((self.__renameItem(container, itemInclD[catName][cName]["include_as_item_name"]), inclMode))
                    else:
                        # Process the full content of the dictionary component
                        for container in containerList:
                            if container.getType() == "data":
                                continue
                            cName = container.getName()
                            catName = cName if container.isCategory() else CifName.categoryPart(cName)
                            #
                            if container.isAttribute():
                                newName = self.__substituteItemPrefix(cName, nsPrefix, nsPrefixReplace)
                                cL.append((self.__renameItem(container, newName), dictInclMode))
                            else:
                                newName = self.__substituteCategoryPrefix(catName, nsPrefix, nsPrefixReplace)
                                cL.append((self.__renameCategory(container, newName), dictInclMode))
                #
                for container, inclMode in cL:
                    if inclMode == "replace":
                        includeDataD.setdefault(datablockName, {}).setdefault("replace", []).append(container)
                    elif inclMode == "extend":
                        logger.debug("%r extending with %r", datablockName, container.getName())
                        includeDataD.setdefault(datablockName, {}).setdefault("extend", []).append(container)
                #
            for nm in includeDataD:
                numReplace = len(includeDataD[nm]["replace"]) if "replace" in includeDataD[nm] else 0
                numExtend = len(includeDataD[nm]["extend"]) if "extend" in includeDataD[nm] else 0
                logger.debug("includeDataD %s replace (%d) extend (%d)", nm, numReplace, numExtend)
            #
        except Exception as e:
            logger.exception("Failing with %s", str(e))

        return includeDataD

    def __isLocal(self, locator):
        try:
            locSp = urlsplit(locator)
            return locSp.scheme in ["", "file"]
        except Exception as e:
            logger.error("Bad include file path (%r) : %s", locator, str(e))
        return False

    def __fetchLocator(self, locator, **kwargs):
        """"""
        try:
            # Locate non-absolute paths relative to the dictionary incude file
            if self.__isLocal(locator) and not os.path.isabs(locator):
                logger.info("locator is %r", locator)
                logger.info("dirStack (%d) top %r", len(self.__dirStack), self.__dirStack[-1])
                locator = os.path.abspath(os.path.relpath(locator, start=self.__dirStack[-1]))
            #
            containerList = []
            workPath = kwargs.get("workPath", None)
            enforceAscii = kwargs.get("enforceAscii", False)
            raiseExceptions = kwargs.get("raiseExceptions", True)
            useCharRefs = kwargs.get("useCharRefs", True)
            #
            myIo = IoAdapterPy(raiseExceptions=raiseExceptions, useCharRefs=useCharRefs)
            containerList = myIo.readFile(locator, enforceAscii=enforceAscii, outDirPath=workPath)
            logger.info("Fetched %r dictionary container length (%d)", locator, len(containerList) if containerList else 0)
            logger.debug("%r", [container.getName() for container in containerList])
        except Exception as e:
            logger.exception("Failing for %s with %s", locator, str(e))
        return containerList

    def __substituteCategoryPrefix(self, catName, curPrefix, newPrefix):
        return catName.replace(curPrefix, newPrefix, 1) if catName and catName.startswith(curPrefix) else catName

    def __substituteItemPrefix(self, itemName, curPrefix, newPrefix):
        atName = CifName.attributePart(itemName)
        atName = atName.replace(curPrefix, newPrefix, 1) if atName and atName.startswith(curPrefix) else atName
        catName = CifName.categoryPart(itemName)
        catName = catName.replace(curPrefix, newPrefix, 1) if atName and catName.startswith(curPrefix) else catName
        return CifName.itemName(catName, atName)

    def __renameItem(self, container, newItemName):
        if not container and not container.isAttribute() or not newItemName:
            return container
        #
        itemNameCur = container.getName()
        if itemNameCur == newItemName:
            return container
        #
        try:
            for item in self.__itemNameRelatives:
                catName = CifName.categoryPart(item)
                if container.exists(catName):
                    cObj = container.getObj(catName)
                    atName = CifName.attributePart(item)
                    if cObj.hasAttribute(atName):
                        for iRow in range(cObj.getRowCount()):
                            curVal = cObj.getValue(atName, iRow)
                            if curVal == itemNameCur:
                                cObj.setValue(newItemName, atName, iRow)
        except Exception as e:
            logger.exception("Failing with %s", str(e))
        return container

    def __renameCategory(self, container, newCategoryName):
        if not container and not container.isCategory() or not newCategoryName:
            return container
        #
        catNameCur = container.getName()
        if catNameCur == newCategoryName:
            return container
        try:
            for item in self.__categoryIdRelatives:
                catName = CifName.categoryPart(item)
                if container.exists(catName):
                    cObj = container.getObj(catName)
                    atName = CifName.attributePart(item)
                    if cObj.hasAttribute(atName):
                        for iRow in range(cObj.getRowCount()):
                            testVal = cObj.getValue(atName, iRow)
                            if testVal == catNameCur:
                                cObj.setValue(newCategoryName, atName, iRow)
        except Exception as e:
            logger.exception("Failing with %s", str(e))
        return container

processIncludedContent(self, containerList, cleanup=False)

Process any dictionary, category or item include instructions in any data containers in the input list of dictionary data and definition containers.

Parameters:

Name Type Description Default
containerList list

list of input PdbxContainer data or definition container objects

required
cleanup bool

flag to remove generator category objects after parsing (default: False)

False

Returns:

Type Description
(list)

list of data and definition containers incorporating included content

Examples:

pathDdlIncludeDictionary = "mmcif_ddl-generator.dic"
myIo = IoAdapter(raiseExceptions=True)
containerList = myIo.readFile(inputFilePath=pathDdlIncludeDictionary)
logger.info("Starting container list length (%d)", len(containerList))
dIncl = DictionaryInclude()
inclL = dIncl.processIncludedContent(containerList)
logger.info("Processed included container length (%d)", len(inclL))
Source code in mmcif/api/DictionaryInclude.py
def processIncludedContent(self, containerList, cleanup=False):
    """Process any dictionary, category or item include instructions in any data containers in the
    input list of dictionary data and definition containers.

    Args:
        containerList (list): list of input PdbxContainer data or definition container objects
        cleanup (bool, optional): flag to remove generator category objects after parsing (default: False)

    Returns:
        (list): list of data and definition containers incorporating included content

    Examples:
        ```python
        pathDdlIncludeDictionary = "mmcif_ddl-generator.dic"
        myIo = IoAdapter(raiseExceptions=True)
        containerList = myIo.readFile(inputFilePath=pathDdlIncludeDictionary)
        logger.info("Starting container list length (%d)", len(containerList))
        dIncl = DictionaryInclude()
        inclL = dIncl.processIncludedContent(containerList)
        logger.info("Processed included container length (%d)", len(inclL))
        ```

    """
    includeD = self.__getIncludeInstructions(containerList, cleanup=cleanup)
    includeContentD = self.__fetchIncludedContent(includeD, cleanup=cleanup)
    return self.__addIncludedContent(containerList, includeContentD)