Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
next_file = None
serializer_format = None
index = self._index.setdefault(base_name, 1)
while not next_file:
file_name = os.path.join(
self._data_path, base_name + '_{0}'.format(index))
next_file, serializer_format = self.find_file_format(file_name)
if next_file:
self._index[base_name] += 1
elif index != 1:
index = 1
self._index[base_name] = 1
else:
raise IOError('response file ({0}.[{1}]) not found'.format(
file_name, "|".join(Format.ALLOWED)))
return next_file, serializer_format
def __init__(self, prefix=None, debug=False, record_format=Format.JSON):
if debug:
self._set_logger(__name__, logging.DEBUG)
record_format = record_format.lower()
if record_format not in Format.ALLOWED:
LOG.warning("Record format not allowed. Falling back to default.")
record_format = Format.DEFAULT
self._serializer = get_serializer(record_format)
self._filename_re = re.compile(r'.*\..*_(?P\d+).{0}'.format(
record_format))
self._record_format = record_format
self.prefix = prefix
self._uuid = str(uuid.uuid4())
self._data_path = None
self._mode = None
self._session = None
self._index = {}
self.events = []
self.clients = []
def find_file_format(file_name):
"""
Returns a tuple with the file path and format found, or (None, None)
"""
for file_format in Format.ALLOWED:
file_path = '.'.join((file_name, file_format))
if os.path.exists(file_path):
return file_path, file_format
return None, None