Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def boot(self):
self.scheduler = get_rate_limit('scheduler',
unit=60,
interval=settings.SCHEDULER_INTERVAL,
limit=1)
def _rate_limit(self, url):
resource = urlparse(url).netloc or url
limit = self.context.get('http_rate_limit', settings.HTTP_RATE_LIMIT)
rate_limit = get_rate_limit(resource, limit=limit)
rate_limit.comply()
def _upsert(context, params, data):
"""Insert or update data and add/update appropriate timestamps"""
table = params.get("table")
table = datastore.get_table(table, primary_id=False)
unique_keys = ensure_list(params.get("unique"))
data["__last_seen"] = datetime.datetime.utcnow()
if len(unique_keys):
updated = table.update(data, unique_keys, return_count=True)
if updated:
return
data["__first_seen"] = data["__last_seen"]
rate_limit = get_rate_limit("db", limit=settings.DB_RATE_LIMIT)
rate_limit.comply()
table.insert(data)