Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@unittest.skipIf(six.PY3, 'Not supported in Python 3.')
def test_cast_long(self):
self.assertEqual(self.type.test(long('141414')), True)
self.assertEqual(self.type.cast(long('141414')), Decimal('141414'))
@unittest.skipIf(not hadoop_installed(), 'Hadoop not installed')
def test_textgz_in_local(self):
self._run_wc('wc-input-alice.txt.gz', launcher=hadoopy.launch_local)
@unittest.skipIf(configuration.TOPOLOGY_FROM_RESOURCE_SERVER, 'Skip for topology testing from resource server: Reads Vault')
def test_iscan_data_object(self):
# test that rodsusers can't use iscan -d
self.user0.assert_icommand('iscan -d non_existent_file', 'STDOUT_SINGLELINE', 'Could not find the requested data object or collection in iRODS.')
existent_file = os.path.join(self.user0.local_session_dir, 'existent_file')
lib.make_file(existent_file, 1)
self.user0.assert_icommand('iput ' + existent_file)
output = self.admin.run_icommand('iquest "SELECT DATA_PATH WHERE DATA_NAME = \'existent_file\'"')[1]
data_path = output.strip().strip('-').strip()[12:]
self.user0.assert_icommand('iscan -d existent_file', 'STDOUT_SINGLELINE', 'User must be a rodsadmin to scan iRODS data objects.')
os.remove(data_path)
self.user0.assert_icommand('iscan -d existent_file', 'STDOUT_SINGLELINE', 'User must be a rodsadmin to scan iRODS data objects.')
lib.make_file(data_path, 1)
self.user0.assert_icommand('irm -f existent_file')
zero_file = os.path.join(self.user0.local_session_dir, 'zero_file')
lib.touch(zero_file)
self.user0.assert_icommand('iput ' + zero_file)
@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
def test_50_docker_rabbitmq(self):
try:
os.environ['RABBITMQ_NAME'] = 'rabbitmq'
os.environ['RABBITMQ_PORT_5672_TCP_ADDR'] = 'localhost'
os.environ['RABBITMQ_PORT_5672_TCP_PORT'] = '5672'
ctx = run.cli.make_context('test', [], None,
obj=dict(testing_mode=True))
ctx = run.cli.invoke(ctx)
queue = ctx.obj.newtask_queue
queue.put('abc')
queue.delete()
except Exception as e:
self.assertIsNone(e)
finally:
del os.environ['RABBITMQ_NAME']
del os.environ['RABBITMQ_PORT_5672_TCP_ADDR']
standard_library.install_aliases()
except:
# we might be on py3.2, which the future library doesn't support
pass
import os
import sys
if sys.version_info < (2, 7, 0): # pragma: no cover
import unittest2 as unittest
else:
import unittest
from layeredconfig import LayeredConfig, Defaults, Environment, INIFile
@unittest.skipIf (sys.version_info[0] == 3 and sys.version_info[1] < 3,
"Python 3.2 and lower doesn't support the future module")
class TestFuture(unittest.TestCase):
def test_newint(self):
os.environ['FERENDA_DOWNLOADMAX'] = '3'
config = LayeredConfig(Defaults({'downloadmax': int}),
Environment(prefix="FERENDA_"))
self.assertEqual(3, config.downloadmax)
self.assertIsInstance(config.downloadmax, int)
@unittest.skipIf(not hasattr(Point, "move"), "Point has no move method")
def test_move(self):
pt = Point(4,5)
pt.move(2,3)
self.assertEqual(pt.x, 2, "pt.x did not move from 4 to 2")
self.assertEqual(pt.y, 3, "pt.y did not move from 5 to 3")
@unittest.skipIf(BSD, "broken on BSD, see #595")
@unittest.skipIf(APPVEYOR,
"can't find any process file on Appveyor")
def test_open_files(self):
# current process
p = psutil.Process()
files = p.open_files()
self.assertFalse(TESTFN in files)
with open(TESTFN, 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
filenames = [x.path for x in p.open_files()]
self.assertIn(TESTFN, filenames)
for file in filenames:
assert os.path.isfile(file), file
# another process
@unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
'/proc/diskstats not available on this Linux version')
@skip_if_linux()
def test_disk_io_counters(self):
self.execute('disk_io_counters')
@unittest.skipIf(ast is None,
'ast module not available on this python version')
def test_nettop(self):
self.assert_syntax('nettop.py')