Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
jp(data_dir, 'covcormoments_csr_1.csv'),
jp(data_dir, 'covcormoments_csr_2.csv'),
jp(data_dir, 'covcormoments_csr_3.csv'),
jp(data_dir, 'covcormoments_csr_4.csv')
]
if __name__ == "__main__":
comm_size = MPI.COMM_WORLD
rankId = comm_size.Get_rank()
# Initialize FileDataSource to retrieve the input data from a .csv file
dataTable = createSparseTable(datasetFileNames[rankId])
# Create an algorithm to compute a sparse variance-covariance matrix on local nodes
localAlgorithm = covariance.Distributed(step1Local, method=covariance.fastCSR)
# Set the input data set to the algorithm
localAlgorithm.input.set(covariance.data, dataTable)
# Compute a sparse variance-covariance matrix
pres = localAlgorithm.compute()
# Serialize partial results required by step 2
dataArch = InputDataArchive()
pres.serialize(dataArch)
perNodeArchLength = dataArch.getSizeOfArchive()
nodeResults = dataArch.getArchiveAsArray()
# Transfer partial results to step 2 on the root node
data = comm_size.gather(nodeResults, MPI_ROOT)
def computeStep1Local(partialResultLocal):
# Create an algorithm object to perform first step of the implicit ALS training algorithm on local-node data
algorithm = training.Distributed(step=step1Local)
algorithm.parameter.nFactors = nFactors
# Set input objects for the algorithm
algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
# Compute partial results of the first step on local nodes
# DistributedPartialResultStep1 class from training
return algorithm.compute()
for i in range(nBlocks):
# Initialize FileDataSource to retrieve the input data from a .csv file
trainDataSource = FileDataSource(
trainDatasetFileNames[i], DataSourceIface.notAllocateNumericTable,
DataSourceIface.doDictionaryFromContext
)
# Create Numeric Tables for training data and labels
trainData = HomogenNumericTable(nFeatures, 0, NumericTableIface.doNotAllocate)
trainGroundTruth = HomogenNumericTable(1, 0, NumericTableIface.doNotAllocate)
mergedData = MergedNumericTable(trainData, trainGroundTruth)
# Retrieve the data from the input file
trainDataSource.loadDataBlock(mergedData)
# Create an algorithm object to train the Naive Bayes model on the local-node data
localAlgorithm = training.Distributed(step1Local, nClasses)
# Pass a training data set and dependent values to the algorithm
localAlgorithm.input.set(classifier.training.data, trainData)
localAlgorithm.input.set(classifier.training.labels, trainGroundTruth)
# Build the Naive Bayes model on the local node and
# Set the local Naive Bayes model as input for the master-node algorithm
masterAlgorithm.input.add(training.partialModels, localAlgorithm.compute())
# Merge and finalize the Naive Bayes model on the master node
masterAlgorithm.compute()
trainingResult = masterAlgorithm.finalizeCompute() # Retrieve the algorithm results
# Initialize FileDataSource to retrieve the input data from a .csv file
trainDataSource = FileDataSource(trainDatasetFileNames[rankId],
DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext)
trainLabelsSource = FileDataSource(trainGroundTruthFileNames[rankId],
DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext)
# Retrieve the data from input files
trainDataSource.loadDataBlock()
trainLabelsSource.loadDataBlock()
# Create an algorithm object to train the Naive Bayes model based on the local-node data
localAlgorithm = training.Distributed(step1Local, nClasses)
# Pass a training data set and dependent values to the algorithm
localAlgorithm.input.set(classifier.training.data, trainDataSource.getNumericTable())
localAlgorithm.input.set(classifier.training.labels, trainLabelsSource.getNumericTable())
# Train the Naive Bayes model on local nodes
pres = localAlgorithm.compute()
# Serialize partial results required by step 2
dataArch = InputDataArchive()
pres.serialize(dataArch)
nodeResults = dataArch.getArchiveAsArray()
# Transfer partial results to step 2 on the root node
serializedData = comm.gather(nodeResults)
def initializeStep1Local(block):
global itemsPartialResultLocal
global itemStep3LocalInput
global userOffsets
# Create an algorithm object to initialize the implicit ALS model with the fastCSR method
initAlgorithm = init.Distributed(step=step1Local, method=init.fastCSR)
initAlgorithm.parameter.fullNUsers = nUsers
initAlgorithm.parameter.nFactors = nFactors
initAlgorithm.parameter.seed += block
usersPartitionArray = np.array(usersPartition, dtype=np.float64)
usersPartitionArray.shape = (1, 1)
initAlgorithm.parameter.partition = HomogenNumericTable(usersPartitionArray)
# Pass a training data set to the algorithm
initAlgorithm.input.set(init.data, dataTable[block])
# Initialize the implicit ALS model
partialResult = initAlgorithm.compute()
itemStep3LocalInput[block] = partialResult.getCollection(init.outputOfInitForComputeStep3)
userOffsets[block] = partialResult.getCollection(init.offsets, block)
partialModelLocal = partialResult.getPartialModel(init.partialModel)
def initializeStep1Local():
global itemsPartialResultLocal, itemStep3LocalInput, userOffset, usersPartition
# Create an algorithm object to initialize the implicit ALS model with the fastCSR method
initAlgorithm = init.Distributed(step=step1Local)
initAlgorithm.parameter.fullNUsers = nUsers
initAlgorithm.parameter.nFactors = nFactors
initAlgorithm.parameter.seed += rankId
initAlgorithm.parameter.partition = HomogenNumericTable(np.array(usersPartition, dtype=np.float64))
# Pass a training data set to the algorithm
initAlgorithm.input.set(init.data, transposedDataTable)
# Initialize the implicit ALS model
partialResult = initAlgorithm.compute()
itemStep3LocalInput = partialResult.getCollection(init.outputOfInitForComputeStep3)
userOffset = partialResult.getCollection(init.offsets, rankId)
partialModelLocal = partialResult.getPartialModel(init.partialModel)
itemsPartialResultLocal = training.DistributedPartialResultStep4()
itemsPartialResultLocal.set(training.outputOfStep4ForStep1, partialModelLocal)
def computestep1Local(block):
global dataFromStep1ForStep2, dataFromStep1ForStep3
# Initialize FileDataSource to retrieve the input data from a .csv file
dataSource = FileDataSource(
datasetFileNames[block],
DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext
)
# Retrieve the input data
dataSource.loadDataBlock()
# Create an algorithm to compute SVD on the local node
algorithm = svd.Distributed(step1Local,fptype=np.float64)
algorithm.input.set(svd.data, dataSource.getNumericTable())
# Compute SVD and get OnlinePartialResult class from daal.algorithms.svd
pres = algorithm.compute()
dataFromStep1ForStep2[block] = pres.get(svd.outputOfStep1ForStep2)
dataFromStep1ForStep3[block] = pres.get(svd.outputOfStep1ForStep3)
# Create an algorithm object for the K-Means algorithm
localInit = init.Distributed(step1Local, nClusters, nBlocks * nVectorsInBlock, i * nVectorsInBlock, method=init.randomDense)
localInit.input.set(init.data, dataTable[i])
res = localInit.compute()
masterInitAlgorithm.input.add(init.partialResults, res)
masterInitAlgorithm.compute()
res = masterInitAlgorithm.finalizeCompute()
centroids = res.get(init.centroids)
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, it == nIterations, method=kmeans.lloydDense)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, dataTable[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
pres = localAlgorithm.compute()
masterAlgorithm.input.add(kmeans.partialResults, pres)
masterAlgorithm.compute()
result = masterAlgorithm.finalizeCompute()
centroids = result.get(kmeans.centroids)
goalFunction = result.get(kmeans.goalFunction)
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localInit = init.Distributed(step1Local, nClusters, nBlocks * nVectorsInBlock, i * nVectorsInBlock, method=init.randomDense)
localInit.input.set(init.data, dataTable[i])
# compute and add input for next
masterInitAlgorithm.input.add(init.partialResults, localInit.compute())
masterInitAlgorithm.compute()
res = masterInitAlgorithm.finalizeCompute()
centroids = res.get(init.centroids)
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, it == nIterations, method=kmeans.lloydCSR)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, dataTable[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
pres = localAlgorithm.compute()
masterAlgorithm.input.add(kmeans.partialResults, pres)
masterAlgorithm.compute()
result = masterAlgorithm.finalizeCompute()
centroids = result.get(kmeans.centroids)
objectiveFunction = result.get(kmeans.objectiveFunction)
for i in range(nBlocks):
def computeStep1Local(partialResultLocal):
# Create algorithm objects to compute implicit ALS algorithm in the distributed processing mode on the local node using the default method
algorithm = training.Distributed(step1Local)
algorithm.parameter.nFactors = nFactors
# Set input objects for the algorithm
algorithm.input.set(training.partialModel, partialResultLocal.get(training.outputOfStep4ForStep1))
# Compute partial estimates on local nodes
return algorithm.compute()