Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_alt_collection(self):
db = self.db
alt = motor.MotorGridFS(db, 'alt')
oid = yield alt.put(b"hello world")
gridout = yield alt.get(oid)
self.assertEqual(b"hello world", (yield gridout.read()))
self.assertEqual(1, (yield self.db.alt.files.count_documents({})))
self.assertEqual(1, (yield self.db.alt.chunks.count_documents({})))
yield alt.delete(oid)
with self.assertRaises(NoFile):
yield alt.get(oid)
self.assertEqual(0, (yield self.db.alt.files.count_documents({})))
self.assertEqual(0, (yield self.db.alt.chunks.count_documents({})))
with self.assertRaises(NoFile):
yield alt.get("foo")
oid = yield alt.put(b"hello world", _id="foo")
def test_put_unacknowledged(self):
client = self.motor_client(w=0)
with self.assertRaises(ConfigurationError):
motor.MotorGridFS(client.motor_test)
client.close()
@tornado.web.authenticated
@tornado.gen.coroutine
def post(self):
user = self.get_secure_cookie("mechtari")
info = json_util.loads(user)
email = info["_id"]
namep = self.get_argument("namep").lower()
description = self.get_argument("description")[:160].lower()
tags = [ tag.lower() for tag in self.get_arguments("tags")]
#tag = [x.strip('\\\"\',!*&^%#$;:+') for x in set(tags.split())] this will cut a sentence to words, uncomment it if you want to use it
if "" in (namep,description, tags):
self.redirect("/pirate")
else:
try:
fs = motor.MotorGridFS(db)
prix = int(self.get_argument("prix")) # html uses strings, so dont forget to convert what you need, here we need integer
echange = self.get_argument("echange")
if echange not in ["non", "oui"]:
self.redirect("/pirate")
etat = self.get_argument("etat")
if etat not in ["se", "be", "ac"]:
self.redirect("/pirate")
date = datetime.datetime.now()
ava = self.request.files['photo'][0]
avat = ava["body"]
avctype = ava["content_type"]
image = Image.open(StringIO.StringIO(buf=avat))
(x, y) = image.size
if x < y:
orientation = "portrait"
@tornado.web.authenticated
@tornado.gen.coroutine
def get(self):
smin = int(self.get_argument("sommemin"))
smax = int(self.get_argument("sommemax"))
s = int(self.get_argument("s"))
url = self.request.uri
lin = spliter.split(url)[0]
link = spliter.split(url)[1]
user = self.get_secure_cookie("mechtari")
info = json_util.loads(user)
email = info["_id"]
try:
fs = motor.MotorGridFS(db)
up = yield db.users.find_one({"_id":email})
achat = []
try:
for i in up["pdn"]:
achat.append(str(i["avt"]["fto"]))
except KeyError:
achat = []
produits = yield db.users.aggregate([{"$unwind":"$pup"},{"$match":{"pup.spec.pri":{"$gte": smin, "$lte": smax}}}, {"$sort":{"pup.spec.pri":-1}}, {"$skip":s}, {"$limit":5}, {"$group":{"_id":0,"pup":{"$push":"$pup"}}}])
avatar = []
produit = produits["result"]
if produit:
for prod in produit:
for i in prod["pup"]:
gridout = yield fs.get(i["avt"]["fto"])
avatar.append(gridout.filename)
produits = prod["pup"]
def open_gridfs():
for collection in ('avatars',):
def set_fs(fs, error):
if error: raise error
global fss
fss[collection] = GridFSWrapper(fs, collection)
motor.MotorGridFS(get_db(), collection=collection).open(set_fs)
from tornado.options import define, options
from tornado.concurrent import run_on_executor
import bson
import motor
from PIL import Image
define('port', default=8000, help='run on the given port', type=int)
define('db_uri', default='localhost', help='mongodb uri')
define('db_name', default='habr_tornado', help='name of database')
define('debug', default=True, help='debug mode', type=bool)
options.parse_command_line()
db = motor.MotorClient(options.db_uri)[options.db_name]
gridfs = motor.MotorGridFS(db)
class UploadHandler(web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=os.cpu_count())
@gen.coroutine
def get(self):
imgs = yield db.imgs.find().sort('_id', -1).to_list(20)
self.render('upload.html', imgs=imgs)
@gen.coroutine
def post(self):
file = self.request.files['file'][0]
try:
thumbnail = yield self.make_thumbnail(file.body)
except OSError:
def store_image(self, name, content, content_type):
"""Put an image in GridFS, and return the URL."""
fs = motor.MotorGridFS(self.settings['db'])
# This is the tail end of the URL, like 2012/06/foo.png.
now = datetime.datetime.utcnow()
fullname = media_link(now.year, now.month, name)
gridin = yield fs.new_file(
filename=fullname,
content_type=content_type)
yield gridin.write(content)
yield gridin.close()
raise gen.Return(self.application.reverse_url('media', fullname))
def post(self):
media_id = self.get_argument('media_id')
fs = yield motor.MotorGridFS(self.settings['db']).open()
yield fs.delete(ObjectId(media_id))
self.redirect(self.reverse_url('media-page'))