Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def vim():
argv = ['nvim', '-u', VIMRC, '--embed', '--headless']
vim = neovim.attach('child', argv=argv)
return WrappedVim(vim)
def f(argv=None, file=None):
if argv is None:
argv = []
argv = ['nvim', '-u', VIMRC, '--embed', '--headless', *argv]
vim = neovim.attach('child', argv=argv)
if file is not None:
fn = file or (tmp_path / 'foo.py')
vim.command('edit %s' % fn)
return WrappedVim(vim)
return f
def __init__(self):
"""Construct an engine."""
addr = os.environ.get('NVIM_LISTEN_ADDRESS')
if addr:
self.nvim = attach('socket', path=addr)
else:
args = ["/usr/bin/env", "nvim", "--embed", "--headless", "-n",
"-u", "init.vim"]
self.nvim = attach('child', argv=args)
# Dummy request to make sure the embedded Nvim proceeds
# See in neovim bd8d43c6fef868 (startup: wait for embedder
# before executing)
self.eval("0")
# Trusty builds on Travis seem to be more prone to races.
self.feed_delay = 0.02 if not os.environ.get('TRAVIS') else 0.1
def __init__(self):
"""Construct an engine."""
addr = os.environ.get('NVIM_LISTEN_ADDRESS')
if addr:
self.nvim = attach('socket', path=addr)
else:
args = ["/usr/bin/env", "nvim", "--embed", "--headless", "-n",
"-u", "init.vim"]
self.nvim = attach('child', argv=args)
# Dummy request to make sure the embedded Nvim proceeds
# See in neovim bd8d43c6fef868 (startup: wait for embedder
# before executing)
self.eval("0")
# Trusty builds on Travis seem to be more prone to races.
self.feed_delay = 0.02 if not os.environ.get('TRAVIS') else 0.1
@neovim.autocmd('VimEnter', pattern='*', sync=True)
def event_vim_enter(self):
"""Find and retain the plugin instance."""
# Don't import semshi on top so it's not collected as a plugin again
from semshi.plugin import Plugin
# Using the garbage collector interface to find the instance is a bit
# hacky, but works reasonably well and doesn't require changes to the
# plugin code itself.
for obj in gc.get_objects():
if isinstance(obj, Plugin):
self._plugin = obj
return
raise Exception('Can\'t find plugin instance.')
@pynvim.function('MarksmanUpdateSearch', sync=True)
def updateSearch(self, args):
self._lazyInit()
assert len(args) == 5, 'Wrong number of arguments to MarksmanUpdateSearch'
rootPath = self._getCanonicalPath(args[0])
assert os.path.isdir(rootPath), f"Could not find directory '{rootPath}'"
requestId = args[1]
offset = args[2]
maxAmount = args[3]
# We could use ignorePath here to hide the current project, but I find that
# in practice this is more annoying than it is useful
# ignorePath = self._getCanonicalPath(args[4])
ignorePath = None
@pynvim.function('MarksmanForceRefresh')
def forceRefresh(self, args):
self._lazyInit()
assert len(args) == 1, 'Wrong number of arguments to MarksmanForceRefresh'
rootPath = self._getCanonicalPath(args[0])
info = self._getProjectInfo(rootPath)
if info.isUpdating.getValue():
return
info.isUpdating.setValue(True)
self._refreshQueue.put(rootPath)
@pynvim.function('GdbInit', sync=True)
def gdb_init(self, args):
"""Handle the command GdbInit."""
# Prepare configuration: keymaps, hooks, parameters etc.
common = BaseCommon(self.vim, Config(self))
app = App(common, *args)
self.apps[self.vim.current.tabpage.handle] = app
app.start()
if len(self.apps) == 1:
# Initialize the UI commands, autocommands etc
self.vim.call("nvimgdb#GlobalInit")
def init_neovim_bridge(self):
try:
from pynvim import attach
except:
raise SystemExit('pynvim unavailable')
try:
self.nvim = attach('socket', path=self.nvim_listen_address)
except:
ncmd = 'env NVIM_LISTEN_ADDRESS={} nvim {}'.format(self.nvim_listen_address, config.NOTE_PATH)
try:
os.system('{} {}'.format(config.KITTYCMD,ncmd))
except:
raise SystemExit('unable to open new kitty window')
end = monotonic() + 5 # 5 second time out
while monotonic() < end:
try:
self.nvim = attach('socket', path=self.nvim_listen_address)
break
except:
# keep trying every tenth of a second
sleep(0.1)
def attach_vim(serveraddr):
if len(serveraddr.split(':')) == 2:
serveraddr, port = serveraddr.split(':')
port = int(port)
vim = attach('tcp', address=serveraddr, port=port)
else:
vim = attach('socket', path=serveraddr)
# sync path
for path in vim.call(
'globpath', vim.options['runtimepath'],
'rplugin/python3', 1).split('\n'):
sys.path.append(path)
# Remove current path
del sys.path[0]
return vim