summaryrefslogtreecommitdiff
path: root/new/lib/apt.py
blob: 52bab254778dc6c9aee0535a24b4321801f0b57d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import logging
import os
import re
from shlex import quote

from lib.helpers import quote_all, execute, data_dir, resources_dir, apt_dir


class Apt:
    _repo_dir = None

    def __init__(self, branch, directory):
        self.branch = branch
        self.directory = directory
        self.gpg_keyring_path = os.path.join(data_dir, ".gnupg")

    def scan_for_dist_files(self, directory):
        dsc_files = []
        binary_files = []
        binary_names = []

        for parent, directories, files in os.walk(directory):
            for file_name in files:
                path = os.path.join(parent, file_name)
                name, ext = os.path.splitext(file_name)
                ext = ext.lower()[1:]
                if ext == "dsc":
                    dsc_files.append(path)
                elif ext == "deb":
                    # exclude build dependency packages
                    if "build-deps_" in name:
                        continue

                    # exclude packages that don't follow name_version_arch.deb
                    if not re.search(r"[^_]+_[^_]+_[^_]+", name):
                        continue

                    # exclude duplicates packages
                    if file_name in binary_names:
                        continue
                    binary_names.append(file_name)

                    binary_files.append(path)

        return dsc_files, binary_files

    def initialize_repository(self):
        pub_keyring_path = os.path.join(self.gpg_keyring_path, "pubring.kbx")
        if not os.path.exists(pub_keyring_path):
            logging.info("Generating GPG signing key")
            material_path = os.path.join(resources_dir, "gpg-gen-key.txt")
            execute("gpg --homedir %s --batch --gen-key < %s" % quote_all(self.gpg_keyring_path, material_path))

        conf_dir = os.path.join(apt_dir, self.branch, "conf")
        if not os.path.exists(conf_dir):
            logging.info("Initializing APT repository")
            os.makedirs(conf_dir)

        dist_path = os.path.join(conf_dir, "distributions")
        if not os.path.exists(dist_path):
            material_path = os.path.join(resources_dir, "apt-distributions.txt")
            with open(material_path, "r") as file:
                contents = file.read()
                contents = contents.replace("%branch%", self.branch)
                contents = contents.replace("%keyId%", self.get_key_id())

            with open(dist_path, "w") as file:
                file.write(contents)

        options_path = os.path.join(conf_dir, "options")
        if not os.path.exists(options_path):
            material_path = os.path.join(resources_dir, "apt-options.txt")
            with open(material_path, "r") as file:
                contents = file.read()

            with open(options_path, "w") as file:
                file.write(contents)

        repo_dir = os.path.dirname(conf_dir)
        root_dir = os.path.dirname(repo_dir)

        pub_key_path = os.path.join(root_dir, "apt.gpg.key")
        if not os.path.exists(pub_key_path):
            execute("gpg --homedir %s --armor --output %s --export-options export-minimal --export %s" % (
                self.gpg_keyring_path, pub_key_path, self.get_key_id()
            ))

        return repo_dir

    def get_repo_dir(self):
        if self._repo_dir is None:
            self._repo_dir = self.initialize_repository()
        return self._repo_dir

    def get_key_id(self):
        output = execute("gpg --homedir %s --list-keys --keyid-format=long signing@not-vyos" % self.gpg_keyring_path)
        we_in_pub = False
        key_id = None
        for line in output.split("\n"):
            line = line.strip()
            if line.startswith("pub"):
                we_in_pub = True
            elif we_in_pub:
                key_id = line
                break

        if not key_id:
            raise Exception("Unable to parser gpg key ID from: %s" % output)

        if not re.search(r"^[a-z0-9]+$", key_id, flags=re.I):
            raise Exception("Get invalid gpg key ID '%s' from: %s" % (key_id, output))

        return key_id

    def fill_apt_repository(self, dsc_files, binary_files):
        repo_dir = self.get_repo_dir()

        prefix_len = len(self.directory)

        for dsc_file in dsc_files:
            with open(dsc_file, "r") as file:
                fields = self.parse_package_info(file.read(), dsc_file, ["Source"])

            package = fields["Source"]

            logging.info("Removing sources of %s from the APT repository" % package)

            execute("reprepro --gnupghome %s  -v -b %s removesrc %s %s" % quote_all(
                self.gpg_keyring_path, repo_dir, self.branch, package
            ))

        for binary_file in binary_files:
            output = execute("dpkg-deb -f %s" % quote_all(binary_file))
            fields = self.parse_package_info(output, binary_file, ["Package", "Architecture"])

            package = fields["Package"]
            architecture = fields["Architecture"]

            logging.info("Removing binaries of %s from the APT repository" % package)

            extra = self.construct_reprepro_bin_extra(architecture)
            execute("reprepro --gnupghome %s  -v -b %s%s remove %s %s" % (
                self.gpg_keyring_path, repo_dir, extra, self.branch, package
            ))

        execute("reprepro --gnupghome %s -v -b %s deleteunreferenced" % (
            self.gpg_keyring_path, repo_dir
        ))

        for dsc_file in dsc_files:
            logging.info("Pushing %s to the APT repository" % dsc_file[prefix_len:])

            execute("reprepro --gnupghome %s -v -b %s includedsc %s %s" % quote_all(
                self.gpg_keyring_path, repo_dir, self.branch, dsc_file
            ))

        for binary_file in binary_files:
            logging.info("Pushing %s to the APT repository" % binary_file[prefix_len:])

            output = execute("dpkg-deb -f %s" % quote_all(binary_file))
            fields = self.parse_package_info(output, binary_file, ["Architecture"])

            architecture = fields["Architecture"]

            extra = self.construct_reprepro_bin_extra(architecture)
            execute("reprepro --gnupghome %s -v -b %s%s includedeb %s %s" % (
                self.gpg_keyring_path, repo_dir, extra, self.branch, binary_file
            ))

    def construct_reprepro_bin_extra(self, architecture):
        additional_params = []
        if architecture != "all":
            additional_params.extend(["-A", quote(architecture)])

        extra = " ".join(additional_params)
        if extra:
            extra = " " + extra
        return extra

    def parse_package_info(self, contents, subject, required_keys: list):
        fields = {}
        for line in contents.split("\n"):
            line = line.strip()
            parts = line.split(":", maxsplit=1)
            if len(parts) == 2:
                key = parts[0].strip()
                value = parts[1].strip()
                fields[key] = value

        for key in required_keys:
            if key not in fields:
                raise Exception("%s: unable to parse %s field" % (subject, key))

        return fields

    def validate_package_info(self, dsc_file, fields, required_keys):
        for key in required_keys:
            if key not in fields:
                raise Exception("%s: unable to parse %s field" % (dsc_file, key))