Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write_texture2():
device = wgpu.utils.get_default_device()
nx, ny, nz = 100, 1, 1
data0 = (ctypes.c_float * 100)(*[random.random() for i in range(nx * ny * nz)])
data1 = (ctypes.c_float * 100)()
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
def test_write_buffer3():
device = wgpu.utils.get_default_device()
nbytes = 12
# Create buffer
buf4 = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
# Upload from CPU to buffer, using bytes
device.create_command_encoder() # we seem to need to create one
device.default_queue.write_buffer(buf4, 0, b"abcdefghijkl", 0, nbytes)
device.default_queue.submit([])
# Download from buffer to CPU
assert buf4.read_data().tobytes() == b"abcdefghijkl"
def test_shader_module_creation():
device = wgpu.utils.get_default_device()
code1 = compute_shader.to_spirv()
assert isinstance(code1, bytes)
code2 = type("CodeObject", (object,), {"to_bytes": lambda: code1})
code3 = type("CodeObject", (object,), {"to_spirv": lambda: code1})
code4 = type("CodeObject", (object,), {})
m1 = device.create_shader_module(code=code1)
m2 = device.create_shader_module(code=code2)
m3 = device.create_shader_module(code=code3)
for m in (m1, m2, m3):
assert m.compilation_info() == []
with raises(TypeError):
device.create_shader_module(code=code4)
def test_struct_checking():
func = wgpu.backends.rs._check_struct
# This works
func("ProgrammableStageDescriptor", {"module": None, "entry_point": None})
# This does not
with raises(ValueError) as e:
func("ProgrammableStageDescriptor", {"module": None, "foo": None})
assert "Unexpected keys" in str(e.value)
if not can_use_wgpu_lib:
return
# Neither does this
device = wgpu.utils.get_default_device()
with raises(ValueError) as e:
device.create_compute_pipeline(
layout=None, compute_stage={"module": None, "foo": None}
)
assert "Unexpected keys" in str(e.value)
def test_compute_indirect():
@python2shader
def compute_shader(
index: ("input", "GlobalInvocationId", i32),
data1: ("buffer", 0, Array(i32)),
data2: ("buffer", 1, Array(i32)),
):
data2[index] = data1[index] + 1
# Create an array of 100 random int32
n = 100
in1 = [int(random.uniform(0, 100)) for i in range(n)]
in1 = (c_int32 * n)(*in1)
# Create device and shader object
device = wgpu.utils.get_default_device()
cshader = device.create_shader_module(code=compute_shader)
# Create input buffer and upload data to in
buffer1 = device.create_buffer_with_data(data=in1, usage=wgpu.BufferUsage.STORAGE)
# Create output buffer
buffer2 = device.create_buffer(
size=ctypes.sizeof(in1),
usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_READ,
)
# Create buffer to hold the dispatch parameters for the indirect call
params = (ctypes.c_int32 * 3)(n - 2, 1, 1) # note the minus 2!
buffer3 = device.create_buffer_with_data(
data=params, usage=wgpu.BufferUsage.INDIRECT,
)
# %% The short version, using numpy
# import numpy as np
#
# numpy_data = np.frombuffer(data, np.int32)
# out = compute_with_buffers({0: numpy_data}, {1: numpy_data.nbytes}, compute_shader, n=n)
# result = np.frombuffer(out[1], dtype=np.int32)
# print(result)
# %% The long version using the wgpu API
# Create device and shader object
device = wgpu.utils.get_default_device()
cshader = device.create_shader_module(code=compute_shader)
# Create buffer objects, input buffer is mapped.
buffer1 = device.create_buffer_with_data(data=data, usage=wgpu.BufferUsage.STORAGE)
buffer2 = device.create_buffer(
size=data.nbytes, usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_READ
)
# Setup layout and bindings
binding_layouts = [
{
"binding": 0,
"visibility": wgpu.ShaderStage.COMPUTE,
"type": wgpu.BindingType.storage_buffer,
},
{
# Get nx, ny, nz from n
if n is None:
output_info = list(output_infos.values())[0]
nx, ny, nz = output_info["length"], 1, 1
elif isinstance(n, int):
nx, ny, nz = int(n), 1, 1
elif isinstance(n, tuple) and len(n) == 3:
nx, ny, nz = int(n[0]), int(n[1]), int(n[2])
else:
raise TypeError("compute_with_buffers: n must be None, an int, or 3-int tuple.")
if not (nx >= 1 and ny >= 1 and nz >= 1):
raise ValueError("compute_with_buffers: n value(s) must be >= 1.")
# Create a device and compile the shader
device = wgpu.utils.get_default_device()
cshader = device.create_shader_module(code=shader)
# Create buffers for input and output arrays
buffers = {}
for index, array in input_arrays.items():
usage = wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_WRITE
if index in output_arrays:
usage |= wgpu.BufferUsage.MAP_READ
buffer = device.create_buffer_with_data(data=array, usage=usage)
buffers[index] = buffer
for index, info in output_infos.items():
if index in input_arrays:
continue # We already have this buffer
usage = wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_READ
buffers[index] = device.create_buffer(size=info["nbytes"], usage=usage)