Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_product(id):
"""更新商品
"""
data = request.get_json()
count = Product.query.filter(Product.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
product = Product.query.get(id)
session.commit()
return json_response(product=ProductSchema().dump(product))
def update_product(id):
"""更新商品
"""
data = request.get_json()
count = Product.query.filter(Product.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
product = Product.query.get(id)
session.commit()
return json_response(product=ProductSchema().dump(product))
def update_shop(id):
"""更新店铺
"""
data = request.get_json()
count = Shop.query.filter(Shop.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
shop = Shop.query.get(id)
session.commit()
return json_response(shop=ShopSchema().dump(shop))
def update_cart_product(id):
"""更新购物车商品,比如数量
"""
data = request.get_json()
count = CartProduct.query.filter(CartProduct.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
cart_product = CartProduct.query.get(id)
session.commit()
return json_response(cart_product=CartProductSchema().dump(cart_product))
def update_shop(id):
"""更新店铺
"""
data = request.get_json()
count = Shop.query.filter(Shop.id == id).update(data)
if count == 0:
return json_response(ResponseCode.NOT_FOUND)
shop = Shop.query.get(id)
session.commit()
return json_response(shop=ShopSchema().dump(shop))
order = Order.query.get(id)
if order is None:
return json_response(ResponseCode.NOT_FOUND)
if data.get('address_id') is not None:
order.address_id = data.get('address_id')
if data.get('note') is not None:
order.note = data.get('note')
if data.get('status') is not None:
order.status = data.get('status')
if data.get('order_products') is not None:
order_products = []
for op in data.get('order_products'):
order_product = OrderProduct.query.get(op.get('id'))
if order_product is None:
return json_response(ResponseCode.NOT_FOUND)
if op.get('amount') is not None:
order_product.amount = op.get('amount')
if op.get('price') is not None:
order_product.price = op.get('price')
order_products.append(order_product)
order.order_products = order_products
session.commit()
return json_response(order=OrderSchema().dump(order))
def shop_info(id):
"""查询店铺
"""
shop = Shop.query.get(id)
if shop is None:
return json_response(ResponseCode.NOT_FOUND)
return json_response(shop=ShopSchema().dump(shop))
def favorite_product_info(id):
"""查询收藏商品
"""
favorite_product = FavoriteProduct.query.get(id)
if favorite_product is None:
return json_response(ResponseCode.NOT_FOUND)
return json_response(favorite_product=FavoriteProductSchema().dump(favorite_product))
def create_wallet_transaction():
"""创建交易
"""
data = request.get_json()
wallet_transaction = WalletTransactionSchema().load(data)
# 采用乐观锁来防止并发情况下可能出现的数据不一致性,也可使用悲观锁(query 时使用 with_for_update),但资源消耗较大
payer = User.query.get(wallet_transaction.payer_id)
if payer is None:
return json_response(ResponseCode.NOT_FOUND)
payee = User.query.get(wallet_transaction.payee_id)
if payee is None:
return json_response(ResponseCode.NOT_FOUND)
count = User.query.filter(
and_(User.id == payer.id, User.wallet_money >= wallet_transaction.amount,
User.wallet_money == payer.wallet_money)
).update({
User.wallet_money: payer.wallet_money - wallet_transaction.amount
})
if count == 0:
session.rollback()
return json_response(ResponseCode.TRANSACTION_FAILURE)
count = User.query.filter(
def update_order(id):
"""更新订单,支持部分更新,但只能更新地址、备注、状态都信息
注意订单商品要么不更新,要么整体一起更新
"""
data = request.get_json()
order = Order.query.get(id)
if order is None:
return json_response(ResponseCode.NOT_FOUND)
if data.get('address_id') is not None:
order.address_id = data.get('address_id')
if data.get('note') is not None:
order.note = data.get('note')
if data.get('status') is not None:
order.status = data.get('status')
if data.get('order_products') is not None:
order_products = []
for op in data.get('order_products'):
order_product = OrderProduct.query.get(op.get('id'))
if order_product is None:
return json_response(ResponseCode.NOT_FOUND)
if op.get('amount') is not None:
order_product.amount = op.get('amount')
if op.get('price') is not None: