Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#
# if not x:
# y = 9
# y = 4
label = Label()
code = Bytecode([Instr('LOAD_NAME', 'x'),
Instr('UNARY_NOT'),
Instr('POP_JUMP_IF_FALSE', label),
Instr('LOAD_CONST', 9),
Instr('STORE_NAME', 'y'),
label,
Instr('LOAD_CONST', 4),
Instr('STORE_NAME', 'y')])
code = self.optimize_blocks(code)
label = Label()
self.check(code,
Instr('LOAD_NAME', 'x'),
Instr('POP_JUMP_IF_TRUE', label),
Instr('LOAD_CONST', 9),
Instr('STORE_NAME', 'y'),
label,
Instr('LOAD_CONST', 4),
Instr('STORE_NAME', 'y'))
def test_eq_labels(self):
# equal
code1 = BytecodeBlocks()
label1 = Label()
code1[0][:] = [Instr(1, "JUMP_FORWARD", label1),
Instr(1, "NOP"),
label1]
code2 = BytecodeBlocks()
label2 = Label()
code2[0][:] = [Instr(1, "JUMP_FORWARD", label2),
Label(), # unused label
Instr(1, "NOP"),
label2]
self.assertEqual(code2, code1)
# not equal
code3 = BytecodeBlocks()
label3 = Label()
code3[0][:] = [Instr(1, "JUMP_FORWARD", label3),
label3,
def test_disassemble(self):
code = get_code("""
if test:
x = 1
else:
x = 2
""")
bytecode = Bytecode.from_code(code)
label_else = Label()
label_exit = Label()
self.assertEqual(bytecode,
[Instr(1, 'LOAD_NAME', 'test'),
Instr(1, 'POP_JUMP_IF_FALSE', label_else),
Instr(2, 'LOAD_CONST', 1),
Instr(2, 'STORE_NAME', 'x'),
Instr(2, 'JUMP_FORWARD', label_exit),
label_else,
Instr(4, 'LOAD_CONST', 2),
Instr(4, 'STORE_NAME', 'x'),
label_exit,
Instr(4, 'LOAD_CONST', None),
Instr(4, 'RETURN_VALUE')])
def _(ast: If, ctx: Ctx):
label1 = Label()
label2 = Label()
ctx.visit(ast.cond)
ctx.bc.append(
Instr("POP_JUMP_IF_FALSE", arg=label1, lineno=ast.cond.lineno + 1))
ctx.visit(ast.iftrue)
ctx.bc.append(
Instr('JUMP_FORWARD', arg=label2, lineno=ast.iftrue.lineno + 1))
ctx.bc.append(label1)
ctx.visit(ast.iffalse)
ctx.bc.append(label2)
def _(ast: If, ctx: Ctx):
label1 = Label()
label2 = Label()
ctx.visit(ast.cond)
ctx.bc.append(
Instr("POP_JUMP_IF_FALSE", arg=label1, lineno=ast.cond.lineno + 1))
ctx.visit(ast.iftrue)
ctx.bc.append(
Instr('JUMP_FORWARD', arg=label2, lineno=ast.iftrue.lineno + 1))
ctx.bc.append(label1)
ctx.visit(ast.iffalse)
ctx.bc.append(label2)
def convert1(self, vm_op, py_token=None, data=None):
"""
:param vm_op:
:param py_token:
:param data:
:return:
"""
start_addr = self._address
vmtoken = VMToken(vm_op=vm_op, addr=start_addr,
pytoken=py_token, data=data)
self._address += 1
if vmtoken.data is not None and type(vmtoken.data) is not Label:
self._address += len(data)
self.insert_vm_token_at(vmtoken, start_addr)
return vmtoken
def try_squash_raise(self):
""" A context manager for squashing tracebacks.
The code written during this context will be wrapped so that
any exception raised will appear to have been generated from
the code, rather than any function called by the code.
"""
exc_label = bc.Label()
end_label = bc.Label()
op_code = "SETUP_FINALLY" if PY38 else "SETUP_EXCEPT"
self.code_ops.append(
bc.Instr(op_code, exc_label), # TOS
)
yield
self.code_ops.extend([ # TOS
bc.Instr("POP_BLOCK"), # TOS
bc.Instr("JUMP_FORWARD", end_label), # TOS
exc_label, # TOS -> tb -> val -> exc
bc.Instr("ROT_THREE"), # TOS -> exc -> tb -> val
bc.Instr("ROT_TWO"), # TOS -> exc -> val -> tb
bc.Instr("POP_TOP"), # TOS -> exc -> val
bc.Instr("RAISE_VARARGS", 2), # TOS
bc.Instr("JUMP_FORWARD", end_label), # TOS
bc.Instr("END_FINALLY"), # TOS
line = ''.join(fields)
line = format_line(offset, line)
else:
fields.append("% 3s %s" % (offset, format_instr(instr)))
line = ''.join(fields)
io(line)
offset += instr.size
elif isinstance(bytecode, Bytecode):
labels = {}
for index, instr in enumerate(bytecode):
if isinstance(instr, Label):
labels[instr] = 'label_instr%s' % index
for index, instr in enumerate(bytecode):
if isinstance(instr, Label):
label = labels[instr]
line = "%s:" % label
if index != 0:
io()
else:
if instr.lineno is not None:
cur_lineno = instr.lineno
line = format_instr(instr, labels)
line = indent + format_line(index, line)
io(line)
io()
elif isinstance(bytecode, ControlFlowGraph):
labels = {}
for block_index, block in enumerate(bytecode, 1):
labels[id(block)] = 'block%s' % block_index
def __init__(self, instruction, expression, index, fallback_ln):
self.instruction = instruction
self.expression = expression
self.index = index
self._methodname = None
self.jump_from = None
self.jump_target = None
self.jump_found = False
self.jump_from_addr = None
self.jump_to_addr = None
# self.jump_to_addr_abs = None
# self.jump_from_addr_abs = None
if isinstance(instruction, Label):
self.jump_target = instruction
self.instruction = Instr("NOP", lineno=fallback_ln)
elif isinstance(instruction.arg, Label):
self.jump_from = instruction.arg
Parameters
----------
iter_var : str
The name of the loop iter variable.
fast_var : bool, optional
Whether the iter_var lives in fast locals. The default is
True. If False, the iter_var is loaded from globals.
"""
start_label = bc.Label()
jump_label = bc.Label()
# Unused under Python 3.8+ since the compiler handle the blocks
# automatically
end_label = bc.Label()
load_op = "LOAD_FAST" if fast_var else "LOAD_GLOBAL"
if PY38:
self.code_ops.append(bc.Instr("SETUP_LOOP", end_label),)
self.code_ops.extend([
bc.Instr(load_op, iter_var),
bc.Instr("GET_ITER"),
start_label,
bc.Instr("FOR_ITER", jump_label),
])
yield
self.code_ops.extend([
bc.Instr("JUMP_ABSOLUTE", start_label),
jump_label,
])
if PY38:
self.code_ops.extend(