#!/usr/bin/env python3
'''
used for CountSummarizer, DataFetcher and TaxonomyAdder
auhtor, Xun
last modified, 2023-07-07
Mail, makise.kurisu.yoru@gmail.com
'''
import pandas as pd
import argparse
class CountSummarizer:
def __init__(self, args):
self.args = args
def norm_expr(self, probe_expr_df, norm_type):
if norm_type == "raw":
return probe_expr_df
elif norm_type == "cpm":
norm_factor = probe_expr_df.sum()
return probe_expr_df * 1000000 / norm_factor
def run(self):
args = self.args
probe_expr_df = pd.read_csv(args.input_file, sep="\t", header=0, index_col=0)
probe_expr_df.index = probe_expr_df.index.map(str)
# Normalize expression data
abundances = [self.norm_expr(probe_expr_df, norm_type) for norm_type in args.norm_type]
usecols = [0] + args.grp_column
probe_map_df = pd.read_csv(args.map_file, sep="\t", header=0, index_col=None, dtype=str, usecols=usecols)
key_column = probe_map_df.columns[0]
# Modified part starts here
grp_columns = probe_map_df.columns[1:]
# Ensure the separators match the columns
if len(args.grp_sep) == 1:
grp_seps = args.grp_sep * len(grp_columns)
else:
grp_seps = args.grp_sep
for grp_column, grp_sep in zip(grp_columns, grp_seps):
self.process_grp_column(grp_column, grp_sep, key_column, abundances, args.norm_type, args.abundance_keep, probe_map_df)
# Modified part ends here
def process_grp_column(self, grp_column, grp_sep, key_column, abundances, norm_type, abundance_keep, probe_map_df):
"""Processes each group column and saves the aggregated data."""
# Extract the DataFrame for the group column
grp_df = probe_map_df.loc[probe_map_df[grp_column] != "", [key_column, grp_column]]
args = self.args
if grp_sep == "*":
grp_sep = ""
grp_df[grp_column] = grp_df[grp_column].str.split(grp_sep)
grp_df_explode = grp_df.explode(grp_column)
grp_df_explode = grp_df_explode.loc[grp_df_explode[grp_column] != "", ]
# Iterate through normalized abundance dataframes
for abundance_df, norm_type in zip(abundances, norm_type):
merged_df = grp_df_explode.merge(abundance_df, left_on=key_column, right_index=True, how="inner")
# Aggregating data
aggregation_dict = {col: abundance_keep for col in merged_df.columns if col not in [grp_column, key_column]}
final_df = merged_df.groupby(grp_column).agg(aggregation_dict).reset_index()
final_df.to_csv(f"{args.output_prefix}.{grp_column}.{norm_type}.txt", sep="\t", index=False)
class DataFetcher:
def __init__(self, args):
self.args = args
def fetch_data(self):
args = self.args
df_input = pd.read_table(args.input, sep='\t', header=None, dtype=str)
df_reference = pd.read_table(args.reference, sep='\t', header=None, dtype=str)
# Convert columns to integers for matching
col1 = int(args.column.split(',')[0]) - 1
col2 = int(args.column.split(',')[1]) - 1
list_input = df_input.iloc[:, col1].tolist()
list_reference = df_reference.iloc[:, col2].tolist()
# Find common elements
common_elements = set(list_input) & set(list_reference)
# Filter the input DataFrame to only include rows with common elements
df_filtered_input = df_input[df_input.iloc[:, col1].isin(common_elements)]
# Filter the reference DataFrame to only include rows with common elements
df_filtered_ref = df_reference[df_reference.iloc[:, col2].isin(common_elements)]
# Save the output
if args.show_other_columns:
df_output = pd.merge(df_filtered_input, df_filtered_ref, left_on=col1, right_on=col2, how='inner')
else:
df_output = df_filtered_input
df_output.to_csv(args.output, index=False, sep='\t', header=None)
def create_parser():
parser = argparse.ArgumentParser(prog='anno_db.py', description='anno_db includes two main functionalities: data fetching with "cr" command and abundance summarizing with "ex" command.')
subparsers = parser.add_subparsers(dest='command')
# Subparser for the 'cr' command
fetch_parser = subparsers.add_parser('cr', description='This command fetches data from the input file and reference file based on a given column, and outputs the matched rows to the output file.')
fetch_parser.add_argument("-i", "--input", help="The input file.")
fetch_parser.add_argument("-r", "--reference", help="The reference file.")
fetch_parser.add_argument("-o", "--output", help="The output file where the result will be saved.")
fetch_parser.add_argument("-c", "--column", default="1,1", help="The column indices in the input and reference files for matching rows, separated by a comma. Indexing starts from 1.")
fetch_parser.add_argument("-s", "--show_other_columns", action="store_true", help="Whether to show other columns in the reference file.")
# Subparser for the 'ex' command
ex_parser = subparsers.add_parser('ex', description='This command executes the abundance summarization on the input file based on the group information in the map file.')
ex_parser.add_argument("-i", "--input_file", help="Sub-item abundance file with format specified above", required=True)
ex_parser.add_argument("-m", "--map_file", help="Map file containing group information", required=True)
ex_parser.add_argument("-e", "--abundance_keep", default="sum", choices=['sum', 'median', 'min', 'max', 'mean'], help="Keep abundance as median, sum, min, max, or mean")
ex_parser.add_argument("-c", "--grp_column", default=[2], type=lambda s: [int(item) - 1 for item in s.split(',') if item], help="The column(s) contains group information")
ex_parser.add_argument("-s", "--grp_sep", default=[","], type=lambda s: s.split('+'), help="Separator(s) for each group")
ex_parser.add_argument("-n", "--norm_type", default=["raw","cpm"], type=lambda s: s.split(','), help="Specify the output data type")
ex_parser.add_argument("-o", "--output_prefix", default="output", help="Output file prefix")
# Subparser for the 'tax' command
tax_parser = subparsers.add_parser('tax', description='This command adds taxonomy information to the cds.output file.')
tax_parser.add_argument("-i", "--cds_output_file", help="The cds.output file.")
tax_parser.add_argument("-t", "--taxonomy_file", help="The taxonomy.tsv file.")
tax_parser.add_argument("-o", "--output_dir", help="The output directory.")
return parser
def main():
parser = create_parser()
args = parser.parse_args()
if args.command == 'cr':
fetcher = DataFetcher(args)
fetcher.fetch_data()
elif args.command == 'ex':
summarizer = CountSummarizer(args)
summarizer.run()
else:
print("Invalid command")
if __name__ == "__main__":
main()