Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# The default working directory is the directory from which Jupyter
# server is launched, which is not the same as the root notebook
# directory assuming either --notebook-dir= is used from the
# command line or c.NotebookApp.notebook_dir is set in the jupyter
# configuration. This line assures that all repos are cloned
# relative to server_root_dir, so that all repos are always in
# scope after cloning. Sometimes server_root_dir will include
# things like `~` and so the path must be expanded.
repo_dir = os.path.join(os.path.expanduser(self.settings['server_root_dir']),
repo.split('/')[-1])
# We gonna send out event streams!
self.set_header('content-type', 'text/event-stream')
self.set_header('cache-control', 'no-cache')
gp = GitPuller(repo, branch, repo_dir, depth=depth, parent=self.settings['nbapp'])
q = Queue()
def pull():
try:
for line in gp.pull():
q.put_nowait(line)
# Sentinel when we're done
q.put_nowait(None)
except Exception as e:
q.put_nowait(e)
raise e
self.gp_thread = threading.Thread(target=pull)
self.gp_thread.start()
while True:
def __init__(self, git_url, branch_name, repo_dir, **kwargs):
assert git_url and branch_name
self.git_url = git_url
self.branch_name = branch_name
self.repo_dir = repo_dir
newargs = {k: v for k, v in kwargs.items() if v is not None}
super(GitPuller, self).__init__(**newargs)