summaryrefslogtreecommitdiff
path: root/new/lib/git.py
blob: ba6f02dc440295745ce223c1cfde2ce1d7eb3497 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os.path
import re

from lib.helpers import execute, quote_all, ProcessException


class Git:
    def __init__(self, repo_path):
        self.repo_path = repo_path

    def exists(self):
        return os.path.exists(self.repo_path)

    def clone(self, git_url, branch=None):
        if branch is not None:
            execute("git clone -b %s --single-branch %s %s" % quote_all(
                branch, git_url, self.repo_path
            ))
        else:
            execute("git clone %s %s" % quote_all(
                git_url, self.repo_path
            ))

    def checkout(self, pathspec, branch=None):
        if branch is not None:
            execute("git -C %s checkout -b %s %s" % quote_all(self.repo_path, branch, pathspec))
        else:
            execute("git -C %s checkout %s" % quote_all(self.repo_path, pathspec))

    def add_remote(self, git_url, remote_name):
        execute("git -C %s remote add %s %s" % quote_all(self.repo_path, remote_name, git_url))

    def rm_remote(self, remote_name):
        execute("git -C %s remote rm %s" % quote_all(self.repo_path, remote_name))

    def get_remote_url(self, remote_name):
        return execute("git -C %s config --get remote.%s.url" % quote_all(self.repo_path, remote_name)).strip()

    def set_remote_url(self, remote_name, git_url):
        execute("git -C %s remote set-url %s %s" % quote_all(self.repo_path, remote_name, git_url))

    def fetch(self):
        execute("git -C %s fetch --all" % quote_all(self.repo_path))

    def pull(self, remote=None, branch=None, ff_only=False):
        execute("git -C %s reset --hard" % quote_all(self.repo_path))

        extra = ""
        if remote:
            extra += " %s" % quote_all(remote)
            if branch:
                extra += " %s" % quote_all(branch)
        if ff_only:
            extra += " --ff-only"

        execute("git -C %s pull%s" % tuple(quote_all(self.repo_path) + (extra,)))

    def push(self, remote):
        return execute("git -C %s push %s" % quote_all(self.repo_path, remote))

    def add(self):
        execute("git -C %s add --all" % quote_all(self.repo_path))

    def commit(self, message):
        execute("git -C %s commit -m %s" % quote_all(self.repo_path, message))

    def get_last_commit_hash(self):
        return execute("git -C %s rev-parse HEAD" % quote_all(self.repo_path)).strip()

    def get_changed_files(self, ref1, ref2):
        try:
            return execute("git diff --name-only %s %s" % quote_all(ref1, ref2)).strip()
        except ProcessException as e:
            if e.exit_code == 1 and "Could not access" in e.output:
                return ""  # ignore non-existing commits (caused by repo changed or force-push)
            raise

    def resolve_changes(self, change_patterns, previous_hash):
        if not self.exists():
            return True

        self.pull()
        current_hash = self.get_last_commit_hash()
        changed = current_hash != previous_hash
        if not changed or not previous_hash:
            return changed

        regexes = []
        for pattern in change_patterns:
            if re.search(r"^[*]+$", pattern):
                return True  # catch-all pattern

            pattern: str = re.escape(pattern)  # escape special characters
            pattern = pattern.replace("\\*", "*")  # undo escape of stars

            # convert stars into regex patterns
            pattern = pattern.replace("**", ".*")
            pattern = re.sub(r"[*]{2,}", ".*", pattern)
            pattern = pattern.replace("*", "[^/]+")

            regexes.append(re.compile(r"^%s$" % pattern, flags=re.I))

        matched = False
        for file in self.get_changed_files(current_hash, previous_hash):
            for regex in regexes:
                if regex.search(file):
                    matched = True
                    break

        if not matched:
            changed = False

        return changed