MPI python-Open-MPI

I have a 20,000 * 515 matrix representing biological data. I need to find the correlation coefficient of biological data, that is, as a result, I will have a matrix of 20,000 * 20,000 with a correlation value. Then I populate the numpy array with 1 and 0 if each correlation coefficient is greater than a threshold value.

I used numpy.corrcoef to find the correlation coefficient, and it works well on my machine.

Then I wanted to place a cluster (having 10 computers and a node from 2 to 8). when I tried to put it in a cluster, each node generating (40) random numbers and getting these 40 random columns from biological data leading to a 20,000 * 40 matrix, I ran into a memory problem.

mpirun noticed that the rank of the process with PID # on the name node came out of signal 9 (killed).

Then I tried to rewrite the program as getting each row gaining a correlation coefficient, and if the value is greater than the threshold, then save 1 in the matrix or otherwise, and not create a correlation matrix. But it takes 1.30 hours to run this program. I need to run it 100 times.

Someone can suggest a better way to do this, for example, how to solve a memory problem by highlighting tasks after each core finishes work. I am new to MPI. Below is my code.

If you need more information, let me know. Thanks you

    import numpy
    from mpi4py import MPI
    import time


    Size=MPI.COMM_WORLD.Get_size();
    Rank=MPI.COMM_WORLD.Get_rank();
    Name=MPI.Get_processor_name();

    RandomNumbers={};





    rndm_indx=numpy.random.choice(range(515),40,replace=False)
    rndm_indx=numpy.sort(rndm_indx)


    Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




    RandomNumbers[Rank]=rndm_indx;
    CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


    start=time.time();
    for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-np.mean(Data[i]);
        alpha1=1./np.linalg.norm(Data[i]);
        for j in range(i,Data.shape[0]):
           if(i==j):
               CORR_CR[i][j]=1;
           else:
               Data[j]=Data[j]-np.mean(Data[j]);
               alpha2=1./np.linalg.norm(Data[j]);
               corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
               corr=int(np.absolute(corrcoef)>=0.9)
               CORR_CR[i][j]=CORR_CR[j][i]=corr
    end=time.time();           

    CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);  


    elapsed=(end-start)
    print('Total Time',elapsed)
+4
source share
3 answers

, , 96 . .

  • , , . alpha1=1./numpy.linalg.norm(Data[i]); . , :

    alpha=numpy.zeros(Data.shape[0])
    for i in range(0,Data.shape[0]):
      Data[i]=Data[i]-numpy.mean(Data[i])
      alpha[i]=1./numpy.linalg.norm(Data[i])
    
    for i in range(0,Data.shape[0]):
      for j in range(i,Data.shape[0]):
        if(i==j):
           CORR_CR[i][j]=1;
        else:
           corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
           corr=int(numpy.absolute(corr)>=0.9)
           CORR_CR[i][j]=CORR_CR[j][i]=corr
    

17 .

  • , , . , , , ( ). scipy.sparse.coo_matrix , : i, j .

    data=[]
    ii=[]
    jj=[]
    ...
      if(corr!=0):
               data.append(corr)
               ii.append(i)
               jj.append(j)
               data.append(corr)
               ii.append(j)
               jj.append(i)
    ...
    CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
    

13 ( ?), . , .

correlator.pyx, Numpy vs Cython speed, :

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

scipy.linalg.cython_blas.ddot() .

cythonize .pyx, makefile (, Linux...)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c

python:

import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

, 5.4s! . , !

. , process : imin imax. CORR_CR, . , . , , for ( i) .

for i, , .

:

  • 0 ( "root process" ) Data.
  • Data MPI bcast().
  • i .
  • . Data, ii, jj .
  • , MPI gather(). Size, 3 , .

:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)

mpirun -np 4 main.py, 3,4 . 4 ... , , , . ...

: . - Data ... , . - ... - ... , - MPI allreduce() alpha...

?

, , sparse.csgraph , connected_components() laplacian(). , !

, , connected_components() :

if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels
+3
if Rank==0:

    Data=numpy.ascontiguousarray(numpy.random.rand(20000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100     
         Data[i] = (Data[i] - numpy.mean(Data[i]))/numpy.linalg.norm(Data[i] - numpy.mean(Data[i]))  

start=time.time(); 
Data=MPI.COMM_WORLD.bcast(Data, root=0)
0
def process(numpy.ndarray[DTYPE_t, ndim=2] array,int imin,int imax):

cdef unsigned int rows = array.shape[0]
cdef  int cols = array.shape[1]
cdef unsigned int row, row2
cdef  int one=1
ii=[]
jj=[]
data=[]

for row in range(imin,imax):
    for row2 in range(row,rows):
        if row==row2:
           data.append(0)
           ii.append(row)
           jj.append(row2)
        else:
           corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)  
           corr=int(numpy.absolute(corr)>=0.9)
           if(corr!=0):
                data.append(corr)
                ii.append(row)
                jj.append(row2)

                data.append(corr)
                ii.append(row2)
                jj.append(row)

return ii,jj,data
0
source

Source: https://habr.com/ru/post/1659532/


All Articles