Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_clone_bare(self):
local_repo = repo.Repo.init(self.temp_d, mkdir=True)
swift.SwiftRepo.init_bare(self.scon, self.conf)
tcp_client = client.TCPGitClient(self.server_address,
port=self.port)
remote_refs = tcp_client.fetch(self.fakerepo, local_repo)
# The remote repo is empty (no refs retreived)
self.assertEqual(remote_refs, None)
def test_push_commit(self):
def determine_wants(*args):
return {"refs/heads/master": local_repo.refs["HEAD"]}
local_repo = repo.Repo.init(self.temp_d, mkdir=True)
# Nothing in the staging area
local_repo.do_commit('Test commit', 'fbo@localhost')
sha = local_repo.refs.read_loose_ref('refs/heads/master')
swift.SwiftRepo.init_bare(self.scon, self.conf)
tcp_client = client.TCPGitClient(self.server_address,
port=self.port)
tcp_client.send_pack(self.fakerepo,
determine_wants,
local_repo.object_store.generate_pack_data)
swift_repo = swift.SwiftRepo("fakerepo", self.conf)
remote_sha = swift_repo.refs.read_loose_ref('refs/heads/master')
self.assertEqual(sha, remote_sha)
def init(pod, theme_url, force=False):
if '//' not in theme_url or ':' not in theme_url:
repo_url = THEME_REPO_URL.format(theme_url)
else:
repo_url = theme_url
git_client = client.HttpGitClient(repo_url)
logging.info('Initializing with {}...'.format(repo_url))
temp_repo = MemoryRepo()
temp_repo.clone(git_client)
tree = temp_repo['refs/heads/master'].tree
# Build the tree into a temp directory.
temp_root = tempfile.mkdtemp()
local_repo = repo.Repo.init(temp_root)
index_file = local_repo.index_path()
index.build_index_from_tree(local_repo.path, index_file, temp_repo.object_store, tree)
shutil.rmtree(os.path.join(temp_root, '.git'))
# Automatically enable "force" for empty directories.
if pod.list_dir('/') == []:
force = True
try:
_copy_files_to_pod(temp_root, pod, force=force)
except OSError as e:
if 'File exists' in str(e):
text = ('{} already exists and is not empty. Delete the directory before'
' proceeding or use --force.')
logging.info(text.format(pod.root))
return
self.renderers = {'copy': self.render_copy, 'md': self.render_md}
self.r = {}
#ensure cache folder is created
if not os.path.exists(self.cache_path):
os.makedirs(self.cache_path)
#ensure doc folder is created
#ensure repo is initiated
if self.repo_root:
if self.git_repos:
for _repo in self.git_repos:
if not os.path.exists(self.repo_root + _repo):
os.makedirs(self.repo_root + _repo + self.doc)
if not os.path.exists(self.cache_path + _repo):
os.makedirs(self.cache_path + _repo)
try:
self.r[_repo] = repo.Repo(self.repo_root + _repo)
except:
self.r[_repo] = repo.Repo.init(
self.repo_root + _repo)
if self.hg_repos:
for _repo in self.hg_repos:
if not os.path.exists(self.repo_root + _repo):
os.makedirs(self.repo_root + _repo + self.doc)
if not os.path.exists(self.cache_path + _repo):
os.makedirs(self.cache_path + _repo)
try:
self.r[_repo] = localrepo.localrepository(
ui.ui(),
self.repo_root + _repo)
except:
hg.init(ui.ui(), self.repo_root + _repo)
self.r[_repo] = localrepo.localrepository(
def _fetch_pack(self):
"""Fetch changes and store them in a pack."""
def prepare_refs(refs):
return [ref.hash.encode('utf-8') for ref in refs
if not ref.refname.endswith('^{}')]
def determine_wants(refs):
remote_refs = prepare_refs(self._discover_refs(remote=True))
local_refs = prepare_refs(self._discover_refs())
wants = [ref for ref in remote_refs if ref not in local_refs]
return wants
client, repo_path = dulwich.client.get_transport_and_path(self.uri)
repo = dulwich.repo.Repo(self.dirpath)
fd = io.BytesIO()
local_refs = self._discover_refs()
graph_walker = _GraphWalker(local_refs)
result = client.fetch_pack(repo_path,
determine_wants,
graph_walker,
fd.write)
refs = [GitRef(ref_hash.decode('utf-8'), ref_name.decode('utf-8'))
for ref_name, ref_hash in result.refs.items()]
if len(fd.getvalue()) > 0:
fd.seek(0)
pack = repo.object_store.add_thin_pack(fd.read, None)
pack_name = pack.name().decode('utf-8')
def _find_reporoot(self, reporoot_opt, relnotessubdir_opt):
"""Find root directory of project."""
reporoot = os.path.abspath(reporoot_opt)
# When building on RTD.org the root directory may not be
# the current directory, so look for it.
try:
return repo.Repo.discover(reporoot).path
except Exception:
pass
for root in ('.', '..', '../..'):
if os.path.exists(os.path.join(root, relnotessubdir_opt)):
return root
raise Exception(
'Could not discover root directory; tried: %s' % ', '.join([
os.path.abspath(root) for root in ('.', '..', '../..')
])
def get_changelog(repo_path, from_commit=None):
"""
Given a repo path and an option commit/tag/refspec to start from, will
get the rpm compatible changelog
Args:
repo_path (str): path to the git repo
from_commit (str): refspec (partial commit hash, tag, branch, full
refspec, partial refspec) to start the changelog from
Returns:
str: Rpm compatible changelog
"""
repo = dulwich.repo.Repo(repo_path)
tags = get_tags(repo)
refs = get_refs(repo)
changelog = []
maj_version = 0
feat_version = 0
fix_version = 0
start_including = False
cur_line = ''
if from_commit is None:
start_including = True
prev_version = (maj_version, feat_version, fix_version)
for commit_sha, children in reversed(
get_children_per_first_parent(repo_path).items()
try:
import wx
import librian.librian_util.wxcef as wxcef
except ModuleNotFoundError:
logging.warning('沒能import wx,改爲使用pyside2。')
import librian.librian_util.fake_wx as wx
import librian.librian_util.qtcef as wxcef
from librian.librian本體.帶有vue的山彥 import 帶有vue的山彥
from librian.librian本體.librian虛擬機 import 虛擬機環境
from librian.librian_util import 加載器, 文件, 路徑, 更新器
rp = dulwich.repo.Repo('.')
最後提交unix時間 = rp[rp.head()].author_time
最後提交時間 = datetime.datetime.fromtimestamp(最後提交unix時間)
class 山彥(帶有vue的山彥):
def __init__(self, *li, **d):
super().__init__(*li, **d)
self.vue.存檔資料 = 加載器.yaml(路徑.librian面板 / '存檔資料/存檔資料.yaml')
self.vue.最後提交時間 = 最後提交時間.strftime('%y-%m-%d')
def js(self, x):
self.窗口.browser.ExecuteJavascript(x)
def alert(self, title, icon=None, text=None):
msg = {"title": title, "icon": icon, "text": text}
self.js(f'Swal.fire({json.dumps(msg)})')
def main(repo_dir, options):
emit_repo_as_xdot(dulwich.repo.Repo(repo_dir), options)
parser.add_argument('--source-repo',
help='URL of repository being converted')
parser.add_argument('git_repo', help='Path to Git repository to linearize')
parser.add_argument('ref', help='ref to linearize')
args = parser.parse_args()
configure_logging()
kwargs = get_commit_rewriter_kwargs(args)
if args.source_repo:
kwargs['source_repo'] = args.source_repo
repo = dulwich.repo.Repo(args.git_repo)
rewriter = commit_metadata_rewriter(repo, **kwargs)
try:
linearize_git_repo(repo, args.ref,
exclude_dirs=args.exclude_dirs,
commit_rewriter=rewriter)
except RewriteError as e:
logger.error('abort: %s' % str(e))