347 lines
12 KiB
Diff
347 lines
12 KiB
Diff
From b41691d0e546795bda994d94091b8e0a03ab96d6 Mon Sep 17 00:00:00 2001
|
|
From: Joseph Sutton <josephsutton@catalyst.net.nz>
|
|
Date: Tue, 7 Jun 2022 17:35:35 +1200
|
|
Subject: [PATCH 02/15] CVE-2022-32743 tests/py_credentials: Add tests for
|
|
setting dNSHostName with LogonGetDomainInfo()
|
|
|
|
Test that the value is properly validated, and that it can be set
|
|
regardless of rights on the account.
|
|
|
|
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14833
|
|
|
|
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
|
|
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
|
|
---
|
|
python/samba/tests/py_credentials.py | 281 +++++++++++++++++++++++++++-
|
|
selftest/knownfail.d/netlogon-dns-host-name | 2 +
|
|
2 files changed, 281 insertions(+), 2 deletions(-)
|
|
create mode 100644 selftest/knownfail.d/netlogon-dns-host-name
|
|
|
|
diff --git a/python/samba/tests/py_credentials.py b/python/samba/tests/py_credentials.py
|
|
index ecb8271..0c442b8 100644
|
|
--- a/python/samba/tests/py_credentials.py
|
|
+++ b/python/samba/tests/py_credentials.py
|
|
@@ -18,6 +18,8 @@
|
|
from samba.tests import TestCase, delete_force
|
|
import os
|
|
|
|
+import ldb
|
|
+
|
|
import samba
|
|
from samba.auth import system_session
|
|
from samba.credentials import (
|
|
@@ -25,7 +27,7 @@ from samba.credentials import (
|
|
CLI_CRED_NTLMv2_AUTH,
|
|
CLI_CRED_NTLM_AUTH,
|
|
DONT_USE_KERBEROS)
|
|
-from samba.dcerpc import netlogon, ntlmssp, srvsvc
|
|
+from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc
|
|
from samba.dcerpc.netlogon import (
|
|
netr_Authenticator,
|
|
netr_WorkstationInformation,
|
|
@@ -36,10 +38,11 @@ from samba.dsdb import (
|
|
UF_WORKSTATION_TRUST_ACCOUNT,
|
|
UF_PASSWD_NOTREQD,
|
|
UF_NORMAL_ACCOUNT)
|
|
-from samba.ndr import ndr_pack
|
|
+from samba.ndr import ndr_pack, ndr_unpack
|
|
from samba.samdb import SamDB
|
|
from samba import NTSTATUSError, ntstatus
|
|
from samba.common import get_string
|
|
+from samba.sd_utils import SDUtils
|
|
|
|
import ctypes
|
|
|
|
@@ -105,6 +108,280 @@ class PyCredentialsTests(TestCase):
|
|
(authenticator, subsequent) = self.get_authenticator(c)
|
|
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
|
|
|
|
+ # Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
|
|
+ def test_set_dns_hostname_valid(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ domain_hostname = self.ldb.domain_dns_name()
|
|
+
|
|
+ new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
|
|
+ new_dns_hostname = new_dns_hostname.encode('utf-8')
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertEqual(new_dns_hostname, got_dns_hostname)
|
|
+
|
|
+ # Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
|
|
+ # when we are denied the right to do so.
|
|
+ def test_set_dns_hostname_valid_denied(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['objectSid'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ machine_sid = ndr_unpack(security.dom_sid,
|
|
+ res[0].get('objectSid', idx=0))
|
|
+
|
|
+ sd_utils = SDUtils(self.ldb)
|
|
+
|
|
+ # Deny Validated Write and Write Property.
|
|
+ mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
|
|
+ f'{machine_sid})')
|
|
+ sd_utils.dacl_add_ace(self.machine_dn, mod)
|
|
+
|
|
+ domain_hostname = self.ldb.domain_dns_name()
|
|
+
|
|
+ new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
|
|
+ new_dns_hostname = new_dns_hostname.encode('utf-8')
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertEqual(new_dns_hostname, got_dns_hostname)
|
|
+
|
|
+ # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
|
|
+ # invalid value, even with Validated Write.
|
|
+ def test_set_dns_hostname_invalid_validated_write(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['objectSid'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ machine_sid = ndr_unpack(security.dom_sid,
|
|
+ res[0].get('objectSid', idx=0))
|
|
+
|
|
+ sd_utils = SDUtils(self.ldb)
|
|
+
|
|
+ # Grant Validated Write.
|
|
+ mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
|
|
+ f'{machine_sid})')
|
|
+ sd_utils.dacl_add_ace(self.machine_dn, mod)
|
|
+
|
|
+ new_dns_hostname = b'invalid'
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertIsNone(got_dns_hostname)
|
|
+
|
|
+ # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
|
|
+ # invalid value, even with Write Property.
|
|
+ def test_set_dns_hostname_invalid_write_property(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['objectSid'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ machine_sid = ndr_unpack(security.dom_sid,
|
|
+ res[0].get('objectSid', idx=0))
|
|
+
|
|
+ sd_utils = SDUtils(self.ldb)
|
|
+
|
|
+ # Grant Write Property.
|
|
+ mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
|
|
+ f'{machine_sid})')
|
|
+ sd_utils.dacl_add_ace(self.machine_dn, mod)
|
|
+
|
|
+ new_dns_hostname = b'invalid'
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertIsNone(got_dns_hostname)
|
|
+
|
|
+ # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
|
|
+ # machine name.
|
|
+ def test_set_dns_hostname_to_machine_name(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ new_dns_hostname = self.machine_name.encode('utf-8')
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertIsNone(got_dns_hostname)
|
|
+
|
|
+ # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
|
|
+ # suffix.
|
|
+ def test_set_dns_hostname_invalid_suffix(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ domain_hostname = self.ldb.domain_dns_name()
|
|
+
|
|
+ new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}'
|
|
+ new_dns_hostname = new_dns_hostname.encode('utf-8')
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String('some OS')
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertIsNone(got_dns_hostname)
|
|
+
|
|
+ # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
|
|
+ # update, but other attributes are still updated.
|
|
+ def test_set_dns_hostname_with_flag(self):
|
|
+ c = self.get_netlogon_connection()
|
|
+ authenticator, subsequent = self.get_authenticator(c)
|
|
+
|
|
+ domain_hostname = self.ldb.domain_dns_name()
|
|
+
|
|
+ new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
|
|
+ new_dns_hostname = new_dns_hostname.encode('utf-8')
|
|
+
|
|
+ operating_system = 'some OS'
|
|
+
|
|
+ query = netr_WorkstationInformation()
|
|
+ query.os_name = lsa.String(operating_system)
|
|
+
|
|
+ query.dns_hostname = new_dns_hostname
|
|
+ query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE
|
|
+
|
|
+ c.netr_LogonGetDomainInfo(
|
|
+ server_name=self.server,
|
|
+ computer_name=self.user_creds.get_workstation(),
|
|
+ credential=authenticator,
|
|
+ return_authenticator=subsequent,
|
|
+ level=1,
|
|
+ query=query)
|
|
+
|
|
+ # Check the result.
|
|
+
|
|
+ res = self.ldb.search(self.machine_dn,
|
|
+ scope=ldb.SCOPE_BASE,
|
|
+ attrs=['dNSHostName',
|
|
+ 'operatingSystem'])
|
|
+ self.assertEqual(1, len(res))
|
|
+
|
|
+ got_dns_hostname = res[0].get('dNSHostName', idx=0)
|
|
+ self.assertIsNone(got_dns_hostname)
|
|
+
|
|
+ got_os = res[0].get('operatingSystem', idx=0)
|
|
+ self.assertEqual(operating_system.encode('utf-8'), got_os)
|
|
+
|
|
def test_SamLogonEx(self):
|
|
c = self.get_netlogon_connection()
|
|
|
|
diff --git a/selftest/knownfail.d/netlogon-dns-host-name b/selftest/knownfail.d/netlogon-dns-host-name
|
|
new file mode 100644
|
|
index 0000000..2d0a0ec
|
|
--- /dev/null
|
|
+++ b/selftest/knownfail.d/netlogon-dns-host-name
|
|
@@ -0,0 +1,2 @@
|
|
+^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_invalid_suffix\(
|
|
+^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_with_flag\(
|
|
--
|
|
1.8.3.1
|
|
|