1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import os.path
import re
from lib.helpers import execute, quote_all, ProcessException
class Git:
def __init__(self, repo_path):
self.repo_path = repo_path
def exists(self):
return os.path.exists(self.repo_path)
def clone(self, git_url, branch=None):
if branch is not None:
execute("git clone -b %s --single-branch %s %s" % quote_all(
branch, git_url, self.repo_path
))
else:
execute("git clone %s %s" % quote_all(
git_url, self.repo_path
))
def checkout(self, pathspec, branch=None):
if branch is not None:
execute("git -C %s checkout -b %s %s" % quote_all(self.repo_path, branch, pathspec))
else:
execute("git -C %s checkout %s" % quote_all(self.repo_path, pathspec))
def add_remote(self, git_url, remote_name):
execute("git -C %s remote add %s %s" % quote_all(self.repo_path, remote_name, git_url))
def rm_remote(self, remote_name):
execute("git -C %s remote rm %s" % quote_all(self.repo_path, remote_name))
def get_remote_url(self, remote_name):
return execute("git -C %s config --get remote.%s.url" % quote_all(self.repo_path, remote_name)).strip()
def set_remote_url(self, remote_name, git_url):
execute("git -C %s remote set-url %s %s" % quote_all(self.repo_path, remote_name, git_url))
def fetch(self):
execute("git -C %s fetch --all" % quote_all(self.repo_path))
def pull(self, remote=None, branch=None, ff_only=False):
execute("git -C %s reset --hard" % quote_all(self.repo_path))
extra = ""
if remote:
extra += " %s" % quote_all(remote)
if branch:
extra += " %s" % quote_all(branch)
if ff_only:
extra += " --ff-only"
execute("git -C %s pull%s" % tuple(quote_all(self.repo_path) + (extra,)))
def push(self, remote):
return execute("git -C %s push %s" % quote_all(self.repo_path, remote))
def add(self):
execute("git -C %s add --all" % quote_all(self.repo_path))
def commit(self, message):
execute("git -C %s commit -m %s" % quote_all(self.repo_path, message))
def get_last_commit_hash(self):
return execute("git -C %s rev-parse HEAD" % quote_all(self.repo_path)).strip()
def get_changed_files(self, ref1, ref2):
try:
return execute("git diff --name-only %s %s" % quote_all(ref1, ref2)).strip()
except ProcessException as e:
if e.exit_code == 1 and "Could not access" in e.output:
return "" # ignore non-existing commits (caused by repo changed or force-push)
raise
def resolve_changes(self, change_patterns, previous_hash):
if not self.exists():
return True
self.pull()
current_hash = self.get_last_commit_hash()
changed = current_hash != previous_hash
if not changed or not previous_hash:
return changed
regexes = []
for pattern in change_patterns:
if re.search(r"^[*]+$", pattern):
return True # catch-all pattern
pattern: str = re.escape(pattern) # escape special characters
pattern = pattern.replace("\\*", "*") # undo escape of stars
# convert stars into regex patterns
pattern = pattern.replace("**", ".*")
pattern = re.sub(r"[*]{2,}", ".*", pattern)
pattern = pattern.replace("*", "[^/]+")
regexes.append(re.compile(r"^%s$" % pattern, flags=re.I))
matched = False
for file in self.get_changed_files(current_hash, previous_hash):
for regex in regexes:
if regex.search(file):
matched = True
break
if not matched:
changed = False
return changed
|