Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
DAAL_PREFIX = os.path.join('..', 'data')
# Input data set parameters
datasetFileName = os.path.join(DAAL_PREFIX, 'batch', 'kmeans_csr.csv')
# K-Means algorithm parameters
nClusters = 20
if __name__ == "__main__":
# Retrieve the data from the input file
dataTable = createSparseTable(datasetFileName)
# Get initial clusters for the K-Means algorithm
init = kmeans.init.Batch(nClusters, method=kmeans.init.randomDense)
init.input.set(kmeans.init.data, dataTable)
res = init.compute()
centroids = res.get(kmeans.init.centroids)
# Create an algorithm object for the K-Means algorithm
algorithm = kmeans.Batch(nClusters, 0, method=kmeans.lloydCSR)
algorithm.input.set(kmeans.data, dataTable)
algorithm.input.set(kmeans.inputCentroids, centroids)
res = algorithm.compute()
# Print the clusterization results
printNumericTable(res.get(kmeans.assignments), "First 10 cluster assignments:", 10)
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, data[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
masterAlgorithm.compute()
res = masterAlgorithm.finalizeCompute()
centroids = res.get(kmeans.centroids)
objectiveFunction = res.get(kmeans.objectiveFunction)
# Calculate assignments
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, data[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
assignments.append(localAlgorithm.compute().get(kmeans.assignments))
# Print the clusterization results
printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
printNumericTable(objectiveFunction, "Objective function value:")
nRows = initialCentroids.getNumberOfRows()
nCols = initialCentroids.getNumberOfColumns()
assignments = []
centroids = initialCentroids
objectiveFunction = None
# Calculate centroids
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, data[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
masterAlgorithm.compute()
res = masterAlgorithm.finalizeCompute()
centroids = res.get(kmeans.centroids)
objectiveFunction = res.get(kmeans.objectiveFunction)
# Calculate assignments
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
# Set the input data to the algorithm
def computeMaster(partsRDDcompute):
# Create an algorithm to compute k-means on the master node
kmeansMaster = kmeans.Distributed(step2Master, nClusters, method=kmeans.defaultDense)
parts_List = partsRDDcompute.collect()
# Add partial results computed on local nodes to the algorithm on the master node
for _, value in parts_List:
deserialized_pres = deserializePartialResult(value, kmeans)
kmeansMaster.input.add(kmeans.partialResults, deserialized_pres)
# Compute k-means on the master node
kmeansMaster.compute()
# Finalize computations and retrieve the results
res = kmeansMaster.finalizeCompute()
return res.get(kmeans.centroids)
def mapper(tup):
key, val = tup
# Create an algorithm to compute k-means on local nodes
kmeansLocal = kmeans.Distributed(step1Local, nClusters, method=kmeans.defaultDense)
# Set the input data on local nodes
deserialized_val = deserializeNumericTable(val)
deserialized_centroids = deserializeNumericTable(centroids)
kmeansLocal.input.set(kmeans.data, deserialized_val)
kmeansLocal.input.set(kmeans.inputCentroids, deserialized_centroids)
# Compute k-means on local nodes
pres = kmeansLocal.compute()
serialized_pres = serializeNumericTable(pres)
return (key, serialized_pres)
return dataRDD.map(mapper)
def computeMaster(partsRDDcompute):
# Create an algorithm to compute k-means on the master node
kmeansMaster = kmeans.Distributed(step2Master, nClusters, method=kmeans.lloydCSR)
parts_List = partsRDDcompute.collect()
# Add partial results computed on local nodes to the algorithm on the master node
for key, val in parts_List:
deserialized_pres = deserializePartialResult(val, kmeans)
kmeansMaster.input.add(kmeans.partialResults, deserialized_pres)
# Compute k-means on the master node
kmeansMaster.compute()
# Finalize computations and retrieve the results
res = kmeansMaster.finalizeCompute()
return res.get(kmeans.centroids)
nClusters = 20
if __name__ == "__main__":
# Initialize FileDataSource to retrieve the input data from a .csv file
dataSource = FileDataSource(
datasetFileName,
DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext
)
# Retrieve the data from the input file
dataSource.loadDataBlock()
# Get initial clusters for the K-Means algorithm
initAlg = kmeans.init.Batch(nClusters, method=kmeans.init.randomDense)
initAlg.input.set(kmeans.init.data, dataSource.getNumericTable())
res = initAlg.compute()
centroidsResult = res.get(kmeans.init.centroids)
# Create an algorithm object for the K-Means algorithm
algorithm = kmeans.Batch(nClusters, 0, method=kmeans.lloydDense)
algorithm.input.set(kmeans.data, dataSource.getNumericTable())
algorithm.input.set(kmeans.inputCentroids, centroidsResult)
res = algorithm.compute()
# Print the clusterization results
printNumericTable(res.get(kmeans.assignments), "First 10 cluster assignments:", 10)
# Calculate centroids
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, False, fptype=algorithmFPType, methods=kmeans.lloydCSR)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, data[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
masterAlgorithm.input.add(kmeans.partialResults, localAlgorithm.compute())
masterAlgorithm.compute()
res = masterAlgorithm.finalizeCompute()
centroids = res.get(kmeans.centroids)
objectiveFunction = res.get(kmeans.objectiveFunction)
# Calculate assignments
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Batch(nClusters, 0, fptyep=algorithmFPType, method=kmeans.lloydCSR)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, data[i])
localAlgorithm.input.set(kmeans.inputCentroids, centroids)
assignments.append(localAlgorithm.compute().get(kmeans.assignments))
# Print the clusterization results
printNumericTable(assignments[0], "First 10 cluster assignments from 1st node:", 10)
printNumericTable(centroids, "First 10 dimensions of centroids:", 20, 10)
os.path.join(DAAL_PREFIX, 'distributed', 'kmeans_dense_1.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'kmeans_dense_2.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'kmeans_dense_3.csv'),
os.path.join(DAAL_PREFIX, 'distributed', 'kmeans_dense_4.csv')
]
nClusters = 20
nIterations = 5
nBlocks = 4
nVectorsInBlock = 2500
dataTable = [0] * nBlocks
if __name__ == "__main__":
masterAlgorithm = kmeans.Distributed(step2Master, nClusters, method=kmeans.lloydDense)
centroids = None
assignments = [0] * nBlocks
masterInitAlgorithm = init.Distributed(step2Master, nClusters, method=init.randomDense)
for i in range(nBlocks):
# Initialize FileDataSource to retrieve the input data from a .csv file
dataSource = FileDataSource(
dataFileNames[i], DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext
)
# Retrieve the data from the input file
dataSource.loadDataBlock()
dataTable[i] = dataSource.getNumericTable()