Source code for pcasuite.pcz_similarity

#!/usr/bin/env python3

"""Module containing the PCZsimilarity class and the command line interface."""
from typing import Optional
import json
import numpy as np
from pathlib import Path, PurePath
from math import exp
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.tools.file_utils import launchlogger


[docs] class PCZsimilarity(BiobbObject): """ | biobb_flexserv PCZsimilarity | Compute PCA similarity between two given compressed PCZ files. | Wrapper of the pczdump tool from the PCAsuite FlexServ module. Args: input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464). properties (dict - Python dictionary object containing the tool parameters, not input/output files): * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product). * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used. * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. * **container_path** (*str*) - (None) Container path definition. * **container_image** (*str*) - ('afandiadib/ambertools:serial') Container image definition. * **container_volume_path** (*str*) - ('/tmp') Container volume path definition. * **container_working_dir** (*str*) - (None) Container working directory definition. * **container_user_id** (*str*) - (None) Container user_id definition. * **container_shell_path** (*str*) - ('/bin/bash') Path to default shell inside the container. Examples: This is a use example of how to use the building block from Python:: from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz', input_pcz_path2='/path/to/pcazip_input2.pcz', output_json_path='/path/to/pcz_similarity.json', properties=prop) Info: * wrapped_software: * name: FlexServ PCAsuite * version: >=1.0 * license: Apache-2.0 * ontology: * name: EDAM * schema: http://edamontology.org/EDAM.owl """ def __init__(self, input_pcz_path1: str, input_pcz_path2: str, output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None: properties = properties or {} # Call parent class constructor super().__init__(properties) self.locals_var_dict = locals().copy() # Input/Output files self.io_dict = { 'in': {'input_pcz_path1': input_pcz_path1, 'input_pcz_path2': input_pcz_path2}, 'out': {'output_json_path': output_json_path} } # Properties specific for BB self.properties = properties self.amplifying_factor = properties.get('amplifying_factor') self.binary_path = properties.get('binary_path', 'pczdump') # Check the properties self.check_properties(properties) self.check_arguments() # Check two eigenvectors to be compatible for dot product # i.e. same number of vectors and values per vector
[docs] def are_compatible(self, eigenvectors_1, eigenvectors_2): # Check the number of eigenvectors and the number of values in both eigenvectors to match if len(eigenvectors_1) != len(eigenvectors_2): print('WARNING: Number of eigenvectors does not match') return False if len(eigenvectors_1[0]) != len(eigenvectors_2[0]): print('WARNING: Number of values in eigenvectors does not match') return False return True
# Weighted Cross Product (WCP). # Get the weighted cross product between eigenvectors # This is meant to compare PCA results for molecular dynamics structural conformations # The number of eigenvectors to be compared may be specified. All (0) by default # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04) # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
[docs] def get_similarity_index(self, eigenvalues_1, eigenvectors_1, eigenvalues_2, eigenvectors_2, dx=None): # Check the number of eigenvectors and the number of values in both eigenvectors to match if not self.are_compatible(eigenvectors_1, eigenvectors_2): raise SystemExit('Eigenvectors are not compatible') # Find out the total number of eigenvectors # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2)) # Find out the number of atoms in the structure # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z) if len(eigenvectors_1[0]) % 3 != 0: raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3') atom_number = int(len(eigenvectors_1[0]) / 3) # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. # The value of the ~10th eigenvalue is usually taken. if dx is not None: amplifying_factor = dx else: amplifying_factor = eigenvalues_1[eigenvectors_number-1] # Get the denominator # Find new denominator # for ($i=0;$i<$nvec;$i++){ # $cte1+=exp(-1/$val_1[$i]*$ampf); # $cte2+=exp(-1/$val_2[$i]*$ampf); # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf); # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf); # } cte1 = part1 = cte2 = part2 = 0 for eigenvalue in eigenvalues_1: cte1 += exp(-1 / eigenvalue * amplifying_factor) part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2 for eigenvalue in eigenvalues_2: cte2 += exp(-1 / eigenvalue * amplifying_factor) part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2 # Get all eigenvector values together eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev] eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev] # IDK what it is doing now total_summatory = 0 for i in range(eigenvectors_number): for j in range(eigenvectors_number): # Array has vectors in increasing order of vap, get last one first a = (eigenvectors_number - 1 - i) * atom_number * 3 b = (eigenvectors_number - i) * atom_number * 3 - 1 c = (eigenvectors_number - 1 - j) * atom_number * 3 d = (eigenvectors_number - j) * atom_number * 3 - 1 temp1 = eigenvector_values_1[a:b] temp2 = eigenvector_values_2[c:d] if len(temp1) != len(temp2): raise ValueError("Projection of vectors of different size!!") # Project the two vectors add = 0 for k, value_1 in enumerate(temp1): value_2 = temp2[k] add += value_1 * value_2 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor) add2 = add ** 2 total_summatory += add2 similarity_index = total_summatory * 2 / denominator return similarity_index
# Weighted Cross Product (WCP). # DF implementation of Alberto's formula # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
[docs] def eigenmsip(self, eigenvalues_1, eigenvectors_1, eigenvalues_2, eigenvectors_2, dx=None): evals1 = np.array(eigenvalues_1) evals2 = np.array(eigenvalues_2) evecs1 = np.array(eigenvectors_1) evecs2 = np.array(eigenvectors_2) n_components = len(eigenvectors_1) # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. # The value of the ~10th eigenvalue is usually taken. if dx is not None: amplifying_factor = dx else: amplifying_factor = evals1[n_components-1] e1 = np.exp(-(amplifying_factor)**2/evals1) e2 = np.exp(-(amplifying_factor)**2/evals2) e1_2 = np.exp(-2*(amplifying_factor)**2/evals1) e2_2 = np.exp(-2*(amplifying_factor)**2/evals2) sume1 = np.sum(e1) sume2 = np.sum(e2) denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2) # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2)) # numerator_df = 2 * np.sum(numerator_df) val_tmp = 0 accum_a = 0 c = sume1*sume2 for pc in range(0, n_components): for pc2 in range(0, n_components): eve1 = evecs1[pc] eve2 = evecs2[pc2] eva1 = evals1[pc] eva2 = evals2[pc2] a = np.dot(eve1, eve2) b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2) val_tmp = val_tmp + ((a * b)/c)**2 accum_a = accum_a + a numerator = 2 * val_tmp return numerator/(denominator)
# Get the dot product matrix of two eigenvectors
[docs] def dot_product(self, eigenvectors_1, eigenvectors_2): # Check the number of eigenvectors and the number of values in both eigenvectors to match if not self.are_compatible(eigenvectors_1, eigenvectors_2): raise SystemExit('Eigenvectors are not compatible') # Get the dot product dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2)) return dpm
# Get the dot product matrix of two eigenvectors (squared and normalized). # Absolute Similarity Index (Hess 2000, 2002)
[docs] def dot_product_accum(self, eigenvectors_1, eigenvectors_2): n_components = len(eigenvectors_1) # Get the dot product dpm = self.dot_product(eigenvectors_1, eigenvectors_2) sso = (dpm * dpm).sum() / n_components # sso = (dpm * dpm).sum() / n_components return sso
# Get the subspace overlap # Same as before, but with a square root
[docs] def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2): # Get the number of eigenvectors n_components = len(eigenvectors_1) # Get the dot product dpm = self.dot_product(eigenvectors_1, eigenvectors_2) sso = np.sqrt((dpm * dpm).sum() / n_components) # sso = (dpm * dpm).sum() / n_components return sso
# Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap
[docs] def get_rmsip(self, eigenvectors_1, eigenvectors_2): # Get the number of eigenvectors n_components = len(eigenvectors_1) accum = 0 for pc in range(0, n_components): for pc2 in range(0, n_components): dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) val = dpm * dpm accum = accum + val sso = np.sqrt(accum / n_components) # sso = (dpm * dpm).sum() / n_components return sso
# RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values. # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets, # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445
[docs] def get_rwsip(self, eigenvalues_1, eigenvectors_1, eigenvalues_2, eigenvectors_2): # Get the number of eigenvectors n_components = len(eigenvectors_1) accum = 0 norm = 0 for pc in range(0, n_components): for pc2 in range(0, n_components): dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2] accum = accum + val norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc] sso = np.sqrt(accum / norm) # sso = (dpm * dpm).sum() / n_components return sso
[docs] @launchlogger def launch(self): """Launches the execution of the FlexServ pcz_similarity module.""" # Setup Biobb if self.check_restart(): return 0 self.stage_files() if self.container_path: working_dir = self.container_volume_path if self.container_volume_path else "/data" else: working_dir = self.stage_io_dict.get("unique_dir", "") unique_dir = Path(self.stage_io_dict.get("unique_dir", "")) # Temporary output temp_out_1 = "output1.dat" temp_out_2 = "output2.dat" temp_out_1_path = unique_dir.joinpath(temp_out_1) temp_out_2_path = unique_dir.joinpath(temp_out_2) staged_output_json_path = unique_dir.joinpath(Path(self.stage_io_dict["out"]["output_json_path"]).name) # Command line 1 # pczdump -i structure.ca.std.pcz --evals -o evals.txt # self.cmd = [self.binary_path, # Evals pcz 1 # "-i", input_pcz_1, # "-o", temp_out_1, # "--evals", ';', # self.binary_path, # Evals pcz 2 # "-i", input_pcz_2, # "-o", temp_out_2, # "--evals" # ] self.cmd = ['cd', working_dir, ';', self.binary_path, # Evals pcz 1 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name, "-o", temp_out_1, "--evals", ';', self.binary_path, # Evals pcz 2 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name, "-o", temp_out_2, "--evals" ] # Run Biobb block 1 self.run_biobb() # Parse output evals info_dict = {} info_dict['evals_1'] = [] info_dict['evals_2'] = [] with open(temp_out_1_path, 'r') as file: for line in file: info = float(line.strip()) info_dict['evals_1'].append(info) with open(temp_out_2_path, 'r') as file: for line in file: info = float(line.strip()) info_dict['evals_2'].append(info) num_evals_1 = len(info_dict['evals_1']) num_evals_2 = len(info_dict['evals_2']) num_evals_min = min(num_evals_1, num_evals_2) info_dict['num_evals_min'] = num_evals_min # Command line 2 # pczdump -i structure.ca.std.pcz --evals -o evals.txt self.cmd = ['cd', working_dir, ';'] for pc in (range(1, num_evals_min+1)): # Evecs pcz 1 self.cmd.append(self.binary_path) self.cmd.append("-i") self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name) self.cmd.append("-o") self.cmd.append("evecs_1_pc{}".format(pc)) self.cmd.append("--evec={}".format(pc)) self.cmd.append(";") # Evals pcz 2 self.cmd.append(self.binary_path) self.cmd.append("-i") self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name) self.cmd.append("-o") self.cmd.append("evecs_2_pc{}".format(pc)) self.cmd.append("--evec={}".format(pc)) self.cmd.append(";") # Run Biobb block 2 self.run_biobb() # Parse output evecs info_dict['evecs_1'] = {} info_dict['evecs_2'] = {} eigenvectors_1 = [] eigenvectors_2 = [] for pc in (range(1, num_evals_min+1)): pc_id = "pc{}".format(pc) info_dict['evecs_1'][pc_id] = [] info_dict['evecs_2'][pc_id] = [] with open(unique_dir.joinpath("evecs_1_pc{}".format(pc)), 'r') as file: list_evecs = [] for line in file: info = line.strip().split(' ') for nums in info: if nums: list_evecs.append(float(nums)) info_dict['evecs_1'][pc_id] = list_evecs eigenvectors_1.append(list_evecs) with open(unique_dir.joinpath("evecs_2_pc{}".format(pc)), 'r') as file: list_evecs = [] for line in file: info = line.strip().split(' ') for nums in info: if nums: list_evecs.append(float(nums)) info_dict['evecs_2'][pc_id] = list_evecs eigenvectors_2.append(list_evecs) # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex)) # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2) # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct)) eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip)) rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2) info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip)) rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2) info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip)) dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2) info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp)) with open(staged_output_json_path, 'w') as out_file: out_file.write(json.dumps(info_dict, indent=4)) # Copy files to host self.copy_to_host() # Remove temporary folder(s) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) return self.return_code
[docs] def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str, properties: Optional[dict] = None, **kwargs) -> int: """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method""" return PCZsimilarity(**dict(locals())).launch()
pcz_similarity.__doc__ = PCZsimilarity.__doc__ main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.") if __name__ == '__main__': main()