From fc5e53eee1b9011ef1d477326c6a21e1846857b1 Mon Sep 17 00:00:00 2001 From: Joost Agterhoek Date: Tue, 7 Jan 2025 09:50:09 +0100 Subject: [PATCH] refactor underway, will consolidate these components --- host_lookup/abuseipdb.py | 41 ---------------- host_lookup/metadata.py | 30 ------------ host_lookup/otx_api.py | 3 -- host_lookup/parse_URI.py | 9 ---- host_lookup/spf_dmarc.py | 9 ---- host_lookup/virustotal.py | 24 ---------- host_lookup/virustotal_api_test.py | 77 ------------------------------ 7 files changed, 193 deletions(-) delete mode 100644 host_lookup/abuseipdb.py delete mode 100644 host_lookup/metadata.py delete mode 100644 host_lookup/otx_api.py delete mode 100644 host_lookup/parse_URI.py delete mode 100644 host_lookup/spf_dmarc.py delete mode 100644 host_lookup/virustotal.py delete mode 100644 host_lookup/virustotal_api_test.py diff --git a/host_lookup/abuseipdb.py b/host_lookup/abuseipdb.py deleted file mode 100644 index cae93c5..0000000 --- a/host_lookup/abuseipdb.py +++ /dev/null @@ -1,41 +0,0 @@ -from base64 import decode -import json -import os -import requests -import requests_cache -from dotenv import load_dotenv -from pprint import pprint - - -class API_error(Exception): - pass - - -def environment(): - requests_cache.install_cache(expire_after=360, allowable_methods=("POST")) - load_dotenv() - api_key = os.getenv("ABUSEIPDB_API") - return api_key - - -def lookup(api_key, host): - url = "https://api.abuseipdb.com/api/v2/check" - payload = {"ipAddress": "", "maxAgeInDays": "90"} - payload.update({"ipAddress": host}) - headers = {"Accept": "application/json", "Key": api_key} - response = requests.request( - method="GET", url=url, params=payload, headers=headers, verify=False - ) # TODO: remove SSL verify=False and add signed certificate if possible. - # Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html - print(requests_cache.get_cache()) - print("Cached:") - print("\n".join(requests_cache.get_cache().urls())) - - return response - - -def analyse(host): - api_key = environment() - result = lookup(api_key, host) - decoded_result = json.loads(result.text) - return decoded_result diff --git a/host_lookup/metadata.py b/host_lookup/metadata.py deleted file mode 100644 index 20c59fc..0000000 --- a/host_lookup/metadata.py +++ /dev/null @@ -1,30 +0,0 @@ -from ipaddress import ip_address -from whois import whois -from ipwhois import IPWhois -import validators -from constants import URL, DOMAIN, IPV4, IPV6 - - -def check(host): - if validators.url(host): - host_type = URL - elif validators.domain(host): - host_type = DOMAIN - elif validators.ip_address.ipv4(host): - host_type = IPV4 - elif validators.ip_address.ipv6(host): - host_type = IPV6 - return host_type - - -# def lookup(host_type): -def lookup(host): - result = dict(whois(host)) - return result - - -# result = whois(host_type[1]) -# return result, host_type[0] -# obj = IPWhois(host_type[1]) -# res = obj.lookup_rdap() -# return res, host_type[0] diff --git a/host_lookup/otx_api.py b/host_lookup/otx_api.py deleted file mode 100644 index ee80000..0000000 --- a/host_lookup/otx_api.py +++ /dev/null @@ -1,3 +0,0 @@ -# Try to get historical telemetry like this page shows: https://otx.alienvault.com/indicator/ip/8.8.8.8 -# Apparently this API does not provide this information :( f.e. the below curl request does not provide information about historical OTX telemetry. -# curl https://otx.alienvault.com/api/v1/indicators/url/http://www.freputation.com/spreputation_san_ponso/slides/IMG_0068.html/general -H "X-OTX-API-KEY: ec672963e435bb7a09c494534b79a6a7a273a5bde5ea560874cccd72e2bc76fc" diff --git a/host_lookup/parse_URI.py b/host_lookup/parse_URI.py deleted file mode 100644 index a8626fe..0000000 --- a/host_lookup/parse_URI.py +++ /dev/null @@ -1,9 +0,0 @@ -# This module should extract any and all URIs (IPs or URLs) from copy and pasted text. - -def parse(text): - split_text = text.split() - for URI in split_text: - print(URI) - - - diff --git a/host_lookup/spf_dmarc.py b/host_lookup/spf_dmarc.py deleted file mode 100644 index 2267aa8..0000000 --- a/host_lookup/spf_dmarc.py +++ /dev/null @@ -1,9 +0,0 @@ -from checkdmarc.dmarc import check_dmarc -from checkdmarc.spf import check_spf -import validators - - -def lookup(host: str) -> tuple: - result_dmarc = check_dmarc(host) - result_spf = check_spf(host) - return (result_dmarc, result_spf) diff --git a/host_lookup/virustotal.py b/host_lookup/virustotal.py deleted file mode 100644 index 789b313..0000000 --- a/host_lookup/virustotal.py +++ /dev/null @@ -1,24 +0,0 @@ -import vt -import os -import requests -import virustotal_python -from dotenv import load_dotenv -from pprint import pprint -from base64 import urlsafe_b64encode - -# todo: implement my own API request module to then try and cache the response (see -> https://realpython.com/caching-external-api-requests/#requests-cache) - -def vt_lookup(URL): - load_dotenv() - api_key = os.getenv("VT_API") - with virustotal_python.Virustotal(api_key) as vtotal: - try: - resp = vtotal.request("urls", data={"url": URL}, method="POST") - print(resp) - # Safe encode URL in base64 format - # https://developers.virustotal.com/reference/url - url_id = urlsafe_b64encode(URL.encode()).decode().strip("=") - report = vtotal.request(f"urls/{url_id}") - return report.data - except virustotal_python.VirustotalError as err: - print(f"Failed to send URL: {URL} for analysis and get the report: {err}") diff --git a/host_lookup/virustotal_api_test.py b/host_lookup/virustotal_api_test.py deleted file mode 100644 index c5c0d38..0000000 --- a/host_lookup/virustotal_api_test.py +++ /dev/null @@ -1,77 +0,0 @@ -import json -import os -import requests -from dotenv import load_dotenv -from pprint import pprint -from constants import URL, DOMAIN, IPV4, IPV6 - -# Would be nice to define some constants, f.e. for the various API urls, the headers, etc. - - -def environment(): - load_dotenv() - api_key = os.getenv("VT_API") - return api_key - - -# Unfortunately this works for actual URLs, not domains. See: https://docs.virustotal.com/reference/domain-info -# This also doesn't work for IPv6 addresses, where the response_dict does not have a 'data' key. So I would have to revamp this module and make separate functions called based on host type (URL, IPv4 and -6, domain). - - -def analysis_object(api_key, host): - url = "https://www.virustotal.com/api/v3/urls" - payload = {"url": ""} - payload.update({"url": host}) - headers = { - "accept": "application/json", - "content-type": "application/x-www-form-urlencoded", - "x-apikey": api_key, - } - response = requests.post(url, data=payload, headers=headers) - response_dict = json.loads(response.text) - response_id = response_dict["data"]["id"] - return response_id - - -def analyse_domain(api_key, host): - url = "https://www.virustotal.com/api/v3/domains/" + host - headers = { - "accept": "application/json", - "content-type": "application/x-www-form-urlencoded", - "x-apikey": api_key, - } - analysis_response = requests.get(url, headers=headers) - response_dict = json.loads(analysis_response.text) - # Probably still need to turn the requests.get into a json like below - return response_dict - - -def analyse_URL(api_key, response_id): - analysis_url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id) - headers = {"accept": "application/json", "x-apikey": api_key} - analysis_response = requests.get(analysis_url, headers=headers) - analysis_dict = json.loads(analysis_response.text) - # return analysis_response.text - return analysis_dict - - -def analyse_IP(api_key, host): - analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host) - headers = {"accept": "application/json", "x-apikey": api_key} - analysis_response = requests.get(analysis_url, headers=headers) - analysis_dict = json.loads(analysis_response.text) - # Implement this: https://docs.virustotal.com/reference/ip-info - return analysis_dict - - -def analyse(host, host_type): - api_key = environment() - if host_type == URL: - response_id = analysis_object(api_key, host) - result = analyse_URL(api_key, response_id) - elif host_type == DOMAIN: - result = analyse_domain(api_key, host) - # elif for IPv4 and IPv6. - elif host_type == IPV4 or IPV6: - result = analyse_IP(api_key, host) - return result