summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoliiER <70503278+AnatoliiER@users.noreply.github.com>2020-09-30 17:54:38 +0300
committerGitHub <noreply@github.com>2020-09-30 17:54:38 +0300
commitcc67e747f7242c8f4b4147fd401828efb28874b3 (patch)
tree6a05cc48b2b6d31bfdcccdfd433d9f1cdfe6e8f6
parentb38d46fca491853c3453bbcc8f23eeb9d1b44818 (diff)
downloadcve-checker-cc67e747f7242c8f4b4147fd401828efb28874b3.tar.gz
cve-checker-cc67e747f7242c8f4b4147fd401828efb28874b3.zip
Update checker.py
Added false positive processing
-rw-r--r--checker.py128
1 files changed, 87 insertions, 41 deletions
diff --git a/checker.py b/checker.py
index 1b9cbd1..15d1ed5 100644
--- a/checker.py
+++ b/checker.py
@@ -31,7 +31,7 @@ class VyosDf:
self.DEBTRACK_LOG = self.config.get('LOGS', 'DEBTRACK_LOG')
self.CVE_for_update = set()
self.debtrack_cve = set()
-
+ self.result_cve = set()
def logger(self, logname, logrec, type):
file = open(logname, type, encoding='utf-8')
@@ -81,7 +81,7 @@ class Vuln(VyosDf):
def vuln_update_v2(self):
_cve = list(self.CVE_for_update)
- self.CVE_DATA = self.vulners_api.documentList(_cve[:1000], fields=['index',
+ self.CVE_DATA = self.vulners_api.documentList(_cve[:500], fields=['index',
'id',
'score',
'sort',
@@ -111,8 +111,8 @@ class Vuln(VyosDf):
try:
self.vulndb.insert_one({"id":key, "_source":value})
except errors.DuplicateKeyError:
- pass
-
+ pass
+
def save_cve_f(self):
handle = open(self.FILECVE, "w")
handle.write(json.dumps(self.all_cve))
@@ -131,17 +131,41 @@ class Vuln(VyosDf):
except errors.DuplicateKeyError:
pass
- def processing_packages(self):
+ def processing_packages(self, distributive):
for rec in self.packages.find({}):
- self.search_cve(rec['packname'], rec['packvers'], rec['fullpackname'])
+ self.search_cve(rec['packname'], rec['packvers'], rec['fullpackname'], distributive)
- def search_cve(self, pname, version, fullpackname):
- a = list(self.vulndb.find({"$and": [{"$text": {"$search": pname}},
+ def search_cve(self, pname, version, fullpackname, distributive):
+ cve_set = list(self.vulndb.find({"$and": [{"$text": {"$search": pname}},
{'_source.affectedSoftware': {"$elemMatch": {'version': version}}}]}))
- print("\n", pname, fullpackname, "--->", len(a))
- for cve in a:
- print(cve['id'])
- rec = " ".join((cve['id'], pname, fullpackname, "\n"))
+ if len(cve_set) > 0:
+ print(pname, fullpackname, "--->", len(cve_set))
+ for cve in cve_set:
+ cveid = cve['id']
+ pattern = f"{pname}.{cve['id']}.{'releases'}.{distributive}"
+
+ debtr_set = list(self.debtr.find({pattern:{"$exists":True, "$ne":None}}))
+ debtr_fl = True
+ for item in debtr_set:
+ try:
+ debtr_fix = item[pname][cveid]["releases"][distributive]["fixed_version"]
+ except:
+ debtr_fix = False
+ if debtr_fix and debtr_fix != '0':
+ if debtr_fix <= fullpackname:
+ debtr_fl = False
+
+ if debtr_fl:
+ rec = " ".join((pname, fullpackname, cve['id'], "\n"))
+ self.result_cve.add(rec)
+ else:
+ rec = " ".join((pname, fullpackname, cve['id'], "patched", "\n"))
+ self.result_cve.add(rec)
+
+ def log_proc(self):
+ _log = sorted(list(self.result_cve))
+ self.logger(self.CHECKER_LOG, "List of vulnerabilities (contains false positive)\n", "w")
+ for rec in _log:
self.logger(self.CHECKER_LOG, rec, "a")
@@ -166,11 +190,32 @@ class Packages(VyosDf):
packvers = (re.split("-", allwords[1]))[0]
if "+" in packvers:
packvers = (re.split("\+", allwords[1]))[0]
+ a = 2
return {"packname": packname,
"packvers": packvers,
"fullpackname": fullpackname}
+class Trackers(VyosDf):
+ """Receiving and processing information about updates of system packages and libraries """
+ def pull_debupdates(self):
+ source = requests.get(self.debtrack_link).json()
+ self.logger(self.DEBTRACK_LOG, json.dumps(source), "w")
+
+ def tst_debupdates(self, file):
+ with open(file, encoding='utf-8') as f:
+ _ttt = json.loads(f.read())
+ for package, value in _ttt.items():
+ try:
+ package_mod = package.replace(".", "")
+ self.debtr.insert_one({"id":package_mod, package_mod:value})
+ except errors.DuplicateKeyError:
+ pass
+ for cve, value1 in value.items():
+ self.debtrack_cve.add(cve)
+
+
+
def help():
print("""
@@ -181,11 +226,8 @@ def help():
4.) --update-db - get updates of vulnerabilities database (Upgrade based on free databases)
5.) --update-info - get information about the number of new CVEs, missing in the database.
Checking is carried out on the basis of MitreCVE db.
- 6.) --start [name of file with packages information] - This is the output of the command:
- apt list --installed
- it is something like this:
- acl/stable,stable,now 2.2.53-4 amd64 [installed,automatic]
- acpid/stable,stable,now 1:2.0.31-1 amd64 [installed]
+ 6.) --start [name of file with packages information, (This is the output of the command: apt list --installed)
+ Code name of Debian version on which VyOS is based (for example: Stretch, Buster)]
""")
def init_db(config):
@@ -195,13 +237,13 @@ def init_db(config):
def updatedb(start_dt, end_dt, config):
print("Start updating databases")
- updb = Vuln(config)
- updb.pull_vulners_cve(start_dt, end_dt)
- updb.save_cve_f()
- updb.open_cve_f()
- updb.handle_set()
- #debupd = Trackers(config)
- #debupd.pull_debupdates()
+ tst = Vuln(config)
+ tst.pull_vulners_cve(start_dt, end_dt)
+ tst.save_cve_f()
+ tst.open_cve_f()
+ tst.handle_set()
+ debupd = Trackers(config)
+ debupd.pull_debupdates()
print("Vulnerabilities database updated successfully")
def updatedb_v2(config):
@@ -213,8 +255,8 @@ def updatedb_v2(config):
print("Start updating databases")
v.vuln_update_v2()
v.handler_mitre_cve()
- print("Vulnerabilities database updated successfully")
-
+ print("Vulnerabilities database updated successfully")
+
def update_info(config):
v = Vuln(config)
print("Starting analyze...")
@@ -222,30 +264,34 @@ def update_info(config):
print("Сhecking the CVE...")
v.handler_mitre_cve()
-def start(filename, config):
+def start(filename, distributive, config):
print("Starting analyze...")
pac = Packages(config)
pac.drop_pack()
pac.get_packages(filename)
tst = Vuln(config)
- tst.processing_packages()
+ tst.processing_packages(distributive)
+ tst.log_proc()
if __name__ == "__main__":
cnf = configparser.ConfigParser()
- if sys.argv[1] == "--help":
- help()
- elif sys.argv[1] == "--init-db":
- init_db(cnf)
- elif sys.argv[1] == "--update-vulners-db":
- updatedb(sys.argv[2], sys.argv[3], cnf)
- elif sys.argv[1] == "--update-db":
- updatedb_v2(cnf)
- elif sys.argv[1] == "--update-info":
- update_info(cnf)
- elif sys.argv[1] == "--start":
- start(sys.argv[2], cnf)
- else:
+ try:
+ if sys.argv[1] == "--help":
+ help()
+ elif sys.argv[1] == "--init-db":
+ init_db(cnf)
+ elif sys.argv[1] == "--update-vulners-db":
+ updatedb(sys.argv[2], sys.argv[3], cnf)
+ elif sys.argv[1] == "--update-db":
+ updatedb_v2(cnf)
+ elif sys.argv[1] == "--update-info":
+ update_info(cnf)
+ elif sys.argv[1] == "--start":
+ start(sys.argv[2], sys.argv[3], cnf)
+ else:
+ help()
+ except:
help()