Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
mp.set_start_method('fork')
if self.dev_name is not None:
net_name = self.get_net_name(self.dev_name)
try:
self.ip_addr = self.get_ip_address(net_name)
except KeyError:
raise unittest.SkipTest('Device {} doesn\'t have net interface'
.format(self.dev_name))
else:
dev_list = d.get_device_list()
for dev in dev_list:
net_name = self.get_net_name(dev.name.decode())
try:
self.ip_addr = self.get_ip_address(net_name)
except IndexError:
continue
else:
ARGS.add_argument(
'-p', '--plot-file-name', action="store",
type=str, default=None,
dest='plot_file_name',
help='file name for plot (default: `%(default)s`)')
ARGS.add_argument(
'-v', '--verbose', action="count", default=0,
help='verbosity level (default: `%(default)s`)')
ARGS.add_argument(
'--profile', action="store_true", default=False,
help='perform aiohttp test profiling, store result as out.prof '
'(default: `%(default)s`)')
if __name__ == '__main__':
set_start_method('spawn')
sys.exit(main(sys.argv))
init_headless(args.directory, args.skip_obfuscation, args.output,
args.parse_libraries, emails, args.debug_mode, [], args.directory, args.skip,
args.commit_size_limit, args.file_size_limit, seed)
else:
initialize(args.directory, args.skip_obfuscation, args.output,
args.parse_libraries, args.email, args.skip_upload, args.debug_mode, args.skip,
args.commit_size_limit, args.file_size_limit)
except KeyboardInterrupt:
print("Cancelled by user")
os._exit(0)
if __name__ == "__main__":
import multiprocessing
multiprocessing.set_start_method('spawn', True)
main()
def main():
"""Test script."""
import sys
if sys.version_info >= (3, 0, 0):
# Only available in Python 3; allows us to test process code as it
# behaves in Windows environments.
multiprocessing.set_start_method('spawn')
import argparse
import json
from bcipy.acquisition.protocols import registry
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--buffer', default='buffer.db',
help='buffer db name')
parser.add_argument('-f', '--filename', default='rawdata.csv')
parser.add_argument('-d', '--device', default='DSI',
choices=registry.supported_devices.keys())
parser.add_argument('-c', '--channels', default='',
help='comma-delimited list')
parser.add_argument('-p', '--params', type=json.loads,
default={'host': '127.0.0.1', 'port': 9000},
help="device connection params; json")
# need different behavior in a GUI
import cytoflow
cytoflow.RUNNING_IN_GUI = True
running_event.set()
RemoteWorkflow().run(parent_workflow_conn, parent_mpl_conn, log_q)
def monitor_remote_process(proc):
proc.join()
if proc.exitcode:
logging.error("Remote process exited with {}".format(proc.exitcode))
if __name__ == '__main__':
multiprocessing.freeze_support()
multiprocessing.set_start_method('spawn')
run_gui()
while True:
logging.info("Hello from proc2")
await client.broadcast(SecondThingHappened("Hit from proc2 "))
await asyncio.sleep(2)
async def display_proc1_events(endpoint):
async for event in endpoint.stream(FirstThingHappened):
logging.info("Received via STREAM API in proc2: %s", event.payload)
if __name__ == "__main__":
# WARNING: The `fork` method does not work well with asyncio yet.
# This might change with Python 3.8 (See https://bugs.python.org/issue22087#msg318140)
multiprocessing.set_start_method("spawn")
# Start two processes
p1 = multiprocessing.Process(target=run_proc1)
p1.start()
p2 = multiprocessing.Process(target=run_proc2)
p2.start()
p1.join()
p2.join()
help='Whether to switch on example rpc mode')
parser.add_argument('--tracker', type=str, default="",
help="Report to RPC tracker")
parser.add_argument('--no-fork', dest='fork', action='store_false',
help="Use spawn mode to avoid fork. This option \
is able to avoid potential fork problems with Metal, OpenCL \
and ROCM compilers.")
parser.set_defaults(fork=True)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
if args.fork is False:
if sys.version_info[0] < 3:
raise RuntimeError(
"Python3 is required for spawn mode."
)
multiprocessing.set_start_method('spawn')
else:
logging.info("If you are running ROCM/Metal, \
fork with cause compiler internal error. Try to launch with arg ```--no-fork```")
main(args)
def __init__(self, *args, **kwargs):
super().__init__(*args, calibrator=CrossCalibrator(fixed_length=50), **kwargs)
multiprocessing.set_start_method('spawn')
self.pyb = PyboardConnection(com_port='COM3')
protocol = MultistimulusExp06Protocol(repetitions=16,
shock_args=dict(burst_freq=1, pulse_amp=3., pulse_n=1,
pulse_dur_ms=5, pyboard=self.pyb),
grating_args=dict(grating_period=10), calibrator=self.calibrator)
self.set_protocol(protocol)
self.finished = False
# Create window and layout:
self.main_layout = QSplitter(Qt.Horizontal)
self.main_layout.addWidget(self.widget_control)
self.setCentralWidget(self.main_layout)
key = self.key
func = fixed_func_registry[key]
assert func is not None
return func(arg)
from itertools import count as _
fixed_func_registry_key_generator = _()
try: # cannot use detect_problem() here (hangs in pool.map())
from multiprocessing.pool import Pool as multiprocessing_Pool
# on macOS restore "fork" instead of new default of "spawn" on Python 3.8
# https://bugs.python.org/issue33725
# may need to re-evaluate if Python is built with macOS 10.13 SDK (or later)
if sys.platform == 'darwin' and sys.hexversion >= 0x03080000:
import multiprocessing
multiprocessing.set_start_method('fork')
except Exception:
multiprocessing_Pool = object
class Pool(multiprocessing_Pool):
"""Subclass of multiprocessing.Pool, used internally by pool_map."""
def __init__(self,
processes=None,
initializer=None,
initargs=(),
maxtasksperchild=None,
fixed_func=None):
if (multiprocessing_Pool is object):
mp_problem = detect_problem()
assert mp_problem is not None
raise RuntimeError(mp_problem)
self.processes = get_processes(processes)
python_version = sys.version_info
if python_version.major == 2:
str1 = unicode(str1)
str2 = unicode(str2)
return distance_function(str1, str2)
# First time initialization on import
# Set Mac OS systems to use the older "fork" method of spawning
# procs for the multiprocessing module. For some reason the newer
# methods don't work (EOFErrors when creating Manager() objects)
system_type = platform.system()
if system_type == "Darwin":
multiprocessing.set_start_method('fork')