Commit 3908131e authored by Nong Hoang Tu's avatar Nong Hoang Tu
Browse files

Restore project patches

parent 038c1ba8
Pipeline #3409 canceled with stages
! ANY INCOMPLETE REPORT WILL BE CLOSED RIGHT AWAY !
## Steps to Reproduce (for bugs)
1.
2.
3.
4.
## Your Environment
* RouterSploit Version used:
* Operating System and version:
* Python Version: ( `python3 --version` )
* Python Environment: ( `python3 -m pip freeze` )
## Current Behavior
* If describing a bug, tell us what happens instead of the expected behavior
* If suggesting a change/improvement, tell us how it works right now
## Expected Behavior
* If you're describing a bug, tell us what should happen
* If you're suggesting a change/improvement, tell us how it should work
## Status
**READY/IN DEVELOPMENT/HOLD**
## Description
Describe what is changed by your Pull Request. If this PR is related to the open issue (bug/feature/new module) please attach issue number.
## Verification
Provide steps to test or reproduce the PR.
1. Start `./rsf.py`
2. `use exploits/routers/dlink/dsl_2750b_rce`
3. `set target 192.168.1.1`
4. `run`
5. ...
## Checklist
- [ ] Write module/feature
- [ ] Write tests ([Example](https://github.com/threat9/routersploit/blob/master/tests/exploits/routers/dlink/test_dsl_2750b_rce.py))
- [ ] Document how it works ([Example](https://github.com/threat9/routersploit/blob/master/docs/modules/exploits/routers/dlink/dsl_2750b_rce.md))
......@@ -30,7 +30,7 @@ class Device(ScanEntry):
def _update(self, resp):
ScanEntry._update(self, resp)
if self.addrType is "random":
if self.addrType == "random":
self.vendor = "None (Random MAC address)"
else:
self.vendor = lookup_vendor(self.addr)
......
class RoutersploitException(Exception):
def __init__(self, msg: str=""):
def __init__(self, msg: str = ""):
super(RoutersploitException, self).__init__(msg)
......
......@@ -42,14 +42,14 @@ class ExploitOptionsAggregator(type):
else:
attrs["exploit_attributes"] = {k: v for d in base_exploit_attributes for k, v in iteritems(d)}
for key, value in iteritems(attrs):
for key, value in list(iteritems(attrs)):
if isinstance(value, Option):
value.label = key
attrs["exploit_attributes"].update({key: [value.display_value, value.description]})
elif key == "__info__":
attrs["_{}{}".format(name, key)] = value
del attrs[key]
elif key in attrs["exploit_attributes"]: # removing exploit_attribtue that was overwritten
elif key in attrs["exploit_attributes"]: # removing exploit_attribute that was overwritten
del attrs["exploit_attributes"][key] # in the child and is not an Option() instance
return super(ExploitOptionsAggregator, cls).__new__(cls, name, bases, attrs)
......@@ -86,8 +86,8 @@ class Exploit(BaseExploit):
def run_threads(self, threads_number: int, target_function: any, *args, **kwargs) -> None:
""" Run function across specified number of threads
:param int thread_number: number of threads that should be executed
:param func target_function: function that should be executed accross specified number of threads
:param int threads_number: number of threads that should be executed
:param func target_function: function that should be executed across specified number of threads
:param any args: args passed to target_function
:param any kwargs: kwargs passed to target function
:return None
......@@ -111,7 +111,7 @@ class Exploit(BaseExploit):
start = time.time()
try:
while thread.isAlive():
while thread.is_alive():
thread.join(1)
except KeyboardInterrupt:
......@@ -172,7 +172,7 @@ def multi(fn):
class DummyFile(object):
""" Mocking file object. Optimilization for the "mute" decorator. """
""" Mocking file object. Optimization for the "mute" decorator. """
def write(self, x):
pass
......
......@@ -9,7 +9,8 @@ from weakref import WeakKeyDictionary
try:
import queue
except ImportError: # Python 3.x
import queue as queue
import Queue as queue
printer_queue = queue.Queue()
thread_output_stream = WeakKeyDictionary()
......@@ -133,7 +134,7 @@ def print_table(headers, *args, **kwargs) -> None:
for idx, element in enumerate(arg):
content_line = "".join((
content_line,
"{:<{}}".format(str(element), fill[idx]) # https://github.com/threat9/routersploit/pull/736 by enty8080
"{:<{}}".format(element, fill[idx])
))
print_info(content_line)
......
......@@ -31,14 +31,16 @@ def shell(exploit, architecture="", method="", payloads=None, **params):
path = "routersploit/modules/payloads/{}/".format(architecture)
# get all payloads for given architecture
all_payloads = [f.split(".")[0] for f in listdir(path) if isfile(join(path, f)) and f.endswith(".py") and f != "__init__.py"]
all_payloads = [
f.split(".")[0] for f in listdir(path) if isfile(join(path, f)) and f.endswith(".py") and f != "__init__.py"
]
payload_path = path.replace("/", ".")
for p in all_payloads:
module = getattr(importlib.import_module("{}{}".format(payload_path, p)), 'Payload')
# if method/arch is cmd then filter out payloads
if method is "cmd":
if method == "cmd":
if getattr(module, "cmd") in payloads:
available_payloads[p] = module
else:
......@@ -56,7 +58,7 @@ def shell(exploit, architecture="", method="", payloads=None, **params):
if payload is None:
cmd_str = "\001\033[4m\002cmd\001\033[0m\002 > "
else:
cmd_str = "\001\033[4m\002cmd\001\033[0m\002 (\033[94m{}\033[0m) > ".format(payload._Payload__info__["name"])
cmd_str = f"\001\033[4m\002cmd\001\033[0m\002 (\033[94m{payload._Payload__info__['name']}\033[0m) > "
cmd = input(cmd_str)
......@@ -72,7 +74,8 @@ def shell(exploit, architecture="", method="", payloads=None, **params):
headers = ("Payload", "Name", "Description")
data = []
for p in available_payloads.keys():
data.append((p, available_payloads[p]._Payload__info__["name"], available_payloads[p]._Payload__info__["description"]))
data.append((p, available_payloads[p]._Payload__info__["name"],
available_payloads[p]._Payload__info__["description"]))
print_table(headers, *data)
......
......@@ -15,7 +15,7 @@ MODULES_DIR = rsf_modules.__path__[0]
WORDLISTS_DIR = wordlists.__path__[0]
def random_text(length: int, alph: str=string.ascii_letters + string.digits) -> str:
def random_text(length: int, alph: str = string.ascii_letters + string.digits) -> str:
""" Generates random string text
:param int length: length of text to generate
......@@ -32,11 +32,10 @@ def is_ipv4(address: str) -> bool:
:param str address: IP address to check
:return bool: True if address is valid IPv4 address, False otherwise
"""
import ipaddress
regexp = "^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"
if re.match(regexp, address):
if type(ipaddress.ip_address(address.strip())).__name__ == "IPv4Address":
return True
return False
......@@ -47,11 +46,10 @@ def is_ipv6(address: str) -> bool:
:return bool: True if address is valid IPv6 address, False otherwise
"""
regexp = "^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)%.*$"
import ipaddress
if re.match(regexp, address):
if type(ipaddress.ip_address(address.strip())).__name__ == "IPv6Address":
return True
return False
......@@ -79,7 +77,7 @@ def convert_port(port: int) -> bytes:
return bytes.fromhex(res)
def index_modules(modules_directory: str=MODULES_DIR) -> list:
def index_modules(modules_directory: str = MODULES_DIR) -> list:
""" Returns list of all exploits modules
:param str modules_directory: path to modules directory
......@@ -123,7 +121,7 @@ def import_exploit(path: str):
)
def iter_modules(modules_directory: str=MODULES_DIR) -> list:
def iter_modules(modules_directory: str = MODULES_DIR) -> list:
""" Iterates over valid modules
:param str modules_directory: path to modules directory
......@@ -207,7 +205,9 @@ def stop_after(space_number):
except Exception as err:
print_info(err)
return wrapped_function(self, *args, **kwargs)
return _wrapper
return _outer_wrapper
......@@ -231,7 +231,7 @@ def lookup_vendor(addr: str) -> str:
if addr.startswith(mac):
return name
return None
return ""
class Version(object):
......@@ -262,7 +262,7 @@ class Version(object):
return self.value != other.value
def __gt__(self, other):
"""Override the defualt x>y"""
"""Override the default x>y"""
if self._compare_versions(self.value, other.value) > 0:
return True
return False
......@@ -275,7 +275,7 @@ class Version(object):
@staticmethod
def _compare_versions(version1, version2):
""" Version comparision
""" Version comparison
:param Version version1:
:param Version version2:
......@@ -285,12 +285,12 @@ class Version(object):
if version1 > version2 then 1
"""
arr1 = re.sub("\D", ".", str(version1)).split(".")
arr2 = re.sub("\D", ".", str(version2)).split(".")
arr1 = re.sub("\\D", ".", str(version1)).split(".")
arr2 = re.sub("\\D", ".", str(version2)).split(".")
i = 0
while(i < len(arr1)):
while i < len(arr1):
if int(arr2[i]) > int(arr1[i]):
return -1
......@@ -302,7 +302,7 @@ class Version(object):
return 0
def detect_file_content(content: str, f: str="/etc/passwd") -> bool:
def detect_file_content(content: str, f: str = "/etc/passwd") -> bool:
""" Detect specific file content in content
:param str content: file content that should be analyzed
......
......@@ -7,14 +7,13 @@ from routersploit.core.exploit.option import OptBool
from routersploit.core.exploit.printer import print_error
from routersploit.core.exploit.printer import print_success
FTP_TIMEOUT = 8.0
class FTPCli(object):
""" FTP Client provides methods to handle communication with FTP server """
def __init__(self, ftp_target: str, ftp_port: int, ssl: bool=False, verbosity: bool=False) -> None:
def __init__(self, ftp_target: str, ftp_port: int, ssl: bool = False, verbosity: bool = False) -> None:
""" FTP client constructor
:param str ftp_target: target FTP server ip address
......@@ -35,7 +34,7 @@ class FTPCli(object):
else:
self.ftp_client = ftplib.FTP()
def connect(self, retries: int=1) -> bool:
def connect(self, retries: int = 1) -> bool:
""" Connect to FTP server
:param int retries: number of retry attempts
......@@ -63,10 +62,14 @@ class FTPCli(object):
try:
self.ftp_client.login(username, password)
print_success(self.peer, "FTP Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
print_success(self.peer,
"FTP Authentication Successful - Username: '{}' Password: '{}'".format(username, password),
verbose=self.verbosity)
return True
except Exception as err:
print_error(self.peer, "FTP Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
print_error(self.peer,
"FTP Authentication Failed - Username: '{}' Password: '{}'".format(username, password),
verbose=self.verbosity)
self.ftp_client.close()
return False
......@@ -122,7 +125,7 @@ class FTPClient(Exploit):
ssl = OptBool(False, "SSL enabled: true/false")
verbosity = OptBool(True, "Enable verbose output: true/false")
def ftp_create(self, target: str=None, port: int=None) -> FTPCli:
def ftp_create(self, target: str = None, port: int = None) -> FTPCli:
""" Create FTP client
:param str target: target FTP server ip address
......
......@@ -7,7 +7,6 @@ from routersploit.core.exploit.exploit import Protocol
from routersploit.core.exploit.option import OptBool
from routersploit.core.exploit.printer import print_error
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
HTTP_TIMEOUT = 30.0
......@@ -21,7 +20,7 @@ class HTTPClient(Exploit):
verbosity = OptBool(True, "Verbosity enabled: true/false")
ssl = OptBool(False, "SSL enabled: true/false")
def http_request(self, method: str, path: str, session: requests=requests, **kwargs) -> requests.Response:
def http_request(self, method: str, path: str, session: requests = requests, **kwargs) -> requests.Response:
""" Requests HTTP resource
:param str method: method that should be issued e.g. GET, POST
......@@ -57,7 +56,7 @@ class HTTPClient(Exploit):
return None
def get_target_url(self, path: str="") -> str:
def get_target_url(self, path: str = "") -> str:
""" Get target URL
:param str path: path to http server resource
......
......@@ -6,14 +6,13 @@ from routersploit.core.exploit.option import OptBool
from routersploit.core.exploit.printer import print_success
from routersploit.core.exploit.printer import print_error
SNMP_TIMEOUT = 15.0
class SNMPCli(object):
""" SNMP Client provides methods to handle communication with SNMP server """
def __init__(self, snmp_target: str, snmp_port: int, verbosity: bool=False) -> None:
def __init__(self, snmp_target: str, snmp_port: int, verbosity: bool = False) -> None:
""" SNMP client constructor
:param str snmp_target: target SNMP server ip address
......@@ -28,7 +27,7 @@ class SNMPCli(object):
self.peer = "{}:{}".format(self.snmp_target, snmp_port)
def get(self, community_string: str, oid: str, version: int=1, retries: int=0) -> bytes:
def get(self, community_string: str, oid: str, version: int = 1, retries: int = 0) -> bytes:
""" Get OID from SNMP server
:param str community_string: SNMP server communit string
......@@ -51,9 +50,11 @@ class SNMPCli(object):
return None
if errorIndication or errorStatus:
print_error(self.peer, "SNMP invalid community string: '{}'".format(community_string), verbose=self.verbosity)
print_error(self.peer, "SNMP invalid community string: '{}'".format(community_string),
verbose=self.verbosity)
else:
print_success(self.peer, "SNMP valid community string found: '{}'".format(community_string), verbose=self.verbosity)
print_success(self.peer, "SNMP valid community string found: '{}'".format(community_string),
verbose=self.verbosity)
return varBinds
return None
......@@ -66,7 +67,7 @@ class SNMPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def snmp_create(self, target: str=None, port: int=None) -> SNMPCli:
def snmp_create(self, target: str = None, port: int = None) -> SNMPCli:
""" Create SNMP client
:param str target: target SNMP server ip address
......
......@@ -13,7 +13,6 @@ from routersploit.core.exploit.printer import print_success
from routersploit.core.exploit.printer import print_error
from routersploit.core.exploit.utils import random_text
SSH_TIMEOUT = 8.0
......@@ -38,7 +37,7 @@ class SSHCli(object):
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def login(self, username: str, password: str, retries: int=1) -> bool:
def login(self, username: str, password: str, retries: int = 1) -> bool:
""" Login to SSH server
:param str username: SSH account username
......@@ -49,22 +48,28 @@ class SSHCli(object):
for _ in range(retries):
try:
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT, username=username, password=password, look_for_keys=False)
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT,
username=username, password=password, look_for_keys=False)
except paramiko.AuthenticationException:
print_error(self.peer, "SSH Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
print_error(self.peer,
"SSH Authentication Failed - Username: '{}' Password: '{}'".format(username, password),
verbose=self.verbosity)
self.ssh_client.close()
break
except Exception as err:
print_error(self.peer, "SSH Error while authenticating", err, verbose=self.verbosity)
else:
print_success(self.peer, "SSH Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
print_success(self.peer,
"SSH Authentication Successful - Username: '{}' Password: '{}'".format(username,
password),
verbose=self.verbosity)
return True
self.ssh_client.close()
return False
def login_pkey(self, username: str, priv_key: str, retries: int=1) -> bool:
def login_pkey(self, username: str, priv_key: str, retries: int = 1) -> bool:
""" Login to SSH server with private key
:param str username: SSH account username
......@@ -82,13 +87,19 @@ class SSHCli(object):
for _ in range(retries):
try:
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT, username=username, pkey=priv_key, look_for_keys=False)
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT,
username=username, pkey=priv_key, look_for_keys=False)
except paramiko.AuthenticationException:
print_error(self.peer, "SSH Authentication Failed - Username: '{}' auth with private key".format(username), verbose=self.verbosity)
print_error(self.peer,
"SSH Authentication Failed - Username: '{}' auth with private key".format(username),
verbose=self.verbosity)
except Exception as err:
print_error(self.peer, "SSH Error while authenticated by using private key", err, verbose=self.verbosity)
print_error(self.peer, "SSH Error while authenticated by using private key", err,
verbose=self.verbosity)
else:
print_success(self.peer, "SSH Authentication Successful - Username: '{}' with private key".format(username), verbose=self.verbosity)
print_success(self.peer,
"SSH Authentication Successful - Username: '{}' with private key".format(username),
verbose=self.verbosity)
return True
self.ssh_client.close()
......@@ -102,7 +113,8 @@ class SSHCli(object):
"""
try:
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, username="root", password=random_text(12), look_for_keys=False)
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, username="root",
password=random_text(12), look_for_keys=False)
except paramiko.AuthenticationException:
self.ssh_client.close()
return True
......@@ -159,7 +171,8 @@ class SSHCli(object):
return fp_content.getvalue()
except Exception as err:
print_error(self.peer, "SSH Error while retrieving file content from the server", err, verbose=self.verbosity)
print_error(self.peer, "SSH Error while retrieving file content from the server", err,
verbose=self.verbosity)
return None
......@@ -301,7 +314,7 @@ class SSHClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def ssh_create(self, target: str=None, port: int=None) -> SSHCli:
def ssh_create(self, target: str = None, port: int = None) -> SSHCli:
""" Create SSH client
:param str target: target SSH server ip address
......
......@@ -8,14 +8,13 @@ from routersploit.core.exploit.printer import print_error
from routersploit.core.exploit.utils import is_ipv4
from routersploit.core.exploit.utils import is_ipv6
TCP_SOCKET_TIMEOUT = 8.0
class TCPCli(object):
""" TCP Client provides methods to handle communication with TCP server """
def __init__(self, tcp_target: str, tcp_port: int, verbosity: bool=False) -> None:
def __init__(self, tcp_target: str, tcp_port: int, verbosity: bool = False) -> None:
""" TCP client constructor
:param str tcp_target: target TCP server ip address
......@@ -131,7 +130,7 @@ class TCPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def tcp_create(self, target: str=None, port: int=None) -> TCPCli:
def tcp_create(self, target: str = None, port: int = None) -> TCPCli:
""" Creates TCP client
:param str target: target TCP server ip address
......
......@@ -6,7 +6,6 @@ from routersploit.core.exploit.option import OptBool
from routersploit.core.exploit.printer import print_success
from routersploit.core.exploit.printer import print_error
TELNET_TIMEOUT = 30.0
......@@ -44,7 +43,7 @@ class TelnetCli(object):
return False
def login(self, username: str, password: str, retries: int=1) -> bool:
def login(self, username: str, password: str, retries: int = 1) -> bool:
""" Login to Telnet server
:param str username: Telnet account username
......@@ -66,11 +65,18 @@ class TelnetCli(object):
(i, obj, res) = self.telnet_client.expect([b"Incorrect", b"incorrect"], 5)
if i == -1 and any([x in res for x in [b"#", b"$", b">"]]) or len(res) > 500: # big banner e.g. mikrotik
print_success(self.peer, "Telnet Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
if i == -1 and any([x in res for x in [b"#", b"$", b">"]]) or len(
res) > 500: # big banner e.g. mikrotik
print_success(self.peer,
"Telnet Authentication Successful - Username: '{}' Password: '{}'".format(username,
password),
verbose=self.verbosity)
return True
else:
print_error(self.peer, "Telnet Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
print_error(self.peer,
"Telnet Authentication Failed - Username: '{}' Password: '{}'".format(username,
password),
verbose=self.verbosity)
break
except Exception as err:
print_error(self.peer, "Telnet Error while authenticating to the server", err, verbose=self.verbosity)
......@@ -154,7 +160,7 @@ class TelnetClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def telnet_create(self, target: str=None, port: int=None) -> TelnetCli:
def telnet_create(self, target: str = None, port: int = None) -> TelnetCli:
""" Create Telnet client
:param str target: target Telnet ip address
......
......@@ -7,14 +7,13 @@ from routersploit.core.exploit.printer import print_error
from routersploit.core.exploit.utils import is_ipv4
from routersploit.core.exploit.utils import is_ipv6
UDP_SOCKET_TIMEOUT = 8.0
class UDPCli(object):
""" UDP Client provides methods to handle communication with UDP server """
def __init__(self, udp_target: str, udp_port: int, verbosity: bool=False) -> None:
def __init__(self, udp_target: str, udp_port: int, verbosity: bool = False) -> None:
""" UDP client constructor
:param str udp_target: target UDP server ip address
......@@ -91,7 +90,7 @@ class UDPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def udp_create(self, target: str=None, port: int=None) -> UDPCli:
def udp_create(self, target: str = None, port: int = None) -> UDPCli:
""" Create UDP client
:param str target: target UDP server ip address
......