#!/usr/bin/env python3
"""Module containing the PCZsimilarity class and the command line interface."""
from typing import Optional
import json
import numpy as np
from pathlib import Path, PurePath
from math import exp
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.tools.file_utils import launchlogger
[docs]
class PCZsimilarity(BiobbObject):
"""
| biobb_flexserv PCZsimilarity
| Compute PCA similarity between two given compressed PCZ files.
| Wrapper of the pczdump tool from the PCAsuite FlexServ module.
Args:
input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464).
properties (dict - Python dictionary object containing the tool parameters, not input/output files):
* **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product).
* **binary_path** (*str*) - ("pczdump") pczdump binary path to be used.
* **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
* **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
* **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
* **container_path** (*str*) - (None) Container path definition.
* **container_image** (*str*) - ('afandiadib/ambertools:serial') Container image definition.
* **container_volume_path** (*str*) - ('/tmp') Container volume path definition.
* **container_working_dir** (*str*) - (None) Container working directory definition.
* **container_user_id** (*str*) - (None) Container user_id definition.
* **container_shell_path** (*str*) - ('/bin/bash') Path to default shell inside the container.
Examples:
This is a use example of how to use the building block from Python::
from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity
pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz',
input_pcz_path2='/path/to/pcazip_input2.pcz',
output_json_path='/path/to/pcz_similarity.json',
properties=prop)
Info:
* wrapped_software:
* name: FlexServ PCAsuite
* version: >=1.0
* license: Apache-2.0
* ontology:
* name: EDAM
* schema: http://edamontology.org/EDAM.owl
"""
def __init__(self, input_pcz_path1: str, input_pcz_path2: str,
output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None:
properties = properties or {}
# Call parent class constructor
super().__init__(properties)
self.locals_var_dict = locals().copy()
# Input/Output files
self.io_dict = {
'in': {'input_pcz_path1': input_pcz_path1,
'input_pcz_path2': input_pcz_path2},
'out': {'output_json_path': output_json_path}
}
# Properties specific for BB
self.properties = properties
self.amplifying_factor = properties.get('amplifying_factor')
self.binary_path = properties.get('binary_path', 'pczdump')
# Check the properties
self.check_properties(properties)
self.check_arguments()
# Check two eigenvectors to be compatible for dot product
# i.e. same number of vectors and values per vector
[docs]
def are_compatible(self, eigenvectors_1, eigenvectors_2):
# Check the number of eigenvectors and the number of values in both eigenvectors to match
if len(eigenvectors_1) != len(eigenvectors_2):
print('WARNING: Number of eigenvectors does not match')
return False
if len(eigenvectors_1[0]) != len(eigenvectors_2[0]):
print('WARNING: Number of values in eigenvectors does not match')
return False
return True
# Weighted Cross Product (WCP).
# Get the weighted cross product between eigenvectors
# This is meant to compare PCA results for molecular dynamics structural conformations
# The number of eigenvectors to be compared may be specified. All (0) by default
# DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04)
# Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
# Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
[docs]
def get_similarity_index(self,
eigenvalues_1, eigenvectors_1,
eigenvalues_2, eigenvectors_2, dx=None):
# Check the number of eigenvectors and the number of values in both eigenvectors to match
if not self.are_compatible(eigenvectors_1, eigenvectors_2):
raise SystemExit('Eigenvectors are not compatible')
# Find out the total number of eigenvectors
# Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total
eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2))
# Find out the number of atoms in the structure
# Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z)
if len(eigenvectors_1[0]) % 3 != 0:
raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3')
atom_number = int(len(eigenvectors_1[0]) / 3)
# Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
# The value of the ~10th eigenvalue is usually taken.
if dx is not None:
amplifying_factor = dx
else:
amplifying_factor = eigenvalues_1[eigenvectors_number-1]
# Get the denominator
# Find new denominator
# for ($i=0;$i<$nvec;$i++){
# $cte1+=exp(-1/$val_1[$i]*$ampf);
# $cte2+=exp(-1/$val_2[$i]*$ampf);
# $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf);
# $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf);
# }
cte1 = part1 = cte2 = part2 = 0
for eigenvalue in eigenvalues_1:
cte1 += exp(-1 / eigenvalue * amplifying_factor)
part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2
for eigenvalue in eigenvalues_2:
cte2 += exp(-1 / eigenvalue * amplifying_factor)
part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2
denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2
# Get all eigenvector values together
eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev]
eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev]
# IDK what it is doing now
total_summatory = 0
for i in range(eigenvectors_number):
for j in range(eigenvectors_number):
# Array has vectors in increasing order of vap, get last one first
a = (eigenvectors_number - 1 - i) * atom_number * 3
b = (eigenvectors_number - i) * atom_number * 3 - 1
c = (eigenvectors_number - 1 - j) * atom_number * 3
d = (eigenvectors_number - j) * atom_number * 3 - 1
temp1 = eigenvector_values_1[a:b]
temp2 = eigenvector_values_2[c:d]
if len(temp1) != len(temp2):
raise ValueError("Projection of vectors of different size!!")
# Project the two vectors
add = 0
for k, value_1 in enumerate(temp1):
value_2 = temp2[k]
add += value_1 * value_2
add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor)
add2 = add ** 2
total_summatory += add2
similarity_index = total_summatory * 2 / denominator
return similarity_index
# Weighted Cross Product (WCP).
# DF implementation of Alberto's formula
# Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
# Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
[docs]
def eigenmsip(self, eigenvalues_1, eigenvectors_1,
eigenvalues_2, eigenvectors_2,
dx=None):
evals1 = np.array(eigenvalues_1)
evals2 = np.array(eigenvalues_2)
evecs1 = np.array(eigenvectors_1)
evecs2 = np.array(eigenvectors_2)
n_components = len(eigenvectors_1)
# Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
# The value of the ~10th eigenvalue is usually taken.
if dx is not None:
amplifying_factor = dx
else:
amplifying_factor = evals1[n_components-1]
e1 = np.exp(-(amplifying_factor)**2/evals1)
e2 = np.exp(-(amplifying_factor)**2/evals2)
e1_2 = np.exp(-2*(amplifying_factor)**2/evals1)
e2_2 = np.exp(-2*(amplifying_factor)**2/evals2)
sume1 = np.sum(e1)
sume2 = np.sum(e2)
denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2)
# numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2))
# numerator_df = 2 * np.sum(numerator_df)
val_tmp = 0
accum_a = 0
c = sume1*sume2
for pc in range(0, n_components):
for pc2 in range(0, n_components):
eve1 = evecs1[pc]
eve2 = evecs2[pc2]
eva1 = evals1[pc]
eva2 = evals2[pc2]
a = np.dot(eve1, eve2)
b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2)
val_tmp = val_tmp + ((a * b)/c)**2
accum_a = accum_a + a
numerator = 2 * val_tmp
return numerator/(denominator)
# Get the dot product matrix of two eigenvectors
[docs]
def dot_product(self, eigenvectors_1, eigenvectors_2):
# Check the number of eigenvectors and the number of values in both eigenvectors to match
if not self.are_compatible(eigenvectors_1, eigenvectors_2):
raise SystemExit('Eigenvectors are not compatible')
# Get the dot product
dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2))
return dpm
# Get the dot product matrix of two eigenvectors (squared and normalized).
# Absolute Similarity Index (Hess 2000, 2002)
[docs]
def dot_product_accum(self, eigenvectors_1, eigenvectors_2):
n_components = len(eigenvectors_1)
# Get the dot product
dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
sso = (dpm * dpm).sum() / n_components
# sso = (dpm * dpm).sum() / n_components
return sso
# Get the subspace overlap
# Same as before, but with a square root
[docs]
def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2):
# Get the number of eigenvectors
n_components = len(eigenvectors_1)
# Get the dot product
dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
sso = np.sqrt((dpm * dpm).sum() / n_components)
# sso = (dpm * dpm).sum() / n_components
return sso
# Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap
[docs]
def get_rmsip(self, eigenvectors_1, eigenvectors_2):
# Get the number of eigenvectors
n_components = len(eigenvectors_1)
accum = 0
for pc in range(0, n_components):
for pc2 in range(0, n_components):
dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
val = dpm * dpm
accum = accum + val
sso = np.sqrt(accum / n_components)
# sso = (dpm * dpm).sum() / n_components
return sso
# RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values.
# See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets,
# Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445
[docs]
def get_rwsip(self,
eigenvalues_1, eigenvectors_1,
eigenvalues_2, eigenvectors_2):
# Get the number of eigenvectors
n_components = len(eigenvectors_1)
accum = 0
norm = 0
for pc in range(0, n_components):
for pc2 in range(0, n_components):
dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2]
accum = accum + val
norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc]
sso = np.sqrt(accum / norm)
# sso = (dpm * dpm).sum() / n_components
return sso
[docs]
@launchlogger
def launch(self):
"""Launches the execution of the FlexServ pcz_similarity module."""
# Setup Biobb
if self.check_restart():
return 0
self.stage_files()
if self.container_path:
working_dir = self.container_volume_path if self.container_volume_path else "/data"
else:
working_dir = self.stage_io_dict.get("unique_dir", "")
unique_dir = Path(self.stage_io_dict.get("unique_dir", ""))
# Temporary output
temp_out_1 = "output1.dat"
temp_out_2 = "output2.dat"
temp_out_1_path = unique_dir.joinpath(temp_out_1)
temp_out_2_path = unique_dir.joinpath(temp_out_2)
staged_output_json_path = unique_dir.joinpath(Path(self.stage_io_dict["out"]["output_json_path"]).name)
# Command line 1
# pczdump -i structure.ca.std.pcz --evals -o evals.txt
# self.cmd = [self.binary_path, # Evals pcz 1
# "-i", input_pcz_1,
# "-o", temp_out_1,
# "--evals", ';',
# self.binary_path, # Evals pcz 2
# "-i", input_pcz_2,
# "-o", temp_out_2,
# "--evals"
# ]
self.cmd = ['cd', working_dir, ';',
self.binary_path, # Evals pcz 1
"-i", PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name,
"-o", temp_out_1,
"--evals", ';',
self.binary_path, # Evals pcz 2
"-i", PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name,
"-o", temp_out_2,
"--evals"
]
# Run Biobb block 1
self.run_biobb()
# Parse output evals
info_dict = {}
info_dict['evals_1'] = []
info_dict['evals_2'] = []
with open(temp_out_1_path, 'r') as file:
for line in file:
info = float(line.strip())
info_dict['evals_1'].append(info)
with open(temp_out_2_path, 'r') as file:
for line in file:
info = float(line.strip())
info_dict['evals_2'].append(info)
num_evals_1 = len(info_dict['evals_1'])
num_evals_2 = len(info_dict['evals_2'])
num_evals_min = min(num_evals_1, num_evals_2)
info_dict['num_evals_min'] = num_evals_min
# Command line 2
# pczdump -i structure.ca.std.pcz --evals -o evals.txt
self.cmd = ['cd', working_dir, ';']
for pc in (range(1, num_evals_min+1)):
# Evecs pcz 1
self.cmd.append(self.binary_path)
self.cmd.append("-i")
self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name)
self.cmd.append("-o")
self.cmd.append("evecs_1_pc{}".format(pc))
self.cmd.append("--evec={}".format(pc))
self.cmd.append(";")
# Evals pcz 2
self.cmd.append(self.binary_path)
self.cmd.append("-i")
self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name)
self.cmd.append("-o")
self.cmd.append("evecs_2_pc{}".format(pc))
self.cmd.append("--evec={}".format(pc))
self.cmd.append(";")
# Run Biobb block 2
self.run_biobb()
# Parse output evecs
info_dict['evecs_1'] = {}
info_dict['evecs_2'] = {}
eigenvectors_1 = []
eigenvectors_2 = []
for pc in (range(1, num_evals_min+1)):
pc_id = "pc{}".format(pc)
info_dict['evecs_1'][pc_id] = []
info_dict['evecs_2'][pc_id] = []
with open(unique_dir.joinpath("evecs_1_pc{}".format(pc)), 'r') as file:
list_evecs = []
for line in file:
info = line.strip().split(' ')
for nums in info:
if nums:
list_evecs.append(float(nums))
info_dict['evecs_1'][pc_id] = list_evecs
eigenvectors_1.append(list_evecs)
with open(unique_dir.joinpath("evecs_2_pc{}".format(pc)), 'r') as file:
list_evecs = []
for line in file:
info = line.strip().split(' ')
for nums in info:
if nums:
list_evecs.append(float(nums))
info_dict['evecs_2'][pc_id] = list_evecs
eigenvectors_2.append(list_evecs)
# simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
# info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex))
# dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2)
# info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct))
eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip))
rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2)
info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip))
rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2)
info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip))
dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2)
info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp))
with open(staged_output_json_path, 'w') as out_file:
out_file.write(json.dumps(info_dict, indent=4))
# Copy files to host
self.copy_to_host()
# Remove temporary folder(s)
self.remove_tmp_files()
self.check_arguments(output_files_created=True, raise_exception=False)
return self.return_code
[docs]
def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str,
properties: Optional[dict] = None, **kwargs) -> int:
"""Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and
execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method"""
return PCZsimilarity(**dict(locals())).launch()
pcz_similarity.__doc__ = PCZsimilarity.__doc__
main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.")
if __name__ == '__main__':
main()