Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
jp(DATA_PREFIX, 'covcormoments_csr_2.csv'),
jp(DATA_PREFIX, 'covcormoments_csr_3.csv'),
jp(DATA_PREFIX, 'covcormoments_csr_4.csv')
]
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rankId = comm.Get_rank()
# Retrieve the input data from a .csv file
dataTable = createSparseTable(datasetFileNames[rankId])
# Create an algorithm for principal component analysis using the correlation method on local nodes
localAlgorithm = pca.Distributed(step1Local)
localAlgorithm.parameter.covariance = covariance.Distributed(step1Local, method=covariance.fastCSR)
# Set the input data set to the algorithm
localAlgorithm.input.setDataset(pca.data, dataTable)
# Compute PCA decomposition
pres = localAlgorithm.compute()
# Serialize partial results required by step 2
dataArch = InputDataArchive()
pres.serialize(dataArch)
nodeResults = dataArch.getArchiveAsArray()
# Transfer partial results to step 2 on the root node
serializedData = comm.gather(nodeResults)
# Add partial results computed on local nodes to the algorithm on the master node
for _, val in parts_list:
dataArch = OutputDataArchive(val)
deserialized_val = covariance.PartialResult()
deserialized_val.deserialize(dataArch)
covarianceMaster.input.add(covariance.partialResults, deserialized_val)
# Compute a dense variance-covariance matrix on the master node
covarianceMaster.compute()
# Finalize computations and retrieve the results
res = covarianceMaster.finalizeCompute()
result = {}
result['covariance'] = res.get(covariance.covariance)
result['mean'] = res.get(covariance.mean)
return result
def computeOnMasterNode():
global result
# Create algorithm objects to compute a correlation matrix in the distributed processing mode using the default method
algorithm = covariance.Distributed(step2Master, method=covariance.fastCSR)
# Set input objects for the algorithm
for i in range(nBlocks):
algorithm.input.add(covariance.partialResults, partialResult[i])
# Set the parameter to choose the type of the output matrix
algorithm.parameter.outputMatrixType = covariance.correlationMatrix
# Compute a partial estimate on the master node from the partial estimates on local nodes
algorithm.compute()
# Finalize the result in the distributed processing mode and get the computed correlation matrix
result = algorithm.finalizeCompute()
def computeOnMasterNode():
global result
# Create algorithm objects to compute a dense correlation matrix in the distributed processing mode using the default method
algorithm = covariance.Distributed(step2Master)
# Set input objects for the algorithm
for i in range(nBlocks):
algorithm.input.add(covariance.partialResults, partialResult[i])
# Set the parameter to choose the type of the output matrix
algorithm.parameter.outputMatrixType = covariance.correlationMatrix
# Compute a partial estimate on the master node from the partial estimates on local nodes
algorithm.compute()
# Finalize the result in the distributed processing mode
result = algorithm.finalizeCompute() # Get the computed dense correlation matrix
def computestep1Local(block):
global partialResult
dataTable = createSparseTable(datasetFileNames[block])
# Create algorithm objects to compute a correlation matrix in the distributed processing mode using the default method
algorithm = covariance.Distributed(step1Local, method=covariance.fastCSR)
# Set input objects for the algorithm
algorithm.input.set(covariance.data, dataTable)
# Compute partial estimates on local nodes
partialResult[block] = algorithm.compute() # Get the computed partial estimates
def computeOnMasterNode():
global result
# Create algorithm objects to compute a variance-covariance matrix in the distributed processing mode using the default method
algorithm = covariance.Distributed(step2Master, method=covariance.fastCSR)
# Set input objects for the algorithm
for i in range(nBlocks):
algorithm.input.add(covariance.partialResults, partialResult[i])
# Compute a partial estimate on the master node from the partial estimates on local nodes
algorithm.compute()
# Finalize the result in the distributed processing mode and get the computed variance-covariance matrix
result = algorithm.finalizeCompute()
# Input data set parameters
nBlocks = 4
datasetFileNames = [
os.path.join(DAAL_PREFIX, 'distributed', 'covcormoments_csr_1.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'covcormoments_csr_2.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'covcormoments_csr_3.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'covcormoments_csr_4.csv')
]
if __name__ == "__main__":
# Create an algorithm for principal component analysis using the correlation method
algorithm = pca.Online(fptype=np.float64)
# Use covariance algorithm for sparse data inside the PCA algorithm
algorithm.parameter.covariance = covariance.Online(fptype=np.float64,method=covariance.fastCSR)
for i in range(nBlocks):
# Read data from a file and create a numeric table to store input data
dataTable = createSparseTable(datasetFileNames[i])
# Set input objects for the algorithm
algorithm.input.setDataset(pca.data, dataTable)
# Update PCA decomposition
algorithm.compute()
# Finalize computations
result = algorithm.finalizeCompute()
printNumericTable(result.get(pca.eigenvalues), "Eigenvalues:")
printNumericTable(result.get(pca.eigenvectors), "Eigenvectors:")
os.path.join(DAAL_PREFIX, 'distributed', 'covcormoments_csr_4.csv')
]
if __name__ == "__main__":
# Create an algorithm for principal component analysis using the correlation method on the master node
masterAlgorithm = pca.Distributed(step2Master,fptype=np.float64)
for i in range(nBlocks):
dataTable = createSparseTable(datasetFileNames[i])
# Create algorithm objects to compute a variance-covariance matrix in the distributed processing mode using the default method
localAlgorithm = pca.Distributed(step1Local,fptype=np.float64)
# Create an algorithm for principal component analysis using the correlation method on the local node
localAlgorithm.parameter.covariance = covariance.Distributed(step1Local, fptype=np.float64, method=covariance.fastCSR)
# Set input objects for the algorithm
localAlgorithm.input.setDataset(pca.data, dataTable)
# Compute partial estimates on local nodes
# Set local partial results as input for the master-node algorithm
masterAlgorithm.input.add(pca.partialResults, localAlgorithm.compute())
# Use covariance algorithm for sparse data inside the PCA algorithm
masterAlgorithm.parameter.covariance = covariance.Distributed(step2Master, fptype=np.float64, method=covariance.fastCSR)
# Merge and finalize PCA decomposition on the master node
masterAlgorithm.compute()
result = masterAlgorithm.finalizeCompute()