Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send2ws(self, data, **kwargs):
title_prefix = kwargs.get("title_prefix", "")
title_suffix = kwargs.get("title_suffix", "")
date_format = kwargs.get("date_format", None)
ws_title = self.create_ws_title(prefix=title_prefix,
suffix=title_suffix,
date_format=date_format)
row = self.format_ws_row(data)
ws = self.get_ws(ws_title)
if not self.is_token_valid():
self.authorize_gspread()
ws = self.get_ws(ws_title)
logger.info("access_token_expired reauthorize.")
try:
result = ws.append_row(row)
except gspread.exceptions.RequestError as e:
logger.error("401 error reopen retry:{}".format(e))
self.authorize_gspread()
ws = self.get_ws(ws_title)
result = ws.append_row(row)
logger.debug("result:{}".format(result))
return result
title_suffix = kwargs.get("title_suffix", "")
date_format = kwargs.get("date_format", None)
insert_date = kwargs.get("insert_date", True)
ws_title = self.create_ws_title(prefix=title_prefix,
suffix=title_suffix,
date_format=date_format,
insert_date=insert_date)
row = self.format_ws_row(data)
ws = self.get_ws(ws_title)
if not self.is_token_valid():
self.authorize_gspread()
ws = self.get_ws(ws_title)
logger.info("access_token_expired reauthorize.")
try:
result = ws.append_row(row)
except gspread.exceptions.RequestError as e:
logger.error("401 error reopen retry:{}".format(e))
self.authorize_gspread()
ws = self.get_ws(ws_title)
result = ws.append_row(row)
logger.debug("result:{}".format(result))
return result
if data and not headers.get('Content-Type', None):
headers['Content-Type'] = 'application/x-www-form-urlencoded'
request_headers = self.headers.copy()
if headers:
for k, v in headers.items():
if v is None:
del request_headers[k]
else:
request_headers[k] = v
try:
func = getattr(self.requests_session, method.lower())
except AttributeError:
raise RequestError("HTTP method '{}' is not supported".format(method))
response = func(url, data=data, params=params, headers=request_headers, files=files, json=json)
if response.status_code > 399:
raise RequestError(response.status_code, "{0}: {1}".format(
response.status_code, response.content))
return response
if headers:
for k, v in headers.items():
if v is None:
del request_headers[k]
else:
request_headers[k] = v
try:
func = getattr(self.requests_session, method.lower())
except AttributeError:
raise RequestError("HTTP method '{}' is not supported".format(method))
response = func(url, data=data, params=params, headers=request_headers, files=files, json=json)
if response.status_code > 399:
raise RequestError(response.status_code, "{0}: {1}".format(
response.status_code, response.content))
return response