Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# #
# http: // www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
############################################################################
import os
from pygls import uris
from pygls.types import TextDocumentItem, WorkspaceFolder
from pygls.workspace import Workspace
DOC_URI = uris.from_fs_path(__file__)
DOC_TEXT = '''test'''
DOC = TextDocumentItem(DOC_URI, 'plaintext', 0, DOC_TEXT)
def test_add_folder(workspace):
dir_uri = os.path.dirname(DOC_URI)
dir_name = 'test'
workspace.add_folder(WorkspaceFolder(dir_uri, dir_name))
assert workspace.folders[dir_uri].name == dir_name
def test_get_document(workspace):
workspace.put_document(DOC)
assert workspace.get_document(DOC_URI).source == DOC_TEXT
def workspace(tmpdir):
"""Return a workspace."""
return Workspace(uris.from_fs_path(str(tmpdir)), Mock())
import pytest
from mock import Mock
from pygls import features, uris
from pygls.feature_manager import FeatureManager
from pygls.server import LanguageServer
from pygls.workspace import Document, Workspace
from tests.ls_setup import setup_ls_features
CALL_TIMEOUT = 2
DOC = """document
for
testing
"""
DOC_URI = uris.from_fs_path(__file__)
@pytest.fixture
def client_server():
""" A fixture to setup a client/server """
# Client to Server pipe
csr, csw = os.pipe()
# Server to client pipe
scr, scw = os.pipe()
# Setup server
server = LanguageServer()
setup_ls_features(server)
server_thread = Thread(target=server.start_io, args=(
def test_win_from_fs_path(path, uri):
assert uris.from_fs_path(path) == uri
def test_get_missing_document(tmpdir, workspace):
doc_path = tmpdir.join("test_document.py")
doc_path.write(DOC_TEXT)
doc_uri = uris.from_fs_path(str(doc_path))
assert workspace.get_document(doc_uri).source == DOC_TEXT
def test_from_fs_path(path, uri):
assert uris.from_fs_path(path) == uri
logger.info('Language server initialized {}'.format(params))
self._server.process_id = params.processId
# Initialize server capabilities
client_capabilities = params.capabilities
server_capabilities = ServerCapabilities(self.fm.features.keys(),
self.fm.feature_options,
self.fm.commands,
self._server.sync_kind,
client_capabilities)
logger.debug('Server capabilities: {}'
.format(server_capabilities.__dict__))
root_path = getattr(params, 'rootPath', None)
root_uri = params.rootUri or from_fs_path(root_path)
# Initialize the workspace
workspace_folders = getattr(params, 'workspaceFolders', [])
self.workspace = Workspace(root_uri, self._server.sync_kind, workspace_folders)
return InitializeResult(server_capabilities)