Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
global left_encoder
global right_encoder
left_motor.deinit()
right_motor.deinit()
left_encoder.deinit()
right_encoder.deinit()
button.deinit()
# 左侧电机
left_motor = Motor(0)
# 左侧编码器
left_encoder = Encoder(0, is_quad_freq=False, motor=left_motor)
# 右侧电机
right_motor = Motor(1)
# 右侧编码器
right_encoder = Encoder(1, is_quad_freq=False, motor=right_motor)
button = Button(0, callback=callback)
'''
电机测试
-----------------
左右轮设定特定的转速
'''
from car_config import gpio_dict, car_property
from motor import Motor
import time
# 左侧电机
lmotor = Motor(gpio_dict['LEFT_MOTOR_A'], gpio_dict['LEFT_MOTOR_B'],
motor_install_dir=car_property['LEFT_MOTOR_INSTALL_DIR'])
lmotor.pwm(300)
# 右侧电机
rmotor = Motor(gpio_dict['RIGHT_MOTOR_A'], gpio_dict['RIGHT_MOTOR_B'],
motor_install_dir=car_property['RIGHT_MOTOR_INSTALL_DIR'])
rmotor.pwm(300)
try:
while True:
pass
except:
lmotor.deinit()
rmotor.deinit()
map_fn = bson.Code('function map() { emit(this._id % 2, 1); }')
reduce_fn = bson.Code('''
function reduce(key, values) {
r = 0;
values.forEach(function(value) { r += value; });
return r;
}''')
yield self.db.tmp_mr.drop()
# First do a standard mapreduce, should return MotorCollection
collection = self.collection
tmp_mr = yield collection.map_reduce(map_fn, reduce_fn, 'tmp_mr')
self.assertTrue(
isinstance(tmp_mr, motor.MotorCollection),
'map_reduce should return MotorCollection, not %s' % tmp_mr)
result = yield tmp_mr.find().sort([('_id', 1)]).to_list(length=1000)
self.assertEqual(expected_result, result)
# Standard mapreduce with full response
yield self.db.tmp_mr.drop()
response = yield collection.map_reduce(
map_fn, reduce_fn, 'tmp_mr', full_response=True)
self.assertTrue(
isinstance(response, dict),
'map_reduce should return dict, not %s' % response)
self.assertEqual('tmp_mr', response['result'])
result = yield tmp_mr.find().sort([('_id', 1)]).to_list(length=1000)
def test_gridfs_find(self):
yield from self.fs.put(b"test2", filename="two")
yield from self.fs.put(b"test2+", filename="two")
yield from self.fs.put(b"test1", filename="one")
yield from self.fs.put(b"test2++", filename="two")
cursor = self.fs.find().sort("_id", -1).skip(1).limit(2)
self.assertTrue((yield from cursor.fetch_next))
grid_out = cursor.next_object()
self.assertTrue(isinstance(grid_out, AsyncIOMotorGridOut))
self.assertEqual(b"test1", (yield from grid_out.read()))
self.assertRaises(TypeError, self.fs.find, {}, {"_id": True})
yield motor.Op(self.app.mongodb.worldprop.insert,
{'wid':self.exwid, 'locid':self.exlocid,
'key':'x', 'val':0})
yield motor.Op(self.app.mongodb.worldprop.insert,
{'wid':self.exwid, 'locid':self.exlocid,
'key':'w', 'val':'world'})
yield motor.Op(self.app.mongodb.worldprop.insert,
{'wid':self.exwid, 'locid':None,
'key':'r', 'val':11})
yield motor.Op(self.app.mongodb.worldprop.insert,
{'wid':self.exwid, 'locid':self.exlocid,
'key':'r', 'val':12})
yield motor.Op(self.app.mongodb.instanceprop.remove,
{})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':self.exlocid,
'key':'x', 'val':1})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':self.exlocid,
'key':'y', 'val':2})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':self.exlocid,
'key':'ls', 'val':[1,2,3]})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':self.exlocid,
'key':'map', 'val':{'one':1, 'two':2, 'three':3}})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':None,
'key':'r', 'val':13})
yield motor.Op(self.app.mongodb.instanceprop.insert,
{'iid':self.exiid, 'locid':self.exlocid,
bar = Field(int, required=False)
moo = Field(list)
Doc.register(client=MOTOR_CLIENT, db='nanotestdb')
d = Doc(foo=six.u('foo value'), bar=42)
d.moo = []
yield motor.Op(d.insert)
del d.bar # unset
yield motor.Op(d.save)
result = yield motor.Op(Doc.find_one, {'_id': d._id})
self.assertEqual(d, result)
d.foo = six.u('new foo')
d['bar'] = 1337
d.moo = ['moo 0']
yield motor.Op(d.save, atomic=True)
result = yield motor.Op(Doc.find_one, {'foo': six.u('new foo'), 'bar': 1337})
self.assertEqual(d, result)
d.moo = []
del d['bar']
yield motor.Op(d.save)
result = yield motor.Op(Doc.find_one, {'_id': d._id})
self.assertEqual(d, result)
async def test_db(error, mocker, request, spawn_client, test_db_name, mock_setup):
client = await spawn_client(setup_mode=True)
actual_host = request.config.getoption("db_host")
update = {
"db_host": "foobar" if error == "db_connection_error" else actual_host,
"db_port": 27017,
"db_name": test_db_name,
"db_username": "",
"db_password": ""
}
if error == "db_not_empty_error":
db_client = motor.motor_asyncio.AsyncIOMotorClient(
io_loop=client.app.loop,
host=actual_host,
port=27017
)
await db_client[test_db_name].references.insert_one({"_id": "test"})
resp = await client.post_form("/setup/db", update)
assert resp.status == 200
errors = None
if error == "db_connection_error":
errors = dict(mock_setup["errors"], db_connection_error=True)
def test_io_loop(self):
with self.assertRaises(TypeError):
motor.MotorClient(test.env.rs_uri, io_loop='foo')
def test_underscore(self):
self.assertIsInstance(self.cx['_db'],
motor_asyncio.AsyncIOMotorDatabase)
self.assertIsInstance(self.db['_collection'],
motor_asyncio.AsyncIOMotorCollection)
self.assertIsInstance(self.collection['_collection'],
motor_asyncio.AsyncIOMotorCollection)
with self.assertRaises(AttributeError):
self.cx._db
with self.assertRaises(AttributeError):
self.db._collection
with self.assertRaises(AttributeError):
self.collection._collection
def test_get_item(self):
client = create_class_with_framework(core.MongoMotorAgnosticClient,
asyncio_framework,
self.__module__)()
db = client['some-db']
comp_db = create_class_with_framework(
core.MongoMotorAgnosticDatabase,
asyncio_framework,
self.__module__)
self.assertTrue(isinstance(db, comp_db))