Source code for refinegems.curation.curate

#!/usr/bin/env python
"""General functions for curating a model

This module provides functionalities for curating models, including special functions for CarveMe models.

Since CarveMe version 1.5.1, the draft models from CarveMe contain pieces of information that are not correctly added to the
annotations. To address this, this module includes the following functionalities:

    - Add URIs from the entity IDs to the annotation field for metabolites & reactions
    - Transfer URIs from the notes field to the annotations for metabolites & reactions
    - Add URIs from the GeneProduct IDs to the annotations

The functionalities for CarveMe models, along with some of the following further functionalities, are gathered in the
main function :py:func:`~refinegems.curation.curate.polish_model`.

Further functionalities:

        - Setting boundary condition & constant for metabolites & reactions
        - Unit handling to add units & UnitDefinitions & to set units for parameters
        - Addition of default settings for compartments & metabolites
        - Addition of URIs to GeneProducts

            - via a mapping from model IDs to valid database IDs
            - via the KEGG API

        - Changing the CURIE pattern/CVTerm qualifier & qualifier type
        - Directionality control
"""

__author__ = "Famke Baeuerle and Carolin Brune and Gwendolyn O. Döbel"

################################################################################
# requirements
################################################################################

import cobra
import logging
import pandas as pd
import re
import warnings

from bioservices.kegg import KEGG
from cobra.io.sbml import _f_specie, _f_reaction
from libsbml import Model as libModel
from libsbml import GeneProduct, Species, ListOfSpecies, ListOfReactions, UnitDefinition
from pathlib import Path
from tqdm.auto import tqdm
from typing import Literal, Union

from .miriam import polish_annotations, change_all_qualifiers

from ..utility.cvterms import (
    add_cv_term_genes,
    add_cv_term_metabolites,
    add_cv_term_reactions,
    DB2PREFIX_METABS,
    DB2PREFIX_REACS,
    get_id_from_cv_term,
)
from ..utility.entities import get_gpid_mapping, create_fba_units, print_UnitDefinitions
from ..utility.io import load_a_table_from_database
from ..utility.util import DB2REGEX, test_biomass_presence

################################################################################
# variables
################################################################################

NH_PATTERN = re.compile(r"nh[3-4]")  #: :meta:

################################################################################
# functions
################################################################################

# sychronise annotations
# ----------------------


