rewrote the lookup logic to handle incorrect user input, still have to output that in the templates
This commit is contained in:
parent
bd33372cf2
commit
5171b1cfa9
16
app.py
16
app.py
@ -1,9 +1,5 @@
|
||||
import secrets
|
||||
|
||||
# TODO
|
||||
# FIX
|
||||
# WARNING
|
||||
|
||||
from flask import Flask, flash, redirect, render_template, request, session, url_for
|
||||
from markupsafe import escape
|
||||
|
||||
@ -23,16 +19,16 @@ def index():
|
||||
@app.route("/lookup", methods=["GET", "POST"])
|
||||
def lookup():
|
||||
hosts = []
|
||||
results = []
|
||||
if request.method == "GET":
|
||||
return render_template("lookup.html")
|
||||
elif request.method == "POST" and "host" in request.form:
|
||||
user_input = escape(request.form.get("host").strip())
|
||||
if len(user_input) >= 1:
|
||||
hosts = host_lookup.process_input(user_input)
|
||||
return render_template("results.html", hosts=hosts)
|
||||
else:
|
||||
flash("YA DONE FUCKED UP", "error")
|
||||
return render_template("lookup.html")
|
||||
hosts, errors = host_lookup.process_input(user_input)
|
||||
for host in hosts:
|
||||
result = host_lookup.Lookedup(host)
|
||||
results.append(result)
|
||||
return render_template("results.html", hosts=results, errors=errors)
|
||||
elif request.method == "POST" and "file" in request.files:
|
||||
file = request.files["file"]
|
||||
extracted = upload.extract(file)
|
||||
|
139
host_lookup.py
Normal file
139
host_lookup.py
Normal file
@ -0,0 +1,139 @@
|
||||
# TODO: make this module only have wrapper functions, no direct lookups or imports. Move all the direct lookup functions (emailsec) into (a) separate module(s).
|
||||
|
||||
import re
|
||||
from ipaddress import ip_address
|
||||
from checkdmarc.dmarc import check_dmarc
|
||||
from checkdmarc.spf import check_spf
|
||||
import validators
|
||||
from ipwhois import IPWhois
|
||||
from whois import whois
|
||||
|
||||
# from constants import DOMAIN, EMAIL, IPV4, IPV6, URL
|
||||
import abuseipdb_api
|
||||
import virustotal_api
|
||||
import socket
|
||||
from urllib.parse import urlparse
|
||||
import host_data
|
||||
import tldextract
|
||||
|
||||
|
||||
class Lookedup(object):
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
self.host_type = determine(self.host)
|
||||
self = self.specific()
|
||||
|
||||
def url_lookup(self):
|
||||
self.domain = urlparse(self.host).netloc
|
||||
self.ip_address = socket.gethostbyname(self.domain)
|
||||
self.email_security = spf_dmarc(self.domain)
|
||||
return self
|
||||
|
||||
def ip_lookup(self):
|
||||
pass
|
||||
|
||||
def domain_lookup(self):
|
||||
self.ip_address = socket.gethostbyname(self.host)
|
||||
self.metadata = domain(self.host)
|
||||
self.domain = self.metadata["domain_name"]
|
||||
self.email_security = spf_dmarc(self.domain)
|
||||
self.vt, self.vt_dict = virustotal_api.analyse(self.host, self.host_type)
|
||||
self.abuseipdb = abuseipdb_api.analyse(self.ip_address)
|
||||
return self
|
||||
|
||||
def email_lookup(self):
|
||||
pass
|
||||
|
||||
def no_host(self):
|
||||
return None
|
||||
|
||||
def specific(self):
|
||||
if self.host_type == "url":
|
||||
return self.url_lookup()
|
||||
elif self.host_type == "domain":
|
||||
return self.domain_lookup()
|
||||
|
||||
|
||||
def sanitize(user_input):
|
||||
sanitized = []
|
||||
if user_input.strip() != "":
|
||||
sanitized = re.split("; |, | |\n", user_input)
|
||||
return sanitized
|
||||
|
||||
|
||||
def determine(host):
|
||||
host_type = ""
|
||||
if validators.url(host):
|
||||
host_type = "url"
|
||||
elif validators.domain(host):
|
||||
host_type = "domain"
|
||||
elif validators.ip_address.ipv4(host):
|
||||
host_type = "ip"
|
||||
elif validators.ip_address.ipv6(host):
|
||||
host_type = "ip"
|
||||
elif validators.email(host):
|
||||
host_type = "email address"
|
||||
else:
|
||||
host_type = "no host"
|
||||
return host_type
|
||||
|
||||
|
||||
# def extract(user_input):
|
||||
# hosts = []
|
||||
# errors = []
|
||||
# for item in user_input:
|
||||
# if validators.url(item):
|
||||
# hosts.append(item)
|
||||
# elif validators.domain(item):
|
||||
# hosts.append(item)
|
||||
# elif validators.ip_address.ipv4(item):
|
||||
# hosts.append(item)
|
||||
# elif validators.ip_address.ipv6(item):
|
||||
# hosts.append(item)
|
||||
# elif validators.email(item):
|
||||
# hosts.append(item)
|
||||
# else:
|
||||
# errors.append(item)
|
||||
# return hosts, errors
|
||||
|
||||
|
||||
def domain(host):
|
||||
result = dict(whois(host))
|
||||
if type(result["creation_date"]) is list:
|
||||
result["creation_date"] = result["creation_date"][0].strftime("%d-%m-%Y")
|
||||
else:
|
||||
result["creation_date"] = result["creation_date"].strftime("%d-%m-%Y")
|
||||
if type(result["domain_name"]) is list:
|
||||
result["domain_name"] = result["domain_name"][0]
|
||||
return result
|
||||
|
||||
|
||||
def spf_dmarc(domain):
|
||||
spf = ""
|
||||
dmarc = ""
|
||||
result_spf = check_spf(domain)
|
||||
if result_spf["valid"]:
|
||||
spf = result_spf["record"]
|
||||
result_dmarc = check_dmarc(domain)
|
||||
if result_dmarc["valid"]:
|
||||
dmarc = result_dmarc["record"]
|
||||
return spf, dmarc
|
||||
|
||||
|
||||
def process_input(user):
|
||||
results = []
|
||||
sanitized = sanitize(user)
|
||||
hosts, errors = extract(sanitized)
|
||||
# for host in hosts:
|
||||
# host_analyzed = Host(host).lookup()
|
||||
# results.append(host_analyzed)
|
||||
# return results
|
||||
return hosts, errors
|
||||
|
||||
|
||||
def process_file(file_content):
|
||||
hosts = []
|
||||
for host in file_content:
|
||||
result = Host(host)
|
||||
hosts.append(result)
|
||||
return hosts
|
Loading…
x
Reference in New Issue
Block a user