From bd33372cf29265904942a506e54d7020de224cdc Mon Sep 17 00:00:00 2001 From: Joost Agterhoek Date: Thu, 30 Jan 2025 21:30:14 +0100 Subject: [PATCH] latest versions from v2 --- abuseipdb_api.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ host_data.py | 43 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 abuseipdb_api.py create mode 100644 host_data.py diff --git a/abuseipdb_api.py b/abuseipdb_api.py new file mode 100644 index 0000000..df6260f --- /dev/null +++ b/abuseipdb_api.py @@ -0,0 +1,52 @@ +from base64 import decode +import json +import os +import requests +import requests_cache +from dotenv import load_dotenv +from pprint import pprint + + +class API_error(Exception): + pass + + +def environment(): + requests_cache.install_cache(expire_after=360, allowable_methods=("POST")) + load_dotenv() + api_key = os.getenv("ABUSEIPDB_API") + return api_key + + +def lookup(api_key, host): + url = "https://api.abuseipdb.com/api/v2/check" + payload = {"ipAddress": "", "maxAgeInDays": "90"} + payload.update({"ipAddress": host}) + headers = {"Accept": "application/json", "Key": api_key} + response = requests.request( + method="GET", url=url, params=payload, headers=headers, verify=False + ) # TODO: remove SSL verify=False and add signed certificate if possible. + # Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html + response_dict = json.loads(response.text) + lookup = dict.fromkeys( + ["score", "last_reported", "IP_address", "CDN", "Tor", "total_reports"] + ) + print(response_dict) + lookup["score"] = response_dict["data"]["abuseConfidenceScore"] + lookup["last_reported"] = response_dict["data"]["lastReportedAt"] + lookup["IP_address"] = response_dict["data"]["ipAddress"] + lookup["usage"] = response_dict["data"]["usageType"] + lookup["Tor"] = response_dict["data"]["isTor"] + lookup["total_reports"] = response_dict["data"]["totalReports"] + + print(requests_cache.get_cache()) + print("Cached:") + print("\n".join(requests_cache.get_cache().urls())) + + return lookup + + +def analyse(host): + api_key = environment() + result = lookup(api_key, host) + return result diff --git a/host_data.py b/host_data.py new file mode 100644 index 0000000..a67fa27 --- /dev/null +++ b/host_data.py @@ -0,0 +1,43 @@ +from ipaddress import ip_address +from checkdmarc.dmarc import check_dmarc +from checkdmarc.spf import check_spf +import validators +from ipwhois import IPWhois +from whois import whois +from constants import DOMAIN, EMAIL, IPV4, IPV6, URL + + +def determine(host): + host_type = "" + if validators.url(host): + host_type = URL + elif validators.domain(host): + host_type = DOMAIN + elif validators.ip_address.ipv4(host): + host_type = IPV4 + elif validators.ip_address.ipv6(host): + host_type = IPV6 + elif validators.email(host): + host_type = EMAIL + else: + print("NO HOST TYPE") + return host_type + + +def domain(host): + result = dict(whois(host)) + if type(result["domain_name"]) is list: + result["domain_name"] = result["domain_name"][0] + return result + + +def emailsec(host): + spf = "" + dmarc = "" + result_spf = check_spf(host) + if result_spf["valid"]: + spf = result_spf["record"] + result_dmarc = check_dmarc(host) + if result_dmarc["valid"]: + dmarc = result_dmarc["record"] + return spf, dmarc