[docs] def update_annotations_from_others(model: libModel) -> libModel: """Synchronizes metabolite annotations for core, periplasm and extracelullar Args: - model (libModel): Model loaded with libSBML Returns: libModel: Modified model with synchronized annotations """ for metab in model.getListOfSpecies(): base = metab.getId()[:-2] for comp in ["_c", "_e", "_p"]: other_metab = model.getSpecies(base + comp) if other_metab is not None: if not other_metab.isSetMetaId(): other_metab.setMetaId("meta_" + other_metab.getId()) for db_id, code in DB2PREFIX_METABS.items(): id = get_id_from_cv_term(metab, code) for entry in id: if entry is not None: add_cv_term_metabolites(entry, db_id, other_metab) return model
[docs] def extend_gp_annots_via_mapping_table( model: libModel, mapping_tbl_file: Union[str,Path] = None, gff_paths: list[str] = None, email: str = None, contains_locus_tags: bool = False, lab_strain: bool = False, outpath: str = None, ) -> libModel: """ | Extend GenePoduct annotations via mapping table. | If no mapping table is provided, a mapping table will be generated. Args: - model (libModel): Model loaded with libSBML - mapping_tbl_file (str|Path, optional): Path to a file containing a mapping table with columns ``model_id | X...`` where X can be ``REFSEQ``, ``NCBI``, ``locus_tag`` or ``UNCLASSIFIED``. The table can contain all of the ``X`` columns or at least one of them. Defaults to None. - gff_paths (list[str], optional): Path(s) to GFF file(s). Allowed GFF formats are: RefSeq, NCBI and Prokka. This is only used when mapping_tbl_file == None. Defaults to None. - email (str, optional): E-mail for NCBI queries. This is only used when mapping_tbl_file == None. Defaults to None. - contains_locus_tags (bool, optional): Specifies if provided model has locus tags within the label tag if set to True. This is only used when mapping_tbl_file == None. Defaults to False. - lab_strain (bool, optional): Specifies if a strain from no database was provided and thus has only homolog mappings if set to True. Defaults to False. - outpath (str, optional): Output path for location where the generated mapping table should be written to. This is only used when mapping_tbl_file == None. Defaults to None. Returns: libModel: Modified model with extended annotations for the GeneProducts """ # 1. Get mapping # If no mapping table provided, get via function print("Get mapping information...") if not mapping_tbl_file: mapping_table = get_gpid_mapping( model, gff_paths, email, contains_locus_tags, outpath ) else: # Otherwise read in table from file mapping_tbl_file = mapping_tbl_file if isinstance(mapping_tbl_file, str) else str(mapping_tbl_file) mapping_table = pd.read_csv(mapping_tbl_file) # Drop all rows without model_id entries mapping_table.dropna(subset="model_id", inplace=True) # Use model_id as index mapping_table.set_index("model_id", inplace=True) # Replace all NaN values with empty string mapping_table.fillna("", inplace=True) # 2. Use table to fill in information in model # Get gene list gene_list = model.getPlugin("fbc").getListOfGeneProducts() print("Extending GeneProduct information...") for gene in tqdm(gene_list): # Get row of mapping table for current model_id gp_infos = mapping_table.loc[gene.getId(), :] # Add infos to current GeneProduct if gp_infos["name"]: gene.setName(gp_infos["name"]) if gp_infos["locus_tag"]: gene.setLabel(gp_infos["locus_tag"]) if ("REFSEQ" in gp_infos.index.to_list()) and gp_infos["REFSEQ"]: add_cv_term_genes(gp_infos["REFSEQ"], "REFSEQ", gene, lab_strain) if ("NCBI" in gp_infos.index.to_list()) and gp_infos["NCBI"]: add_cv_term_genes(gp_infos["NCBI"], "NCBI", gene, lab_strain) return model
[docs] def extend_gp_annots_via_KEGG(gene_list: list[GeneProduct], kegg_organism_id: str): """Adds KEGG gene & UniProt identifiers to the GeneProduct annotations Args: gene_list (list[GeneProduct]): libSBML ListOfGenes kegg_organism_id (str): Organism identifier in the KEGG database """ k = KEGG() mapping_kegg_uniprot = k.conv("uniprot", kegg_organism_id) no_valid_kegg = [] for gp in tqdm(gene_list): if gp.getId() != "G_spontaneous": kegg_gene_id = f"{kegg_organism_id}:{gp.getLabel()}" try: uniprot_id = mapping_kegg_uniprot[kegg_gene_id] add_cv_term_genes(kegg_gene_id, "KEGG", gp) add_cv_term_genes(uniprot_id.split(r"up:")[1], "UNIPROT", gp) except KeyError: no_valid_kegg.append(gp.getLabel()) if no_valid_kegg: logging.info( f"The following {len(no_valid_kegg)} locus tags form no valid KEGG Gene ID: {no_valid_kegg} with the provided KEGG Organism ID: {kegg_organism_id}." )
[docs] def extend_metab_reac_annots_via_id( entity_list: Union[ListOfSpecies, ListOfReactions], id_db: str ) -> None: """Extends metabolite or reaction annotations by extracting the core ID from the entity ID and adding this ID as valid annotation if possible Args: - entity_list (Union[ListOfSpecies, ListOfReactions]): List of entities, either species (metabolites, ListOfSpecies) or reactions (ListOfReactions) - id_db (str): The database prefix to validate IDs against. Must correspond to a valid prefix in the Bioregistry Raises: - TypeError: Unsupported type for entity_list """ # Initialise id_db correctly id_db = id_db.upper() # Set-up case-dependent variables match entity_list: case ListOfSpecies(): bigg_db = "bigg_metabolites" bigg_id_type = "universal_bigg_id" add_cv_term = add_cv_term_metabolites id_handler = _f_specie db2prefix = DB2PREFIX_METABS case ListOfReactions(): bigg_db = "bigg_reactions" bigg_id_type = "id" add_cv_term = add_cv_term_reactions id_handler = _f_reaction db2prefix = DB2PREFIX_REACS case _: raise TypeError( f"Unsupported type for entity_list {type(entity_list)}. Must be ListOfSpecies or ListOfReactions." ) # Set-up default variables try: id_db_prefix = db2prefix[id_db] except KeyError: print( f""" KeyError: with id_db=\'{id_db}\' id_db must be one of the valid database names: {db2prefix.keys()} If your id_db is not part of the list, please contact the developers. """ ) return try: db_pattern = DB2REGEX[id_db_prefix] except KeyError: print( f"KeyError: id_db_prefix = {id_db_prefix} must be one of the valid prefixes in https://bioregistry.io/." ) return # Get BiGG IDs for VMH ID == BiGG ID validation if "vmh" in id_db_prefix.lower(): bigg_ids = load_a_table_from_database(f"SELECT bigg_id FROM {bigg_db}") bigg_ids = set(bigg_ids[bigg_id_type].tolist()) # Get ID from entity & add annotation if valid database ID for entity in entity_list: # Get ID current_id = entity.getId() # Use current_id as metaid if no metaid is present if not entity.isSetMetaId(): entity.setMetaId(f"meta_{current_id}") if re.fullmatch(db_pattern, current_id, re.IGNORECASE): # Remove prefix current_id = id_handler(current_id) # Unset annotations if no CV terms exist if entity.getNumCVTerms() == 0: entity.unsetAnnotation() # Get ID for annotation if isinstance(entity, Species): # Remove compartment suffix id_for_anno = re.sub( f"(_|\[){entity.getCompartment()}\]?$", "", current_id ) else: id_for_anno = current_id # Add ID as URI to annotation add_cv_term(id_for_anno, id_db, entity) # Add BiGG ID to annotation, additionally, if VMH and valid BiGG ID if ("vmh" in id_db_prefix.lower()) and (id_for_anno in bigg_ids): add_cv_term(id_for_anno, id_db, entity)
[docs] def extend_metab_reac_annots_via_notes( entity_list: Union[ListOfSpecies, ListOfReactions], ) -> None: """Extends metabolite or reaction annotations by extracting valid URIs from the notes section of the provided entities and cleans up the notes by removing processed elements Args: - entity_list (Union[ListOfSpecies, ListOfReactions]): List of entities, either species (metabolites, ListOfSpecies) or reactions (ListOfReactions) Raises: - TypeError: Unsupported type for entity_list """ # Set-up case-dependent variables match entity_list: case ListOfSpecies(): db2prefix = DB2PREFIX_METABS add_cv_term = add_cv_term_metabolites case ListOfReactions(): db2prefix = DB2PREFIX_REACS add_cv_term = add_cv_term_reactions case _: raise TypeError( f"Unsupported type for entity_list {type(entity_list)}. Must be ListOfSpecies or ListOfReactions." ) # Get notes & add annotation from notes to CVTerms if valid URI for entity in entity_list: if not entity.isSetMetaId(): entity.setMetaId("meta_" + entity.getId()) notes_list = [] elem_used = [] notes_string = entity.getNotesString().split("\n") for elem in notes_string: for id_db in db2prefix.keys(): if "<p>" + id_db in elem.upper(): elem_used.append(elem) # @DEBUG print(elem.strip()[:-4].split(r': ')[1]) fill_in = re.split(r":\s*", elem.strip()[:-4])[1] if (";") in fill_in: if not re.search(r"inchi", id_db, re.IGNORECASE): entries = fill_in.split(r";") for entry in entries: if not re.fullmatch( r"^nan$", entry.strip(), re.IGNORECASE ): add_cv_term(entry.strip(), id_db, entity) else: if not re.fullmatch(r"^nan$", fill_in, re.IGNORECASE): add_cv_term(fill_in.strip(), id_db, entity) for elem in notes_string: if elem not in elem_used and elem not in notes_list: notes_list.append(elem) # Adding new, shortened notes new_notes = " ".join([str(elem) + "\n" for elem in notes_list]) entity.unsetNotes() entity.setNotes(new_notes)
# @DEBUG print(species.getAnnotationString()) # correct basic model set-up # ---------------------------
[docs] def polish_model_units(model: libModel) -> None: """Replaces the list of unit definitions with the unit definitions needed for FBA: - mmol per gDW per h - mmol per gDW - hour (h) - femto litre (fL) Args: - model (libModel): Model loaded with libSBML """ # Get FBA unit definitions per refineGEMs definition fba_unit_defs = create_fba_units(model) # Get model unit definitions model_unit_defs = model.getListOfUnitDefinitions().clone() # If list of unit definitions is not empty, replace all units with the defined FBA units # & Print the non-FBA unit definitions if model_unit_defs: # List to collect all non-FBA unit definitions removed_unit_defs = [] # Check if model unit definitions fit to the fba unit definitions for model_ud in model_unit_defs: for fba_ud in fba_unit_defs: # In case of identical unit definitions, remove unit definition from fba unit def list if not UnitDefinition.areIdentical(fba_ud, model_ud): removed_unit_defs.append(model_ud) # Remove all model unit definitions model.getListOfUnitDefinitions().clear(doDelete=True) # Only print list if UnitDefinitions were removed if len(removed_unit_defs) == 0: logging.warning( """ The following UnitDefinition objects were removed. The reasoning is that \t(a) these UnitDefinitions are not contained in the UnitDefinition list of this program and \t(b) the UnitDefinitions defined within this program are handled as ground truth. Thus, the following UnitDefinitions are not seen as relevant for the model. """ ) print_UnitDefinitions(removed_unit_defs) # Add all defined FBA units to the model for unit_def in fba_unit_defs: model.getListOfUnitDefinitions().append(unit_def)
[docs] def set_model_default_units(model: libModel): """Sets default units of model Args: - model (libModel): Model loaded with libSBML """ for unit in model.getListOfUnitDefinitions(): unit_id = unit.getId() if re.fullmatch(r"mmol_per_gDW", unit_id, re.IGNORECASE): if not (model.isSetExtentUnits() and model.getExtentUnits() == unit_id): model.setExtentUnits(unit_id) if not ( model.isSetSubstanceUnits() and model.getSubstanceUnits() == unit_id ): model.setSubstanceUnits(unit_id) if not ( model.isSetTimeUnits() and model.getTimeUnits() == unit_id ) and re.fullmatch(r"hr?", unit_id, re.IGNORECASE): model.setTimeUnits(unit_id) if not ( model.isSetVolumeUnits() and model.getVolumeUnits() == unit_id ) and re.fullmatch(r"fL", unit_id, re.IGNORECASE): model.setVolumeUnits(unit_id)
[docs] def set_units_of_parameters(model: libModel): """Sets units of parameters in model Args: - model (libModel): Model loaded with libSBML """ for ( param ) in ( model.getListOfParameters() ): # needs to be added to list of unit definitions aswell if any( ( unit_id := re.fullmatch( r"mmol_per_gDW_per_hr?", unit.getId(), re.IGNORECASE ) ) for unit in model.getListOfUnitDefinitions() ): if not (param.isSetUnits() and param.getUnits() == unit_id.group(0)): param.setUnits(unit_id.group(0))
[docs] def add_compartment_structure_specs(model: libModel): """| Adds the required specifications for the compartment structure | if not set (size & spatial dimension) Args: - model (libModel): Model loaded with libSBML """ for compartment in model.getListOfCompartments(): if not compartment.isSetSize(): compartment.setSize(float("NaN")) if not compartment.isSetSpatialDimensions(): compartment.setSpatialDimensions(3) if any( (unit_id := re.fullmatch(r"fL", unit.getId(), re.IGNORECASE)) for unit in model.getListOfUnitDefinitions() ): if not ( compartment.isSetUnits() and compartment.getUnits() == unit_id.group(0) ): compartment.setUnits(unit_id.group(0))
[docs] def set_initial_amount_metabs(model: libModel): """Sets initial amount to all metabolites if not already set or if initial concentration is not set Args: - model (libModel): Model loaded with libSBML """ for species in model.getListOfSpecies(): if not (species.isSetInitialAmount() or species.isSetInitialConcentration()): species.setInitialAmount(float("NaN"))
[docs] def polish_entity_conditions(entity_list: Union[ListOfSpecies, ListOfReactions]): """Sets boundary condition and constant if not set for an entity Args: - entity_list (Union[ListOfSpecies, ListOfReactions]): libSBML ListOfSpecies or ListOfReactions """ match entity_list: case ListOfSpecies(): for entity in entity_list: if not entity.getBoundaryCondition(): entity.setBoundaryCondition(False) if not entity.getConstant(): entity.setConstant(False) case ListOfReactions(): pass case _: logging.warning( f"Unsupported type for entity_list {type(entity_list)}. Must be ListOfSpecies or ListOfReactions." )
# duplicates # ----------
[docs] def resolve_duplicate_reactions( model: cobra.Model, based_on: str = "reaction", remove_reac: bool = True ) -> cobra.Model: """Resolve and remove duplicate reaction based on their reaction equation and matching database identifiers. Only if all match or a comparison with nan occurs will one of the reactions be removed. Args: - model (cobra.Model): A model loaded with COBRApy. - based_on (str, optional): Label to base the resolvement process on . Can be 'reaction' or any other annotation label. Defaults to 'reaction'. - remove_reac (bool, optional): When True, combines and remove duplicates. Otherwise only reports the findings. Defaults to True. Returns: cobra.Model: The model. """ # get annotation and compartment information anno_reac = [] for r in model.reactions: anno_reac.append( {"id": r.id, "compartment": str(r.compartments), "reaction": r.reaction} | r.annotation ) df_reac = pd.DataFrame.from_dict(anno_reac) # check if based_on is valid if not based_on in df_reac.columns.tolist(): warnings.warn( f"Warning: Annotation column {based_on} does not exists. Search for duplicates will be skipped." ) return model # set basic parameters skip_cols = ["id", "compartment", "bigg.reaction", "reaction", based_on] colnames = df_reac.columns.tolist() for c in df_reac.groupby("compartment"): # note: using groupby drops nans for mnx in c[1].groupby(based_on): # find possible duplicates dupl = True annotations = {} if len(mnx[1]) > 1: # check annotations for col in [_ for _ in colnames if not _ in skip_cols]: if len(mnx[1][col].dropna().value_counts()) < 2: annotations[col] = ( mnx[1][col].dropna().explode().unique().tolist() ) else: dupl = False break # if duplicate found if dupl: if remove_reac: # choose reaction to keep keep_reac = model.reactions.get_by_id(mnx[1]["id"].tolist()[0]) # resolve annotations for key, value in annotations.items(): if len(value) > 0 and not key in keep_reac.annotation: keep_reac.annotation[key] = value # combine gene reaction rules for r_id in mnx[1]["id"].tolist()[1:]: gpr_to_add = model.reactions.get_by_id( r_id ).gene_reaction_rule if gpr_to_add and gpr_to_add != "": if ( keep_reac.gene_reaction_rule and keep_reac.gene_reaction_rule != "" ): # add two existing rules keep_reac.gene_reaction_rule = ( keep_reac.gene_reaction_rule + " or " + gpr_to_add ) else: keep_reac.gene_reaction_rule = ( gpr_to_add # add the one existing the other ) else: pass # nothing to add model.reactions.get_by_id(r_id).remove_from_model() print( f"\tDuplicate reaction {r_id} found. Combined to {keep_reac.id} and deleted." ) else: print( f'\tDuplicate reactions {", ".join(mnx[1]["id"].tolist())} found.' ) return model
[docs] def resolve_duplicate_metabolites( model: cobra.Model, based_on: str = "metanetx.chemical", replace: bool = True ) -> cobra.Model: """Resolve duplicate metabolites in a model. Metabolites are considered duplicate if they share the same annotations (same or nan). .. note:: Depending on the starting database, the results might differ. Args: - model (cobra.Model): The model loaded with COBRApy. - based_on (str, optional): Label to base the resolvement process on . Can be any annotation label. Defaults to 'metanetx.chemical'. - replace (bool, optional): Either report the duplicates (False) or replace them with one (True). Defaults to True. Returns: cobra.Model: The model. """ # get annotation and compartment information anno_meta = [] for m in model.metabolites: anno_meta.append({"id": m.id, "compartment": m.compartment} | m.annotation) df_meta = pd.DataFrame.from_dict(anno_meta) # check if based_on is valid if not based_on in df_meta.columns.tolist(): warnings.warn( f"Warning: Annotation {based_on} not found. Search for metabolite duplicates skipped." ) return model # set basic parameters skip_cols = ["id", "compartment", "bigg.metabolite", based_on] colnames = df_meta.columns.tolist() # get objective function bof_list = test_biomass_presence(model) if len(bof_list) == 1: objective_function = bof_list[0] elif len(bof_list) > 1: mes = f"Multiple BOFs detected. Will be using {bof_list[0]}" warnings.warn(mes, category=UserWarning) objective_function = bof_list[0] else: mes = "No BOF detected. Might lead to problems during duplicate removal." warnings.warn(mes, category=UserWarning) for c in df_meta.groupby("compartment"): # note: using groupby drops nans # note: mnx as starting point was choosen, as it seems to have the most annotations (easy to get) c[1][based_on] = c[1][based_on].apply( lambda x: tuple(x) if isinstance(x, list) else x ) for mnx in c[1].groupby(based_on): # find possible duplicates dupl = True annotations = {} if len(mnx[1]) > 1: for col in [_ for _ in colnames if not _ in skip_cols]: # check annotation, if current group truly consists of duplicates if len(mnx[1][col].dropna().value_counts()) < 2: annotations[col] = ( mnx[1][col].dropna().explode().unique().tolist() ) else: dupl = False break # no duplicates = check next else: continue # if duplicate is found: if dupl: # either replace ... if replace: # choose metabolite to keep keep_meta = model.metabolites.get_by_id(mnx[1]["id"].tolist()[0]) # resolve annotations for key, value in annotations.items(): if len(value) > 0 and not key in keep_meta.annotation: keep_meta.annotations[key] = value # note: charge and formula should be in valid range and be corrected by MCC (if needed) # replace duplicates with the metabolite to be kept # to ensure consistency, only delete duplicate metabolites, which # do NOT share ANY reactions # retrieve reactions for metabolites set for keeping keep_reac = [ _.id for _ in model.metabolites.get_by_id(keep_meta.id).reactions ] # iterate over metabolites set for deletion for del_meta_id in mnx[1]["id"].tolist()[1:]: # retrieve reaction for metabolites set for deletion del_reac = [ _.id for _ in model.metabolites.get_by_id(del_meta_id).reactions ] # get intersection of reactions (keep + del) reac_intersec = list(set(keep_reac) & set(del_reac)) # if intersection empty, metabolite is with a high probability indeed a duplicate # Special case: NH3 / NH4 # intersection does not have to be emtpy, know 'problem' caused by CarveMe if len(reac_intersec) == 0 or all( [ re.search(NH_PATTERN, _) for _ in [keep_meta.id, del_meta_id] ] ): # automated deletion is only advisable, if consistency can be retained perform_deletion = True with model as model_del: # if the special case is detected ... if all( [ re.search(NH_PATTERN, _) for _ in [keep_meta.id, del_meta_id] ] ): print( f"\tSpecial case -Duplicate NH4/NH3- detected.\n\tTrying to solve by additionally removing reactions containing both metabolites." ) # ... remove reactions with nh3 and nh4 both present for del_reac_id in reac_intersec: # if objective_function is part of the set # automated deletion is (currently) not possible if del_reac_id == objective_function: perform_deletion = False break model_del.reactions.get_by_id( del_reac_id ).remove_from_model() # set the metabolites to be deleted to be the one NOT in the objective functions # to avoid inconsistencies if del_meta_id in [ _.id for _ in model_del.reactions.get_by_id( objective_function ).metabolites ]: temp = del_meta_id del_meta_id = keep_meta.id keep_meta = model_del.metabolites.get_by_id( temp ) # try replacing metabolite with the kept duplicate ... for reac in model_del.metabolites.get_by_id( del_meta_id ).reactions: reac.reaction = reac.reaction.replace( del_meta_id, keep_meta.id ) # skip if objective function is found if reac.id == objective_function: continue # check if consistency is still intact balance_test = reac.check_mass_balance() if not reac.boundary and len(balance_test) > 0: # try fixing H-balance if "H" in balance_test.keys(): # .............................. # TODO: # get H according to compartment # current implementation relies heavily # on 'correct' use input: compartment should have format C_c or c (C_p, p, C_e, e etc.) # .............................. reac_comp = reac.compartments.pop()[-1] if reac_comp == "c": reac.subtract_metabolites( {"h_c": balance_test["H"]} ) elif reac_comp == "p": reac.subtract_metabolites( {"h_p": balance_test["H"]} ) elif reac_comp == "e": reac.subtract_metabolites( {"h_e": balance_test["H"]} ) else: perform_deletion = False break # .............................. # TODO: # fix other possible problems # .............................. # finally, check balance again (continue only if fixed, else break) if len(reac.check_mass_balance()) > 0: perform_deletion = False break else: continue # if not problems are found, duplicate is removed if perform_deletion: model = model_del.copy() print( f"\tDuplicate metabolite {del_meta_id} found. Replaced with {keep_meta.id}." ) # if problems are not solvable, duplicate is kept and only reported else: print( f"\tDuplicate metabolite {del_meta_id} found (duplicate to {keep_meta.id} based on annotation).\n\t\tAutomated deletion not possible due to problems with consistency." ) # else, metabolite is kept # since it might be an isomer, elongation, or other explanation # for the same annotation else: print( f"\tDuplicate metabolite {del_meta_id} found (duplicate to {keep_meta.id} based on annotation).\n\t\tKept, as reaction containing both metabolites was found." ) # ... or only report duplicates else: print( f'\tDuplicate metabolite(s) {", ".join(mnx[1]["id"].tolist())} found.' ) return model
[docs] def resolve_duplicates( model: cobra.Model, check_reac: bool = True, check_meta: Literal["default", "exhaustive", "skip"] = "default", replace_dupl_meta: bool = True, remove_unused_meta: bool = False, remove_dupl_reac: bool = True, ) -> cobra.Model: """Resolve and remove (optional) duplicate metabolites and reactions in the model. Args: - model (cobra.Model): The model loaded with COBRApy. - check_reac (bool, optional): Whether to check reactions for duplicates. Defaults to True. - check_meta (Literal['default','exhaustive','skip'], optional): Whether to check for duplicate metabolites. Defaults to 'default'. - replace_dupl_meta (bool, optional): Option to replace/remove duplicate metabolites. Defaults to True. - remove_unused_meta (bool, optional): Option to remove unused metabolites. Defaults to False. - remove_dupl_reac (bool, optional): Option to combine/remove duplicate reactions. Defaults to True. Returns: cobra.Model: The (edited) model. """ # resolve duplicate metabolites if check_meta == "default": # resolve duplicates starting with the metanetx.chemical database identifiers model = resolve_duplicate_metabolites(model, replace=replace_dupl_meta) elif check_meta == "exhaustive": # resolve duplicates by starting at every database identifer one after another # note: bigg and sbo are skipped as sbo gives not much information and bigg is # usually the one that differs (naming issue) anno_types = set() # get all database annotation types present in the model for m in model.metabolites: anno_types = anno_types | set(m.annotation.keys()) for colname in [_ for _ in anno_types if not _ in ["bigg.metabolite", "sbo"]]: model = resolve_duplicate_metabolites( model, colname, replace=replace_dupl_meta ) elif check_meta == "skip": print("\tSkip check for duplicate metabolites.") else: warnings.warn( f"Warning: Unknown option for metabolites duplicate checking {check_meta}. Search for metabolite duplicates skipped." ) # remove now unused metabolites if remove_unused_meta: model, removed = cobra.manipulation.delete.prune_unused_metabolites(model) print( f'\tThe following metabolites () have been removed: {", ".join([x.id for x in removed])}' ) # resolve duplicate reactions if check_reac: model = resolve_duplicate_reactions( model, based_on="reaction", remove_reac=remove_dupl_reac ) return model
# Directionality Control # ----------------------
[docs] def check_direction(model: cobra.Model, data: Union[pd.DataFrame, str]) -> cobra.Model: """Check the direction of reactions by searching for matching MetaCyc, KEGG and MetaNetX IDs as well as EC number in a downloaded BioCyc (MetaCyc) database table or dataFrame (need to contain at least the following columns: Reactions (MetaCyc ID),EC-Number,KEGG reaction,METANETX,Reaction-Direction. Args: model (cobra.Model): The model loaded with COBRApy. data (pd.DataFrame | str): Either a pandas DataFrame or a path to a CSV file containing the BioCyc smart table. Raises: - TypeError: Unknown data type for parameter data Returns: cobra.Model: The edited model. """ match data: # already a DataFrame case pd.DataFrame(): pass case str(): # load from a table data = pd.read_csv(data, sep="\t") # rewrite the columns into a better comparable/searchable format data["KEGG reaction"] = data["KEGG reaction"].str.extract(r".*>(R\d*)<.*") data["METANETX"] = data["METANETX"].str.extract(r".*>(MNXR\d*)<.*") data["EC-Number"] = data["EC-Number"].str.extract(r"EC-(.*)") case _: mes = f"Unknown data type for parameter data: {type(data)}" raise TypeError(mes) # check direction # -------------------- for r in model.reactions: direction = None # easy case: metacyc is already (corretly) annotated if ( "metacyc.reaction" in r.annotation and len(data[data["Reactions"] == r.annotation["metacyc.reaction"]]) != 0 ): direction = data[data["Reactions"] == r.annotation["metacyc.reaction"]][ "Reaction-Direction" ].iloc[0] r.notes["BioCyc direction check"] = f"found {direction}" # complicated case: no metacyc annotation else: annotations = [] # collect matches if ( "kegg.reaction" in r.annotation and r.annotation["kegg.reaction"] in data["KEGG reaction"].tolist() ): annotations.append( data[data["KEGG reaction"] == r.annotation["kegg.reaction"]][ "Reactions" ].tolist() ) if ( "metanetx.reaction" in r.annotation and r.annotation["metanetx.reaction"] in data["METANETX"].tolist() ): annotations.append( data[data["METANETX"] == r.annotation["metanetx.reaction"]][ "Reactions" ].tolist() ) if ( "ec-code" in r.annotation and r.annotation["ec-code"] in data["EC-Number"].tolist() ): annotations.append( data[data["EC-Number"] == r.annotation["ec-code"]][ "Reactions" ].tolist() ) # check results # no matches if len(annotations) == 0: r.notes["BioCyc direction check"] = "not found" # matches found else: # built intersection intersec = set(annotations[0]).intersection(*annotations) # case 1: exactly one match remains if len(intersec) == 1: entry = intersec.pop() direction = data[data["Reactions"] == entry][ "Reaction-Direction" ].iloc[0] r.annotation["metacyc.reaction"] = entry r.notes["BioCyc direction check"] = f"found {direction}" # case 2: multiple matches found -> inconclusive else: r.notes["BioCyc direction check"] = f"found, but inconclusive" # update direction if possible and needed if not pd.isnull(direction): if "REVERSIBLE" in direction: # set reaction as reversible by setting default values for upper and lower bounds r.lower_bound = cobra.Configuration().lower_bound elif "RIGHT-TO-LEFT" in direction: # invert the default values for the boundaries r.lower_bound = cobra.Configuration().lower_bound r.upper_bound = 0.0 elif "LEFT-To-RIGHT" in direction: # In case direction was already wrong r.lower_bound = 0.0 r.upper_bound = cobra.Configuration().upper_bound else: # left to right case is the standard for adding reactions # = nothing left to do continue return model
# Perform all clean-up steps # --------------------------
[docs] def polish_model( model: libModel, id_db: str = "BiGG", mapping_tbl_file: str = None, gff_paths: list[str] = None, email: str = None, contains_locus_tags: bool = False, lab_strain: bool = False, kegg_organism_id: str = None, reaction_direction: str = None, outpath: str = None, ) -> libModel: """Completes all steps to polish a model .. note:: So far only tested for models having either BiGG or VMH identifiers. Args: - model (libModel): Model loaded with libSBML - id_db (str, optional): Main database where identifiers in model come from. Defaults to 'BiGG'. - mapping_tbl_file (str, optional): Path to a file containing a mapping table with columns ``model_id | X...`` where X can be ``REFSEQ``, ``NCBI``, ``locus_tag`` or ``UNCLASSIFIED``. The table can contain all of the ``X`` columns or at least one of them. Defaults to None. - gff_paths (list[str], optional): Path(s) to GFF file(s). Allowed GFF formats are: RefSeq, NCBI and Prokka. This is only used when mapping_tbl_file == None. Defaults to None. - email (str, optional): E-mail for NCBI queries. This is only used when mapping_tbl_file == None. Defaults to None. - contains_locus_tags (bool, optional): Specifies if provided model has locus tags within the label tag if set to True. This is only used when mapping_tbl_file == None. Defaults to False. - lab_strain (bool, optional): Specifies if a strain from no database was provided and thus has only homolog mappings, if set to True. Defaults to False. - kegg_organism_id (str, optional): KEGG organism identifier if available. Defaults to None. - reaction_direction (str, optional): Path to a CSV file containing the BioCyc smart table with the columns ``Reactions (MetaCyc ID) | EC-Number | KEGG reaction | METANETX | Reaction-Direction``. For more details see :py:func:`~refinegems.curation.curate.check_direction` Defaults to None. - outpath (str, optional): Output path for mapping table from model ID to valid database IDs (if mapping_tbl_file == None) & incorrect annotations file(s). Defaults to None. Returns: libModel: Polished libSBML model """ ### Set-up # Get ListOf objects metab_list = model.getListOfSpecies() reac_list = model.getListOfReactions() gene_list = model.getPlugin("fbc").getListOfGeneProducts() ### unit definition ### polish_model_units(model) set_model_default_units(model) set_units_of_parameters(model) add_compartment_structure_specs(model) set_initial_amount_metabs(model) ### improve metabolite, reaction and gene annotations ### extend_metab_reac_annots_via_id(metab_list, id_db) extend_metab_reac_annots_via_id(reac_list, id_db) extend_metab_reac_annots_via_notes(metab_list) extend_metab_reac_annots_via_notes(reac_list) update_annotations_from_others(model) ### Extend annotations for GeneProducts ### extend_gp_annots_via_mapping_table( model, mapping_tbl_file, gff_paths, email, contains_locus_tags, lab_strain, outpath ) if kegg_organism_id: extend_gp_annots_via_KEGG(gene_list, kegg_organism_id) ### Check reaction direction ### if reaction_direction: check_direction(model, reaction_direction) ### set boundaries and constants ### polish_entity_conditions(metab_list) polish_entity_conditions(reac_list) ### MIRIAM compliance of CVTerms ### print( "Remove duplicates & transform all CURIEs to the new identifiers.org pattern (: between db and ID):" ) model = polish_annotations(model, True, outpath) print("Changing all qualifiers to be MIRIAM compliant:") model = change_all_qualifiers(model, lab_strain) return model