Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def spinnaker_pipeline(mock_os, mock_get_details, mock_get_prop):
"""Sets up pipeline fixture object"""
mock_get_prop.return_value = TEST_SETTINGS
pipelineObj = SpinnakerPipeline(
app='appgroup',
trigger_job='a_group_app', )
pipelineObj.generated = TEST_FORMAT_GENERATOR
pipelineObj.app_name = 'appgroup'
pipelineObj.group_name = 'group'
return pipelineObj
@mock.patch.object(SpinnakerPipeline, 'render_wrapper')
@mock.patch('foremast.pipeline.create_pipeline.get_subnets')
@mock.patch('foremast.pipeline.create_pipeline.construct_pipeline_block')
@mock.patch('foremast.pipeline.create_pipeline.renumerate_stages')
@mock.patch.object(SpinnakerPipeline, 'post_pipeline')
def test_create_pipeline_ec2(mock_post, mock_renumerate, mock_construct, mock_subnets, mock_wrapper, mock_clean,
spinnaker_pipeline):
"""test pipeline creation if ec2 pipeline."""
test_block_data = {
"env": "dev",
"generated": TEST_FORMAT_GENERATOR,
"previous_env": None,
"region": "us-east-1",
"settings": spinnaker_pipeline.settings["dev"]["us-east-1"],
"pipeline_data": spinnaker_pipeline.settings['pipeline'],
"region_subnets": {
'us-east-1': ['us-east-1d', 'us-east-1a', 'us-east-1e']
if pipeline_type not in consts.ALLOWED_TYPES and pipeline_type not in consts.MANUAL_TYPES:
raise NotImplementedError('Pipeline type "{0}" not permitted.'.format(pipeline_type))
if not onetime:
if pipeline_type == 'lambda':
spinnakerpipeline = pipeline.SpinnakerPipelineLambda(**kwargs)
elif pipeline_type == 's3':
spinnakerpipeline = pipeline.SpinnakerPipelineS3(**kwargs)
elif pipeline_type == 'datapipeline':
spinnakerpipeline = pipeline.SpinnakerPipelineDataPipeline(**kwargs)
elif pipeline_type in consts.MANUAL_TYPES:
spinnakerpipeline = pipeline.SpinnakerPipelineManual(**kwargs)
else:
# Handles all other pipelines
spinnakerpipeline = pipeline.SpinnakerPipeline(**kwargs)
else:
spinnakerpipeline = pipeline.SpinnakerPipelineOnetime(onetime=onetime, **kwargs)
spinnakerpipeline.create_pipeline()
def create_pipeline(self, onetime=None):
""" Creates the spinnaker pipeline """
utils.banner("Creating Pipeline")
if not onetime:
spinnakerpipeline = pipeline.SpinnakerPipeline(
app=self.app,
trigger_job=self.trigger_job,
prop_path=self.json_path,
base=None,
token_file=self.gitlab_token_path)
else:
spinnakerpipeline = pipeline.SpinnakerPipelineOnetime(
app=self.app,
trigger_job=self.trigger_job,
prop_path=self.json_path,
base=None,
token_file=self.gitlab_token_path,
onetime=onetime)
spinnakerpipeline.create_pipeline()