Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_07_create_a_cluster(self):
provider_settings: ProviderSettings = ProviderSettings()
regions_config = RegionConfig()
replication_specs = ReplicationSpecs(regions_config={provider_settings.region_name: regions_config.__dict__})
cluster_config = ClusterConfig(name=self.TEST_CLUSTER3_NAME_UNIQUE,
providerSettings=provider_settings,
replication_specs=replication_specs)
output = self.a.Clusters.create_cluster(cluster_config)
pprint(output)
def test_02_get_a_cluster_as_obj(self):
cluster = self.a.Clusters.get_single_cluster_as_obj(self.TEST_CLUSTER_NAME)
self.assertTrue(type(cluster) is ClusterConfig)
self.assertEqual(cluster.name, self.TEST_CLUSTER_NAME)
test_02_get_a_cluster_as_obj.basic = True
cluster (str): The cluster name
Returns:
ClusterConfig: Response payload
"""
cluster_data = self.get_single_cluster(cluster=cluster)
try:
if cluster_data.get('clusterType', None) == 'SHARDED':
logger.info("Cluster Type is SHARDED, Returning a ShardedClusterConfig")
out_obj = ShardedClusterConfig.fill_from_dict(data_dict=cluster_data)
elif cluster_data.get('clusterType', None) == 'REPLICASET':
logger.info("Cluster Type is REPLICASET, Returning a ClusterConfig")
out_obj = ClusterConfig.fill_from_dict(data_dict=cluster_data)
else:
logger.info("Cluster Type is not recognized, Returning a REPLICASET")
out_obj = ClusterConfig.fill_from_dict(data_dict=cluster_data)
except Exception as e:
raise e
return out_obj
out_dict = self.as_dict()
out_dict.pop('stateName', None)
out_dict.pop('numShards', None)
out_dict.pop('mongoURI', None)
out_dict.pop('mongoDBVersion', None)
out_dict.pop('mongoURIUpdated', None)
out_dict.pop('mongoURIWithOptions', None)
out_dict.pop('paused', None)
out_dict.pop('srvAddress', None)
out_dict.pop('links', None)
out_dict.pop('state_name', None)
return out_dict
class ShardedClusterConfig(ClusterConfig):
def __init__(self, backup_enabled: bool = False,
cluster_type: ClusterType = ClusterType.REPLICASET,
disk_size_gb: int = 32,
name: str = None,
mongodb_major_version: MongoDBMajorVersion = MongoDBMajorVersion.v4_0,
mongodb_version: Optional[str] = None,
num_shards: int = 1,
mongo_uri: Optional[str] = None,
mongo_uri_updated: Optional[str] = None,
mongo_uri_with_options: Optional[str] = None,
paused: bool = False,
pit_enabled: bool = False,
replication_factor: Optional[int] = None,
state_name: Optional[ClusterStates] = None,
autoscaling: dict = {},
replication_specs: list = [],
url: https://docs.atlas.mongodb.com/reference/api/clusters-get-one/
Args:
cluster (str): The cluster name
Returns:
ClusterConfig: Response payload
"""
cluster_data = self.get_single_cluster(cluster=cluster)
try:
if cluster_data.get('clusterType', None) == 'SHARDED':
logger.info("Cluster Type is SHARDED, Returning a ShardedClusterConfig")
out_obj = ShardedClusterConfig.fill_from_dict(data_dict=cluster_data)
elif cluster_data.get('clusterType', None) == 'REPLICASET':
logger.info("Cluster Type is REPLICASET, Returning a ClusterConfig")
out_obj = ClusterConfig.fill_from_dict(data_dict=cluster_data)
else:
logger.info("Cluster Type is not recognized, Returning a REPLICASET")
out_obj = ClusterConfig.fill_from_dict(data_dict=cluster_data)
except Exception as e:
raise e
return out_obj
def __init__(self, name: str,
size: InstanceSizeName = InstanceSizeName.M10,
disk_size: int = 10,
provider: ProviderName = ProviderName.AWS,
region: str = 'US_WEST_2',
version: MongoDBMajorVersion = MongoDBMajorVersion.v4_0,
) -> None:
provider_settings = ProviderSettings(size=size,
provider=provider, region=region)
regions_config = RegionConfig()
replication_specs = ReplicationSpecs(regions_config={region: regions_config.__dict__})
self.config: ClusterConfig = ClusterConfig(disk_size_gb=disk_size,
name=name,
mongodb_major_version=version,
providerSettings=provider_settings,
replication_specs=replication_specs)
self.config_running = None
self.config_pending = None