forked from DGNum/liminix
ebc5d6a3e0
* move shell script to a runCommand * multicast needs special options to run on loopback (nix-build sandbox disables non-local network interfaces)
87 lines
2.3 KiB
Python
87 lines
2.3 KiB
Python
# forge packets for testing liminix and send them via the qemu udp
|
|
# multicast socket interface
|
|
|
|
MCAST_GRP = '230.0.0.1'
|
|
MCAST_PORT = 1235
|
|
MULTICAST_TTL = 2
|
|
|
|
TIMEOUT = 10 # seconds
|
|
|
|
from warnings import filterwarnings
|
|
filterwarnings("ignore")
|
|
|
|
import random
|
|
import binascii
|
|
import socket
|
|
import time
|
|
import json
|
|
|
|
from builtins import bytes, bytearray
|
|
|
|
from scapy.all import Ether, IP, UDP, BOOTP, DHCP, sendp, send, raw
|
|
|
|
class JSONEncoderWithBytes(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, (bytes, bytearray)):
|
|
return obj.decode('utf-8')
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
def dhcp_option(pkt, label):
|
|
if pkt.haslayer(DHCP):
|
|
for i in pkt[DHCP].options:
|
|
l, v = i
|
|
if l == label:
|
|
return v
|
|
return None
|
|
|
|
def is_dhcp_offer(pkt):
|
|
val = dhcp_option(pkt, 'message-type')
|
|
return (val == 2)
|
|
|
|
|
|
|
|
def mac_to_bytes(mac_addr: str) -> bytes:
|
|
""" Converts a MAC address string to bytes.
|
|
"""
|
|
return int(mac_addr.replace(":", ""), 16).to_bytes(6, "big")
|
|
|
|
|
|
client_mac = "01:02:03:04:05:06"
|
|
discover = (
|
|
Ether(dst="ff:ff:ff:ff:ff:ff") /
|
|
IP(src="0.0.0.0", dst="255.255.255.255") /
|
|
UDP(sport=68, dport=67) /
|
|
BOOTP(
|
|
chaddr=mac_to_bytes(client_mac),
|
|
xid=random.randint(1, 2**32-1),
|
|
) /
|
|
DHCP(options=[("message-type", "discover"), "end"])
|
|
)
|
|
payload = raw(discover)
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
|
sock.settimeout(TIMEOUT)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, MULTICAST_TTL)
|
|
|
|
sock.bind((MCAST_GRP, MCAST_PORT))
|
|
sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton('127.0.0.1'))
|
|
sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP,
|
|
socket.inet_aton(MCAST_GRP) + socket.inet_aton('127.0.0.1'))
|
|
|
|
endtime = time.time() + TIMEOUT
|
|
sock.sendto(payload, (MCAST_GRP, MCAST_PORT))
|
|
|
|
while time.time() < endtime:
|
|
try:
|
|
data, addr = sock.recvfrom(1024)
|
|
except socket.error as e:
|
|
print('recv exception: ', e)
|
|
else:
|
|
reply = Ether(data)
|
|
if is_dhcp_offer(reply):
|
|
opts = dict([o for o in reply[DHCP].options if type(o) is tuple])
|
|
print(json.dumps(opts, cls=JSONEncoderWithBytes))
|
|
exit(0)
|
|
exit(1)
|