Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
ids = []
for v in request.args.get('ids', '').split(','):
id = int(v.strip())
if id > 0:
ids.append(id)
if len(ids) == 0:
raise BadRequest()
query = Address.query.filter(Address.id.in_(ids))
addresses = {address.id: AddressSchema().dump(address)
for address in query}
return json_response(addresses=addresses)
"""
user_id = request.args.get('user_id', type=int)
order_direction = request.args.get('order_direction', 'desc')
limit = request.args.get(
'limit', current_app.config['PAGINATION_PER_PAGE'], type=int)
offset = request.args.get('offset', 0, type=int)
order_by = Order.id.asc() if order_direction == 'asc' else Order.id.desc()
query = Order.query
if user_id is not None:
query = query.filter(Order.user_id == user_id)
total = query.count()
query = query.order_by(order_by).limit(limit).offset(offset)
return json_response(orders=OrderSchema().dump(query, many=True), total=total)
order_direction = request.args.get('order_direction', 'desc')
limit = request.args.get(
'limit', current_app.config['PAGINATION_PER_PAGE'], type=int)
offset = request.args.get('offset', 0, type=int)
order_by = Product.id.asc() if order_direction == 'asc' else Product.id.desc()
query = Product.query
if shop_id is not None:
query = query.filter(Product.shop_id == shop_id)
if keywords != '':
query = query.filter(
or_(Product.title.match(keywords), Product.description.match(keywords)))
total = query.count()
query = query.order_by(order_by).limit(limit).offset(offset)
return json_response(products=ProductSchema().dump(query, many=True), total=total)
def create_file():
"""保存表单上传文件到 GridFS
"""
if 'file' not in request.files or request.files['file'].filename == '':
raise NotFound()
id = mongo.save_file(request.files['file'].filename, request.files["file"])
_, ext = path.splitext(request.files['file'].filename)
return json_response(id='{}{}'.format(id, ext))
def pay(id):
"""支付订单
"""
resp = TbBuy(current_app).get_json('/orders/{}'.format(id))
order = resp['data']['order']
resp = TbUser(current_app).get_json('/users/{}'.format(order['user_id']))
user = resp['data']['user']
if user['wallet_money'] < order['pay_amount']:
return json_response(ResponseCode.NO_ENOUGH_MONEY)
resp = TbMall(current_app).get_json('/products/infos', params={
'ids': ','.join([str(v['product_id']) for v in order['order_products']]),
})
products = resp['data']['products']
# 对订单中的每个商品,创建一笔交易来完成订单创建者向商品店主的付款
for order_product in order['order_products']:
product = products.get(str(order_product['product_id']))
resp = TbUser(current_app).post_json('/wallet_transactions', json={
'amount': order_product['amount'] * order_product['price'],
'note': '支付订单({})商品({})'.format(order['id'], product['id']),
'payer_id': order['user_id'],
'payee_id': product['shop']['user_id'],
})
if resp['code'] != 0:
def order_info(id):
"""查询订单
"""
order = Order.query.get(id)
if order is None:
return json_response(ResponseCode.NOT_FOUND)
return json_response(order=OrderSchema().dump(order))
def cancel(id):
"""取消订单
"""
resp = TbBuy(current_app).post_json('/orders/{}'.format(id), json={
'status': 'cancelled',
})
return json_response(resp['code'], resp['message'], **resp['data'])