Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_04_create_basic_cluster(self):
myoutput = self.a.Clusters.create_basic_rs(name=self.TEST_CLUSTER2_NAME_UNIQUE, version=mdb_version.v4_2,
size=InstanceSizeName.M10)
self.assertEqual(type(myoutput), AtlasBasicReplicaSet)
pprint(myoutput.config.as_dict())
print('-------------------Waiting a bit to allow the cluster to be created......-------------')
sleep(30)
print('-----------------------------------Done Sleeping -------------------------------------')
replication_specs = data_dict.get('replicationSpecs', {})
backup_enabled = data_dict.get('backupEnabled', None)
disk_size_gb = data_dict.get('diskSizeGB', None)
name = data_dict.get('name', None)
num_shards = data_dict.get('numShards', 1)
clusterType = data_dict.get('clusterType', None)
cluster_type = ClusterType[clusterType]
mongo_uri = data_dict.get('mongoURI', None)
mongo_uri_with_options = data_dict.get('mongoURIWithOptions', None)
try:
mongo_uri_updated = datetime.strptime(data_dict.get('mongoURIUpdated', None), FORMAT).astimezone(
tz=pytz.UTC)
except ValueError:
mongo_uri_updated = None
try:
mongodb_major_version = MongoDBMajorVersion(data_dict.get('mongoDBMajorVersion'))
except ValueError:
mongodb_major_version = MongoDBMajorVersion.vX_x
mongodb_version = data_dict.get('mongoDBVersion', None)
paused = data_dict.get('paused', False)
pit_enabled = data_dict.get('pitEnabled', False)
replication_factor = data_dict.get('replicationFactor', None)
state_name = ClusterStates[data_dict.get('stateName', ClusterStates.UNKNOWN)]
autoscaling = data_dict.get('autoScaling', {})
if len(replication_specs) > 1:
raise ValueError('We are limited to a single replication spec.')
replication_specs = replication_specs[0]
repl_spec_obj = ReplicationSpecs.from_dict(replication_specs)
replication_specs = repl_spec_obj
provider_settings_dict = data_dict.get('providerSettings', None)
srv_address = data_dict.get('srvAddress', None)
disk_size_gb = data_dict.get('diskSizeGB', None)
name = data_dict.get('name', None)
num_shards = data_dict.get('numShards', 1)
clusterType = data_dict.get('clusterType', None)
cluster_type = ClusterType[clusterType]
mongo_uri = data_dict.get('mongoURI', None)
mongo_uri_with_options = data_dict.get('mongoURIWithOptions', None)
try:
mongo_uri_updated = datetime.strptime(data_dict.get('mongoURIUpdated', None), FORMAT).astimezone(
tz=pytz.UTC)
except ValueError:
mongo_uri_updated = None
try:
mongodb_major_version = MongoDBMajorVersion(data_dict.get('mongoDBMajorVersion'))
except ValueError:
mongodb_major_version = MongoDBMajorVersion.vX_x
mongodb_version = data_dict.get('mongoDBVersion', None)
paused = data_dict.get('paused', False)
pit_enabled = data_dict.get('pitEnabled', False)
replication_factor = data_dict.get('replicationFactor', None)
state_name = ClusterStates[data_dict.get('stateName', ClusterStates.UNKNOWN)]
autoscaling = data_dict.get('autoScaling', {})
if len(replication_specs) > 1:
raise ValueError('We are limited to a single replication spec.')
replication_specs = replication_specs[0]
repl_spec_obj = ReplicationSpecs.from_dict(replication_specs)
replication_specs = repl_spec_obj
provider_settings_dict = data_dict.get('providerSettings', None)
srv_address = data_dict.get('srvAddress', None)
providerSettings = ProviderSettings.from_dict(provider_settings_dict)
links = data_dict.get('links', [])
def __init__(self, backup_enabled: bool = False,
cluster_type: ClusterType = ClusterType.REPLICASET,
disk_size_gb: int = 32,
name: str = None,
mongodb_major_version: MongoDBMajorVersion = MongoDBMajorVersion.v4_0,
mongodb_version: Optional[str] = None,
num_shards: int = 1,
mongo_uri: Optional[str] = None,
mongo_uri_updated: Optional[str] = None,
mongo_uri_with_options: Optional[str] = None,
paused: bool = False,
pit_enabled: bool = False,
replication_factor: Optional[int] = None,
state_name: Optional[ClusterStates] = None,
autoscaling: dict = None,
replication_specs: ReplicationSpecs = None,
srv_address: Optional[str] = None,
providerSettings: Optional[ProviderSettings] = None,
links: list = None,
id: Optional[str] = None
) -> None:
def create_basic_rs(self, name: str,
size: InstanceSizeName = InstanceSizeName.M10,
disk_size: int = 10,
provider: ProviderName = ProviderName.AWS,
region: str = 'US_WEST_2',
version: MongoDBMajorVersion = MongoDBMajorVersion.v4_0) -> AtlasBasicReplicaSet:
"""
Simplified method for creating a basic replica set with basic options.
:param name: The name for the cluster
:param size: The Atlas Instance size, found in The InstanceSizeName enum
:param disk_size: The size in GB for disk
:param provider: The cloud provider, found in ProviderName enum
:param region: The provider region to place the cluster.
:param version: The mongodb major version (enum)
:return: AtlasBasicReplicaSet
"""
cluster: AtlasBasicReplicaSet = AtlasBasicReplicaSet(name=name, size=size, disk_size=disk_size,
provider=provider, region=region,
version=version)
result = self.create_cluster(cluster=cluster.config)