Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_put_incoming_creates_a_blob_using_filesystem(self):
self.prepare('filesystem')
user_id, doc_id = self.user_id, uuid4().hex
content = 'Hi'
formatter = IncomingFormatter()
incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
consumer = DummyRequest([''])
yield db.read_blob(user_id, doc_id, consumer, namespace='MX')
flags = yield db.get_flags(user_id, doc_id, namespace='MX')
data = consumer.written.pop()
expected_preamble = formatter.preamble(content, doc_id)
expected_preamble = decode_preamble(expected_preamble, True)
written_preamble, written_content = data.split()
written_preamble = decode_preamble(written_preamble, True)
self.assertEquals(expected_preamble, written_preamble)
self.assertEquals(content, written_content)
self.assertIn(Flags.PENDING, flags)
def test_put_incoming_creates_a_document_using_couch(self):
self.prepare('couch')
user_id, doc_id = self.user_id, uuid4().hex
content, scheme = 'Hi', EncryptionSchemes.PUBKEY
formatter = IncomingFormatter()
incoming_endpoint = self.uri + '%s/%s' % (user_id, doc_id)
yield treq.put(incoming_endpoint, BytesIO(content), persistent=False)
db = self.state.open_database(user_id)
doc = db.get_doc(doc_id)
self.assertEquals(doc.content, formatter.format(content, scheme))
def deliver_using_incoming_api(url, user_uuid, token, data):
auth = 'Token %s' % base64.b64encode('%s:%s' % (user_uuid, token))
uri = "%s/incoming/%s/%s?namespace=MX" % (url, user_uuid, uuid.uuid4().hex)
return treq.put(uri, headers={'Authorization': auth}, data=BytesIO(data))
def put_mapping(_):
print("Putting mapping {0}".format(event_mapping))
with open(event_mapping) as f:
d = treq.put('http://localhost:9200/history/event/_mapping',
f.read())
return d.addCallback(treq.json_content).addCallback(print)
records_base = base.child("zones").child(zone_id).child("dns_records")
records_query_url = str(records_base
.set("type", "TXT")
.set("name", full_name))
response = yield treq.get(records_query_url, headers=self._headers())
data = yield response.json()
records = data['result']
dns_record = {
"type": "TXT",
"ttl": 120,
"name": full_name,
"content": validation
}
if records:
put_to = str(records_base.child(records[0]["id"]))
response = yield treq.put(
put_to, json=dns_record,
headers=self._headers()
)
else:
post_to = str(records_base)
response = yield treq.post(post_to, json=dns_record, headers=self._headers())
yield response.json()
yield ConsistencyChecker.default(self._reactor).check(full_name, validation)
def ensure(_):
d = treq.put('http://localhost:9200/history')
d.addCallback(treq.json_content)
return d.addCallback(print)
def sendAll(self, args={}):
logging.critical('Setting All Lights')
data = args.get('lightCommand')
url = self._api_url + 'all/state'
treq.put(url, headers=self.header, data=data)
#if r.status_code == 200:
return True
#return False
def upload(self, local_path):
log.debug("Uploading %s...", local_path)
yield self.await_ready()
with open(local_path, "rb") as f:
resp = yield treq.put("{}uri".format(self.nodeurl), f)
if resp.code == 200:
content = yield treq.content(resp)
log.debug("Successfully uploaded %s", local_path)
return content.decode("utf-8")
content = yield treq.content(resp)
raise TahoeWebError(content.decode("utf-8"))