flask-soc-site/flask_soc_site/src/virustotal_api.py

113 lines
4.0 KiB
Python
Raw Normal View History

import json
import time
import os
import requests
from dotenv import load_dotenv
from .constants import URL, DOMAIN, IPV4, IPV6, domain_lookup
def environment():
load_dotenv()
api_key = os.getenv("VT_API")
return api_key
def analysis_object(api_key, host):
url = "https://www.virustotal.com/api/v3/urls"
payload = {"url": ""}
payload.update({"url": host})
headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded",
"x-apikey": api_key,
}
response = requests.post(url, data=payload, headers=headers)
response_dict = json.loads(response.text)
response_id = response_dict["data"]["id"]
return response_id
def analyse_domain(api_key, host):
url = "https://www.virustotal.com/api/v3/domains/" + host
headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded",
"x-apikey": api_key,
}
analysis_json = requests.get(url, headers=headers)
response_dict = json.loads(analysis_json.text)
return response_dict, analysis_json
def analyse_URL(api_key, response_id):
url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id)
headers = {"accept": "application/json", "x-apikey": api_key}
analysis_json = requests.get(url, headers=headers)
analysis_dict = json.loads(analysis_json.text)
return analysis_dict, analysis_json
# This returns a differently shaped JSON and therefore dict:
# analysis_dict keys 'data', 'meta'
# analysis_dict['data'] keys 'id', 'type', 'links', 'attributes'
# analysis_dict['data']['attributes'] keys 'stats', (numbers) 'results', (all the AV engine results) 'date', (Linux epoch timestamp) 'status'
def analyse_IP(api_key, host):
analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host)
headers = {"accept": "application/json", "x-apikey": api_key}
analysis_json = requests.get(analysis_url, headers=headers)
response_dict = json.loads(analysis_json.text)
# Implement this: https://docs.virustotal.com/reference/ip-info
return response_dict, analysis_json
def analyse(host, host_type):
api_key = environment()
if host_type == URL:
response_id = analysis_object(api_key, host)
result, analysis_json = analyse_URL(api_key, response_id)
elif host_type == DOMAIN:
result, analysis_json = analyse_domain(api_key, host)
elif host_type == IPV4 or IPV6:
result, analysis_json = analyse_IP(api_key, host)
return result, analysis_json
def analyse2(host, host_type):
api_key = environment()
if host_type == "url":
response_id = analysis_object(api_key, host)
result, analysis_json = analyse_URL(api_key, response_id)
elif host_type == "domain" or host_type == "email address":
result, analysis_json = analyse_domain(api_key, host)
elif host_type == "ip":
result, analysis_json = analyse_IP(api_key, host)
if host_type == "url":
vt_stats = result["data"]["attributes"]["stats"]
vt_results = result["data"]["attributes"]["results"]
last_update = result["data"]["attributes"]["date"]
elif host_type == "domain" or host_type == "email address" or host_type == "ip":
vt_stats = result["data"]["attributes"]["last_analysis_stats"]
vt_results = result["data"]["attributes"]["last_analysis_results"]
last_update = result["data"]["attributes"]["last_analysis_date"]
summary = dict.fromkeys(["total", "score", "vendors", "last_update"])
total = 0
vendors = []
for key, value in vt_stats.items():
total += value
for key, value in vt_results.items():
if value["category"] == "malicious":
vendors.append(key)
summary["total"] = total
summary["score"] = vt_stats["malicious"]
summary["vendors"] = vendors
summary["last_update"] = time.strftime(
"%d-%m-%Y",
time.gmtime(last_update),
)
2025-06-10 21:04:43 +02:00
# return summary, analysis_json
return summary