Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _mount_nfs(self, mount_path, nfs_share, options=None, ensure=False):
"""Mount nfs export to mount path."""
utils.execute('mkdir', '-p', mount_path)
# Construct the NFS mount command.
nfs_cmd = ['mount', '-t', 'nfs']
if CONF.libvirt.nfs_mount_options is not None:
nfs_cmd.extend(['-o', CONF.libvirt.nfs_mount_options])
if options:
nfs_cmd.extend(options.split(' '))
nfs_cmd.extend([nfs_share, mount_path])
try:
utils.execute(*nfs_cmd, run_as_root=True)
except processutils.ProcessExecutionError as exc:
if ensure and 'already mounted' in six.text_type(exc):
LOG.warning(_LW("%s is already mounted"), nfs_share)
else:
raise
def _mount_glusterfs(self, mount_path, glusterfs_share,
options=None, ensure=False):
"""Mount glusterfs export to mount path."""
utils.execute('mkdir', '-p', mount_path)
gluster_cmd = ['mount', '-t', 'glusterfs']
if options is not None:
gluster_cmd.extend(options.split(' '))
gluster_cmd.extend([glusterfs_share, mount_path])
try:
utils.execute(*gluster_cmd, run_as_root=True)
except processutils.ProcessExecutionError as exc:
if ensure and 'already mounted' in six.text_type(exc):
LOG.warning(_LW("%s is already mounted"), glusterfs_share)
else:
raise
LOG.exception(_LE("Failed to upload %(image_location)s "
"to %(image_path)s"), log_vars)
_update_image_state(context, image_uuid, 'failed_upload')
return
metadata = {'status': 'active',
'properties': {'image_state': 'available'}}
self.service.update(context, image_uuid, metadata,
purge_props=False)
shutil.rmtree(image_path)
except exception.ImageNotFound:
LOG.info(_LI("Image %s was deleted underneath us"), image_uuid)
return
utils.spawn_n(delayed_create)
return image
def _ovs_vsctl(args):
full_args = ['ovs-vsctl', '--timeout=%s' % CONF.ovs_vsctl_timeout] + args
try:
return utils.execute(*full_args)
except Exception as e:
LOG.error(_LE("Unable to execute %(cmd)s. Exception: %(exception)s"),
{'cmd': full_args, 'exception': e})
raise exception.AgentError(method=full_args)
def product_destroy(context, product_id):
session = get_session()
with session.begin():
session.query(models.Product).\
filter_by(id=product_id).\
update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
# NOTE(joel-coffman): cryptsetup will strip trailing newlines from
# input specified on stdin unless --key-file=- is specified.
cmd = ["cryptsetup", "create", "--key-file=-"]
cipher = kwargs.get("cipher", None)
if cipher is not None:
cmd.extend(["--cipher", cipher])
key_size = kwargs.get("key_size", None)
if key_size is not None:
cmd.extend(["--key-size", key_size])
cmd.extend([self.dev_name, self.dev_path])
utils.execute(*cmd, process_input=passphrase,
check_exit_code=True, run_as_root=True)
def __init__(self, host):
super(LibvirtDRBDVolumeDriver, self).__init__(host)
self.connector = connector.InitiatorConnector.factory(
initiator.DRBD, utils.get_root_helper())
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
LOG = logging.getLogger('nova.virt.conn_common')
FLAGS = flags.FLAGS
flags.DEFINE_string('injected_network_template',
utils.abspath('virt/interfaces.template'),
'Template file for injected network')
def get_injectables(inst):
key = str(inst['key_data'])
net = None
network_ref = db.network_get_by_instance(context.get_admin_context(),
inst['id'])
if network_ref['injected']:
admin_context = context.get_admin_context()
address = db.instance_get_fixed_address(admin_context, inst['id'])
ra_server = network_ref['ra_server']
if not ra_server:
ra_server = "fd00::"
with open(FLAGS.injected_network_template) as f:
net = f.read() % {'address': address,
def _cp_template(self, template_name, dest_path, params):
f = open(utils.abspath("virt/dodai/" + template_name + ".template"), "r")
content = f.read()
f.close()
path = os.path.dirname(dest_path)
if not os.path.exists(path):
os.makedirs(path)
for key, value in params.iteritems():
content = content.replace(key, str(value))
f = open(dest_path, "w")
f.write(content)
f.close
def initialize_instance_snapshot_metadata(context, instance, name,
extra_properties=None):
"""Initialize new metadata for a snapshot of the given instance.
:param context: authenticated RequestContext; note that this may not
be the owner of the instance itself, e.g. an admin creates a
snapshot image of some user instance
:param instance: nova.objects.instance.Instance object
:param name: string for name of the snapshot
:param extra_properties: dict of extra metadata properties to include
:returns: the new instance snapshot metadata
"""
image_meta = utils.get_image_from_system_metadata(
instance.system_metadata)
image_meta['name'] = name
# If the user creating the snapshot is not in the same project as
# the owner of the instance, then the image visibility should be
# "shared" so the owner of the instance has access to the image, like
# in the case of an admin creating a snapshot of another user's
# server, either directly via the createImage API or via shelve.
extra_properties = extra_properties or {}
if context.project_id != instance.project_id:
# The glance API client-side code will use this to add the
# instance project as a member of the image for access.
image_meta['visibility'] = 'shared'
extra_properties['instance_owner'] = instance.project_id
# TODO(mriedem): Should owner_project_name and owner_user_name
# be removed from image_meta['properties'] here, or added to