refactoring in flask-soc-site-v2
This commit is contained in:
parent
079da00a53
commit
79d3323804
107
app.py
107
app.py
|
@ -1,114 +1,43 @@
|
|||
# from dotenv import load_dotenv
|
||||
import secrets
|
||||
import socket
|
||||
import uuid
|
||||
from logging.config import dictConfig
|
||||
from pprint import pprint
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# TODO
|
||||
# FIX
|
||||
# WARNING
|
||||
|
||||
from flask import Flask, flash, redirect, render_template, request, session, url_for
|
||||
from markupsafe import escape
|
||||
|
||||
# from io import StringIO
|
||||
from validators import domain, email, ipv4, ipv6, url
|
||||
import host_lookup
|
||||
import upload
|
||||
|
||||
from constants import *
|
||||
from host_lookup import abuseipdb, metadata, spf_dmarc, virustotal_api_test
|
||||
from upload import csv_parse
|
||||
|
||||
# import csv
|
||||
|
||||
dictConfig(LOGCONF)
|
||||
|
||||
# put this in a .flaskenv file: https://dev.to/kubona_my/dealing-with-environment-variables-in-flask-o1
|
||||
app = Flask(__name__)
|
||||
generate_secret = secrets.token_urlsafe(16)
|
||||
app.secret_key = generate_secret
|
||||
# app.debug = True
|
||||
|
||||
|
||||
class Info(object):
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
self.ip_address = None
|
||||
self.host_type = metadata.check(self.host)
|
||||
self.metadata = metadata.lookup(self.host)
|
||||
self.emailsec = ()
|
||||
self.vt = {}
|
||||
self.abuseipdb = {}
|
||||
|
||||
def lookup(host):
|
||||
result = Info(host)
|
||||
if result.host_type == DOMAIN:
|
||||
result.ip_address = socket.gethostbyname(host)
|
||||
result.emailsec = spf_dmarc.lookup(host)
|
||||
result.vt = virustotal_api_test.analyse(result.host, result.host_type)
|
||||
result.abuseipdb = abuseipdb.analyse(result.ip_address)
|
||||
print("[DEBUGGING]")
|
||||
pprint(result.emailsec)
|
||||
elif result.host_type == URL:
|
||||
result.domain = urlparse(host).netloc
|
||||
result.ip_address = socket.gethostbyname(result.domain)
|
||||
result.vt = virustotal_api_test.analyse(result.host, result.host_type)
|
||||
result.abuseipdb = abuseipdb.analyse(result.ip_address)
|
||||
elif result.host_type == IPV4 or IPV6:
|
||||
result.vt = virustotal_api_test.analyse(result.host, result.host_type)
|
||||
result.abuseipdb = abuseipdb.analyse(host)
|
||||
return result
|
||||
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
# logging example taken from https://betterstack.com/community/guides/logging/how-to-start-logging-with-flask/
|
||||
session["ctx"] = {"request_id": str(uuid.uuid4())}
|
||||
app.logger.info("A user visited the home page >>> %s", session["ctx"])
|
||||
|
||||
return redirect(url_for("lookup"))
|
||||
|
||||
|
||||
# refactor to handle form requests better: https://www.digitalocean.com/community/tutorials/how-to-use-web-forms-in-a-flask-application
|
||||
|
||||
|
||||
@app.route("/lookup", methods=["GET", "POST"])
|
||||
def lookup():
|
||||
host = ""
|
||||
host = escape(request.form.get("host"))
|
||||
session["ctx"] = {"request_id": str(uuid.uuid4())}
|
||||
# figure out how to start a session, maybe with a variable?
|
||||
# variable = session.get('something')
|
||||
hosts = []
|
||||
if request.method == "GET":
|
||||
return render_template("lookup_options.html")
|
||||
return render_template("lookup.html")
|
||||
elif request.method == "POST" and "host" in request.form:
|
||||
host = ""
|
||||
host = escape(request.form.get("host"))
|
||||
session["ctx"] = {"request_id": str(uuid.uuid4())}
|
||||
app.logger.info(
|
||||
"A user submitted a host to look up. | host: %s >>> %s",
|
||||
host,
|
||||
session["ctx"],
|
||||
)
|
||||
if not host:
|
||||
flash("Try again", "error")
|
||||
return render_template("lookup_options.html")
|
||||
elif host:
|
||||
result = Info.lookup(host)
|
||||
return render_template(
|
||||
"lookup_options.html",
|
||||
host=result.host,
|
||||
host_type=result.host_type,
|
||||
result=result,
|
||||
)
|
||||
user_input = escape(request.form.get("host").strip())
|
||||
if len(user_input) >= 1:
|
||||
hosts = host_lookup.process_input(user_input)
|
||||
return render_template("results.html", hosts=hosts)
|
||||
else:
|
||||
flash("YA DONE FUCKED UP", "error")
|
||||
return render_template("lookup.html")
|
||||
elif request.method == "POST" and "file" in request.files:
|
||||
file = request.files["file"]
|
||||
extracted = csv_parse.extract(file)
|
||||
results = []
|
||||
for host in extracted:
|
||||
results.append(Info.lookup(host))
|
||||
print(results)
|
||||
return render_template("lookup_options.html")
|
||||
else:
|
||||
flash("No file!", "error")
|
||||
return render_template("lookup_options.html")
|
||||
extracted = upload.extract(file)
|
||||
hosts = host_lookup.process_file(extracted)
|
||||
return render_template("results.html", hosts=hosts)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in New Issue
Block a user