Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# These names are gathered later in this function by inspecting the output from Stan.
self.sample_and_sampler_param_names: Sequence[str]
num_flat_params = sum(np.product(dims_ or 1) for dims_ in dims) # if dims == [] then it is a scalar
assert num_flat_params == len(constrained_param_names)
num_samples_saved = (self.num_samples + self.num_warmup * self.save_warmup) // self.num_thin
# self._draws holds all the draws. We cannot allocate it before looking at the draws
# because we do not know how many sampler-specific parameters are present. Later in this
# function we count them and only then allocate the array for `self._draws`.
self._draws: np.ndarray
for chain_index, stan_output in zip(range(self.num_chains), self.stan_outputs):
draw_index = 0
for msg in stan_output:
if msg.topic == callbacks_writer_pb2.WriterMessage.Topic.Value("SAMPLE"):
# Ignore sample message which is mixed together with proper draws.
if msg.feature and msg.feature[0].name == "":
continue
draw_row = [] # a "row" of values from a single draw from Stan C++
# for the first draw: collect sample and sampler parameter names.
if not hasattr(self, "_draws"):
feature_names = tuple(fea.name for fea in msg.feature)
self.sample_and_sampler_param_names = tuple(
name for name in feature_names if name.endswith("__")
)
num_rows = len(self.sample_and_sampler_param_names) + num_flat_params
# column-major order ("F") aligns with how the draws are stored (in cols).
self._draws = np.empty((num_rows, num_samples_saved, num_chains), order="F")
# rudimentary check of parameter order (sample & sampler params must be first)
for chain in range(1, num_chains + 1):
payload = {"function": "stan::services::sample::hmc_nuts_diag_e_adapt"}
payload.update(kwargs)
payload["chain"] = chain
payload["data"] = self.data
payload["init"] = init.pop(0)
if self.random_seed is not None:
payload["random_seed"] = self.random_seed
# fit needs to know num_samples, num_warmup, num_thin, save_warmup
# progress bar needs to know some of these
num_warmup = payload.get("num_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "num_warmup"))
num_samples = payload.get(
"num_samples", arguments.lookup_default(arguments.Method["SAMPLE"], "num_samples"),
)
num_thin = payload.get("num_thin", arguments.lookup_default(arguments.Method["SAMPLE"], "num_thin"))
save_warmup = payload.get(
"save_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "save_warmup"),
)
payloads.append(payload)
def extract_protobuf_messages(fit_bytes):
varint_decoder = google.protobuf.internal.decoder._DecodeVarint32
next_pos, pos = 0, 0
while pos < len(fit_bytes):
msg = callbacks_writer_pb2.WriterMessage()
next_pos, pos = varint_decoder(fit_bytes, pos)
msg.ParseFromString(fit_bytes[pos : pos + next_pos])
yield msg
pos += next_pos
async def go():
if len(init) != num_chains:
raise ValueError("Initial values must be provided for each chain.")
payloads = []
for chain in range(1, num_chains + 1):
payload = {"function": "stan::services::sample::hmc_nuts_diag_e_adapt"}
payload.update(kwargs)
payload["chain"] = chain
payload["data"] = self.data
payload["init"] = init.pop(0)
if self.random_seed is not None:
payload["random_seed"] = self.random_seed
# fit needs to know num_samples, num_warmup, num_thin, save_warmup
# progress bar needs to know some of these
num_warmup = payload.get("num_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "num_warmup"))
num_samples = payload.get(
"num_samples", arguments.lookup_default(arguments.Method["SAMPLE"], "num_samples"),
)
num_thin = payload.get("num_thin", arguments.lookup_default(arguments.Method["SAMPLE"], "num_thin"))
save_warmup = payload.get(
"save_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "save_warmup"),
)
payloads.append(payload)
def extract_protobuf_messages(fit_bytes):
varint_decoder = google.protobuf.internal.decoder._DecodeVarint32
next_pos, pos = 0, 0
while pos < len(fit_bytes):
msg = callbacks_writer_pb2.WriterMessage()
next_pos, pos = varint_decoder(fit_bytes, pos)
msg.ParseFromString(fit_bytes[pos : pos + next_pos])
payload.update(kwargs)
payload["chain"] = chain
payload["data"] = self.data
payload["init"] = init.pop(0)
if self.random_seed is not None:
payload["random_seed"] = self.random_seed
# fit needs to know num_samples, num_warmup, num_thin, save_warmup
# progress bar needs to know some of these
num_warmup = payload.get("num_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "num_warmup"))
num_samples = payload.get(
"num_samples", arguments.lookup_default(arguments.Method["SAMPLE"], "num_samples"),
)
num_thin = payload.get("num_thin", arguments.lookup_default(arguments.Method["SAMPLE"], "num_thin"))
save_warmup = payload.get(
"save_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "save_warmup"),
)
payloads.append(payload)
def extract_protobuf_messages(fit_bytes):
varint_decoder = google.protobuf.internal.decoder._DecodeVarint32
next_pos, pos = 0, 0
while pos < len(fit_bytes):
msg = callbacks_writer_pb2.WriterMessage()
next_pos, pos = varint_decoder(fit_bytes, pos)
msg.ParseFromString(fit_bytes[pos : pos + next_pos])
yield msg
pos += next_pos
async def go():
progress_bar = ProgressBar(ConsoleIO())
progress_bar.set_format("very_verbose")
payloads = []
for chain in range(1, num_chains + 1):
payload = {"function": "stan::services::sample::hmc_nuts_diag_e_adapt"}
payload.update(kwargs)
payload["chain"] = chain
payload["data"] = self.data
payload["init"] = init.pop(0)
if self.random_seed is not None:
payload["random_seed"] = self.random_seed
# fit needs to know num_samples, num_warmup, num_thin, save_warmup
# progress bar needs to know some of these
num_warmup = payload.get("num_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "num_warmup"))
num_samples = payload.get(
"num_samples", arguments.lookup_default(arguments.Method["SAMPLE"], "num_samples"),
)
num_thin = payload.get("num_thin", arguments.lookup_default(arguments.Method["SAMPLE"], "num_thin"))
save_warmup = payload.get(
"save_warmup", arguments.lookup_default(arguments.Method["SAMPLE"], "save_warmup"),
)
payloads.append(payload)
def extract_protobuf_messages(fit_bytes):
varint_decoder = google.protobuf.internal.decoder._DecodeVarint32
next_pos, pos = 0, 0
while pos < len(fit_bytes):
msg = callbacks_writer_pb2.WriterMessage()
next_pos, pos = varint_decoder(fit_bytes, pos)
msg.ParseFromString(fit_bytes[pos : pos + next_pos])
yield msg
pos += next_pos
def extract_protobuf_messages(fit_bytes):
varint_decoder = google.protobuf.internal.decoder._DecodeVarint32
next_pos, pos = 0, 0
while pos < len(fit_bytes):
msg = callbacks_writer_pb2.WriterMessage()
next_pos, pos = varint_decoder(fit_bytes, pos)
msg.ParseFromString(fit_bytes[pos : pos + next_pos])
yield msg
pos += next_pos
async def go():
io = ConsoleIO()
io.error("Building...")
async with stan.common.httpstan_server() as (host, port):
# Check to see if model is in cache.
model_name = httpstan.models.calculate_model_name(program_code)
path, payload = f"/v1/{model_name}/params", {"data": data}
async with aiohttp.request("POST", f"http://{host}:{port}{path}", json=payload) as resp:
model_in_cache = resp.status != 404
io.error_line(" Found model in cache." if model_in_cache else " This may take some time.")
# Note: during compilation `httpstan` redirects stderr to /dev/null, making `print` impossible.
path, payload = "/v1/models", {"program_code": program_code}
async with aiohttp.request("POST", f"http://{host}:{port}{path}", json=payload) as resp:
if resp.status != 201:
raise RuntimeError((await resp.json())["message"])
assert model_name == (await resp.json())["name"]
path, payload = f"/v1/{model_name}/params", {"data": data}
async with aiohttp.request("POST", f"http://{host}:{port}{path}", json=payload) as resp:
if resp.status != 200:
raise RuntimeError((await resp.json())["message"])
params_list = (await resp.json())["params"]
def __init__(
self,
model_name: str,
program_code: str,
data: dict,
param_names: typing.Tuple[str],
constrained_param_names: typing.Tuple[str],
dims: typing.Tuple[typing.Tuple[int]],
random_seed: typing.Optional[int],
) -> None:
if model_name != httpstan.models.calculate_model_name(program_code):
raise ValueError("`model_name` does not match `program_code`.")
self.model_name = model_name
self.program_code = program_code
self.data = data or {}
self.param_names = param_names
self.constrained_param_names = constrained_param_names
self.dims = dims
self.random_seed = random_seed
async def httpstan_server():
"""Manage starting and stopping the httpstan HTTP server."""
host, port = "127.0.0.1", unused_tcp_port()
app = httpstan.app.make_app()
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, host, port)
await site.start()
yield (host, port)
await runner.cleanup()