Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_07_create_a_cluster(self):
provider_settings: ProviderSettings = ProviderSettings()
regions_config = RegionConfig()
replication_specs = ReplicationSpecs(regions_config={provider_settings.region_name: regions_config.__dict__})
cluster_config = ClusterConfig(name=self.TEST_CLUSTER3_NAME_UNIQUE,
providerSettings=provider_settings,
replication_specs=replication_specs)
output = self.a.Clusters.create_cluster(cluster_config)
pprint(output)
def __init__(self, name: str,
size: InstanceSizeName = InstanceSizeName.M10,
disk_size: int = 10,
provider: ProviderName = ProviderName.AWS,
region: str = 'US_WEST_2',
version: MongoDBMajorVersion = MongoDBMajorVersion.v4_0,
) -> None:
provider_settings = ProviderSettings(size=size,
provider=provider, region=region)
regions_config = RegionConfig()
replication_specs = ReplicationSpecs(regions_config={region: regions_config.__dict__})
self.config: ClusterConfig = ClusterConfig(disk_size_gb=disk_size,
name=name,
mongodb_major_version=version,
providerSettings=provider_settings,
replication_specs=replication_specs)
self.config_running = None
self.config_pending = None
mongo_uri: Optional[str] = None,
mongo_uri_updated: Optional[str] = None,
mongo_uri_with_options: Optional[str] = None,
paused: bool = False,
pit_enabled: bool = False,
replication_factor: Optional[int] = None,
state_name: Optional[ClusterStates] = None,
autoscaling: dict = None,
replication_specs: ReplicationSpecs = None,
srv_address: Optional[str] = None,
providerSettings: Optional[ProviderSettings] = None,
links: list = None,
id: Optional[str] = None
) -> None:
self.id: Optional[str] = id
self.providerSettings: Optional[ProviderSettings] = providerSettings
self.backup_enabled: bool = backup_enabled
self.cluster_type: ClusterType = cluster_type
self.disk_size_gb: int = disk_size_gb
self.name: Optional[str] = name
self.mongodb_major_version: MongoDBMajorVersion = mongodb_major_version
self.mongodb_version: Optional[str] = mongodb_version
self.num_shards: int = num_shards
self.mongo_uri: Optional[str] = mongo_uri
self.mongo_uri_updated: Optional[datetime] = mongo_uri_updated
self.mongod_uri_with_options: Optional[str] = mongo_uri_with_options
self.paused: bool = paused
self.pit_enabled: bool = pit_enabled
self.replication_factor: Optional[int] = replication_factor
self.state_name: Optional[ClusterStates] = state_name
self.autoscaling: dict = autoscaling
if type(replication_specs) == list:
mongodb_major_version = MongoDBMajorVersion.vX_x
mongodb_version = data_dict.get('mongoDBVersion', None)
paused = data_dict.get('paused', False)
pit_enabled = data_dict.get('pitEnabled', False)
replication_factor = data_dict.get('replicationFactor', None)
state_name = ClusterStates[data_dict.get('stateName', ClusterStates.UNKNOWN)]
autoscaling = data_dict.get('autoScaling', {})
if len(replication_specs) > 1:
raise ValueError('We are limited to a single replication spec.')
replication_specs = replication_specs[0]
repl_spec_obj = ReplicationSpecs.from_dict(replication_specs)
replication_specs = repl_spec_obj
provider_settings_dict = data_dict.get('providerSettings', None)
srv_address = data_dict.get('srvAddress', None)
providerSettings = ProviderSettings.from_dict(provider_settings_dict)
links = data_dict.get('links', [])
id = data_dict.get('id', None)
return cls(backup_enabled, cluster_type, disk_size_gb, name, mongodb_major_version, mongodb_version,
num_shards, mongo_uri, mongo_uri_updated, mongo_uri_with_options, paused, pit_enabled,
replication_factor, state_name, autoscaling, replication_specs,
srv_address, providerSettings, links,id)