Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@task
def send_tx(self):
iroha = Iroha('admin@test')
tx = iroha.transaction([iroha.command(
'TransferAsset', src_account_id='admin@test', dest_account_id='test@test', asset_id='coin#test',
amount='0.01', description=HOSTNAME
)])
ic.sign_transaction(tx, ADMIN_PRIVATE_KEY)
self.client.send_tx_await(tx)
@task(50)
def vote_on_comment(self):
"""
Vote on a comment_response
Calls made:
GET_thread_list
GET_comment_list
PATCH_comment
"""
data = {"voted": random.choice(["true", "false"])}
name = "vote_on_comment" if self.verbose else "PATCH_comment"
self.patch_comment(
comment_id=self._get_comment_id(),
data=data,
name=name
)
@task
def search_notes(self):
"""
Search notes for random text.
"""
path = '/api/v1/search/'
self.get(
path,
{
'user': self._anonymous_user_id,
'course_id': self.course_id,
'text': ' '.join(pick_some(
NOTES_TEXT,
settings.data['NUM_SEARCH_TERMS'],
)),
'highlight': True,
'highlight_tag': HIGHLIGHT_TAG,
@task
def get_config_json(self):
self.client.get("/config.json", name="02 /config.json")
self.tm.end_transaction("startup")
from locust import HttpUser, TaskSet, task
import random
class DistroxApiUser(HttpUser):
min_wait = 100
max_wait = 1000
host = "http://localhost:8080/v1/kv"
@task(1)
class DistroxServerTasks(TaskSet):
keys = []
v = "\x68\x65\x6C\x6C\x6F\x20\x77\x6F\x72\x6C\x64\x21"
entry_count = 300
def put_entry(self, key):
self.client.headers['Content-Type'] = "application/json"
response = self.client.put("/{0}".format(key), data=self.v)
if response.status_code == 201:
self.keys.append(key)
@task
def get_entry(self):
response = self.client.get("/{0}".format(random.choice(self.keys)))
assert response.status_code == 200, "expected OK"
@task
def task3(self):
self.client.post("/post", json={"one": "two"})
@task
def index_page_with_regex_assertion(self):
r = self.client.get("/")
assert self.HOME_PAGE_TITLE_REGEX.search(r.text) is not None, \
"Expected title has not been found!"
@task
def index_page_with_response_code_assertion(self):
r = self.client.get("/")
assert r.status_code is 200, "Unexpected response code: " + r.status_code
@task(10)
def leaderboard(self):
self.client.get("/events/" + event_name + "/leaderboard")
@task
def run_cmd_date(self):
self.client.run_cmd('date')