Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_initialization(self):
assert isinstance(HivemindOpinion(), HivemindOpinion)
def test_setting_ranked_choice_only_contains_valid_option_hashes(self):
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=STRING_STATE_HASH)
with pytest.raises(Exception):
opinion.set(opinionator='me', ranked_choice=[STRING_OPTION1_HASH, STRING_OPTION2_HASH, 'foo', 'bar'])
with pytest.raises(Exception):
opinion.set(opinionator='me', ranked_choice=STRING_OPTION1_HASH) # must be a list of option hashes
hivemind_state.add_option(option_hash=option.multihash(), address=address, signature=signature)
print('')
print('All options:')
print(hivemind_state.options)
assert len(hivemind_state.options) == len(option_values)
print('')
hivemind_state_hash = hivemind_state.save()
print('Hivemind state hash:', hivemind_state_hash)
print(hivemind_state.hivemind_issue_hash)
n_opinions = 10
for i in range(n_opinions):
opinionator = get_address_from_wallet(account=3, index=i+1)
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=hivemind_state_hash)
ranked_choice = hivemind_state.options
random.shuffle(ranked_choice)
opinion.set(opinionator=opinionator, ranked_choice=ranked_choice)
opinion.save()
print('%s = %s' % (opinionator, opinion.ranked_choice))
print('saved as %s' % opinion.multihash())
signature = sign_message(private_key=get_private_key_from_wallet(account=3, index=i+1)[opinionator], message='/ipfs/%s' % opinion.multihash())
hivemind_state.add_opinion(opinion_hash=opinion.multihash(), signature=signature, weight=1.0)
print('')
print('All opinions:')
print(hivemind_state.opinions[0])
assert len(hivemind_state.opinions[0]) == n_opinions
print('')
def test_setting_ranked_choice_does_not_contain_duplicate_option_hashes(self):
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=STRING_STATE_HASH)
with pytest.raises(Exception):
opinion.set(opinionator='me', ranked_choice=[STRING_OPTION1_HASH, STRING_OPTION2_HASH, STRING_OPTION1_HASH])
def test_auto_complete(self):
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=INTEGER_STATE_HASH)
opinion.set(opinionator='me', ranked_choice=[INTEGER_OPTION3_HASH])
assert opinion.ranking() == [INTEGER_OPTION3_HASH]
print('\nAuto_completing using opinion as MAX value')
opinion.auto_complete = 'MAX'
for option_hash in opinion.ranking():
print(HivemindOption(multihash=option_hash).value, option_hash)
assert opinion.ranking() == [INTEGER_OPTION5_HASH, INTEGER_OPTION2_HASH, INTEGER_OPTION3_HASH]
print('\nAuto_completing using opinion as MIN value')
opinion.auto_complete = 'MIN'
for option_hash in opinion.ranking():
print(HivemindOption(multihash=option_hash).value, option_hash)
assert opinion.ranking() == [INTEGER_OPTION3_HASH, INTEGER_OPTION4_HASH, INTEGER_OPTION1_HASH]
hivemind_state.add_option(option_hash=option.multihash())
print('')
print('All options:')
print(hivemind_state.options)
assert len(hivemind_state.options) == len(option_values)
print('')
hivemind_state_hash = hivemind_state.save()
print('Hivemind state hash:', hivemind_state_hash)
print(hivemind_state.hivemind_issue_hash)
i = 0
opinionator = get_address_from_wallet(account=3, index=i)
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=hivemind_state_hash)
ranked_choice = hivemind_state.options
opinion.set(opinionator=opinionator, ranked_choice=ranked_choice)
opinion.save()
print('%s = %s' % (opinionator, opinion.ranked_choice))
print('saved as %s' % opinion.multihash())
signature = sign_message(private_key=get_private_key_from_wallet(account=3, index=i)[opinionator], message='/ipfs/%s' % opinion.multihash())
hivemind_state.add_opinion(opinion_hash=opinion.multihash(), signature=signature, weight=1.0, question_index=0)
print('')
opinionator = get_address_from_wallet(account=3, index=i)
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=hivemind_state_hash)
ranked_choice = list(reversed(hivemind_state.options))
opinion.set(opinionator=opinionator, ranked_choice=ranked_choice)
opinion.save()
opinion.set_hivemind_state(hivemind_state_hash=hivemind_state.multihash())
opinion.set(opinionator=random_opinionator, ranked_choice=random_opinion)
opinion.save()
private_key = get_private_key_from_wallet(account=0, index=2)[random_opinionator]
message = '/ipfs/%s' % opinion.multihash()
signature = sign_message(message=message, private_key=private_key)
hivemind_state.add_opinion(opinion_hash=opinion.multihash(), signature=signature, weight=1.0, question_index=0)
hivemind_state.calculate_results(question_index=0)
consensus = hivemind_state.get_consensus(question_index=0)
print(consensus)
assert consensus == random_values
print('\nTest adding the random opinion on question 1')
opinion = HivemindOpinion()
opinion.set_hivemind_state(hivemind_state_hash=hivemind_state.multihash())
opinion.set(opinionator=random_opinionator, ranked_choice=random_opinion)
opinion.save()
private_key = get_private_key_from_wallet(account=0, index=2)[random_opinionator]
message = '/ipfs/%s' % opinion.multihash()
signature = sign_message(message=message, private_key=private_key)
hivemind_state.add_opinion(opinion_hash=opinion.multihash(), signature=signature, weight=1.0, question_index=1)
hivemind_state.calculate_results(question_index=1)
consensus = hivemind_state.get_consensus(question_index=1)
print(consensus)
assert consensus == random_values
hivemind_state.set_weight(opinionator=correct_opinionator, weight=0.0)
deviances = {}
total_deviance = 0
multipliers = {}
# sort the option hashes by highest score
option_hashes_by_score = [option[0] for option in sorted(self.results[question_index].items(), key=lambda x: x[1]['score'], reverse=True)]
# sort the opinionators by the timestamp of their opinion
opinionators_by_timestamp = [opinionator for opinionator, opinion_data in sorted(self.opinions[question_index].items(), key=lambda x: x[1][2])]
# exclude the opinionators with weight 0
opinionators_by_timestamp = [opinionator for opinionator in opinionators_by_timestamp if self.weights[opinionator] > 0.0]
for i, opinionator in enumerate(opinionators_by_timestamp):
deviance = 0
opinion = HivemindOpinion(multihash=self.opinions[question_index][opinionator][0])
# Calculate the 'early bird' multiplier (whoever gives their opinion first gets the highest multiplier, value is between 0 and 1), if opinion is an empty list, then multiplier is 0
multipliers[opinionator] = 1 - (i/float(len(opinionators_by_timestamp))) if len(opinion.ranked_choice) > 0 else 0
# Calculate the deviance of the opinion, the closer the opinion is to the final result, the lower the deviance
for j, option_hash in enumerate(option_hashes_by_score):
if option_hash in opinion.ranked_choice:
deviance += abs(j - opinion.ranked_choice.index(option_hash))
else:
deviance += len(option_hashes_by_score)-j
total_deviance += deviance
deviances[opinionator] = deviance
if total_deviance != 0: # to avoid divide by zero
self.contributions[question_index] = {opinionator: (1-(deviances[opinionator]/float(total_deviance)))*multipliers[opinionator] for opinionator in deviances}
def get_opinion(self, opinionator, question_index=0):
"""
Get the Opinion object of a certain opinionator
:param opinionator: The opinionator
:param question_index: The index of the question in the HivemindQuestion (default=0)
:return: An Opinion object
"""
opinion = None
if opinionator in self.opinions[question_index]:
opinion = HivemindOpinion(multihash=self.opinions[question_index][opinionator])
return opinion