started consolidating with temporary functions (analyse2), still need to work email address lookup out

This commit is contained in:
Joost Agterhoek 2025-02-21 21:39:40 +01:00
parent c33c7a57f7
commit 9711d760dd
2 changed files with 54 additions and 31 deletions

View File

@ -22,6 +22,7 @@ class Lookedup(object):
self.host = host self.host = host
self.host_type = determine(self.host) self.host_type = determine(self.host)
self = self.specific() self = self.specific()
# TODO: consolidate all below functions if possible
def url_lookup(self): def url_lookup(self):
self.domain = urlparse(self.host).netloc self.domain = urlparse(self.host).netloc
@ -30,28 +31,39 @@ class Lookedup(object):
return self return self
def ip_lookup(self): def ip_lookup(self):
pass self.metadata = domain(self.host)
self.domain = self.metadata["domain_name"]
self.email_security = spf_dmarc(self.domain)
self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type)
self.abuseipdb = abuseipdb_api.analyse(self.host)
return self
def domain_lookup(self): def domain_lookup(self):
self.ip_address = socket.gethostbyname(self.host) self.ip_address = socket.gethostbyname(self.host)
self.metadata = domain(self.host) self.metadata = domain(self.host)
self.domain = self.metadata["domain_name"] self.domain = self.metadata["domain_name"]
self.email_security = spf_dmarc(self.domain) self.email_security = spf_dmarc(self.domain)
self.vt, self.vt_dict = virustotal_api.analyse(self.host, self.host_type) self.vt, self.vt_dict = virustotal_api.analyse2(self.host, self.host_type)
self.abuseipdb = abuseipdb_api.analyse(self.ip_address) self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
return self return self
def email_lookup(self): def email_lookup(self):
pass self.domain = self.host.split("@")[1]
self.ip_address = socket.gethostbyname(self.domain)
def no_host(self): self.email_security = spf_dmarc(self.domain)
return None self.vt, self.vt_dict = virustotal_api.analyse2(self.domain, self.host_type)
self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
return self
def specific(self): def specific(self):
if self.host_type == "url": if self.host_type == "url":
return self.url_lookup() return self.url_lookup()
elif self.host_type == "domain": elif self.host_type == "domain":
return self.domain_lookup() return self.domain_lookup()
elif self.host_type == "ip":
return self.ip_lookup()
elif self.host_type == "email address":
return self.email_lookup()
def sanitize(user_input): def sanitize(user_input):

View File

@ -34,31 +34,9 @@ def analyse_domain(api_key, host):
"content-type": "application/x-www-form-urlencoded", "content-type": "application/x-www-form-urlencoded",
"x-apikey": api_key, "x-apikey": api_key,
} }
vendors = []
analysis_json = requests.get(url, headers=headers) analysis_json = requests.get(url, headers=headers)
response_dict = json.loads(analysis_json.text) response_dict = json.loads(analysis_json.text)
return response_dict, analysis_json
virustotal_stats = response_dict["data"]["attributes"]["last_analysis_stats"]
virustotal_results = response_dict["data"]["attributes"]["last_analysis_results"]
last_update = response_dict["data"]["attributes"]["last_update_date"]
domain_lookup = dict.fromkeys(["total", "score", "vendors", "last_update"])
total = 0
vendors = []
for key, value in virustotal_stats.items():
total += value
for key, value in virustotal_results.items():
if value["category"] == "malicious":
vendors.append(key)
domain_lookup["total"] = total
domain_lookup["score"] = virustotal_stats["malicious"]
domain_lookup["vendors"] = vendors
domain_lookup["last_update"] = time.strftime(
"%d-%m-%Y",
time.gmtime(last_update),
)
return domain_lookup, response_dict
def analyse_URL(api_key, response_id): def analyse_URL(api_key, response_id):
@ -73,9 +51,9 @@ def analyse_IP(api_key, host):
analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host) analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host)
headers = {"accept": "application/json", "x-apikey": api_key} headers = {"accept": "application/json", "x-apikey": api_key}
analysis_json = requests.get(analysis_url, headers=headers) analysis_json = requests.get(analysis_url, headers=headers)
analysis_dict = json.loads(analysis_json.text) response_dict = json.loads(analysis_json.text)
# Implement this: https://docs.virustotal.com/reference/ip-info # Implement this: https://docs.virustotal.com/reference/ip-info
return analysis_dict, analysis_json return response_dict, analysis_json
def analyse(host, host_type): def analyse(host, host_type):
@ -88,3 +66,36 @@ def analyse(host, host_type):
elif host_type == IPV4 or IPV6: elif host_type == IPV4 or IPV6:
result, analysis_json = analyse_IP(api_key, host) result, analysis_json = analyse_IP(api_key, host)
return result, analysis_json return result, analysis_json
def analyse2(host, host_type):
api_key = environment()
if host_type == "url":
response_id = analysis_object(api_key, host)
result, analysis_json = analyse_URL(api_key, response_id)
elif host_type == "domain" or "email_address":
result, analysis_json = analyse_domain(api_key, host)
elif host_type == "IPv4" or "IPv6":
result, analysis_json = analyse_IP(api_key, host)
vt_stats = result["data"]["attributes"]["last_analysis_stats"]
vt_results = result["data"]["attributes"]["last_analysis_results"]
last_update = result["data"]["attributes"]["last_analysis_date"]
summary = dict.fromkeys(["total", "score", "vendors", "last_update"])
total = 0
vendors = []
for key, value in vt_stats.items():
total += value
for key, value in vt_results.items():
if value["category"] == "malicious":
vendors.append(key)
summary["total"] = total
summary["score"] = vt_stats["malicious"]
summary["vendors"] = vendors
summary["last_update"] = time.strftime(
"%d-%m-%Y",
time.gmtime(last_update),
)
return summary, analysis_json