From 824de5c08279d98edba5a741f075f5671ba8a33d Mon Sep 17 00:00:00 2001 From: Joost Agterhoek Date: Wed, 26 Mar 2025 19:55:28 +0100 Subject: [PATCH] moved to /src --- abuseipdb_api.py | 52 ---------------- host_lookup.py | 151 ---------------------------------------------- virustotal_api.py | 101 ------------------------------- 3 files changed, 304 deletions(-) delete mode 100644 abuseipdb_api.py delete mode 100644 host_lookup.py delete mode 100644 virustotal_api.py diff --git a/abuseipdb_api.py b/abuseipdb_api.py deleted file mode 100644 index df6260f..0000000 --- a/abuseipdb_api.py +++ /dev/null @@ -1,52 +0,0 @@ -from base64 import decode -import json -import os -import requests -import requests_cache -from dotenv import load_dotenv -from pprint import pprint - - -class API_error(Exception): - pass - - -def environment(): - requests_cache.install_cache(expire_after=360, allowable_methods=("POST")) - load_dotenv() - api_key = os.getenv("ABUSEIPDB_API") - return api_key - - -def lookup(api_key, host): - url = "https://api.abuseipdb.com/api/v2/check" - payload = {"ipAddress": "", "maxAgeInDays": "90"} - payload.update({"ipAddress": host}) - headers = {"Accept": "application/json", "Key": api_key} - response = requests.request( - method="GET", url=url, params=payload, headers=headers, verify=False - ) # TODO: remove SSL verify=False and add signed certificate if possible. - # Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html - response_dict = json.loads(response.text) - lookup = dict.fromkeys( - ["score", "last_reported", "IP_address", "CDN", "Tor", "total_reports"] - ) - print(response_dict) - lookup["score"] = response_dict["data"]["abuseConfidenceScore"] - lookup["last_reported"] = response_dict["data"]["lastReportedAt"] - lookup["IP_address"] = response_dict["data"]["ipAddress"] - lookup["usage"] = response_dict["data"]["usageType"] - lookup["Tor"] = response_dict["data"]["isTor"] - lookup["total_reports"] = response_dict["data"]["totalReports"] - - print(requests_cache.get_cache()) - print("Cached:") - print("\n".join(requests_cache.get_cache().urls())) - - return lookup - - -def analyse(host): - api_key = environment() - result = lookup(api_key, host) - return result diff --git a/host_lookup.py b/host_lookup.py deleted file mode 100644 index ff5f812..0000000 --- a/host_lookup.py +++ /dev/null @@ -1,151 +0,0 @@ -# TODO: make this module only have wrapper functions, no direct lookups or imports. Move all the direct lookup functions (emailsec) into (a) separate module(s). - -import re -from ipaddress import ip_address -from checkdmarc.dmarc import check_dmarc -from checkdmarc.spf import check_spf -import validators -from ipwhois import IPWhois -from whois import whois - -# from constants import DOMAIN, EMAIL, IPV4, IPV6, URL -import abuseipdb_api -import virustotal_api -import socket -from urllib.parse import urlparse -import host_data -import tldextract - - -class Lookedup(object): - def __init__(self, host): - self.host = host - self.host_type = determine(self.host) - self = self.specific() - # TODO: consolidate all below functions if possible - - def url_lookup(self): - self.domain = urlparse(self.host).netloc - self.ip_address = socket.gethostbyname(self.domain) - self.email_security = spf_dmarc(self.domain) - return self - - def ip_lookup(self): - self.metadata = domain(self.host) - self.domain = self.metadata["domain_name"] - self.email_security = spf_dmarc(self.domain) - self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type) - self.abuseipdb = abuseipdb_api.analyse(self.host) - return self - - def domain_lookup(self): - self.ip_address = socket.gethostbyname(self.host) - self.metadata = domain(self.host) - self.domain = self.metadata["domain_name"] - self.email_security = spf_dmarc(self.domain) - self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type) - self.abuseipdb = abuseipdb_api.analyse(self.ip_address) - return self - - def email_lookup(self): - self.domain = self.host.split("@")[1] - self.ip_address = socket.gethostbyname(self.domain) - self.email_security = spf_dmarc(self.domain) - self.vt, self.vt_dict = virustotal_api.analyse2(self.domain, self.host_type) - self.abuseipdb = abuseipdb_api.analyse(self.ip_address) - return self - - def specific(self): - if self.host_type == "url": - return self.url_lookup() - elif self.host_type == "domain": - return self.domain_lookup() - elif self.host_type == "ip": - return self.ip_lookup() - elif self.host_type == "email address": - return self.email_lookup() - - -def sanitize(user_input): - sanitized = [] - if user_input.strip() != "": - sanitized = re.split("; |, | |\n", user_input) - return sanitized - - -def determine(host): - host_type = "" - if validators.url(host): - host_type = "url" - elif validators.domain(host): - host_type = "domain" - elif validators.ip_address.ipv4(host): - host_type = "ip" - elif validators.ip_address.ipv6(host): - host_type = "ip" - elif validators.email(host): - host_type = "email address" - else: - host_type = "no host" - return host_type - - -def extract(user_input): - hosts = [] - errors = [] - for item in user_input: - if validators.url(item): - hosts.append(item) - elif validators.domain(item): - hosts.append(item) - elif validators.ip_address.ipv4(item): - hosts.append(item) - elif validators.ip_address.ipv6(item): - hosts.append(item) - elif validators.email(item): - hosts.append(item) - else: - errors.append(item) - return hosts, errors - - -def domain(host): - result = dict(whois(host)) - if type(result["creation_date"]) is list: - result["creation_date"] = result["creation_date"][0].strftime("%d-%m-%Y") - else: - result["creation_date"] = result["creation_date"].strftime("%d-%m-%Y") - if type(result["domain_name"]) is list: - result["domain_name"] = result["domain_name"][0] - return result - - -def spf_dmarc(domain): - spf = "" - dmarc = "" - result_spf = check_spf(domain) - if result_spf["valid"]: - spf = result_spf["record"] - result_dmarc = check_dmarc(domain) - if result_dmarc["valid"]: - dmarc = result_dmarc["record"] - return spf, dmarc - - -def process_input(user): - results = [] - sanitized = sanitize(user) - hosts, errors = extract(sanitized) - # for host in hosts: - # host_analyzed = Host(host).lookup() - # results.append(host_analyzed) - # return results - return hosts, errors - - -def process_file(file_content): - hosts = [] - for host in file_content: - result = Host(host) - hosts.append(result) - return hosts diff --git a/virustotal_api.py b/virustotal_api.py deleted file mode 100644 index 3d16446..0000000 --- a/virustotal_api.py +++ /dev/null @@ -1,101 +0,0 @@ -import json -import time -import os -import requests -from dotenv import load_dotenv -from constants import URL, DOMAIN, IPV4, IPV6, domain_lookup - - -def environment(): - load_dotenv() - api_key = os.getenv("VT_API") - return api_key - - -def analysis_object(api_key, host): - url = "https://www.virustotal.com/api/v3/urls" - payload = {"url": ""} - payload.update({"url": host}) - headers = { - "accept": "application/json", - "content-type": "application/x-www-form-urlencoded", - "x-apikey": api_key, - } - response = requests.post(url, data=payload, headers=headers) - response_dict = json.loads(response.text) - response_id = response_dict["data"]["id"] - return response_id - - -def analyse_domain(api_key, host): - url = "https://www.virustotal.com/api/v3/domains/" + host - headers = { - "accept": "application/json", - "content-type": "application/x-www-form-urlencoded", - "x-apikey": api_key, - } - analysis_json = requests.get(url, headers=headers) - response_dict = json.loads(analysis_json.text) - return response_dict, analysis_json - - -def analyse_URL(api_key, response_id): - url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id) - headers = {"accept": "application/json", "x-apikey": api_key} - analysis_json = requests.get(url, headers=headers) - analysis_dict = json.loads(analysis_json.text) - return analysis_dict, analysis_json - - -def analyse_IP(api_key, host): - analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host) - headers = {"accept": "application/json", "x-apikey": api_key} - analysis_json = requests.get(analysis_url, headers=headers) - response_dict = json.loads(analysis_json.text) - # Implement this: https://docs.virustotal.com/reference/ip-info - return response_dict, analysis_json - - -def analyse(host, host_type): - api_key = environment() - if host_type == URL: - response_id = analysis_object(api_key, host) - result, analysis_json = analyse_URL(api_key, response_id) - elif host_type == DOMAIN: - result, analysis_json = analyse_domain(api_key, host) - elif host_type == IPV4 or IPV6: - result, analysis_json = analyse_IP(api_key, host) - return result, analysis_json - - -def analyse2(host, host_type): - api_key = environment() - if host_type == "url": - response_id = analysis_object(api_key, host) - result, analysis_json = analyse_URL(api_key, response_id) - elif host_type == "domain" or "email_address": - result, analysis_json = analyse_domain(api_key, host) - elif host_type == "IPv4" or "IPv6": - result, analysis_json = analyse_IP(api_key, host) - - vt_stats = result["data"]["attributes"]["last_analysis_stats"] - vt_results = result["data"]["attributes"]["last_analysis_results"] - last_update = result["data"]["attributes"]["last_analysis_date"] - - summary = dict.fromkeys(["total", "score", "vendors", "last_update"]) - total = 0 - vendors = [] - for key, value in vt_stats.items(): - total += value - for key, value in vt_results.items(): - if value["category"] == "malicious": - vendors.append(key) - - summary["total"] = total - summary["score"] = vt_stats["malicious"] - summary["vendors"] = vendors - summary["last_update"] = time.strftime( - "%d-%m-%Y", - time.gmtime(last_update), - ) - return summary, analysis_json