Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def main():
scope = sys.argv[1]
worker_type = 'standard'
if scope == 'deploy':
worker_cores = 8
worker_disk_size_gb = 100
max_instances = 10
pool_size = 10
else:
worker_cores = 1
worker_disk_size_gb = 10
max_instances = 3
pool_size = 2
db = Database()
await db.async_init()
# 22 is 16 bytes of entropy
# math.log((2**8)**16, 62) = 21.497443706501368
sigma = string.ascii_letters + string.digits
instance_id = ''.join([secrets.choice(sigma) for _ in range(22)])
await db.execute_insertone(
'''
INSERT INTO globals (
instance_id, internal_token,
worker_cores, worker_type, worker_disk_size_gb, max_instances, pool_size)
VALUES (%s, %s, %s, %s, %s, %s, %s);
''',
(instance_id, secrets.token_urlsafe(32),
worker_cores, worker_type, worker_disk_size_gb, max_instances, pool_size))
database_name = create_database_config['database_name']
cant_create_database = create_database_config['cant_create_database']
if cant_create_database:
assert sql_config.get('db') is not None
await write_user_config(namespace, database_name, 'admin', sql_config)
await write_user_config(namespace, database_name, 'user', sql_config)
return
scope = create_database_config['scope']
_name = create_database_config['_name']
admin_username = create_database_config['admin_username']
user_username = create_database_config['user_username']
db = Database()
await db.async_init()
if scope == 'deploy':
assert _name == database_name
# create if not exists
rows = db.execute_and_fetchall(
"SHOW DATABASES LIKE '{database_name}';")
rows = [row async for row in rows]
if len(rows) > 0:
assert len(rows) == 1
return
admin_password = secrets.token_urlsafe(16)
user_password = secrets.token_urlsafe(16)
async def on_startup(app):
pool = concurrent.futures.ThreadPoolExecutor()
app['blocking_pool'] = pool
db = Database()
await db.async_init()
app['db'] = db
row = await db.select_and_fetchone(
'SELECT worker_type, worker_cores, instance_id, internal_token FROM globals;')
app['worker_type'] = row['worker_type']
app['worker_cores'] = row['worker_cores']
instance_id = row['instance_id']
log.info(f'instance_id {instance_id}')
app['instance_id'] = instance_id
app['driver_headers'] = {
'Authorization': f'Bearer {row["internal_token"]}'
}
out, _ = await check_shell_output(
f'''
kubectl -n {namespace} get -o json secret {shq(admin_secret_name)}
''')
admin_secret = json.loads(out)
with open('/sql-config.json', 'wb') as f:
f.write(base64.b64decode(admin_secret['data']['sql-config.json']))
with open('/sql-config.cnf', 'wb') as f:
f.write(base64.b64decode(admin_secret['data']['sql-config.cnf']))
os.environ['HAIL_DATABASE_CONFIG_FILE'] = '/sql-config.json'
os.environ['HAIL_SCOPE'] = scope
db = Database()
await db.async_init()
rows = db.execute_and_fetchall(
f"SHOW TABLES LIKE '{database_name}_migration_version';")
rows = [row async for row in rows]
if len(rows) == 0:
await db.just_execute(f'''
CREATE TABLE `{database_name}_migration_version` (
`version` BIGINT NOT NULL
) ENGINE = InnoDB;
INSERT INTO `{database_name}_migration_version` (`version`) VALUES (1);
CREATE TABLE {database_name}_migrations (
`version` BIGINT NOT NULL,
`name` VARCHAR(100),
`script_sha1` VARCHAR(40),
async def on_startup(app):
pool = concurrent.futures.ThreadPoolExecutor()
app['blocking_pool'] = pool
kube.config.load_incluster_config()
k8s_client = kube.client.CoreV1Api()
app['k8s_client'] = k8s_client
db = Database()
await db.async_init()
app['db'] = db
row = await db.select_and_fetchone(
'SELECT instance_id, internal_token FROM globals;')
instance_id = row['instance_id']
log.info(f'instance_id {instance_id}')
app['instance_id'] = instance_id
app['internal_token'] = row['internal_token']
machine_name_prefix = f'batch-worker-{DEFAULT_NAMESPACE}-'
credentials = google.oauth2.service_account.Credentials.from_service_account_file(
'/gsa-key/key.json')
gservices = GServices(machine_name_prefix, credentials)