Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def aio_fn(conn, *args, **kwargs):
parameters = kwargs if len(kwargs) > 0 else args
if op_type == SQLOperationType.INSERT_RETURNING:
return await driver_adapter.insert_returning(conn, query_name, sql, parameters)
elif op_type == SQLOperationType.INSERT_UPDATE_DELETE:
return await driver_adapter.insert_update_delete(conn, query_name, sql, parameters)
elif op_type == SQLOperationType.INSERT_UPDATE_DELETE_MANY:
return await driver_adapter.insert_update_delete_many(
conn, query_name, sql, *parameters
)
elif op_type == SQLOperationType.SCRIPT:
return await driver_adapter.execute_script(conn, sql)
elif op_type == SQLOperationType.SELECT:
return await driver_adapter.select(conn, query_name, sql, parameters)
else:
raise ValueError(f"Unknown op_type: {op_type}")
def load_methods(sql_text, driver_adapter):
lines = sql_text.strip().splitlines()
query_name = lines[0].replace("-", "_")
if query_name.endswith("