Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_tag(self, key, value):
data = dump_json(value)
key = make_key(self.crawler, "tag", key)
return conn.set(key, data, ex=self.crawler.expire)
def save(self):
session = pickle.dumps(self.session)
session = codecs.encode(session, 'base64')
key = sha1(session).hexdigest()[:15]
key = make_key(self.context.run_id, "session", key)
conn.set(key, session, ex=QUEUE_EXPIRE)
self.context.state[self.STATE_SESSION] = key
def operation_start(cls, crawler, stage, run_id):
if not conn.sismember(make_key(crawler, "runs"), run_id):
conn.sadd(make_key(crawler, "runs"), run_id)
conn.set(make_key("run", run_id, "start"), pack_now())
conn.incr(make_key("run", run_id))
conn.incr(make_key("run", run_id, "total_ops"))
conn.incr(make_key(crawler, stage))
conn.incr(make_key(crawler, "total_ops"))
conn.set(make_key(crawler, "last_run"), pack_now())
conn.set(make_key(crawler, "current_run"), run_id)
def operation_start(cls, crawler, stage, run_id):
if not conn.sismember(make_key(crawler, "runs"), run_id):
conn.sadd(make_key(crawler, "runs"), run_id)
conn.set(make_key("run", run_id, "start"), pack_now())
conn.incr(make_key("run", run_id))
conn.incr(make_key("run", run_id, "total_ops"))
conn.incr(make_key(crawler, stage))
conn.incr(make_key(crawler, "total_ops"))
conn.set(make_key(crawler, "last_run"), pack_now())
conn.set(make_key(crawler, "current_run"), run_id)
def operation_end(cls, crawler, run_id):
conn.set(make_key(crawler, "last_run"), pack_now())
pending = conn.decr(make_key("run", run_id))
if unpack_int(pending) == 0:
conn.set(make_key("run", run_id, "end"), pack_now())