reorganizing to match better with flaskr tutorial
This commit is contained in:
parent
e6be0a5c19
commit
c37883bc8b
52
src/abuseipdb_api.py
Normal file
52
src/abuseipdb_api.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
from base64 import decode
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import requests_cache
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
|
||||||
|
class API_error(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def environment():
|
||||||
|
requests_cache.install_cache(expire_after=360, allowable_methods=("POST"))
|
||||||
|
load_dotenv()
|
||||||
|
api_key = os.getenv("ABUSEIPDB_API")
|
||||||
|
return api_key
|
||||||
|
|
||||||
|
|
||||||
|
def lookup(api_key, host):
|
||||||
|
url = "https://api.abuseipdb.com/api/v2/check"
|
||||||
|
payload = {"ipAddress": "", "maxAgeInDays": "90"}
|
||||||
|
payload.update({"ipAddress": host})
|
||||||
|
headers = {"Accept": "application/json", "Key": api_key}
|
||||||
|
response = requests.request(
|
||||||
|
method="GET", url=url, params=payload, headers=headers, verify=False
|
||||||
|
) # TODO: remove SSL verify=False and add signed certificate if possible.
|
||||||
|
# Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html
|
||||||
|
response_dict = json.loads(response.text)
|
||||||
|
lookup = dict.fromkeys(
|
||||||
|
["score", "last_reported", "IP_address", "CDN", "Tor", "total_reports"]
|
||||||
|
)
|
||||||
|
print(response_dict)
|
||||||
|
lookup["score"] = response_dict["data"]["abuseConfidenceScore"]
|
||||||
|
lookup["last_reported"] = response_dict["data"]["lastReportedAt"]
|
||||||
|
lookup["IP_address"] = response_dict["data"]["ipAddress"]
|
||||||
|
lookup["usage"] = response_dict["data"]["usageType"]
|
||||||
|
lookup["Tor"] = response_dict["data"]["isTor"]
|
||||||
|
lookup["total_reports"] = response_dict["data"]["totalReports"]
|
||||||
|
|
||||||
|
print(requests_cache.get_cache())
|
||||||
|
print("Cached:")
|
||||||
|
print("\n".join(requests_cache.get_cache().urls()))
|
||||||
|
|
||||||
|
return lookup
|
||||||
|
|
||||||
|
|
||||||
|
def analyse(host):
|
||||||
|
api_key = environment()
|
||||||
|
result = lookup(api_key, host)
|
||||||
|
return result
|
154
src/host_lookup.py
Normal file
154
src/host_lookup.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
# TODO: make this module only have wrapper functions, no direct lookups or imports. Move all the direct lookup functions (emailsec) into (a) separate module(s).
|
||||||
|
|
||||||
|
import re
|
||||||
|
from ipaddress import ip_address
|
||||||
|
from checkdmarc.dmarc import check_dmarc
|
||||||
|
from checkdmarc.spf import check_spf
|
||||||
|
import validators
|
||||||
|
from ipwhois import IPWhois
|
||||||
|
from whois import whois
|
||||||
|
|
||||||
|
# from constants import DOMAIN, EMAIL, IPV4, IPV6, URL
|
||||||
|
from src import abuseipdb_api, virustotal_api
|
||||||
|
import socket
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
import host_data
|
||||||
|
import tldextract
|
||||||
|
|
||||||
|
|
||||||
|
class Lookedup(object):
|
||||||
|
def __init__(self, host):
|
||||||
|
self.host = host
|
||||||
|
self.host_type = determine(self.host)
|
||||||
|
self = self.specific()
|
||||||
|
# TODO: consolidate all below functions if possible
|
||||||
|
|
||||||
|
def url_lookup(self):
|
||||||
|
self.domain = urlparse(self.host).netloc
|
||||||
|
self.ip_address = socket.gethostbyname(self.domain)
|
||||||
|
self.metadata = domain(self.domain)
|
||||||
|
self.email_security = spf_dmarc(self.domain)
|
||||||
|
self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type)
|
||||||
|
self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def ip_lookup(self):
|
||||||
|
self.metadata = domain(self.host)
|
||||||
|
self.domain = self.metadata["domain_name"]
|
||||||
|
self.email_security = spf_dmarc(self.domain)
|
||||||
|
self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type)
|
||||||
|
self.abuseipdb = abuseipdb_api.analyse(self.host)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def domain_lookup(self):
|
||||||
|
self.ip_address = socket.gethostbyname(self.host)
|
||||||
|
self.metadata = domain(self.host)
|
||||||
|
self.domain = self.metadata["domain_name"]
|
||||||
|
self.email_security = spf_dmarc(self.domain)
|
||||||
|
self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type)
|
||||||
|
self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def email_lookup(self):
|
||||||
|
self.domain = self.host.split("@")[1]
|
||||||
|
self.metadata = domain(self.domain)
|
||||||
|
self.ip_address = socket.gethostbyname(self.domain)
|
||||||
|
self.email_security = spf_dmarc(self.domain)
|
||||||
|
self.vt, self.vt_dict = virustotal_api.analyse2(self.domain, self.host_type)
|
||||||
|
self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def specific(self):
|
||||||
|
if self.host_type == "url":
|
||||||
|
return self.url_lookup()
|
||||||
|
elif self.host_type == "domain":
|
||||||
|
return self.domain_lookup()
|
||||||
|
elif self.host_type == "ip":
|
||||||
|
return self.ip_lookup()
|
||||||
|
elif self.host_type == "email address":
|
||||||
|
return self.email_lookup()
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize(user_input):
|
||||||
|
sanitized = []
|
||||||
|
if user_input.strip() != "":
|
||||||
|
sanitized = re.split("; |, | |\n", user_input)
|
||||||
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
|
def determine(host):
|
||||||
|
host_type = ""
|
||||||
|
if validators.url(host):
|
||||||
|
host_type = "url"
|
||||||
|
elif validators.domain(host):
|
||||||
|
host_type = "domain"
|
||||||
|
elif validators.ip_address.ipv4(host):
|
||||||
|
host_type = "ip"
|
||||||
|
elif validators.ip_address.ipv6(host):
|
||||||
|
host_type = "ip"
|
||||||
|
elif validators.email(host):
|
||||||
|
host_type = "email address"
|
||||||
|
else:
|
||||||
|
host_type = "no host"
|
||||||
|
return host_type
|
||||||
|
|
||||||
|
|
||||||
|
def extract(user_input):
|
||||||
|
hosts = []
|
||||||
|
errors = []
|
||||||
|
for item in user_input:
|
||||||
|
if validators.url(item):
|
||||||
|
hosts.append(item)
|
||||||
|
elif validators.domain(item):
|
||||||
|
hosts.append(item)
|
||||||
|
elif validators.ip_address.ipv4(item):
|
||||||
|
hosts.append(item)
|
||||||
|
elif validators.ip_address.ipv6(item):
|
||||||
|
hosts.append(item)
|
||||||
|
elif validators.email(item):
|
||||||
|
hosts.append(item)
|
||||||
|
else:
|
||||||
|
errors.append(item)
|
||||||
|
return hosts, errors
|
||||||
|
|
||||||
|
|
||||||
|
def domain(host):
|
||||||
|
result = dict(whois(host))
|
||||||
|
if type(result["creation_date"]) is list:
|
||||||
|
result["creation_date"] = result["creation_date"][0].strftime("%d-%m-%Y")
|
||||||
|
else:
|
||||||
|
result["creation_date"] = result["creation_date"].strftime("%d-%m-%Y")
|
||||||
|
if type(result["domain_name"]) is list:
|
||||||
|
result["domain_name"] = result["domain_name"][0]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def spf_dmarc(domain):
|
||||||
|
spf = ""
|
||||||
|
dmarc = ""
|
||||||
|
result_spf = check_spf(domain)
|
||||||
|
if result_spf["valid"]:
|
||||||
|
spf = result_spf["record"]
|
||||||
|
result_dmarc = check_dmarc(domain)
|
||||||
|
if result_dmarc["valid"]:
|
||||||
|
dmarc = result_dmarc["record"]
|
||||||
|
return spf, dmarc
|
||||||
|
|
||||||
|
|
||||||
|
def process_input(user):
|
||||||
|
results = []
|
||||||
|
sanitized = sanitize(user)
|
||||||
|
hosts, errors = extract(sanitized)
|
||||||
|
# for host in hosts:
|
||||||
|
# host_analyzed = Host(host).lookup()
|
||||||
|
# results.append(host_analyzed)
|
||||||
|
# return results
|
||||||
|
return hosts, errors
|
||||||
|
|
||||||
|
|
||||||
|
def process_file(file_content):
|
||||||
|
hosts = []
|
||||||
|
for host in file_content:
|
||||||
|
result = Host(host)
|
||||||
|
hosts.append(result)
|
||||||
|
return hosts
|
111
src/virustotal_api.py
Normal file
111
src/virustotal_api.py
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from constants import URL, DOMAIN, IPV4, IPV6, domain_lookup
|
||||||
|
|
||||||
|
|
||||||
|
def environment():
|
||||||
|
load_dotenv()
|
||||||
|
api_key = os.getenv("VT_API")
|
||||||
|
return api_key
|
||||||
|
|
||||||
|
|
||||||
|
def analysis_object(api_key, host):
|
||||||
|
url = "https://www.virustotal.com/api/v3/urls"
|
||||||
|
payload = {"url": ""}
|
||||||
|
payload.update({"url": host})
|
||||||
|
headers = {
|
||||||
|
"accept": "application/json",
|
||||||
|
"content-type": "application/x-www-form-urlencoded",
|
||||||
|
"x-apikey": api_key,
|
||||||
|
}
|
||||||
|
response = requests.post(url, data=payload, headers=headers)
|
||||||
|
response_dict = json.loads(response.text)
|
||||||
|
response_id = response_dict["data"]["id"]
|
||||||
|
return response_id
|
||||||
|
|
||||||
|
|
||||||
|
def analyse_domain(api_key, host):
|
||||||
|
url = "https://www.virustotal.com/api/v3/domains/" + host
|
||||||
|
headers = {
|
||||||
|
"accept": "application/json",
|
||||||
|
"content-type": "application/x-www-form-urlencoded",
|
||||||
|
"x-apikey": api_key,
|
||||||
|
}
|
||||||
|
analysis_json = requests.get(url, headers=headers)
|
||||||
|
response_dict = json.loads(analysis_json.text)
|
||||||
|
return response_dict, analysis_json
|
||||||
|
|
||||||
|
|
||||||
|
def analyse_URL(api_key, response_id):
|
||||||
|
url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id)
|
||||||
|
headers = {"accept": "application/json", "x-apikey": api_key}
|
||||||
|
analysis_json = requests.get(url, headers=headers)
|
||||||
|
analysis_dict = json.loads(analysis_json.text)
|
||||||
|
return analysis_dict, analysis_json
|
||||||
|
|
||||||
|
|
||||||
|
# This returns a differently shaped JSON and therefore dict:
|
||||||
|
# analysis_dict keys 'data', 'meta'
|
||||||
|
# analysis_dict['data'] keys 'id', 'type', 'links', 'attributes'
|
||||||
|
# analysis_dict['data']['attributes'] keys 'stats', (numbers) 'results', (all the AV engine results) 'date', (Linux epoch timestamp) 'status'
|
||||||
|
|
||||||
|
|
||||||
|
def analyse_IP(api_key, host):
|
||||||
|
analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host)
|
||||||
|
headers = {"accept": "application/json", "x-apikey": api_key}
|
||||||
|
analysis_json = requests.get(analysis_url, headers=headers)
|
||||||
|
response_dict = json.loads(analysis_json.text)
|
||||||
|
# Implement this: https://docs.virustotal.com/reference/ip-info
|
||||||
|
return response_dict, analysis_json
|
||||||
|
|
||||||
|
|
||||||
|
def analyse(host, host_type):
|
||||||
|
api_key = environment()
|
||||||
|
if host_type == URL:
|
||||||
|
response_id = analysis_object(api_key, host)
|
||||||
|
result, analysis_json = analyse_URL(api_key, response_id)
|
||||||
|
elif host_type == DOMAIN:
|
||||||
|
result, analysis_json = analyse_domain(api_key, host)
|
||||||
|
elif host_type == IPV4 or IPV6:
|
||||||
|
result, analysis_json = analyse_IP(api_key, host)
|
||||||
|
return result, analysis_json
|
||||||
|
|
||||||
|
|
||||||
|
def analyse2(host, host_type):
|
||||||
|
api_key = environment()
|
||||||
|
if host_type == "url":
|
||||||
|
response_id = analysis_object(api_key, host)
|
||||||
|
result, analysis_json = analyse_URL(api_key, response_id)
|
||||||
|
elif host_type == "domain" or host_type == "email address":
|
||||||
|
result, analysis_json = analyse_domain(api_key, host)
|
||||||
|
elif host_type == "ip":
|
||||||
|
result, analysis_json = analyse_IP(api_key, host)
|
||||||
|
if host_type == "url":
|
||||||
|
vt_stats = result["data"]["attributes"]["stats"]
|
||||||
|
vt_results = result["data"]["attributes"]["results"]
|
||||||
|
last_update = result["data"]["attributes"]["date"]
|
||||||
|
elif host_type == "domain" or host_type == "email address" or host_type == "ip":
|
||||||
|
vt_stats = result["data"]["attributes"]["last_analysis_stats"]
|
||||||
|
vt_results = result["data"]["attributes"]["last_analysis_results"]
|
||||||
|
last_update = result["data"]["attributes"]["last_analysis_date"]
|
||||||
|
|
||||||
|
summary = dict.fromkeys(["total", "score", "vendors", "last_update"])
|
||||||
|
total = 0
|
||||||
|
vendors = []
|
||||||
|
for key, value in vt_stats.items():
|
||||||
|
total += value
|
||||||
|
for key, value in vt_results.items():
|
||||||
|
if value["category"] == "malicious":
|
||||||
|
vendors.append(key)
|
||||||
|
|
||||||
|
summary["total"] = total
|
||||||
|
summary["score"] = vt_stats["malicious"]
|
||||||
|
summary["vendors"] = vendors
|
||||||
|
summary["last_update"] = time.strftime(
|
||||||
|
"%d-%m-%Y",
|
||||||
|
time.gmtime(last_update),
|
||||||
|
)
|
||||||
|
return summary, analysis_json
|
Loading…
x
Reference in New Issue
Block a user