Source code for refinegems.utility.io

#!/usr/bin/env python
"""Provides functions to load and write models, media definitions and the manual annotation table

Depending on the application the model needs to be loaded with COBRApy (e.g. memote)
or with libSBML (e.g. activation of groups). Some might even require both (e.g. gap filling).
The manual_annotations table has to follow the specific layout given in the data folder in order to work with this module.
"""

__author__ = "Carolin Brune, Tobias Fehrenbach, Famke Baeuerle and Gwendolyn O. Döbel"

################################################################################
# requirements
################################################################################


import cobra
import gffutils
import logging
import os
import pandas as pd
import re
import sqlalchemy
import sqlite3

from ols_client import EBIClient
from Bio import SeqIO
from libsbml import Model as libModel
from libsbml import SBMLReader, writeSBMLToFile, SBMLValidator, SBMLDocument
from pathlib import Path
from typing import Literal, Union

from .databases import PATH_TO_DB

################################################################################
# variables
################################################################################

################################################################################
# functions
################################################################################

# models
# ------


[docs] def load_model( modelpath: Union[str, list[str]], package: Literal["cobra", "libsbml"] ) -> Union[cobra.Model, list[cobra.Model], libModel, list[libModel]]: """Load a model. Args: - modelpath (str | list[str]): Path to the model or list of paths to models (string format). - package (Literal['cobra','libsbml']): Package to use to load the model. Returns: cobra.Model|list[cobra.Model]|libModel|list[libModel]: The loaded model(s). """ def load_cobra_model(modelpath: str) -> cobra.Model: """Load a model using COBRApy. Args: modelpath (str): Path to the model. Can be a xml, json, yml or mat file. Raises: - ValueError: Unknown file extension Returns: cobra.Model: The loaded model object. """ extension = os.path.splitext(modelpath)[1].replace(".", "") match extension: case "xml" | "sbml": data = cobra.io.read_sbml_model(modelpath) case "json": data = cobra.io.load_json_model(modelpath) case "yml" | "yaml": data = cobra.io.load_yaml_model(modelpath) case "mat": data = cobra.io.load_matlab_model(modelpath) case _: raise ValueError("Unknown file extension for model: ", extension) return data def load_libsbml_model(modelpath: str) -> libModel: """Load a model with libsbml. Args: modelpath (str): Path to the model. Should be xml. Returns: libModel: The loaded model object """ extension = os.path.splitext(modelpath)[1].replace(".", "") match extension: case "xml" | "sbml": reader = SBMLReader() read = reader.readSBMLFromFile(modelpath) # read from file mod = read.getModel() case "json" | "yml" | "mat": data = load_cobra_model(modelpath) mod = cobra.io.sbml._model_to_sbml(data).getModel() case _: raise ValueError("Unknown file extension for model: ", extension) return mod match modelpath: # read in multiple models case list(): loaded_models = [] for m in modelpath: if package == "cobra": loaded_models.append(load_cobra_model(m)) elif package == "libsbml": loaded_models.append(load_libsbml_model(m)) else: mes = f"Unknown type for package. Unable to load model." raise ValueError(mes) return loaded_models # read in a single model case str(): if package == "cobra": return load_cobra_model(modelpath) elif package == "libsbml": return load_libsbml_model(modelpath) else: mes = f"Unknown type for package. Unable to load model." raise ValueError(mes) case Path(): modelpath = str(modelpath) if package == "cobra": return load_cobra_model(modelpath) elif package == "libsbml": return load_libsbml_model(modelpath) else: mes = f"Unknown type for package. Unable to load model." raise ValueError(mes) case _: mes = f"Unkown type for modelpath: {modelpath}" raise TypeError(mes)
[docs] def load_document_libsbml(modelpath: str) -> SBMLDocument: """Loads model document using libSBML Args: - modelpath (str): Path to GEM Returns: SBMLDocument: Loaded document by libSBML """ reader = SBMLReader() read = reader.readSBMLFromFile(modelpath) # read from file return read
[docs] def write_model_to_file( model: Union[libModel, cobra.Model], filename: Union[str, Path] ): """Save a model into a file. Args: - model (libModel|cobra.Model): The model to be saved - filename (str|Path): The filename/path to save the model to. Raises: - ValueError: Unknown file extension for model - TypeError: Unknown model type """ def _write_cobra_model_to_file(model: cobra.Model, filepath: Path): """Helper function of :py:func:`~refingems.utility.io.write_model_to_file`. Write a model loaded with COBRApy to a file. Args: - model (cobra.Model): The COBRApy model. - filepath (Path): The file path to save the model to. Extensions sets the file format. Raises: - ValueError: Unknown file extension for model """ try: extension = filepath.suffix.replace(".", "") match extension: case "xml": cobra.io.write_sbml_model(model, filepath) case "json": cobra.io.save_json_model(model, filepath) case "yml": cobra.io.save_yaml_model(model, str(filepath)) case "mat": cobra.io.save_matlab_model(model, filepath) case _: raise ValueError("Unknown file extension for model: ", extension) logging.info("Modified model written to " + str(filepath)) except OSError as e: print("Could not write to file. Wrong path?") # Cast filename to Path object if string is provided if isinstance(filename, str): filename = Path(filename) # save cobra model if isinstance(model, cobra.core.model.Model): _write_cobra_model_to_file(model, filename) # save libsbml model elif isinstance(model, libModel): try: extension = filename.suffix.replace(".", "") match extension: case "xml": new_document = model.getSBMLDocument() writeSBMLToFile(new_document, str(filename)) case "json" | "yml" | "mat": data = cobra.io.sbml._sbml_to_model(model) _write_cobra_model_to_file(data, filename) case _: raise ValueError("Unknown file extension for model: ", extension) logging.info("Modified model written to " + str(filename)) except OSError as e: print("Could not write to file. Wrong path?") # unknown model type or no model else: message = f"Unknown model type {type(model)}. Cannot save." raise TypeError(message)
# media # -----
[docs] def load_substance_table_from_db( mediumname: str, database: str, type: Literal["testing", "standard"] = "standard" ) -> pd.DataFrame: """Load a substance table from a database. Currently available types: - 'testing': for debugging - 'standard': The standard format containing all information in long format. Note: 'documentation' currently object to change Args: - name (str): The name (or identifier) of the medium. - database (str): Path to the database. - type (Literal['testing','standard'], optional): How to load the table. Defaults to 'standard'. Raises: - ValueError: Unknown type for loading the substance table. Returns: pd.DataFrame: The substance table in the specified type retrieved from the database. """ # build connection to DB connection = sqlite3.connect(database) cursor = connection.cursor() match type: # use this for debugging case "testing": result = cursor.execute( """SELECT substance.id, substance.name, substance.formula, medium2substance.flux FROM medium, medium2substance, substance WHERE medium.name = ? AND medium.id = medium2substance.medium_id AND medium2substance.substance_id = substance.id """, (mediumname,), ) substance_table = result.fetchall() substance_table = pd.DataFrame( substance_table, columns=["id", "name", "formula", "flux"] ) # create table with all information, standard for generating the Medium table case "standard": result = cursor.execute( """SELECT substance.name, substance.formula, medium2substance.flux , medium2substance.source, substance2db.db_id, substance2db.db_type FROM medium, medium2substance, substance, substance2db WHERE medium.name = ? AND medium.id = medium2substance.medium_id AND medium2substance.substance_id = substance.id AND substance2db.substance_id = substance.id """, (mediumname,), ) substance_table = result.fetchall() substance_table = pd.DataFrame( substance_table, columns=["name", "formula", "flux", "source", "db_id", "db_type"], ) # default: throw error case _: connection.close() raise ValueError(f"Unknown type for loading the substance table: {type}") # close connection connection.close() return substance_table
[docs] def load_subset_from_db(subset_name: str) -> tuple[str, str, pd.DataFrame]: """Load a subset from the database. Args: - subset_name(str): Name of the subset to be loaded. Returns: tuple of (1) str, (2) str and (3) pd.DataFrame (1) name of the subset (2) description of the subset (3) substance table for the subset """ # open connection to database connection = sqlite3.connect(PATH_TO_DB) cursor = connection.cursor() # check if subset name is valid result = cursor.execute( """SELECT * FROM subset WHERE subset.name = ?""", (subset_name,) ) check = result.fetchone() if check: # set name name = subset_name # set description description = check[2] # retrieve subset from database db_res = cursor.execute( """SELECT substance.name, subset2substance.percent FROM substance, subset, subset2substance WHERE subset.name = ? AND subset.id = subset2substance.subset_id AND subset2substance.substance_id = substance.id """, (subset_name,), ) substance_table = pd.DataFrame(db_res.fetchall(), columns=["name", "percent"]) return (name, description, substance_table)
# other # -----
[docs] def load_a_table_from_database( table_name_or_query: str, query: bool = True ) -> pd.DataFrame: """| Loads the table for which the name is provided or a table containing all rows for which the query evaluates to | true from the refineGEMs database ('data/database/data.db') Args: - table_name_or_query (str): Name of a table contained in the database 'data.db'/ a SQL query - query (bool): Specifies if a query or a table name is provided with table_name_or_query Returns: pd.DataFrame: Containing the table for which the name was provided from the database 'data.db' """ table_name_or_query = ( sqlalchemy.text(table_name_or_query) if query else table_name_or_query ) sqlalchemy_engine_input = f"sqlite:///{PATH_TO_DB}" engine = sqlalchemy.create_engine(sqlalchemy_engine_input) open_con = engine.connect() db_table = pd.read_sql(table_name_or_query, open_con) open_con.close() return db_table
[docs] def parse_dict_to_dataframe(str2list: dict) -> pd.DataFrame: """| Parses dictionary of form {str: list} & | Transforms it into a table with a column containing the strings and a column containing the lists Args: str2list (dict): Dictionary mapping strings to lists Returns: pd.DataFrame: Table with column containing the strings and column containing the lists """ # Get max number of list length max_len_of_list = max(map(len, str2list.values())) # Fill lists with None until all lists have the same size -> Required for pd.DataFrame for key in str2list: current_list = str2list.get(key) while len(current_list) != max_len_of_list: str2list.get(key).append(None) df = pd.DataFrame.from_dict(str2list).stack().T.reset_index() df = df.drop("level_0", axis=1) return df
[docs] def validate_libsbml_model(model: libModel) -> int: """Debug method: Validates a libSBML model with the libSBML validator Args: - model (libModel): A libSBML model Returns: int: Integer specifying if validate was successful or not """ validator = SBMLValidator() doc = model.getSBMLDocument() return validator.validate(doc)
# FASTA # -----
[docs] def create_missing_genes_protein_fasta( fasta: str, missing_genes: pd.DataFrame, outdir: str = None, ) -> str: """Creates a FASTA file containing proteins for missing_genes .. note:: Please keep in mind that the input FASTA file has to have Genbank format. Args: - fasta (str): Path to the FASTA protein file. - missing_genes (pd.DataFrame): The table of missing genes. - outdir (str, optional): Path to a directory to write the output to. Defaults to None. Returns: str: Path to the FASTA protein file for the missing genes. """ # format the missing genes' locus tags locus_tags_dict = { _: "[locus_tag=" + _ + "]" for _ in list(missing_genes["locus_tag"]) } # parse the protein FASTA protfasta = SeqIO.parse(fasta, "fasta") # extract the sequences of the missing genes only missing_seqs = [] for seq in protfasta: # Case 1: locus tag equals FASTA header if seq.id in locus_tags_dict.keys(): missing_seqs.append(seq) # Case 2: locus tag as descriptor elif any([k for k, v in locus_tags_dict.items() if v in seq.description]): for k, v in locus_tags_dict.items(): if v in seq.description: seq.id = k missing_seqs.append(seq) break # Case _: locus tag either in model or errounous else: pass # save the collected sequences in a new file if outdir: outfile = str(Path(outdir, "missing_genes.fasta")) else: outfile = str(Path("missing_genes.fasta")) SeqIO.write(missing_seqs, outfile, "fasta") return outfile
# GFF # ---
[docs] def parse_gff_for_cds( gffpath: str, keep_attributes: dict[str:str] = None ) -> Union[pd.DataFrame, tuple[pd.DataFrame, str]]: """Parses a GFF file to obtain a mapping for the corresponding attributes listed in keep_attributes Args: - gffpath (str): Path to the GFF file. - keep_attributes (dict, optional): Dictionary of attributes to be kept and the corresponding column for the table. Defaults to None. Returns: (1) Case: ``return_variety = False`` pd.DataFrame: Dataframe containing a mapping for the corresponding attributes listed in keep_attributes (2) Case: ``return_variety = True`` tuple: tuple of (1) pd.DataFrame & (2) str: (1) pd.DataFrame: Dataframe containing a mapping for the corresponding attributes listed in keep_attributes (2) str: Found variety for provided GFF """ # load the gff gff = gffutils.create_db(gffpath, ":memory:", merge_strategy="create_unique") # extract the attributes of the CDS cds = pd.DataFrame.from_dict([_.attributes for _ in gff.features_of_type("CDS")]) if "locus_tag" in cds.columns: cds = cds.explode("locus_tag") genes = pd.DataFrame.from_dict([_.attributes for _ in gff.features_of_type("gene")]) if "locus_tag" in genes.columns: genes = genes.explode("locus_tag") if "old_locus_tag" in genes.columns and "locus_tag" in genes.columns: cds = cds.merge( genes[["locus_tag", "old_locus_tag"]], how="left", on="locus_tag" ) cds.drop(columns=["locus_tag"], inplace=True) cds.rename(columns={"old_locus_tag": "locus_tag"}, inplace=True) # keep only certain columns if keep_attributes: cds = cds[[_ for _ in cds.columns if _ in list(keep_attributes.keys())]] # rename columns cds.rename( columns={k: v for k, v in keep_attributes.items() if k in cds.columns}, inplace=True, ) if "locus_tag" in cds.columns: cds = cds.explode("locus_tag") return cds
# GBFF # ----
[docs] def parse_gbff_for_cds(file_path: str) -> pd.DataFrame: """Retrieves a table containg information about the following qualifiers from a Genbank file: ['protein_id','locus_tag','db_xref','old_locus_tag','EC_number']. Args: - file_path (str): Path to the Genbank (.gbff) file. Returns: pd.DataFrame: A table containing the information above. Has the following columns= ['ncbi_accession_version', 'locus_tag_ref','old_locus_tag','GeneID','EC number']. """ temp_table = pd.DataFrame( columns=[ "ncbi_accession_version", "locus_tag_ref", "old_locus_tag", "GeneID", "EC number", ] ) attributes = ["protein_id", "locus_tag", "old_locus_tag", "db_xref", "EC_number"] for record in SeqIO.parse(file_path, "genbank"): if record.features: for feature in record.features: if feature.type == "CDS": temp_list = [] for a in attributes: if a in feature.qualifiers.keys(): temp_list.append(feature.qualifiers[a][0]) else: temp_list.append("-") temp_table.loc[len(temp_table)] = temp_list # reformat pat = re.compile(r"\D") temp_table["GeneID"] = [pat.sub("", x) for x in temp_table["GeneID"]] return temp_table
# else: # -----
[docs] def search_sbo_label(sbo_number: str) -> str: """Looks up the SBO label corresponding to a given SBO Term number Args: - sbo_number (str): Last three digits of SBO-Term as str Returns: str: Denoted label for given SBO Term """ sbo_number = str(sbo_number) client = EBIClient() sbo = client.get_term("sbo", "http://biomodels.net/SBO/SBO_0000" + sbo_number) return sbo["_embedded"]["terms"][0]["label"]