Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_expect_basic (self):
p = pexpect.spawnu('cat')
p.sendline('Hello')
p.sendline('there')
p.sendline('Mr. þython') # þ is more like th than p, but never mind
p.expect('Hello')
p.expect('there')
p.expect('Mr. þython')
p.sendeof ()
p.expect (pexpect.EOF)
i = 1
for d in test_config['disks']:
os.system("rm -f %s" % d)
os.system("truncate -s 10G %s" % d)
extra_disks += " -s 31:%d,virtio-blk,%s" % (i, d)
i=i+1
# Part 1: Do a clean install using the ISO
cmd = "bhyvectl --destroy --vm=%s" % test_config['vm_name']
print
ret = os.system(cmd)
cmd = "bhyveload -m %s -d %s %s" % (test_config['ram'], test_config['iso'], test_config['vm_name'])
print cmd
child1 = pexpect.spawn(cmd)
child1.logfile = sys.stdout
child1.expect(pexpect.EOF)
macaddress = ""
if test_config.has_key('mac'):
macaddress = ",mac=%s" % test_config['mac']
cmd = "bhyve -c 2 -m %s -AI -H -P -g 0 -s 0:0,hostbridge -s 1:0,lpc -s 2:0,virtio-net,%s%s -s 3:0,virtio-blk,%s -l com1,stdio -s 31:0,virtio-blk,%s %s" % (test_config['ram'], test_config['tap'], macaddress, test_config['disk_img'], test_config['iso'], test_config['vm_name'])
print cmd
child2 = pexpect.spawn(cmd)
child2.logfile = sys.stdout
child2.expect(['Install'])
child2.sendline("1")
child2.expect(['Select the drive'])
child2.sendline("\n")
child2.expect(['Proceed with the installation'])
child2.sendline("Y")
child2.expect(['Please remove'], 250000)
p.sendline('3')
p.expect(PROMPT)
print '===> %r' % p.match.group(0)
p.sendline('4')
p.expect(PROMPT)
print '===> %r' % p.match.group(0)
p.sendline('1')
p.expect(pexpect.EOF)
print '===> %r' % p.before
p = _login_duo(confs)
p.sendline('2')
p.expect(pexpect.EOF)
print '===> %r' % p.before
def test_maxread ():
fd = os.open (file, os.O_RDONLY)
s = fdpexpect.fdspawn (fd)
s.maxread = 100
s.expect('2')
s.expect ('This is the end of test data:')
s.expect (pexpect.EOF)
assert s.before == ' END\n'
if not (classfile, jarfile) in self.opened_files:
if not os.path.exists(jarfile): pass
classparts = classfile.split(".")
classgroup = ".".join(classparts[0:3])
print "%s : %s" % (jarfile, classgroup)
self.opened_files.append((classgroup, jarfile))
def flush(self):
pass
if __name__=='__main__':
app = "'%s'" % ("' '".join(sys.argv[1:]))
command = app
child = pexpect.spawn(command, logfile=Profiler() )
child.expect(pexpect.EOF, timeout=999999999)
child.close()
p = Profiler()
import ldap3
import pexpect
import MySQLdb as mdb
# Run this on iodine.tjhsst.edu
start_date = "2015-09-01"
csl_realm = "CSL.TJHSST.EDU"
host = "iodine.tjhsst.edu"
ldap_realm = "CSL.TJHSST.EDU"
ldap_server = "ldap://iodine-ldap.tjhsst.edu"
base_dn = "ou=people,dc=tjhsst,dc=edu"
kgetcred = pexpect.spawn("/usr/bin/kgetcred ldap/{}@{}".format(host, ldap_realm))
kgetcred.expect(pexpect.EOF)
kgetcred.close()
if kgetcred.exitstatus:
print("Authorization to LDAP failed. Try running kinit.")
sys.exit(1)
print("Successfully authorized to LDAP service")
server = ldap3.Server(ldap_server)
connection = ldap3.Connection(server, authentication=ldap3.SASL, sasl_mechanism='GSSAPI')
connection.bind()
print("Successfully bound to LDAP with " + connection.extend.standard.who_am_i())
def user_attrs(uid, attr):
sfilter = '(iodineUidNumber=' + str(uid) + ')'
fd.write('\n')
nextPc = calculatePcPlusFour(pc)
sndline = 'disas /r ' + pc + ',' + nextPc
#child.sendline('disas /r %s, %s' % (pc, nextPc))
child.sendline(sndline)
child.expect(COMMAND_PROMPT)
processInstruction(child.before, fd)
child.sendline('si')
i = child.expect([EXIT_MESSAGE, COMMAND_PROMPT])
if i == 0: # Finished GDB
child.sendline('quit')
child.expect(pexpect.EOF)
break
if i == 1:
child.sendline('info registers')
child.expect(COMMAND_PROMPT)
#fd.write(child.before)
processRegisterInfo(child.before, fd)
fd.close()
for cmd in cmds:
if self.firstCmd:
p.expect("Vivado%", timeout=self.timeout) # block while command line init
self.firstCmd = False
if self.guiOpened:
raise Exception("Controller have no acces to Vivado because gui is opened")
p.sendline(cmd)
# @attention: there is timing issue in reading from tty next command returns corrupted line
p.readline() # read cmd from tty
# p.expect(cmd, timeout=self.timeout)
if cmd == VivadoTCL.start_gui():
self.guiOpened = True
try:
p.expect("Vivado%", timeout=self.timeout) # block while cmd ends
except pexpect.EOF:
pass
t = p.before.decode(self.encoding)
if self.logComunication:
print(cmd)
print(t)
res = VivadoCmdResult.fromStdoutStr(cmd, t)
res.raiseOnErrors()
yield res
def process(self, cmds, iterator=False):
def _pexpect_out(self):
if self.subprocess.encoding:
result = ""
else:
result = b""
if self.subprocess.before:
result += self.subprocess.before
if self.subprocess.after and self.subprocess.after is not pexpect.EOF:
result += self.subprocess.after
result += self.subprocess.read()
return result
def drain_child_buffer(self):
'''Read all data from child to make it eof.'''
try:
data = self.p.read_nonblocking(size=100, timeout=1)
while data:
data = self.p.read_nonblocking(size=100, timeout=1)
except (pexpect.EOF, pexpect.TIMEOUT):
logger.debug('drain buffer to EOF')