# Test cases for dscp policy # Copyright (c) 2021, Jouni Malinen # Copyright (c) 2021, The Linux Foundation # # This software may be distributed under the terms of the BSD license. # See README for more details. import struct import time import sys import socket import hostapd from wpasupplicant import WpaSupplicant from utils import * def register_dscp_req(hapd): type = 0x00d0 match = "7e506f9a1a" if "OK" not in hapd.request("REGISTER_FRAME %04x %s" % (type, match)): raise Exception("Could not register frame reception for Vendor specific protected type") def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie, truncate=False): type = 0 subtype = 13 category = 126 oui_type = 0x506f9a1a if truncate: req = struct.pack('>BLBB', category, oui_type, oui_subtype, dialog_token) else: req = struct.pack('>BLBBB', category, oui_type, oui_subtype, dialog_token, req_control) if qos_ie: req += qos_ie msg = {} msg['fc'] = 0x00d0 msg['sa'] = hapd.own_addr() msg['da'] = da msg['bssid'] = hapd.own_addr() msg['type'] = type msg['subtype'] = subtype msg['payload'] = req hapd.mgmt_tx(msg) ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) if ev is None or "stype=13 ok=1" not in ev: raise Exception("No DSCP Policy Request sent") def prepare_qos_ie(policy_id, req_type, dscp, start_port=0, end_port=0, frame_classifier=None, frame_class_len=0, domain_name=None): qos_elem_oui_type = 0x229a6f50 qos_elem_id = 221 if policy_id: qos_attr = struct.pack('BBBBB', 2, 3, policy_id, req_type, dscp) qos_attr_len = 5 else: qos_attr = 0 qos_attr_len = 0 if start_port and end_port: port_range_attr = struct.pack('>BBHH', 1, 4, start_port, end_port) if qos_attr: qos_attr += port_range_attr else: qos_attr = port_range_attr qos_attr_len += 6 if frame_classifier and frame_class_len: tclas_attr = struct.pack('>BB%ds' % (len(frame_classifier),), 3, len(frame_classifier), frame_classifier) if qos_attr: qos_attr += tclas_attr else: qos_attr = tclas_attr qos_attr_len += 2 + len(frame_classifier) if domain_name: s = bytes(domain_name, 'utf-8') domain_name_attr = struct.pack('>BB%ds' % (len(s),), 4, len(s), s) if qos_attr: qos_attr += domain_name_attr else: qos_attr = domain_name_attr qos_attr_len += 2 + len(s) qos_attr_len += 4 qos_ie = struct.pack('CTRL-EVENT-DSCP-POLICY request_start") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY remove policy_id=1") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") # Request Type: Reserved dialog_token += 1 qos_ie = prepare_qos_ie(1, 2, 36) send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") def test_dscp_response(dev, apdev): """DSCP Policy Response""" # Positive tests # AP with DSCP Capabilities params = {"ssid": "dscp", "ext_capa": 6*"00" + "40", "assocresp_elements": "dd06506f9a230101", "vendor_elements": "dd06506f9a230101"} hapd = ap_sta_connectivity(dev, apdev, params) da = dev[0].own_addr() # Sending solicited DSCP response after receiving DSCP request dialog_token = 1 domain_name = "example.com" ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1") ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2") frame_classifier_start = [4,91,4] frame_classifier_end = [12,34,12,34,0,17,0] frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end) frame_len = len(frame_classifier) qos_ie = prepare_qos_ie(1, 0, 22, 0, 0, frame_classifier, frame_len, domain_name) ie = prepare_qos_ie(4, 0, 32, 12345, 23456, 0, 0, domain_name) qos_ie += ie send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) ev = dev[0].wait_event(["CTRL-EVENT-DSCP-POLICY"], timeout=5) if ev is None: raise Exception("DSCP event not reported") if "request_start" not in ev: raise Exception("Unexpected DSCP event: " + ev) cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=4 status=0" if "OK" not in dev[0].request(cmd): raise Exception("Sending DSCP Response failed") response = b'\x7e\x50\x6f\x9a\x1a\x02\x01\x00\x02\x01\x00\x04\x00' handle_dscp_response(hapd, response) # Unsolicited DSCP Response without status duples cmd = "DSCP_RESP reset more" if "OK" not in dev[0].request(cmd): raise Exception("Sending DSCP Response failed") response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x03\x00' handle_dscp_response(hapd, response) # Unsolicited DSCP Response with one status duple cmd = "DSCP_RESP policy_id=2 status=0" if "OK" not in dev[0].request(cmd): raise Exception("Sending DSCP Response failed") response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x00\x01\x02\x00' handle_dscp_response(hapd, response) # Negative tests # Send solicited DSCP Response without prior DSCP request cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=5 status=0" if "FAIL" not in dev[0].request(cmd): raise Exception("Able to send invalid DSCP response") def test_dscp_unsolicited_req_at_assoc(dev, apdev): """DSCP Policy and unsolicited request at association""" params = {"ssid": "dscp", "ext_capa": 6*"00" + "40", "assocresp_elements": "dd06506f9a230103", "vendor_elements": "dd06506f9a230103"} hapd = ap_sta_connectivity(dev, apdev, params) da = dev[0].own_addr() dialog_token = 1 qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456) send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=36 ip_version=0 start_port=12345 end_port=23456") validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") cmd = "DSCP_QUERY wildcard" if "OK" not in dev[0].request(cmd): raise Exception("Sending DSCP Query failed") def test_dscp_missing_unsolicited_req_at_assoc(dev, apdev): """DSCP Policy and missing unsolicited request at association""" params = {"ssid": "dscp", "ext_capa": 6*"00" + "40", "assocresp_elements": "dd06506f9a230103", "vendor_elements": "dd06506f9a230103"} hapd = ap_sta_connectivity(dev, apdev, params) da = dev[0].own_addr() cmd = "DSCP_QUERY wildcard" if "FAIL" not in dev[0].request(cmd): raise Exception("DSCP_QUERY accepted during wait for unsolicited requesdt") time.sleep(5) validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_wait end") cmd = "DSCP_QUERY wildcard" if "OK" not in dev[0].request(cmd): raise Exception("Sending DSCP Query failed")