latest versions from v2
This commit is contained in:
parent
f6b4b0b278
commit
bd33372cf2
52
abuseipdb_api.py
Normal file
52
abuseipdb_api.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
from base64 import decode
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import requests_cache
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
|
||||||
|
class API_error(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def environment():
|
||||||
|
requests_cache.install_cache(expire_after=360, allowable_methods=("POST"))
|
||||||
|
load_dotenv()
|
||||||
|
api_key = os.getenv("ABUSEIPDB_API")
|
||||||
|
return api_key
|
||||||
|
|
||||||
|
|
||||||
|
def lookup(api_key, host):
|
||||||
|
url = "https://api.abuseipdb.com/api/v2/check"
|
||||||
|
payload = {"ipAddress": "", "maxAgeInDays": "90"}
|
||||||
|
payload.update({"ipAddress": host})
|
||||||
|
headers = {"Accept": "application/json", "Key": api_key}
|
||||||
|
response = requests.request(
|
||||||
|
method="GET", url=url, params=payload, headers=headers, verify=False
|
||||||
|
) # TODO: remove SSL verify=False and add signed certificate if possible.
|
||||||
|
# Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html
|
||||||
|
response_dict = json.loads(response.text)
|
||||||
|
lookup = dict.fromkeys(
|
||||||
|
["score", "last_reported", "IP_address", "CDN", "Tor", "total_reports"]
|
||||||
|
)
|
||||||
|
print(response_dict)
|
||||||
|
lookup["score"] = response_dict["data"]["abuseConfidenceScore"]
|
||||||
|
lookup["last_reported"] = response_dict["data"]["lastReportedAt"]
|
||||||
|
lookup["IP_address"] = response_dict["data"]["ipAddress"]
|
||||||
|
lookup["usage"] = response_dict["data"]["usageType"]
|
||||||
|
lookup["Tor"] = response_dict["data"]["isTor"]
|
||||||
|
lookup["total_reports"] = response_dict["data"]["totalReports"]
|
||||||
|
|
||||||
|
print(requests_cache.get_cache())
|
||||||
|
print("Cached:")
|
||||||
|
print("\n".join(requests_cache.get_cache().urls()))
|
||||||
|
|
||||||
|
return lookup
|
||||||
|
|
||||||
|
|
||||||
|
def analyse(host):
|
||||||
|
api_key = environment()
|
||||||
|
result = lookup(api_key, host)
|
||||||
|
return result
|
43
host_data.py
Normal file
43
host_data.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
from ipaddress import ip_address
|
||||||
|
from checkdmarc.dmarc import check_dmarc
|
||||||
|
from checkdmarc.spf import check_spf
|
||||||
|
import validators
|
||||||
|
from ipwhois import IPWhois
|
||||||
|
from whois import whois
|
||||||
|
from constants import DOMAIN, EMAIL, IPV4, IPV6, URL
|
||||||
|
|
||||||
|
|
||||||
|
def determine(host):
|
||||||
|
host_type = ""
|
||||||
|
if validators.url(host):
|
||||||
|
host_type = URL
|
||||||
|
elif validators.domain(host):
|
||||||
|
host_type = DOMAIN
|
||||||
|
elif validators.ip_address.ipv4(host):
|
||||||
|
host_type = IPV4
|
||||||
|
elif validators.ip_address.ipv6(host):
|
||||||
|
host_type = IPV6
|
||||||
|
elif validators.email(host):
|
||||||
|
host_type = EMAIL
|
||||||
|
else:
|
||||||
|
print("NO HOST TYPE")
|
||||||
|
return host_type
|
||||||
|
|
||||||
|
|
||||||
|
def domain(host):
|
||||||
|
result = dict(whois(host))
|
||||||
|
if type(result["domain_name"]) is list:
|
||||||
|
result["domain_name"] = result["domain_name"][0]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def emailsec(host):
|
||||||
|
spf = ""
|
||||||
|
dmarc = ""
|
||||||
|
result_spf = check_spf(host)
|
||||||
|
if result_spf["valid"]:
|
||||||
|
spf = result_spf["record"]
|
||||||
|
result_dmarc = check_dmarc(host)
|
||||||
|
if result_dmarc["valid"]:
|
||||||
|
dmarc = result_dmarc["record"]
|
||||||
|
return spf, dmarc
|
Loading…
x
Reference in New Issue
Block a user