python-waitress/cve-2022-24761.diff
2022-04-19 01:07:38 +00:00

393 lines
15 KiB
Diff

diff -Nru waitress-2.0.0/src/waitress/parser.py waitress-2.0.0.fixed/src/waitress/parser.py
--- waitress-2.0.0/src/waitress/parser.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/src/waitress/parser.py 2022-04-18 08:04:12.173729937 +0000
@@ -23,6 +23,7 @@
from waitress.buffers import OverflowableBuffer
from waitress.receiver import ChunkedReceiver, FixedStreamReceiver
+from waitress.rfc7230 import HEADER_FIELD_RE, ONLY_DIGIT_RE
from waitress.utilities import (
BadRequest,
RequestEntityTooLarge,
@@ -31,8 +32,6 @@
find_double_newline,
)
-from .rfc7230 import HEADER_FIELD
-
def unquote_bytes_to_wsgi(bytestring):
return unquote_to_bytes(bytestring).decode("latin-1")
@@ -221,7 +220,7 @@
headers = self.headers
for line in lines:
- header = HEADER_FIELD.match(line)
+ header = HEADER_FIELD_RE.match(line)
if not header:
raise ParsingError("Invalid header")
@@ -314,11 +313,12 @@
self.connection_close = True
if not self.chunked:
- try:
- cl = int(headers.get("CONTENT_LENGTH", 0))
- except ValueError:
+ cl = headers.get("CONTENT_LENGTH", "0")
+
+ if not ONLY_DIGIT_RE.match(cl.encode("latin-1")):
raise ParsingError("Content-Length is invalid")
+ cl = int(cl)
self.content_length = cl
if cl > 0:
diff -Nru waitress-2.0.0/src/waitress/receiver.py waitress-2.0.0.fixed/src/waitress/receiver.py
--- waitress-2.0.0/src/waitress/receiver.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/src/waitress/receiver.py 2022-04-18 08:04:12.173729937 +0000
@@ -14,6 +14,7 @@
"""Data Chunk Receiver
"""
+from waitress.rfc7230 import CHUNK_EXT_RE, ONLY_HEXDIG_RE
from waitress.utilities import BadRequest, find_double_newline
@@ -110,6 +111,7 @@
s = b""
else:
self.chunk_end = b""
+
if pos == 0:
# Chop off the terminating CR LF from the chunk
s = s[2:]
@@ -133,20 +135,32 @@
line = s[:pos]
s = s[pos + 2 :]
self.control_line = b""
- line = line.strip()
if line:
# Begin a new chunk.
semi = line.find(b";")
if semi >= 0:
- # discard extension info.
+ extinfo = line[semi:]
+ valid_ext_info = CHUNK_EXT_RE.match(extinfo)
+
+ if not valid_ext_info:
+ self.error = BadRequest("Invalid chunk extension")
+ self.all_chunks_received = True
+
+ break
+
line = line[:semi]
- try:
- sz = int(line.strip(), 16) # hexadecimal
- except ValueError: # garbage in input
- self.error = BadRequest("garbage in chunked encoding input")
- sz = 0
+
+ if not ONLY_HEXDIG_RE.match(line):
+ self.error = BadRequest("Invalid chunk size")
+ self.all_chunks_received = True
+
+ break
+
+ # Can not fail due to matching against the regular
+ # expression above
+ sz = int(line, 16) # hexadecimal
if sz > 0:
# Start a new chunk.
diff -Nru waitress-2.0.0/src/waitress/rfc7230.py waitress-2.0.0.fixed/src/waitress/rfc7230.py
--- waitress-2.0.0/src/waitress/rfc7230.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/src/waitress/rfc7230.py 2022-04-18 08:04:12.173729937 +0000
@@ -5,6 +5,9 @@
import re
+HEXDIG = "[0-9a-fA-F]"
+DIGIT = "[0-9]"
+
WS = "[ \t]"
OWS = WS + "{0,}?"
RWS = WS + "{1,}?"
@@ -25,6 +28,12 @@
# ; visible (printing) characters
VCHAR = r"\x21-\x7e"
+# The '\\' between \x5b and \x5d is needed to escape \x5d (']')
+QDTEXT = "[\t \x21\x23-\x5b\\\x5d-\x7e" + OBS_TEXT + "]"
+
+QUOTED_PAIR = r"\\" + "([\t " + VCHAR + OBS_TEXT + "])"
+QUOTED_STRING = '"(?:(?:' + QDTEXT + ")|(?:" + QUOTED_PAIR + '))*"'
+
# header-field = field-name ":" OWS field-value OWS
# field-name = token
# field-value = *( field-content / obs-fold )
@@ -43,8 +52,24 @@
# Which allows the field value here to just see if there is even a value in the first place
FIELD_VALUE = "(?:" + FIELD_CONTENT + ")?"
-HEADER_FIELD = re.compile(
+# chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
+# chunk-ext-name = token
+# chunk-ext-val = token / quoted-string
+
+CHUNK_EXT_NAME = TOKEN
+CHUNK_EXT_VAL = "(?:" + TOKEN + ")|(?:" + QUOTED_STRING + ")"
+CHUNK_EXT = (
+ "(?:;(?P<extension>" + CHUNK_EXT_NAME + ")(?:=(?P<value>" + CHUNK_EXT_VAL + "))?)*"
+)
+
+# Pre-compiled regular expressions for use elsewhere
+ONLY_HEXDIG_RE = re.compile(("^" + HEXDIG + "+$").encode("latin-1"))
+ONLY_DIGIT_RE = re.compile(("^" + DIGIT + "+$").encode("latin-1"))
+HEADER_FIELD_RE = re.compile(
(
"^(?P<name>" + TOKEN + "):" + OWS + "(?P<value>" + FIELD_VALUE + ")" + OWS + "$"
).encode("latin-1")
)
+QUOTED_PAIR_RE = re.compile(QUOTED_PAIR)
+QUOTED_STRING_RE = re.compile(QUOTED_STRING)
+CHUNK_EXT_RE = re.compile(("^" + CHUNK_EXT + "$").encode("latin-1"))
diff -Nru waitress-2.0.0/src/waitress/utilities.py waitress-2.0.0.fixed/src/waitress/utilities.py
--- waitress-2.0.0/src/waitress/utilities.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/src/waitress/utilities.py 2022-04-18 08:04:12.173729937 +0000
@@ -22,7 +22,7 @@
import stat
import time
-from .rfc7230 import OBS_TEXT, VCHAR
+from .rfc7230 import QUOTED_PAIR_RE, QUOTED_STRING_RE
logger = logging.getLogger("waitress")
queue_logger = logging.getLogger("waitress.queue")
@@ -216,32 +216,10 @@
return retval
-# RFC 5234 Appendix B.1 "Core Rules":
-# VCHAR = %x21-7E
-# ; visible (printing) characters
-vchar_re = VCHAR
-
-# RFC 7230 Section 3.2.6 "Field Value Components":
-# quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
-# qdtext = HTAB / SP /%x21 / %x23-5B / %x5D-7E / obs-text
-# obs-text = %x80-FF
-# quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
-obs_text_re = OBS_TEXT
-
-# The '\\' between \x5b and \x5d is needed to escape \x5d (']')
-qdtext_re = "[\t \x21\x23-\x5b\\\x5d-\x7e" + obs_text_re + "]"
-
-quoted_pair_re = r"\\" + "([\t " + vchar_re + obs_text_re + "])"
-quoted_string_re = '"(?:(?:' + qdtext_re + ")|(?:" + quoted_pair_re + '))*"'
-
-quoted_string = re.compile(quoted_string_re)
-quoted_pair = re.compile(quoted_pair_re)
-
-
def undquote(value):
if value.startswith('"') and value.endswith('"'):
# So it claims to be DQUOTE'ed, let's validate that
- matches = quoted_string.match(value)
+ matches = QUOTED_STRING_RE.match(value)
if matches and matches.end() == len(value):
# Remove the DQUOTE's from the value
@@ -249,7 +227,7 @@
# Remove all backslashes that are followed by a valid vchar or
# obs-text
- value = quoted_pair.sub(r"\1", value)
+ value = QUOTED_PAIR_RE.sub(r"\1", value)
return value
elif not value.startswith('"') and not value.endswith('"'):
diff -Nru waitress-2.0.0/tests/test_functional.py waitress-2.0.0.fixed/tests/test_functional.py
--- waitress-2.0.0/tests/test_functional.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/tests/test_functional.py 2022-04-18 08:04:12.173729937 +0000
@@ -312,7 +312,7 @@
self.assertFalse("transfer-encoding" in headers)
def test_chunking_request_with_content(self):
- control_line = b"20;\r\n" # 20 hex = 32 dec
+ control_line = b"20\r\n" # 20 hex = 32 dec
s = b"This string has 32 characters.\r\n"
expected = s * 12
header = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
@@ -332,7 +332,7 @@
self.assertFalse("transfer-encoding" in headers)
def test_broken_chunked_encoding(self):
- control_line = b"20;\r\n" # 20 hex = 32 dec
+ control_line = b"20\r\n" # 20 hex = 32 dec
s = b"This string has 32 characters.\r\n"
to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
to_send += control_line + s + b"\r\n"
@@ -355,8 +355,52 @@
self.send_check_error(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
+ def test_broken_chunked_encoding_invalid_hex(self):
+ control_line = b"0x20\r\n" # 20 hex = 32 dec
+ s = b"This string has 32 characters.\r\n"
+ to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
+ to_send += control_line + s + b"\r\n"
+ self.connect()
+ self.sock.send(to_send)
+ with self.sock.makefile("rb", 0) as fp:
+ line, headers, response_body = read_http(fp)
+ self.assertline(line, "400", "Bad Request", "HTTP/1.1")
+ cl = int(headers["content-length"])
+ self.assertEqual(cl, len(response_body))
+ self.assertIn(b"Invalid chunk size", response_body)
+ self.assertEqual(
+ sorted(headers.keys()),
+ ["connection", "content-length", "content-type", "date", "server"],
+ )
+ self.assertEqual(headers["content-type"], "text/plain")
+ # connection has been closed
+ self.send_check_error(to_send)
+ self.assertRaises(ConnectionClosed, read_http, fp)
+
+ def test_broken_chunked_encoding_invalid_extension(self):
+ control_line = b"20;invalid=\r\n" # 20 hex = 32 dec
+ s = b"This string has 32 characters.\r\n"
+ to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
+ to_send += control_line + s + b"\r\n"
+ self.connect()
+ self.sock.send(to_send)
+ with self.sock.makefile("rb", 0) as fp:
+ line, headers, response_body = read_http(fp)
+ self.assertline(line, "400", "Bad Request", "HTTP/1.1")
+ cl = int(headers["content-length"])
+ self.assertEqual(cl, len(response_body))
+ self.assertIn(b"Invalid chunk extension", response_body)
+ self.assertEqual(
+ sorted(headers.keys()),
+ ["connection", "content-length", "content-type", "date", "server"],
+ )
+ self.assertEqual(headers["content-type"], "text/plain")
+ # connection has been closed
+ self.send_check_error(to_send)
+ self.assertRaises(ConnectionClosed, read_http, fp)
+
def test_broken_chunked_encoding_missing_chunk_end(self):
- control_line = b"20;\r\n" # 20 hex = 32 dec
+ control_line = b"20\r\n" # 20 hex = 32 dec
s = b"This string has 32 characters.\r\n"
to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
to_send += control_line + s
diff -Nru waitress-2.0.0/tests/test_parser.py waitress-2.0.0.fixed/tests/test_parser.py
--- waitress-2.0.0/tests/test_parser.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/tests/test_parser.py 2022-04-18 08:04:12.173729937 +0000
@@ -155,7 +155,7 @@
b"Transfer-Encoding: chunked\r\n"
b"X-Foo: 1\r\n"
b"\r\n"
- b"1d;\r\n"
+ b"1d\r\n"
b"This string has 29 characters\r\n"
b"0\r\n\r\n"
)
@@ -188,6 +188,26 @@
try:
self.parser.parse_header(data)
+ except ParsingError as e:
+ self.assertIn("Content-Length is invalid", e.args[0])
+ else: # pragma: nocover
+ self.assertTrue(False)
+
+ def test_parse_header_bad_content_length_plus(self):
+ data = b"GET /foobar HTTP/8.4\r\ncontent-length: +10\r\n"
+
+ try:
+ self.parser.parse_header(data)
+ except ParsingError as e:
+ self.assertIn("Content-Length is invalid", e.args[0])
+ else: # pragma: nocover
+ self.assertTrue(False)
+
+ def test_parse_header_bad_content_length_minus(self):
+ data = b"GET /foobar HTTP/8.4\r\ncontent-length: -10\r\n"
+
+ try:
+ self.parser.parse_header(data)
except ParsingError as e:
self.assertIn("Content-Length is invalid", e.args[0])
else: # pragma: nocover
diff -Nru waitress-2.0.0/tests/test_receiver.py waitress-2.0.0.fixed/tests/test_receiver.py
--- waitress-2.0.0/tests/test_receiver.py 2021-03-08 07:24:23.000000000 +0000
+++ waitress-2.0.0.fixed/tests/test_receiver.py 2022-04-18 08:04:12.173729937 +0000
@@ -1,5 +1,7 @@
import unittest
+import pytest
+
class TestFixedStreamReceiver(unittest.TestCase):
def _makeOne(self, cl, buf):
@@ -226,6 +228,55 @@
self.assertEqual(inst.error, None)
+class TestChunkedReceiverParametrized:
+ def _makeOne(self, buf):
+ from waitress.receiver import ChunkedReceiver
+
+ return ChunkedReceiver(buf)
+
+ @pytest.mark.parametrize(
+ "invalid_extension", [b"\n", b"invalid=", b"\r", b"invalid = true"]
+ )
+ def test_received_invalid_extensions(self, invalid_extension):
+ from waitress.utilities import BadRequest
+
+ buf = DummyBuffer()
+ inst = self._makeOne(buf)
+ data = b"4;" + invalid_extension + b"\r\ntest\r\n"
+ result = inst.received(data)
+ assert result == len(data)
+ assert inst.error.__class__ == BadRequest
+ assert inst.error.body == "Invalid chunk extension"
+
+ @pytest.mark.parametrize(
+ "valid_extension", [b"test", b"valid=true", b"valid=true;other=true"]
+ )
+ def test_received_valid_extensions(self, valid_extension):
+ # While waitress may ignore extensions in Chunked Encoding, we do want
+ # to make sure that we don't fail when we do encounter one that is
+ # valid
+ buf = DummyBuffer()
+ inst = self._makeOne(buf)
+ data = b"4;" + valid_extension + b"\r\ntest\r\n"
+ result = inst.received(data)
+ assert result == len(data)
+ assert inst.error == None
+
+ @pytest.mark.parametrize(
+ "invalid_size", [b"0x04", b"+0x04", b"x04", b"+04", b" 04", b" 0x04"]
+ )
+ def test_received_invalid_size(self, invalid_size):
+ from waitress.utilities import BadRequest
+
+ buf = DummyBuffer()
+ inst = self._makeOne(buf)
+ data = invalid_size + b"\r\ntest\r\n"
+ result = inst.received(data)
+ assert result == len(data)
+ assert inst.error.__class__ == BadRequest
+ assert inst.error.body == "Invalid chunk size"
+
+
class DummyBuffer:
def __init__(self, data=None):
if data is None: