Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
createdSubsystem (subsystemContainer): The created subsystem to check.
expectedSubsystem (expectedSubsystemValues): The corresponding expected subsystem values.
Returns:
bool: True if all assertions passed (which is implicitly true if we get to the end of the function).
"""
# Ensure that we've determine the file location subsystem properly.
assert createdSubsystem.fileLocationSubsystem == expectedSubsystem.fileLocationSubsystem
# Check the directories
assert createdSubsystem.baseDir == expectedSubsystem.baseDir
assert createdSubsystem.imgDir == expectedSubsystem.imgDir
assert createdSubsystem.jsonDir == expectedSubsystem.jsonDir
# Check keys first.
assert list(createdSubsystem.files) == [utilities.extractTimeStampFromFilename(filename) for filename in expectedSubsystem.filenames]
# Then check filenames themselves.
assert [fileCont.filename for fileCont in itervalues(createdSubsystem.files)] == [os.path.join(expectedSubsystem.baseDir, filename) for filename in expectedSubsystem.filenames]
# Check the start and end times
assert createdSubsystem.startOfRun == expectedSubsystem.startOfRun
assert createdSubsystem.endOfRun == expectedSubsystem.endOfRun
# New files are added.
assert createdSubsystem.newFile is True
# Check whether root files will be shown.
assert createdSubsystem.showRootFiles is expectedSubsystem.showRootFiles
return True
ExportConverters.
Yields:
Converted values. Converted values may be of different types.
Raises:
NoConverterFound: in case no suitable converters were found for a value in
metadata_value_pairs. This error is only raised after
all values in metadata_value_pairs are attempted to be
converted. If there are multiple value types that could
not be converted because of the lack of corresponding
converters, only the last one will be specified in the
exception message.
"""
no_converter_found_error = None
for metadata_values_group in itervalues(
collection.Group(
metadata_value_pairs, lambda pair: pair[1].__class__.__name__)):
_, first_value = metadata_values_group[0]
converters_classes = ExportConverter.GetConvertersByValue(first_value)
if not converters_classes:
no_converter_found_error = "No converters found for value: %s" % str(
first_value)
continue
converters = [cls(options) for cls in converters_classes]
for converter in converters:
for result in converter.BatchConvert(metadata_values_group, token=token):
yield result
if no_converter_found_error is not None:
default=None,
enum_name=None,
enum=None,
enum_descriptions=None,
enum_labels=None,
**kwargs):
super(ProtoEnum, self).__init__(**kwargs)
if enum_name is None:
raise type_info.TypeValueError("Enum groups must be given a name.")
self.enum_name = enum_name
self.proto_type_name = enum_name
if isinstance(enum, EnumContainer):
enum = enum.enum_dict
for v in itervalues(enum):
if not (v.__class__ is int or v.__class__ is long):
raise type_info.TypeValueError("Enum values must be integers.")
self.enum_container = EnumContainer(
name=enum_name,
descriptions=enum_descriptions,
enum_labels=enum_labels,
values=(enum or {}))
self.enum = self.enum_container.enum_dict
self.reverse_enum = self.enum_container.reverse_enum
# Ensure the default is a valid enum value.
if default is not None:
self.default = self.Validate(default)
def GetAvailableReportPlugins():
"""Lists the registered report plugins."""
return sorted(
itervalues(REGISTRY.GetRegisteredPlugins()), key=lambda cls: cls.__name__)
def calc_hints(self, frame):
for pol in itervalues(frame):
if len(pol['btn_points']):
pol['hints'] = [(round(float(point.center_x - self.sprite.x) /
self.sprite.width, 3),
round(float(point.center_y - self.sprite.y) /
self.sprite.height, 3))
for point in pol['btn_points']]
else:
pol['hints'] = []
def schema(self):
""" The DQL query that will construct this table's schema """
attrs = self.attrs.copy()
parts = ["CREATE", "TABLE", self.name, "(%s," % self.hash_key.schema]
del attrs[self.hash_key.name]
if self.range_key:
parts.append(self.range_key.schema + ",")
del attrs[self.range_key.name]
if attrs:
attr_def = ", ".join([attr.schema for attr in itervalues(attrs)])
parts.append(attr_def + ",")
parts.append(
"THROUGHPUT (%d, %d))" % (self.read_throughput, self.write_throughput)
)
parts.extend([g.schema for g in itervalues(self.global_indexes)])
return " ".join(parts) + ";"
def _parse_tags(item):
"""Parse the tags"""
return {'tag': [tagdef['name']
for tagdef
in itervalues(item.get('tags', {}))
if isinstance(tagdef.get('name', {}), unicode)]}
def run(self):
try:
logger.info('Scheduler will start to execute jobs in %d seconds', initial_sleep)
# Add default jobs to the ODB and start all of them, the default and user-defined ones
self.init_jobs()
_sleep = self.sleep
_sleep_time = self.sleep_time
with self.lock:
for job in sorted(itervalues(self.jobs)):
if job.max_repeats_reached:
logger.info('Job `%s` already reached max runs count (%s UTC)', job.name, job.max_repeats_reached_at)
else:
self.spawn_job(job)
# Ok, we're good now.
self.ready = True
logger.info('Scheduler started')
while self.keep_running:
_sleep(_sleep_time)
if self.iter_cb:
self.iter_cb(*self.iter_cb_args)
def removed(self):
return itervalues(self._removed)
def Flush(self):
"""Writes the changes in this object to the datastore."""
self.data_store.StoreRequestsAndResponses(
new_requests=self.request_queue,
new_responses=self.response_queue,
requests_to_delete=self.requests_to_delete)
# We need to make sure that notifications are written after the requests so
# we flush after writing all requests and only notify afterwards.
mutation_pool = self.data_store.GetMutationPool()
with mutation_pool:
messages_by_queue = collection.Group(
list(itervalues(self.client_messages_to_delete)),
lambda request: request.queue)
for queue, messages in iteritems(messages_by_queue):
self.Delete(queue, messages, mutation_pool=mutation_pool)
if self.new_client_messages:
for timestamp, messages in iteritems(
collection.Group(self.new_client_messages, lambda x: x[1])):
self.Schedule([x[0] for x in messages],
timestamp=timestamp,
mutation_pool=mutation_pool)
if self.notifications:
for notification in itervalues(self.notifications):
self.NotifyQueue(notification, mutation_pool=mutation_pool)