From c37883bc8bbdec644663d387c867111bfa69e302 Mon Sep 17 00:00:00 2001 From: Joost Agterhoek Date: Wed, 26 Mar 2025 19:54:31 +0100 Subject: [PATCH] reorganizing to match better with flaskr tutorial --- src/abuseipdb_api.py | 52 ++++++++++++++ src/host_lookup.py | 154 ++++++++++++++++++++++++++++++++++++++++++ src/virustotal_api.py | 111 ++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+) create mode 100644 src/abuseipdb_api.py create mode 100644 src/host_lookup.py create mode 100644 src/virustotal_api.py diff --git a/src/abuseipdb_api.py b/src/abuseipdb_api.py new file mode 100644 index 0000000..df6260f --- /dev/null +++ b/src/abuseipdb_api.py @@ -0,0 +1,52 @@ +from base64 import decode +import json +import os +import requests +import requests_cache +from dotenv import load_dotenv +from pprint import pprint + + +class API_error(Exception): + pass + + +def environment(): + requests_cache.install_cache(expire_after=360, allowable_methods=("POST")) + load_dotenv() + api_key = os.getenv("ABUSEIPDB_API") + return api_key + + +def lookup(api_key, host): + url = "https://api.abuseipdb.com/api/v2/check" + payload = {"ipAddress": "", "maxAgeInDays": "90"} + payload.update({"ipAddress": host}) + headers = {"Accept": "application/json", "Key": api_key} + response = requests.request( + method="GET", url=url, params=payload, headers=headers, verify=False + ) # TODO: remove SSL verify=False and add signed certificate if possible. + # Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html + response_dict = json.loads(response.text) + lookup = dict.fromkeys( + ["score", "last_reported", "IP_address", "CDN", "Tor", "total_reports"] + ) + print(response_dict) + lookup["score"] = response_dict["data"]["abuseConfidenceScore"] + lookup["last_reported"] = response_dict["data"]["lastReportedAt"] + lookup["IP_address"] = response_dict["data"]["ipAddress"] + lookup["usage"] = response_dict["data"]["usageType"] + lookup["Tor"] = response_dict["data"]["isTor"] + lookup["total_reports"] = response_dict["data"]["totalReports"] + + print(requests_cache.get_cache()) + print("Cached:") + print("\n".join(requests_cache.get_cache().urls())) + + return lookup + + +def analyse(host): + api_key = environment() + result = lookup(api_key, host) + return result diff --git a/src/host_lookup.py b/src/host_lookup.py new file mode 100644 index 0000000..3741df6 --- /dev/null +++ b/src/host_lookup.py @@ -0,0 +1,154 @@ +# TODO: make this module only have wrapper functions, no direct lookups or imports. Move all the direct lookup functions (emailsec) into (a) separate module(s). + +import re +from ipaddress import ip_address +from checkdmarc.dmarc import check_dmarc +from checkdmarc.spf import check_spf +import validators +from ipwhois import IPWhois +from whois import whois + +# from constants import DOMAIN, EMAIL, IPV4, IPV6, URL +from src import abuseipdb_api, virustotal_api +import socket +from urllib.parse import urlparse +import host_data +import tldextract + + +class Lookedup(object): + def __init__(self, host): + self.host = host + self.host_type = determine(self.host) + self = self.specific() + # TODO: consolidate all below functions if possible + + def url_lookup(self): + self.domain = urlparse(self.host).netloc + self.ip_address = socket.gethostbyname(self.domain) + self.metadata = domain(self.domain) + self.email_security = spf_dmarc(self.domain) + self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type) + self.abuseipdb = abuseipdb_api.analyse(self.ip_address) + return self + + def ip_lookup(self): + self.metadata = domain(self.host) + self.domain = self.metadata["domain_name"] + self.email_security = spf_dmarc(self.domain) + self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type) + self.abuseipdb = abuseipdb_api.analyse(self.host) + return self + + def domain_lookup(self): + self.ip_address = socket.gethostbyname(self.host) + self.metadata = domain(self.host) + self.domain = self.metadata["domain_name"] + self.email_security = spf_dmarc(self.domain) + self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type) + self.abuseipdb = abuseipdb_api.analyse(self.ip_address) + return self + + def email_lookup(self): + self.domain = self.host.split("@")[1] + self.metadata = domain(self.domain) + self.ip_address = socket.gethostbyname(self.domain) + self.email_security = spf_dmarc(self.domain) + self.vt, self.vt_dict = virustotal_api.analyse2(self.domain, self.host_type) + self.abuseipdb = abuseipdb_api.analyse(self.ip_address) + return self + + def specific(self): + if self.host_type == "url": + return self.url_lookup() + elif self.host_type == "domain": + return self.domain_lookup() + elif self.host_type == "ip": + return self.ip_lookup() + elif self.host_type == "email address": + return self.email_lookup() + + +def sanitize(user_input): + sanitized = [] + if user_input.strip() != "": + sanitized = re.split("; |, | |\n", user_input) + return sanitized + + +def determine(host): + host_type = "" + if validators.url(host): + host_type = "url" + elif validators.domain(host): + host_type = "domain" + elif validators.ip_address.ipv4(host): + host_type = "ip" + elif validators.ip_address.ipv6(host): + host_type = "ip" + elif validators.email(host): + host_type = "email address" + else: + host_type = "no host" + return host_type + + +def extract(user_input): + hosts = [] + errors = [] + for item in user_input: + if validators.url(item): + hosts.append(item) + elif validators.domain(item): + hosts.append(item) + elif validators.ip_address.ipv4(item): + hosts.append(item) + elif validators.ip_address.ipv6(item): + hosts.append(item) + elif validators.email(item): + hosts.append(item) + else: + errors.append(item) + return hosts, errors + + +def domain(host): + result = dict(whois(host)) + if type(result["creation_date"]) is list: + result["creation_date"] = result["creation_date"][0].strftime("%d-%m-%Y") + else: + result["creation_date"] = result["creation_date"].strftime("%d-%m-%Y") + if type(result["domain_name"]) is list: + result["domain_name"] = result["domain_name"][0] + return result + + +def spf_dmarc(domain): + spf = "" + dmarc = "" + result_spf = check_spf(domain) + if result_spf["valid"]: + spf = result_spf["record"] + result_dmarc = check_dmarc(domain) + if result_dmarc["valid"]: + dmarc = result_dmarc["record"] + return spf, dmarc + + +def process_input(user): + results = [] + sanitized = sanitize(user) + hosts, errors = extract(sanitized) + # for host in hosts: + # host_analyzed = Host(host).lookup() + # results.append(host_analyzed) + # return results + return hosts, errors + + +def process_file(file_content): + hosts = [] + for host in file_content: + result = Host(host) + hosts.append(result) + return hosts diff --git a/src/virustotal_api.py b/src/virustotal_api.py new file mode 100644 index 0000000..6cd75a8 --- /dev/null +++ b/src/virustotal_api.py @@ -0,0 +1,111 @@ +import json +import time +import os +import requests +from dotenv import load_dotenv +from constants import URL, DOMAIN, IPV4, IPV6, domain_lookup + + +def environment(): + load_dotenv() + api_key = os.getenv("VT_API") + return api_key + + +def analysis_object(api_key, host): + url = "https://www.virustotal.com/api/v3/urls" + payload = {"url": ""} + payload.update({"url": host}) + headers = { + "accept": "application/json", + "content-type": "application/x-www-form-urlencoded", + "x-apikey": api_key, + } + response = requests.post(url, data=payload, headers=headers) + response_dict = json.loads(response.text) + response_id = response_dict["data"]["id"] + return response_id + + +def analyse_domain(api_key, host): + url = "https://www.virustotal.com/api/v3/domains/" + host + headers = { + "accept": "application/json", + "content-type": "application/x-www-form-urlencoded", + "x-apikey": api_key, + } + analysis_json = requests.get(url, headers=headers) + response_dict = json.loads(analysis_json.text) + return response_dict, analysis_json + + +def analyse_URL(api_key, response_id): + url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id) + headers = {"accept": "application/json", "x-apikey": api_key} + analysis_json = requests.get(url, headers=headers) + analysis_dict = json.loads(analysis_json.text) + return analysis_dict, analysis_json + + +# This returns a differently shaped JSON and therefore dict: +# analysis_dict keys 'data', 'meta' +# analysis_dict['data'] keys 'id', 'type', 'links', 'attributes' +# analysis_dict['data']['attributes'] keys 'stats', (numbers) 'results', (all the AV engine results) 'date', (Linux epoch timestamp) 'status' + + +def analyse_IP(api_key, host): + analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host) + headers = {"accept": "application/json", "x-apikey": api_key} + analysis_json = requests.get(analysis_url, headers=headers) + response_dict = json.loads(analysis_json.text) + # Implement this: https://docs.virustotal.com/reference/ip-info + return response_dict, analysis_json + + +def analyse(host, host_type): + api_key = environment() + if host_type == URL: + response_id = analysis_object(api_key, host) + result, analysis_json = analyse_URL(api_key, response_id) + elif host_type == DOMAIN: + result, analysis_json = analyse_domain(api_key, host) + elif host_type == IPV4 or IPV6: + result, analysis_json = analyse_IP(api_key, host) + return result, analysis_json + + +def analyse2(host, host_type): + api_key = environment() + if host_type == "url": + response_id = analysis_object(api_key, host) + result, analysis_json = analyse_URL(api_key, response_id) + elif host_type == "domain" or host_type == "email address": + result, analysis_json = analyse_domain(api_key, host) + elif host_type == "ip": + result, analysis_json = analyse_IP(api_key, host) + if host_type == "url": + vt_stats = result["data"]["attributes"]["stats"] + vt_results = result["data"]["attributes"]["results"] + last_update = result["data"]["attributes"]["date"] + elif host_type == "domain" or host_type == "email address" or host_type == "ip": + vt_stats = result["data"]["attributes"]["last_analysis_stats"] + vt_results = result["data"]["attributes"]["last_analysis_results"] + last_update = result["data"]["attributes"]["last_analysis_date"] + + summary = dict.fromkeys(["total", "score", "vendors", "last_update"]) + total = 0 + vendors = [] + for key, value in vt_stats.items(): + total += value + for key, value in vt_results.items(): + if value["category"] == "malicious": + vendors.append(key) + + summary["total"] = total + summary["score"] = vt_stats["malicious"] + summary["vendors"] = vendors + summary["last_update"] = time.strftime( + "%d-%m-%Y", + time.gmtime(last_update), + ) + return summary, analysis_json