refactor underway, will consolidate these components
This commit is contained in:
parent
79d3323804
commit
fc5e53eee1
|
@ -1,41 +0,0 @@
|
|||
from base64 import decode
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import requests_cache
|
||||
from dotenv import load_dotenv
|
||||
from pprint import pprint
|
||||
|
||||
|
||||
class API_error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def environment():
|
||||
requests_cache.install_cache(expire_after=360, allowable_methods=("POST"))
|
||||
load_dotenv()
|
||||
api_key = os.getenv("ABUSEIPDB_API")
|
||||
return api_key
|
||||
|
||||
|
||||
def lookup(api_key, host):
|
||||
url = "https://api.abuseipdb.com/api/v2/check"
|
||||
payload = {"ipAddress": "", "maxAgeInDays": "90"}
|
||||
payload.update({"ipAddress": host})
|
||||
headers = {"Accept": "application/json", "Key": api_key}
|
||||
response = requests.request(
|
||||
method="GET", url=url, params=payload, headers=headers, verify=False
|
||||
) # TODO: remove SSL verify=False and add signed certificate if possible.
|
||||
# Figure out how caching functions here: https://requests-cache.readthedocs.io/en/stable/examples.html
|
||||
print(requests_cache.get_cache())
|
||||
print("Cached:")
|
||||
print("\n".join(requests_cache.get_cache().urls()))
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def analyse(host):
|
||||
api_key = environment()
|
||||
result = lookup(api_key, host)
|
||||
decoded_result = json.loads(result.text)
|
||||
return decoded_result
|
|
@ -1,30 +0,0 @@
|
|||
from ipaddress import ip_address
|
||||
from whois import whois
|
||||
from ipwhois import IPWhois
|
||||
import validators
|
||||
from constants import URL, DOMAIN, IPV4, IPV6
|
||||
|
||||
|
||||
def check(host):
|
||||
if validators.url(host):
|
||||
host_type = URL
|
||||
elif validators.domain(host):
|
||||
host_type = DOMAIN
|
||||
elif validators.ip_address.ipv4(host):
|
||||
host_type = IPV4
|
||||
elif validators.ip_address.ipv6(host):
|
||||
host_type = IPV6
|
||||
return host_type
|
||||
|
||||
|
||||
# def lookup(host_type):
|
||||
def lookup(host):
|
||||
result = dict(whois(host))
|
||||
return result
|
||||
|
||||
|
||||
# result = whois(host_type[1])
|
||||
# return result, host_type[0]
|
||||
# obj = IPWhois(host_type[1])
|
||||
# res = obj.lookup_rdap()
|
||||
# return res, host_type[0]
|
|
@ -1,3 +0,0 @@
|
|||
# Try to get historical telemetry like this page shows: https://otx.alienvault.com/indicator/ip/8.8.8.8
|
||||
# Apparently this API does not provide this information :( f.e. the below curl request does not provide information about historical OTX telemetry.
|
||||
# curl https://otx.alienvault.com/api/v1/indicators/url/http://www.freputation.com/spreputation_san_ponso/slides/IMG_0068.html/general -H "X-OTX-API-KEY: ec672963e435bb7a09c494534b79a6a7a273a5bde5ea560874cccd72e2bc76fc"
|
|
@ -1,9 +0,0 @@
|
|||
# This module should extract any and all URIs (IPs or URLs) from copy and pasted text.
|
||||
|
||||
def parse(text):
|
||||
split_text = text.split()
|
||||
for URI in split_text:
|
||||
print(URI)
|
||||
|
||||
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
from checkdmarc.dmarc import check_dmarc
|
||||
from checkdmarc.spf import check_spf
|
||||
import validators
|
||||
|
||||
|
||||
def lookup(host: str) -> tuple:
|
||||
result_dmarc = check_dmarc(host)
|
||||
result_spf = check_spf(host)
|
||||
return (result_dmarc, result_spf)
|
|
@ -1,24 +0,0 @@
|
|||
import vt
|
||||
import os
|
||||
import requests
|
||||
import virustotal_python
|
||||
from dotenv import load_dotenv
|
||||
from pprint import pprint
|
||||
from base64 import urlsafe_b64encode
|
||||
|
||||
# todo: implement my own API request module to then try and cache the response (see -> https://realpython.com/caching-external-api-requests/#requests-cache)
|
||||
|
||||
def vt_lookup(URL):
|
||||
load_dotenv()
|
||||
api_key = os.getenv("VT_API")
|
||||
with virustotal_python.Virustotal(api_key) as vtotal:
|
||||
try:
|
||||
resp = vtotal.request("urls", data={"url": URL}, method="POST")
|
||||
print(resp)
|
||||
# Safe encode URL in base64 format
|
||||
# https://developers.virustotal.com/reference/url
|
||||
url_id = urlsafe_b64encode(URL.encode()).decode().strip("=")
|
||||
report = vtotal.request(f"urls/{url_id}")
|
||||
return report.data
|
||||
except virustotal_python.VirustotalError as err:
|
||||
print(f"Failed to send URL: {URL} for analysis and get the report: {err}")
|
|
@ -1,77 +0,0 @@
|
|||
import json
|
||||
import os
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from pprint import pprint
|
||||
from constants import URL, DOMAIN, IPV4, IPV6
|
||||
|
||||
# Would be nice to define some constants, f.e. for the various API urls, the headers, etc.
|
||||
|
||||
|
||||
def environment():
|
||||
load_dotenv()
|
||||
api_key = os.getenv("VT_API")
|
||||
return api_key
|
||||
|
||||
|
||||
# Unfortunately this works for actual URLs, not domains. See: https://docs.virustotal.com/reference/domain-info
|
||||
# This also doesn't work for IPv6 addresses, where the response_dict does not have a 'data' key. So I would have to revamp this module and make separate functions called based on host type (URL, IPv4 and -6, domain).
|
||||
|
||||
|
||||
def analysis_object(api_key, host):
|
||||
url = "https://www.virustotal.com/api/v3/urls"
|
||||
payload = {"url": ""}
|
||||
payload.update({"url": host})
|
||||
headers = {
|
||||
"accept": "application/json",
|
||||
"content-type": "application/x-www-form-urlencoded",
|
||||
"x-apikey": api_key,
|
||||
}
|
||||
response = requests.post(url, data=payload, headers=headers)
|
||||
response_dict = json.loads(response.text)
|
||||
response_id = response_dict["data"]["id"]
|
||||
return response_id
|
||||
|
||||
|
||||
def analyse_domain(api_key, host):
|
||||
url = "https://www.virustotal.com/api/v3/domains/" + host
|
||||
headers = {
|
||||
"accept": "application/json",
|
||||
"content-type": "application/x-www-form-urlencoded",
|
||||
"x-apikey": api_key,
|
||||
}
|
||||
analysis_response = requests.get(url, headers=headers)
|
||||
response_dict = json.loads(analysis_response.text)
|
||||
# Probably still need to turn the requests.get into a json like below
|
||||
return response_dict
|
||||
|
||||
|
||||
def analyse_URL(api_key, response_id):
|
||||
analysis_url = "https://www.virustotal.com/api/v3/analyses/{}".format(response_id)
|
||||
headers = {"accept": "application/json", "x-apikey": api_key}
|
||||
analysis_response = requests.get(analysis_url, headers=headers)
|
||||
analysis_dict = json.loads(analysis_response.text)
|
||||
# return analysis_response.text
|
||||
return analysis_dict
|
||||
|
||||
|
||||
def analyse_IP(api_key, host):
|
||||
analysis_url = "https://www.virustotal.com/api/v3/ip_addresses/{}".format(host)
|
||||
headers = {"accept": "application/json", "x-apikey": api_key}
|
||||
analysis_response = requests.get(analysis_url, headers=headers)
|
||||
analysis_dict = json.loads(analysis_response.text)
|
||||
# Implement this: https://docs.virustotal.com/reference/ip-info
|
||||
return analysis_dict
|
||||
|
||||
|
||||
def analyse(host, host_type):
|
||||
api_key = environment()
|
||||
if host_type == URL:
|
||||
response_id = analysis_object(api_key, host)
|
||||
result = analyse_URL(api_key, response_id)
|
||||
elif host_type == DOMAIN:
|
||||
result = analyse_domain(api_key, host)
|
||||
# elif for IPv4 and IPv6.
|
||||
elif host_type == IPV4 or IPV6:
|
||||
result = analyse_IP(api_key, host)
|
||||
return result
|
Loading…
Reference in New Issue
Block a user