Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from src.types.fees_target import FeesTarget
from src.types.full_block import FullBlock
from src.types.header import Header, HeaderData
from src.types.header_block import HeaderBlock
from src.types.proof_of_space import ProofOfSpace
from src.types.proof_of_time import ProofOfTime
from src.types.sized_bytes import bytes32
from src.util.errors import NoProofsOfSpaceFound
from src.util.ints import uint8, uint32, uint64
# Can't go much lower than 19, since plots start having no solutions
k: uint8 = uint8(19)
# Uses many plots for testing, in order to guarantee proofs of space at every height
num_plots = 80
# Use the empty string as the seed for the private key
pool_sk: PrivateKey = PrivateKey.from_seed(b"")
pool_pk: PublicKey = pool_sk.get_public_key()
plot_sks: List[PrivateKey] = [
PrivateKey.from_seed(pn.to_bytes(4, "big")) for pn in range(num_plots)
]
plot_pks: List[PublicKey] = [sk.get_public_key() for sk in plot_sks]
farmer_sk: PrivateKey = PrivateKey.from_seed(b"coinbase")
coinbase_target = sha256(bytes(farmer_sk.get_public_key())).digest()
fee_target = sha256(bytes(farmer_sk.get_public_key())).digest()
n_wesolowski = uint8(3)
class BlockTools:
"""
Tools to generate blocks for testing.
"""
from src.util.errors import NoProofsOfSpaceFound
from src.util.ints import uint8, uint32, uint64
# Can't go much lower than 19, since plots start having no solutions
k: uint8 = uint8(19)
# Uses many plots for testing, in order to guarantee proofs of space at every height
num_plots = 80
# Use the empty string as the seed for the private key
pool_sk: PrivateKey = PrivateKey.from_seed(b"")
pool_pk: PublicKey = pool_sk.get_public_key()
plot_sks: List[PrivateKey] = [
PrivateKey.from_seed(pn.to_bytes(4, "big")) for pn in range(num_plots)
]
plot_pks: List[PublicKey] = [sk.get_public_key() for sk in plot_sks]
farmer_sk: PrivateKey = PrivateKey.from_seed(b"coinbase")
coinbase_target = sha256(bytes(farmer_sk.get_public_key())).digest()
fee_target = sha256(bytes(farmer_sk.get_public_key())).digest()
n_wesolowski = uint8(3)
class BlockTools:
"""
Tools to generate blocks for testing.
"""
def __init__(self):
plot_seeds: List[bytes32] = [
ProofOfSpace.calculate_plot_seed(pool_pk, plot_pk) for plot_pk in plot_pks
]
self.filenames: List[str] = [
os.path.join(
async def test_harvester_signature(self, initial_blockchain):
blocks, b = initial_blockchain
# Time too far in the past
block_bad = FullBlock(
HeaderBlock(
blocks[9].header_block.proof_of_space,
blocks[9].header_block.proof_of_time,
blocks[9].header_block.challenge,
Header(
blocks[9].header_block.header.data,
PrivateKey.from_seed(b"0").sign_prepend(b"random junk"),
),
),
blocks[9].body,
)
assert (await b.receive_block(block_bad)) == ReceiveBlockResult.INVALID_BLOCK
from src.types.proof_of_time import ProofOfTime
from src.types.sized_bytes import bytes32
from src.util.errors import NoProofsOfSpaceFound
from src.util.ints import uint8, uint32, uint64
# Can't go much lower than 19, since plots start having no solutions
k: uint8 = uint8(19)
# Uses many plots for testing, in order to guarantee proofs of space at every height
num_plots = 80
# Use the empty string as the seed for the private key
pool_sk: PrivateKey = PrivateKey.from_seed(b"")
pool_pk: PublicKey = pool_sk.get_public_key()
plot_sks: List[PrivateKey] = [
PrivateKey.from_seed(pn.to_bytes(4, "big")) for pn in range(num_plots)
]
plot_pks: List[PublicKey] = [sk.get_public_key() for sk in plot_sks]
farmer_sk: PrivateKey = PrivateKey.from_seed(b"coinbase")
coinbase_target = sha256(bytes(farmer_sk.get_public_key())).digest()
fee_target = sha256(bytes(farmer_sk.get_public_key())).digest()
n_wesolowski = uint8(3)
class BlockTools:
"""
Tools to generate blocks for testing.
"""
def __init__(self):
plot_seeds: List[bytes32] = [
ProofOfSpace.calculate_plot_seed(pool_pk, plot_pk) for plot_pk in plot_pks
]
yn = input(f"The keys file {key_config_filename} already exists. Are you sure"
f" you want to override the keys? Plots might become invalid. (y/n): ")
if not (yn.lower() == "y" or yn.lower() == "yes"):
quit()
else:
# Create the file if if doesn't exist
open(key_config_filename, "a").close()
key_config = safe_load(open(key_config_filename, "r"))
if key_config is None:
key_config = {}
if args.farmer:
# Replaces the farmer's private key. The farmer target allows spending
# of the fees.
farmer_sk = PrivateKey.from_seed(token_bytes(32))
farmer_target = sha256(bytes(farmer_sk.get_public_key())).digest()
key_config["farmer_sk"] = bytes(farmer_sk).hex()
key_config["farmer_target"] = farmer_target.hex()
with open(key_config_filename, "w") as f:
safe_dump(key_config, f)
if args.harvester:
# Replaces the harvester's sk seed. Used to generate plot private keys, which are
# used to sign farmed blocks.
key_config["sk_seed"] = token_bytes(32).hex()
with open(key_config_filename, "w") as f:
safe_dump(key_config, f)
if args.pool:
# Replaces the pools keys and targes. Only useful if running a pool, or doing
# solo farming. The pool target allows spending of the coinbase.
pool_sks = [PrivateKey.from_seed(token_bytes(32)) for _ in range(2)]
pool_target = sha256(bytes(pool_sks[0].get_public_key())).digest()
for head in heads:
assert head.challenge
if head.challenge.get_hash() == request.challenge_hash:
target_head = head
if target_head is None:
# TODO: should we still allow the farmer to farm?
log.warning(
f"Challenge hash: {request.challenge_hash} not in one of three heads"
)
return
# TODO: use mempool to grab best transactions, for the selected head
transactions_generator: bytes32 = sha256(b"").digest()
# TODO: calculate the fees of these transactions
fees: FeesTarget = FeesTarget(request.fees_target_puzzle_hash, uint64(0))
aggregate_sig: Signature = PrivateKey.from_seed(b"12345").sign(b"anything")
# TODO: calculate aggregate signature based on transactions
# TODO: calculate cost of all transactions
cost = uint64(0)
# Creates a block with transactions, coinbase, and fees
body: Body = Body(
request.coinbase,
request.coinbase_signature,
fees,
aggregate_sig,
transactions_generator,
cost,
)
# Creates the block header
prev_header_hash: bytes32 = target_head.header.get_hash()
async def on_connect():
# Sends a handshake to the harvester
pool_sks: List[PrivateKey] = [
PrivateKey.from_bytes(bytes.fromhex(ce))
for ce in farmer.key_config["pool_sks"]
]
msg = HarvesterHandshake([sk.get_public_key() for sk in pool_sks])
yield OutboundMessage(
NodeType.HARVESTER, Message("harvester_handshake", msg), Delivery.BROADCAST
)
def _privkey_from_int(privkey: int) -> PrivateKey:
privkey_bytes = privkey.to_bytes(PrivateKey.PRIVATE_KEY_SIZE, "big")
try:
return PrivateKey.from_bytes(privkey_bytes)
except RuntimeError as error:
raise ValueError(f"Bad private key: {privkey}, {error}")
from src.util.ints import uint32
pp = pprint.PrettyPrinter(indent=1, width=120, compact=True)
# TODO: Remove hack, this allows streaming these objects from binary
size_hints = {
"PrivateKey": PrivateKey.PRIVATE_KEY_SIZE,
"PublicKey": PublicKey.PUBLIC_KEY_SIZE,
"Signature": Signature.SIGNATURE_SIZE,
"InsecureSignature": InsecureSignature.SIGNATURE_SIZE,
"PrependSignature": PrependSignature.SIGNATURE_SIZE,
"ExtendedPublicKey": ExtendedPublicKey.EXTENDED_PUBLIC_KEY_SIZE,
"ExtendedPrivateKey": ExtendedPrivateKey.EXTENDED_PRIVATE_KEY_SIZE,
"ChainCode": ChainCode.CHAIN_CODE_KEY_SIZE
}
unhashable_types = [PrivateKey, PublicKey, Signature, PrependSignature, InsecureSignature,
ExtendedPublicKey, ExtendedPrivateKey, ChainCode]
def streamable(cls: Any):
"""
This is a decorator for class definitions. It applies the strictdataclass decorator,
which checks all types at construction. It also defines a simple serialization format,
and adds parse, from bytes, stream, and __bytes__ methods.
Serialization format:
- Each field is serialized in order, by calling from_bytes/__bytes__.
- For Lists, there is a 4 byte prefix for the list length.
- For Optionals, there is a one byte prefix, 1 iff object is present, 0 iff not.
All of the constituents must have parse/from_bytes, and stream/__bytes__ and therefore
be of fixed size. For example, int cannot be a constituent since it is not a fixed size,
import io
import dataclasses
import pprint
from typing import Type, BinaryIO, get_type_hints, Any, List
from hashlib import sha256
from blspy import (PrivateKey, PublicKey, InsecureSignature, Signature, PrependSignature,
ExtendedPrivateKey, ExtendedPublicKey, ChainCode)
from src.util.type_checking import strictdataclass, is_type_List, is_type_SpecificOptional
from src.types.sized_bytes import bytes32
from src.util.ints import uint32
pp = pprint.PrettyPrinter(indent=1, width=120, compact=True)
# TODO: Remove hack, this allows streaming these objects from binary
size_hints = {
"PrivateKey": PrivateKey.PRIVATE_KEY_SIZE,
"PublicKey": PublicKey.PUBLIC_KEY_SIZE,
"Signature": Signature.SIGNATURE_SIZE,
"InsecureSignature": InsecureSignature.SIGNATURE_SIZE,
"PrependSignature": PrependSignature.SIGNATURE_SIZE,
"ExtendedPublicKey": ExtendedPublicKey.EXTENDED_PUBLIC_KEY_SIZE,
"ExtendedPrivateKey": ExtendedPrivateKey.EXTENDED_PRIVATE_KEY_SIZE,
"ChainCode": ChainCode.CHAIN_CODE_KEY_SIZE
}
unhashable_types = [PrivateKey, PublicKey, Signature, PrependSignature, InsecureSignature,
ExtendedPublicKey, ExtendedPrivateKey, ChainCode]
def streamable(cls: Any):
"""
This is a decorator for class definitions. It applies the strictdataclass decorator,
which checks all types at construction. It also defines a simple serialization format,