Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def on_connect():
# Sends a handshake to the harvester
pool_sks: List[PrivateKey] = [
PrivateKey.from_bytes(bytes.fromhex(ce))
for ce in farmer.key_config["pool_sks"]
]
msg = HarvesterHandshake([sk.get_public_key() for sk in pool_sks])
yield OutboundMessage(
NodeType.HARVESTER, Message("harvester_handshake", msg), Delivery.BROADCAST
)
self.num_top_block_pools: int = 5
self.top_winners: List[Tuple[uint64, bytes32]] = []
self.our_winners: List[Tuple[uint64, bytes32]] = []
self.prev_route: str = "home/"
self.route: str = "home/"
self.focused: bool = False
self.parent_close_cb = parent_close_cb
self.kb = self.setup_keybindings()
self.style = Style([("error", "#ff0044")])
self.pool_pks: List[PublicKey] = []
key_config_filename = os.path.join(ROOT_DIR, "config", "keys.yaml")
if os.path.isfile(key_config_filename):
config = safe_load(open(key_config_filename, "r"))
self.pool_pks = [
PrivateKey.from_bytes(bytes.fromhex(ce)).get_public_key()
for ce in config["pool_sks"]
]
self.draw_initial()
self.app = Application(
style=self.style,
layout=self.layout,
full_screen=True,
key_bindings=self.kb,
mouse_support=True,
)
self.closed = False
self.update_ui_task = asyncio.get_running_loop().create_task(self.update_ui())
self.update_data_task = asyncio.get_running_loop().create_task(
self.update_data()
def _privkey_from_int(privkey: int) -> PrivateKey:
privkey_bytes = privkey.to_bytes(PrivateKey.PRIVATE_KEY_SIZE, "big")
try:
return PrivateKey.from_bytes(privkey_bytes)
except RuntimeError as error:
raise ValueError(f"Bad private key: {privkey}, {error}")
# We need the keys file, to access pool keys (if the exist), and the sk_seed.
args = parser.parse_args()
if not os.path.isfile(key_config_filename):
raise RuntimeError("Keys not generated. Run ./scripts/regenerate_keys.py.")
# The seed is what will be used to generate a private key for each plot
key_config = safe_load(open(key_config_filename, "r"))
sk_seed: bytes = bytes.fromhex(key_config["sk_seed"])
pool_pk: PublicKey
if len(args.pool_pub_key) > 0:
# Use the provided pool public key, useful for using an external pool
pool_pk = PublicKey.from_bytes(bytes.fromhex(args.pool_pub_key))
else:
# Use the pool public key from the config, useful for solo farming
pool_sk = PrivateKey.from_bytes(bytes.fromhex(key_config["pool_sks"][0]))
pool_pk = pool_sk.get_public_key()
print(
f"Creating {args.num_plots} plots of size {args.size}, sk_seed {sk_seed.hex()} ppk {pool_pk}"
)
for i in range(args.num_plots):
# Generate a sk based on the seed, plot size (k), and index
sk: PrivateKey = PrivateKey.from_seed(
sk_seed + args.size.to_bytes(1, "big") + i.to_bytes(4, "big")
)
# The plot seed is based on the pool and plot pks
plot_seed: bytes32 = ProofOfSpace.calculate_plot_seed(
pool_pk, sk.get_public_key()
)
challenge_hash, filename, index = self.challenge_hashes[request.quality]
except KeyError:
log.warning(f"Quality {request.quality} not found")
return
if index is not None:
proof_xs: bytes
try:
proof_xs = self.provers[filename].get_full_proof(challenge_hash, index)
except RuntimeError:
self.provers[filename] = DiskProver(filename)
proof_xs = self.provers[filename].get_full_proof(challenge_hash, index)
pool_pubkey = PublicKey.from_bytes(
bytes.fromhex(self.plot_config["plots"][filename]["pool_pk"])
)
plot_pubkey = PrivateKey.from_bytes(
bytes.fromhex(self.plot_config["plots"][filename]["sk"])
).get_public_key()
proof_of_space: ProofOfSpace = ProofOfSpace(
challenge_hash,
pool_pubkey,
plot_pubkey,
uint8(self.provers[filename].get_size()),
[uint8(b) for b in proof_xs],
)
response = harvester_protocol.RespondProofOfSpace(
request.quality, proof_of_space
)
if response:
yield OutboundMessage(
NodeType.FARMER,
async def request_partial_proof(
self, request: harvester_protocol.RequestPartialProof
):
"""
The farmer requests a signature on the farmer_target, for one of the proofs that we found.
We look up the correct plot based on the quality, lookup the proof, and sign
the farmer target hash using the plot private key. This will be used as a pool share.
"""
_, filename, _ = self.challenge_hashes[request.quality]
plot_sk = PrivateKey.from_bytes(
bytes.fromhex(self.plot_config["plots"][filename]["sk"])
)
farmer_target_signature: PrependSignature = plot_sk.sign_prepend(
request.farmer_target_hash
)
response: harvester_protocol.RespondPartialProof = harvester_protocol.RespondPartialProof(
request.quality, farmer_target_signature
)
yield OutboundMessage(
NodeType.FARMER,
Message("respond_partial_proof", response),
Delivery.RESPOND,
)
async def respond_proof_of_space(
self, response: harvester_protocol.RespondProofOfSpace
):
"""
This is a response from the harvester with a proof of space. We check it's validity,
and request a pool partial, a header signature, or both, if the proof is good enough.
"""
pool_sks: List[PrivateKey] = [
PrivateKey.from_bytes(bytes.fromhex(ce))
for ce in self.key_config["pool_sks"]
]
assert response.proof.pool_pubkey in [sk.get_public_key() for sk in pool_sks]
challenge_hash: bytes32 = self.harvester_responses_challenge[response.quality]
challenge_weight: uint64 = self.challenge_to_weight[challenge_hash]
challenge_height: uint32 = self.challenge_to_height[challenge_hash]
new_proof_height: uint32 = uint32(challenge_height + 1)
difficulty: uint64 = uint64(0)
for posf in self.challenges[challenge_weight]:
if posf.challenge_hash == challenge_hash:
difficulty = posf.difficulty
if difficulty == 0:
raise RuntimeError("Did not find challenge")
computed_quality = response.proof.verify_and_get_quality()