mirror of
https://github.com/Cronocide/wifipumpkin3.git
synced 2025-01-22 11:18:55 +00:00
added allow sniffkin3 capture POST requests on session
This commit is contained in:
parent
04f45ea44f
commit
95fdcc81b4
@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file.
|
||||
## [1.0.0] - 2020-04-18
|
||||
|
||||
### Added
|
||||
- added allow sniffkin3 capture POST requests on session [mh4x0f]
|
||||
- added more terminal colors on function setcolor [mh4x0f]
|
||||
- added notification when client joined the AP [mh4x0f]
|
||||
- added check python version when install tool [mh4x0f]
|
||||
|
@ -3,4 +3,4 @@
|
||||
import unittest
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
unittest.main(module="tests", verbosity=2)
|
||||
unittest.main(module="tests", verbosity=2)
|
||||
|
47
tests/test_decoded_packet_dict.py
Normal file
47
tests/test_decoded_packet_dict.py
Normal file
@ -0,0 +1,47 @@
|
||||
from wifipumpkin3.core.common.platforms import decoded
|
||||
import unittest
|
||||
|
||||
result = {
|
||||
"IP": {
|
||||
"version": 4,
|
||||
"src": "10.0.0.21",
|
||||
"dst": "216.58.202.227",
|
||||
"ihl": 5,
|
||||
"tos": 0,
|
||||
},
|
||||
"Headers": {
|
||||
"Connection": "Keep-Alive",
|
||||
"Method": "GET",
|
||||
"Path": "/generate_204",
|
||||
"Http-Version": "HTTP/1.1",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestConfigPumpkinProxy(unittest.TestCase):
|
||||
def test_decoded_data(self):
|
||||
global result
|
||||
data = {
|
||||
"IP": {
|
||||
"version": 4,
|
||||
"src": "10.0.0.21".encode(),
|
||||
"dst": "216.58.202.227".encode(),
|
||||
"ihl": 5,
|
||||
"tos": 0,
|
||||
},
|
||||
"Headers": {
|
||||
"Connection": "Keep-Alive".encode(),
|
||||
"Method": "GET".encode(),
|
||||
"Path": "/generate_204".encode(),
|
||||
"Http-Version": "HTTP/1.1".encode(),
|
||||
},
|
||||
}
|
||||
# decode byte array to str ascii
|
||||
with decoded(data) as data_decoded:
|
||||
self.data_decoded = data_decoded
|
||||
|
||||
self.assertEqual(result, self.data_decoded)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -236,17 +236,30 @@ class decoded(object):
|
||||
:py:attr:`raw_content` has the encoded content.
|
||||
"""
|
||||
|
||||
def __init__(self, message): # pragma no cover
|
||||
warnings.warn(
|
||||
"decoded() is deprecated, you can now directly use .content instead. "
|
||||
".raw_content has the encoded content.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
_data_decoded = None
|
||||
|
||||
def __enter__(self): # pragma no cover
|
||||
pass
|
||||
@property
|
||||
def data_decoded(self):
|
||||
return self._data_decoded
|
||||
|
||||
def __exit__(self, type, value, tb): # pragma no cover
|
||||
def __init__(self, data):
|
||||
self._data_decoded = self.converter(data)
|
||||
|
||||
def converter(self, data):
|
||||
# https://stackoverflow.com/questions/33137741/fastest-way-to-convert-a-dicts-keys-values-from-bytes-to-str-in-python3
|
||||
if isinstance(data, bytes):
|
||||
return data.decode("ascii")
|
||||
if isinstance(data, dict):
|
||||
return dict(map(self.converter, data.items()))
|
||||
if isinstance(data, tuple):
|
||||
return map(self.converter, data)
|
||||
return data
|
||||
|
||||
def __enter__(self):
|
||||
# return data decoded using with as data
|
||||
return self.data_decoded
|
||||
|
||||
def __exit__(self, type, value, tb):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -81,12 +81,49 @@ class Sniffkin3(MitmMode):
|
||||
self.reactor.setObjectName(self.ID)
|
||||
self.reactor._ProcssOutput.connect(self.LogOutput)
|
||||
|
||||
def setDataColor(self, data=dict):
|
||||
""" set color in data log keys"""
|
||||
if list(dict(data).keys())[0] == "urlsCap":
|
||||
data["urlsCap"]["Headers"]["Method"] = setcolor(
|
||||
data["urlsCap"]["Headers"]["Method"], color="orange_bg"
|
||||
)
|
||||
data["urlsCap"]["Headers"]["Host"] = setcolor(
|
||||
data["urlsCap"]["Headers"]["Host"], color="yellow"
|
||||
)
|
||||
if list(dict(data).keys())[0] == "POSTCreds":
|
||||
data["POSTCreds"]["Data"]["Payload"] = setcolor(
|
||||
data["POSTCreds"]["Data"]["Payload"], color="blue"
|
||||
)
|
||||
data["POSTCreds"]["Data"]["User"] = setcolor(
|
||||
data["POSTCreds"]["Data"]["User"], color="red"
|
||||
)
|
||||
data["POSTCreds"]["Packets"]["Headers"]["Method"] = setcolor(
|
||||
data["POSTCreds"]["Packets"]["Headers"]["Method"], color="purple_bg"
|
||||
)
|
||||
data["POSTCreds"]["Data"]["Pass"] = setcolor(
|
||||
data["POSTCreds"]["Data"]["Pass"], color="red"
|
||||
)
|
||||
return data
|
||||
|
||||
def LogOutput(self, data):
|
||||
if self.conf.get("accesspoint", "status_ap", format=bool):
|
||||
self.logger.addExtra("packet", data) # packet info save json
|
||||
data = self.setDataColor(data)
|
||||
if list(dict(data).keys())[0] == "urlsCap":
|
||||
|
||||
return self.logger.info(
|
||||
"[ {0[src]} > {0[dst]} ] {1[Method]} {1[Host]}{1[Path]}".format(
|
||||
data["urlsCap"]["IP"], data["urlsCap"]["Headers"]
|
||||
)
|
||||
)
|
||||
self.logger.info(
|
||||
"[ {0[src]} > {0[dst]} ] {1[Method]} {1[Host]}{1[Path]}".format(
|
||||
data["urlsCap"]["IP"], data["urlsCap"]["Headers"]
|
||||
"[ {0[src]} > {0[dst]} ] {1[Method]} {1[Host]}{1[Path]} \n \
|
||||
payload: {2[Payload]}\n \
|
||||
Username: {2[User]}\n \
|
||||
Password: {2[Pass]}\n".format(
|
||||
data["POSTCreds"]["Packets"]["IP"],
|
||||
data["POSTCreds"]["Packets"]["Headers"],
|
||||
data["POSTCreds"]["Data"],
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -2,6 +2,7 @@ from scapy.all import *
|
||||
from scapy_http import http
|
||||
from wifipumpkin3.plugins.analyzers.default import PSniffer
|
||||
import re
|
||||
from wifipumpkin3.core.common.platforms import decoded
|
||||
|
||||
# This file is part of the wifipumpkin3 Open Source Project.
|
||||
# wifipumpkin3 is licensed under the Apache 2.0.
|
||||
@ -53,18 +54,20 @@ class MonitorCreds(PSniffer):
|
||||
username = re.findall(user_regex, str(payload, "utf-8"))
|
||||
password = re.findall(pw_regex, str(payload, "utf-8"))
|
||||
if not username == [] and not password == []:
|
||||
self.output.emit(
|
||||
{
|
||||
"POSTCreds": {
|
||||
data = {
|
||||
"POSTCreds": {
|
||||
"Url": str(url),
|
||||
"Destination": "{}/{}".format(sport, dport),
|
||||
"Packets": pkt,
|
||||
"Data": {
|
||||
"User": username[0][1],
|
||||
"Pass": password[0][1],
|
||||
"Url": str(url),
|
||||
"Destination": "{}/{}".format(sport, dport),
|
||||
"Packets": pkt,
|
||||
"Payload": payload,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
with decoded(data) as data_decoded:
|
||||
self.output.emit(data_decoded)
|
||||
|
||||
def get_http_POST(self, load):
|
||||
dict_head = {}
|
||||
@ -103,9 +106,9 @@ class MonitorCreds(PSniffer):
|
||||
self.src_ip_port,
|
||||
{"IP": ip_layer.fields, "Headers": http_layer.fields},
|
||||
)
|
||||
self.output.emit(
|
||||
{"urlsCap": {"IP": ip_layer.fields, "Headers": http_layer.fields}}
|
||||
)
|
||||
data = {"urlsCap": {"IP": ip_layer.fields, "Headers": http_layer.fields}}
|
||||
with decoded(data) as data_decoded:
|
||||
self.output.emit(data_decoded)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user