Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data0 = (ctypes.c_float * 100)(*[random.random() for i in range(nx * ny * nz)])
data1 = (ctypes.c_float * 100)()
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
buf4 = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
for i in range(len(data1)):
data1[i] = data0[i]
# Upload from CPU to texture
command_encoder = device.create_command_encoder()
device.default_queue.write_texture(
{"texture": tex3},
data1,
{"bytes_per_row": bpp * nx, "rows_per_image": ny},
(nx, ny, nz),
)
# device.default_queue.submit([]) -> call further down
# Invalidate the data now, to show that write_texture has made a copy
def test_buffer_init3():
# Initializing an empty buffer, then writing to it
device = wgpu.utils.get_default_device()
data1 = b"abcdefghijkl"
# First fail
with raises(ValueError):
device.create_buffer(
mapped_at_creation=True, size=len(data1), usage=wgpu.BufferUsage.MAP_READ
)
# Create buffer
buf = device.create_buffer(
size=len(data1), usage=wgpu.BufferUsage.MAP_READ | wgpu.BufferUsage.MAP_WRITE
)
# Write data to it
buf.write_data(data1)
# Download from buffer to CPU
data3 = buf.read_data()
assert data1 == data3
usage=wgpu.TextureUsage.STORAGE | wgpu.TextureUsage.COPY_SRC,
)
texture_view1 = texture1.create_view()
texture_view2 = texture2.create_view()
# Determine texture component type from the format
if texture_format.endswith(("norm", "float")):
texture_component_type = wgpu.TextureComponentType.float
elif "uint" in texture_format:
texture_component_type = wgpu.TextureComponentType.uint
else:
texture_component_type = wgpu.TextureComponentType.sint
# Create buffer that we need to upload the data
buffer_usage = (
wgpu.BufferUsage.MAP_READ
| wgpu.BufferUsage.COPY_SRC
| wgpu.BufferUsage.COPY_DST
)
buffer = device.create_buffer_with_data(data=data1, usage=buffer_usage)
assert buffer.usage == buffer_usage
# Define bindings
# One can see here why we need 2 textures: one is readonly, one writeonly
bindings = [
{"binding": 0, "resource": texture_view1},
{"binding": 1, "resource": texture_view2},
]
binding_layouts = [
{
"binding": 0,
"visibility": wgpu.ShaderStage.COMPUTE,
texture_format = wgpu.TextureFormat.rgba8unorm # rgba8unorm or bgra8unorm_srgb
# Create texture to render to
nx, ny, bpp = size[0], size[1], 4
nbytes = nx * ny * bpp
texture = device.create_texture(
size=(nx, ny, 1),
dimension=wgpu.TextureDimension.d2,
format=texture_format,
usage=wgpu.TextureUsage.OUTPUT_ATTACHMENT | wgpu.TextureUsage.COPY_SRC,
)
current_texture_view = texture.create_view()
# Also a buffer to read the data to CPU
buffer = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.MAP_READ | wgpu.BufferUsage.COPY_DST
)
vshader = device.create_shader_module(code=vertex_shader)
fshader = device.create_shader_module(code=fragment_shader)
render_pipeline = device.create_render_pipeline(
layout=pipeline_layout,
vertex_stage={"module": vshader, "entry_point": "main"},
fragment_stage={"module": fshader, "entry_point": "main"},
primitive_topology=topology,
rasterization_state={
"front_face": wgpu.FrontFace.ccw,
"cull_mode": wgpu.CullMode.none,
"depth_bias": 0,
"depth_bias_slope_scale": 0.0,
"depth_bias_clamp": 0.0,
nx, ny, nz = int(n[0]), int(n[1]), int(n[2])
else:
raise TypeError("compute_with_buffers: n must be None, an int, or 3-int tuple.")
if not (nx >= 1 and ny >= 1 and nz >= 1):
raise ValueError("compute_with_buffers: n value(s) must be >= 1.")
# Create a device and compile the shader
device = wgpu.utils.get_default_device()
cshader = device.create_shader_module(code=shader)
# Create buffers for input and output arrays
buffers = {}
for index, array in input_arrays.items():
usage = wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_WRITE
if index in output_arrays:
usage |= wgpu.BufferUsage.MAP_READ
buffer = device.create_buffer_with_data(data=array, usage=usage)
buffers[index] = buffer
for index, info in output_infos.items():
if index in input_arrays:
continue # We already have this buffer
usage = wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_READ
buffers[index] = device.create_buffer(size=info["nbytes"], usage=usage)
# Create bindings and binding layouts
bindings = []
binding_layouts = []
for index, buffer in buffers.items():
bindings.append(
{
"binding": index,
"resource": {"buffer": buffer, "offset": 0, "size": buffer.size},