netbox-agent/netbox_agent/lldp.py

72 lines
2.6 KiB
Python
Raw Normal View History

import logging
2021-07-09 11:10:43 +02:00
import subprocess
from netbox_agent.misc import is_tool
2019-08-26 11:05:41 +02:00
2024-10-21 12:55:54 +02:00
class LLDP:
2020-02-02 20:08:56 +01:00
def __init__(self, output=None):
2024-10-21 12:55:54 +02:00
if not is_tool("lldpctl"):
logging.debug("lldpd package seems to be missing or daemon not running.")
2020-02-02 20:08:56 +01:00
if output:
self.output = output
else:
2024-10-21 12:55:54 +02:00
self.output = subprocess.getoutput("lldpctl -f keyvalue")
2019-08-26 11:05:41 +02:00
self.data = self.parse()
def parse(self):
output_dict = {}
2019-09-09 15:32:14 +02:00
vlans = {}
vid = None
2019-08-26 11:05:41 +02:00
for entry in self.output.splitlines():
2024-10-21 12:55:54 +02:00
if "=" not in entry:
2019-08-26 11:05:41 +02:00
continue
path, value = entry.strip().split("=", 1)
2019-09-09 15:32:14 +02:00
split_path = path.split(".")
interface = split_path[1]
path_components, final = split_path[:-1], split_path[-1]
2019-08-26 11:05:41 +02:00
current_dict = output_dict
2019-09-09 15:32:14 +02:00
if vlans.get(interface) is None:
vlans[interface] = {}
2019-08-26 11:05:41 +02:00
for path_component in path_components:
if not isinstance(current_dict.get(path_component), dict):
current_dict[path_component] = {}
current_dict = current_dict.get(path_component)
2024-10-21 12:55:54 +02:00
if "vlan-id" in path:
2019-09-09 15:32:14 +02:00
vid = value
vlans[interface][value] = vlans[interface].get(vid, {})
2024-10-21 12:55:54 +02:00
elif path.endswith("vlan"):
vid = value.replace("vlan-", "")
2019-09-09 15:32:14 +02:00
vlans[interface][vid] = vlans[interface].get(vid, {})
2024-10-21 12:55:54 +02:00
elif "pvid" in path:
vlans[interface][vid]["pvid"] = True
if "vlan" not in path:
2019-09-09 15:32:14 +02:00
current_dict[final] = value
for interface, vlan in vlans.items():
2024-10-21 12:55:54 +02:00
output_dict["lldp"][interface]["vlan"] = vlan
if not output_dict:
2024-10-21 12:55:54 +02:00
logging.debug("No LLDP output, please check your network config.")
2019-08-26 11:05:41 +02:00
return output_dict
def get_switch_ip(self, interface):
# lldp.eth0.chassis.mgmt-ip=100.66.7.222
2024-10-21 12:55:54 +02:00
if self.data["lldp"].get(interface) is None:
2019-08-26 11:05:41 +02:00
return None
2024-10-21 12:55:54 +02:00
return self.data["lldp"][interface]["chassis"].get("mgmt-ip")
2019-08-26 11:05:41 +02:00
def get_switch_port(self, interface):
# lldp.eth0.port.descr=GigabitEthernet1/0/1
2024-10-21 12:55:54 +02:00
if self.data["lldp"].get(interface) is None:
2019-08-26 11:05:41 +02:00
return None
2024-10-21 12:55:54 +02:00
if self.data["lldp"][interface]["port"].get("ifname"):
return self.data["lldp"][interface]["port"]["ifname"]
return self.data["lldp"][interface]["port"]["descr"]
2019-08-26 11:05:41 +02:00
def get_switch_vlan(self, interface):
# lldp.eth0.vlan.vlan-id=296
2024-10-21 12:55:54 +02:00
if self.data["lldp"].get(interface) is None:
2019-08-26 11:05:41 +02:00
return None
2024-10-21 12:55:54 +02:00
return self.data["lldp"][interface]["vlan"]