Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def upload_to_texture(device, texture, data, nx, ny, nz):
nbytes = ctypes.sizeof(data)
bpp = nbytes // (nx * ny * nz)
# Create a buffer to get the data into the GPU
buffer = device.create_buffer_with_data(data=data, usage=wgpu.BufferUsage.COPY_SRC)
# Copy to texture (rows_per_image must only be nonzero for 3D textures)
command_encoder = device.create_command_encoder()
command_encoder.copy_buffer_to_texture(
{"buffer": buffer, "offset": 0, "bytes_per_row": bpp * nx, "rows_per_image": 0},
{"texture": texture, "mip_level": 0, "origin": (0, 0, 0)},
(nx, ny, nz),
)
device.default_queue.submit([command_encoder.finish()])
data0 = (ctypes.c_float * 100)(*[random.random() for i in range(nx * ny * nz)])
data1 = (ctypes.c_float * 100)()
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
buf4 = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
for i in range(len(data1)):
data1[i] = data0[i]
# Upload from CPU to texture
command_encoder = device.create_command_encoder()
device.default_queue.write_texture(
{"texture": tex3},
data1,
{"bytes_per_row": bpp * nx, "rows_per_image": ny},
(nx, ny, nz),
)
# device.default_queue.submit([]) -> call further down
# Invalidate the data now, to show that write_texture has made a copy
def test_write_texture2():
device = wgpu.utils.get_default_device()
nx, ny, nz = 100, 1, 1
data0 = (ctypes.c_float * 100)(*[random.random() for i in range(nx * ny * nz)])
data1 = (ctypes.c_float * 100)()
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
def test_write_texture1():
device = wgpu.utils.get_default_device()
nx, ny, nz = 100, 1, 1
data1 = memoryview(np.random.random(size=100).astype(np.float32))
bpp = data1.nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
buf4 = device.create_buffer(
size=data1.nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
# Upload from CPU to texture
command_encoder = device.create_command_encoder()
device.default_queue.write_texture(
nx, ny, nz, nc = texture_size
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz) # bytes per pixel
if can_use_vulkan_sdk:
pyshader.dev.validate(compute_shader)
device = get_default_device()
cshader = device.create_shader_module(code=compute_shader)
# Create textures and views
texture1 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.STORAGE | wgpu.TextureUsage.COPY_DST,
)
texture2 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.STORAGE | wgpu.TextureUsage.COPY_SRC,
)
texture_view1 = texture1.create_view()
texture_view2 = texture2.create_view()
# Determine texture component type from the format
if texture_format.endswith(("norm", "float")):
texture_component_type = wgpu.TextureComponentType.float
elif "uint" in texture_format:
texture_component_type = wgpu.TextureComponentType.uint
else:
device = wgpu.utils.get_default_device()
nx, ny, nz = 100, 1, 1
data0 = (ctypes.c_float * 100)(*[random.random() for i in range(nx * ny * nz)])
data1 = (ctypes.c_float * 100)()
nbytes = ctypes.sizeof(data1)
bpp = nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
buf4 = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
for i in range(len(data1)):
data1[i] = data0[i]
# Upload from CPU to texture
command_encoder = device.create_command_encoder()
device.default_queue.write_texture(
{"texture": tex3},
data1,
{"bytes_per_row": bpp * nx, "rows_per_image": ny},
(nx, ny, nz),
)
def test_write_texture1():
device = wgpu.utils.get_default_device()
nx, ny, nz = 100, 1, 1
data1 = memoryview(np.random.random(size=100).astype(np.float32))
bpp = data1.nbytes // (nx * ny * nz)
texture_format = wgpu.TextureFormat.r32float
texture_dim = wgpu.TextureDimension.d1
# Create buffers and textures
tex3 = device.create_texture(
size=(nx, ny, nz),
dimension=texture_dim,
format=texture_format,
usage=wgpu.TextureUsage.COPY_SRC | wgpu.TextureUsage.COPY_DST,
)
buf4 = device.create_buffer(
size=data1.nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
# Upload from CPU to texture
command_encoder = device.create_command_encoder()
device.default_queue.write_texture(
{"texture": tex3},
def test_write_buffer3():
device = wgpu.utils.get_default_device()
nbytes = 12
# Create buffer
buf4 = device.create_buffer(
size=nbytes, usage=wgpu.BufferUsage.COPY_DST | wgpu.BufferUsage.MAP_READ
)
# Upload from CPU to buffer, using bytes
device.create_command_encoder() # we seem to need to create one
device.default_queue.write_buffer(buf4, 0, b"abcdefghijkl", 0, nbytes)
device.default_queue.submit([])
# Download from buffer to CPU
assert buf4.read_data().tobytes() == b"abcdefghijkl"
@python2shader
def fragment_shader(out_color: (RES_OUTPUT, 0, vec4),):
out_color = vec4(1.0, 0.499, 0.0, 1.0) # noqa
# Bindings and layout
bind_group_layout = device.create_bind_group_layout(entries=[]) # zero bindings
bind_group = device.create_bind_group(layout=bind_group_layout, entries=[])
pipeline_layout = device.create_pipeline_layout(
bind_group_layouts=[bind_group_layout]
)
# Index buffer
indices = (ctypes.c_int32 * 6)(0, 1, 2, 2, 1, 3)
ibo = device.create_buffer_with_data(
data=indices, usage=wgpu.BufferUsage.INDEX | wgpu.BufferUsage.MAP_WRITE,
)
# Render
render_args = device, vertex_shader, fragment_shader, pipeline_layout, bind_group
# render_to_screen(*render_args, topology=wgpu.PrimitiveTopology.triangle_list, ibo=ibo)
a = render_to_texture(
*render_args,
size=(64, 64),
topology=wgpu.PrimitiveTopology.triangle_list,
ibo=ibo,
)
# Check that the background is all zero
bg = a.copy()
bg[16:-16, 16:-16, :] = 0
assert np.all(bg == 0)
data1: ("buffer", 0, Array(i32)),
data2: ("buffer", 1, Array(i32)),
):
data2[index] = data1[index] + 1
# Create an array of 100 random int32
n = 100
in1 = [int(random.uniform(0, 100)) for i in range(n)]
in1 = (c_int32 * n)(*in1)
# Create device and shader object
device = wgpu.utils.get_default_device()
cshader = device.create_shader_module(code=compute_shader)
# Create input buffer and upload data to in
buffer1 = device.create_buffer_with_data(data=in1, usage=wgpu.BufferUsage.STORAGE)
# Create output buffer
buffer2 = device.create_buffer(
size=ctypes.sizeof(in1),
usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.MAP_READ,
)
# Create buffer to hold the dispatch parameters for the indirect call
params = (ctypes.c_int32 * 3)(n - 2, 1, 1) # note the minus 2!
buffer3 = device.create_buffer_with_data(
data=params, usage=wgpu.BufferUsage.INDIRECT,
)
# Setup layout and bindings
binding_layouts = [
{