Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@job(queue=queue)
def custom_queue_job(x, y):
return x + y
@job(queue='queue_name')
def hello():
return 'Hi'
result = hello.delay()
@job(queue='queue_name')
def bar():
return 'Firstly'
@job('bucket-keyset-scan', timeout=DEFAULT_TTL, ttl=DEFAULT_TTL,
connection=connection, result_ttl=0)
def process_keyset(bid, key_set):
account, bucket = bid.split(':', 1)
region = connection.hget('bucket-regions', bid)
versioned = bool(int(connection.hget('bucket-versions', bid)))
account_info = json.loads(connection.hget('bucket-accounts', account))
visitors = get_key_visitors(account_info)
object_reporting = account_info.get('object-reporting')
session = get_session(account_info)
patch_ssl()
s3 = session.client('s3', region_name=region, config=s3config)
error_count = sesserr = connerr = enderr = missing_count = 0
throttle_count = denied_count = remediation_count = 0
@job('bucket-inventory', timeout=DEFAULT_TTL, ttl=DEFAULT_TTL,
connection=connection, result_ttl=0)
def process_bucket_inventory(bid, inventory_bucket, inventory_prefix):
"""Load last inventory dump and feed as key source.
"""
log.info("Loading bucket %s keys from inventory s3://%s/%s",
bid, inventory_bucket, inventory_prefix)
account, bucket = bid.split(':', 1)
region = connection.hget('bucket-regions', bid)
versioned = bool(int(connection.hget('bucket-versions', bid)))
session = boto3.Session()
s3 = session.client('s3', region_name=region, config=s3config)
# find any key visitors with inventory filtering
account_info = json.loads(connection.hget('bucket-accounts', account))
ifilters = [v.inventory_filter for v
in get_key_visitors(account_info) if v.inventory_filter]
@job('bucket-set', timeout=3600, connection=connection)
def process_bucket_set(account_info, buckets):
"""Process a collection of buckets.
For each bucket fetch location, versioning and size and
then kickoff processing strategy based on size.
"""
region_clients = {}
log = logging.getLogger('salactus.bucket-set')
log.info("processing account %s", account_info)
session = get_session(account_info)
client = session.client('s3', config=s3config)
for b in buckets:
bid = bucket_id(account_info, b)
with bucket_ops(bid):
info = {'name': b}
def continue_worker(self, oid, restart_point, **kwargs):
"""
Registers continue_worker as a new task in RQ
The delay method executes it asynchronously
@param oid: uuid of the object to be started
@type oid: string
@param restart_point: sets the start point
@type restart_point: string
"""
return job(queue='default', connection=redis_conn)(continue_worker). \
delay(oid, restart_point, **kwargs)
@job("medium", connection=redis_conn)
def make_new_avatar(file_name):
pass
@job(queue=queue)
def build_corpus(pipeline, corpus_id, file_name, text_column, **kw):
""" Async job to build a corpus using the provided pipeline
"""
cpath = corpus_base_path(file_name) # corpus_id
fname = os.path.join(cpath, '%s_docs.json' % corpus_id)
logger.info('Creating corpus for %s at %s', file_name, cpath)
docs = build_pipeline(file_name, text_column, pipeline, preview_mode=False)
stream = DocStream(docs)
io.json.write_json(
stream, fname, mode='wt', lines=True, ensure_ascii=True,
separators=(',', ':')
)
def run_worker(self, workflow_name, data, **kwargs):
"""
Registers run_worker function as a new task in RQ
The delay method executes it asynchronously
:param workflow_name: name of the workflow to be run
:type workflow_name: string
:param data: list of objects for the workflow
:type data: list
"""
return RedisResult(
job(queue='default', connection=redis_conn)(run_worker).delay(
workflow_name, data, **kwargs
)