Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def Search(agent, param):
s = spade.DF.Service()
s.setName("unittest_name_1_"+param)
#s.setOwner(spade.AID.aid(param+"@"+host,["xmpp://"+param+"@"+host]))
#s.setInputs(['login','password'])
agent.result = agent.searchService(s)
def testComposePlanBestQoS(self):
tbcbp = TBCBP()
s1 = Service(P=["P(P1)", "P(P2)"], Q=["Q(Q1)", "Q(Q2)"], inputs=["i1"], outputs=["o1"], name="s1")
s2 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s2")
s3 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s3")
tbcbp.registerService(s1, time=1, QoS=1)
tbcbp.registerService(s2, time=1, QoS=1)
tbcbp.registerService(s3, time=1, QoS=100)
kb0 = KB()
kb0.tell("P(P1)")
kb0.tell("P(P2)")
kb0.set("i1", 1024)
g = Goal("Q(Q3)")
plan = tbcbp.composePlan(Goal=g, kb=kb0, use_rl=False)
for s in plan.getServices():
def testComposePlan1CaseNoP(self):
tbcbp = TBCBP()
s1 = Service(P=["P(P1)", "P(P2)"], Q=["Q(Q1)"], inputs=["i1"], outputs=["o1"], name="s1")
tbcbp.registerService(s1)
kb0 = KB()
kb0.tell("P(P1)")
#Missing kb0.tell("P(P2)")
kb0.set("i1", 1024)
g = Goal("Q(Q1)")
plan = tbcbp.composePlan(Goal=g, kb=kb0)
assert plan is None
def testComposePlanBestTime(self):
tbcbp = TBCBP()
s1 = Service(P=["P(P1)", "P(P2)"], Q=["Q(Q1)", "Q(Q2)"], inputs=["i1"], outputs=["o1"], name="s1")
s2 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s2")
s3 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s3")
tbcbp.registerService(s1, time=10, QoS=1)
tbcbp.registerService(s2, time=200, QoS=1)
tbcbp.registerService(s3, time=20, QoS=1)
kb0 = KB()
kb0.tell("P(P1)")
kb0.tell("P(P2)")
kb0.set("i1", 1024)
g = Goal("Q(Q3)")
plan = tbcbp.composePlan(Goal=g, kb=kb0, use_rl=False)
for s in plan.getServices():
assert s in ["s1", "s3"]
def testComposePlanBestQoS(self):
tbcbp = TBCBP()
s1 = Service(P=["P(P1)", "P(P2)"], Q=["Q(Q1)", "Q(Q2)"], inputs=["i1"], outputs=["o1"], name="s1")
s2 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s2")
s3 = Service(P=["P(P3)", "P(P4)"], Q=["Q(Q3)"], inputs=["o1"], outputs=["o2"], name="s3")
tbcbp.registerService(s1, time=1, QoS=1)
tbcbp.registerService(s2, time=1, QoS=1)
tbcbp.registerService(s3, time=1, QoS=100)
kb0 = KB()
kb0.tell("P(P1)")
kb0.tell("P(P2)")
kb0.set("i1", 1024)
g = Goal("Q(Q3)")
plan = tbcbp.composePlan(Goal=g, kb=kb0, use_rl=False)