Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_next_file_path(self, service, operation):
base_name = "{0}.{1}".format(service, operation)
if self.prefix:
base_name = "{0}.{1}".format(self.prefix, base_name)
pill.LOG.debug("get_next_file_path: %s", base_name)
next_file = None
while next_file is None:
index = self._index.setdefault(base_name, 1)
fn = os.path.join(self._data_path, base_name + "_{0}.json".format(index))
fn = fn.replace('\\', '/')
if fn in self._files:
next_file = fn
self._index[base_name] += 1
self._files.add(fn)
elif index != 1:
self._index[base_name] = 1
else:
# we are looking for the first index and it's not here
raise IOError("response file ({0}) not found".format(fn))
return fn
def save_response(self, service, operation, response_data, http_response=200):
filepath = self.get_new_file_path(service, operation)
pill.LOG.debug("save_response: path=%s", filepath)
json_data = {"status_code": http_response, "data": response_data}
self.archive.writestr(
filepath,
json.dumps(json_data, indent=4, default=pill.serialize),
zipfile.ZIP_DEFLATED,
)
self._files.add(filepath)
def get_new_file_path(self, service, operation):
base_name = "{0}.{1}".format(service, operation)
if self.prefix:
base_name = "{0}.{1}".format(self.prefix, base_name)
pill.LOG.debug("get_new_file_path: %s", base_name)
index = 0
glob_pattern = os.path.join(self._data_path, base_name + "*")
for file_path in fnmatch.filter(self._files, glob_pattern):
file_name = os.path.basename(file_path)
m = self.filename_re.match(file_name)
if m:
i = int(m.group("index"))
if i > index:
index = i
index += 1
return os.path.join(self._data_path, "{0}_{1}.json".format(base_name, index))
def load_response(self, service, operation):
response_file = self.get_next_file_path(service, operation)
self._used.add(response_file)
pill.LOG.debug("load_responses: %s", response_file)
response_data = json.loads(
self.archive.read(response_file), object_hook=pill.deserialize
)
return (
pill.FakeHttpResponse(response_data["status_code"]), response_data["data"]
)