netbox-agent/netbox_agent/inventory.py

563 lines
19 KiB
Python

import json
import logging
import re
import sys
import traceback
import pynetbox
from netbox_agent.config import config
from netbox_agent.config import netbox_instance as nb
from netbox_agent.lshw import LSHW
from netbox_agent.misc import get_vendor, is_tool
from netbox_agent.raid.hp import HPRaid
from netbox_agent.raid.omreport import OmreportRaid
from netbox_agent.raid.storcli import StorcliRaid
INVENTORY_TAG = {
"cpu": {"name": "hw:cpu", "slug": "hw-cpu"},
"gpu": {"name": "hw:gpu", "slug": "hw-gpu"},
"disk": {"name": "hw:disk", "slug": "hw-disk"},
"interface": {"name": "hw:interface", "slug": "hw-interface"},
"memory": {"name": "hw:memory", "slug": "hw-memory"},
"motherboard": {"name": "hw:motherboard", "slug": "hw-motherboard"},
"raid_card": {"name": "hw:raid_card", "slug": "hw-raid-card"},
}
class Inventory:
"""
Better Inventory items coming, see:
- https://github.com/netbox-community/netbox/issues/3087
- https://github.com/netbox-community/netbox/issues/3333
This class implements for:
* memory
* cpu
* raid cards
* disks
* gpus
methods that:
* get local item
* get netbox item
* create netbox item
* update netbox item
Known issues:
- no scan of non-raid devices
- no scan of NVMe devices
"""
def __init__(self, server, update_expansion=False):
self.create_netbox_tags()
self.server = server
self.update_expansion = update_expansion
netbox_server = self.server.get_netbox_server(update_expansion)
self.device_id = netbox_server.id if netbox_server else None
self.raid = None
self.disks = []
self.lshw = LSHW()
def create_netbox_tags(self):
ret = []
for key, tag in INVENTORY_TAG.items():
nb_tag = nb.extras.tags.get(name=tag["name"])
if not nb_tag:
nb_tag = nb.extras.tags.create(
name=tag["name"],
slug=tag["slug"],
comments=tag["name"],
)
ret.append(nb_tag)
return ret
def find_or_create_manufacturer(self, name):
if name is None:
return None
manufacturer = nb.dcim.manufacturers.get(
name=name,
)
if not manufacturer:
logging.info("Creating missing manufacturer {name}".format(name=name))
manufacturer = nb.dcim.manufacturers.create(
name=name,
slug=re.sub("[^A-Za-z0-9]+", "-", name).lower(),
)
logging.info("Creating missing manufacturer {name}".format(name=name))
return manufacturer
def get_netbox_inventory(self, device_id, tag):
try:
items = nb.dcim.inventory_items.filter(device_id=device_id, tag=tag)
except pynetbox.core.query.RequestError:
logging.info("Tag {tag} is missing, returning empty array.".format(tag=tag))
items = []
return list(items)
def create_netbox_inventory_item(
self, device_id, tags, vendor, name, serial, description
):
manufacturer = self.find_or_create_manufacturer(vendor)
_ = nb.dcim.inventory_items.create(
device=device_id,
manufacturer=manufacturer.id,
discovered=True,
tags=tags,
name="{}".format(name),
serial="{}".format(serial),
description=description,
)
logging.info(
"Creating inventory item {} {}/{} {} ".format(
vendor, name, serial, description
)
)
def get_hw_motherboards(self):
motherboards = []
m = {}
m["serial"] = self.lshw.motherboard_serial
m["vendor"] = self.lshw.vendor
m["name"] = "{} {}".format(self.lshw.vendor, self.lshw.motherboard)
m["description"] = "{} Motherboard".format(self.lshw.motherboard)
motherboards.append(m)
return motherboards
def do_netbox_motherboard(self):
motherboards = self.get_hw_motherboards()
nb_motherboards = self.get_netbox_inventory(
device_id=self.device_id, tag=INVENTORY_TAG["motherboard"]["slug"]
)
for nb_motherboard in nb_motherboards:
if nb_motherboard.serial not in [x["serial"] for x in motherboards]:
logging.info(
"Deleting unknown motherboard {motherboard}/{serial}".format(
motherboard=self.lshw.motherboard,
serial=nb_motherboard.serial,
)
)
nb_motherboard.delete()
# create interfaces that are not in netbox
for motherboard in motherboards:
if motherboard.get("serial") not in [x.serial for x in nb_motherboards]:
self.create_netbox_inventory_item(
device_id=self.device_id,
tags=[{"name": INVENTORY_TAG["motherboard"]["name"]}],
vendor="{}".format(motherboard.get("vendor", "N/A")),
serial="{}".format(motherboard.get("serial", "No SN")),
name="{}".format(motherboard.get("name")),
description="{}".format(motherboard.get("description")),
)
def create_netbox_interface(self, iface):
manufacturer = self.find_or_create_manufacturer(iface["vendor"])
_ = nb.dcim.inventory_items.create(
device=self.device_id,
manufacturer=manufacturer.id,
discovered=True,
tags=[{"name": INVENTORY_TAG["interface"]["name"]}],
name="{}".format(iface["product"]),
serial="{}".format(iface["serial"]),
description="{} {}".format(iface["description"], iface["name"]),
)
def do_netbox_interfaces(self):
nb_interfaces = self.get_netbox_inventory(
device_id=self.device_id, tag=INVENTORY_TAG["interface"]["slug"]
)
interfaces = self.lshw.interfaces
# delete interfaces that are in netbox but not locally
# use the serial_number has the comparison element
for nb_interface in nb_interfaces:
if nb_interface.serial not in [x["serial"] for x in interfaces]:
logging.info(
"Deleting unknown interface {serial}".format(
serial=nb_interface.serial,
)
)
nb_interface.delete()
# create interfaces that are not in netbox
for iface in interfaces:
if iface.get("serial") not in [x.serial for x in nb_interfaces]:
self.create_netbox_interface(iface)
def create_netbox_cpus(self):
for cpu in self.lshw.get_hw_linux("cpu"):
manufacturer = self.find_or_create_manufacturer(cpu["vendor"])
_ = nb.dcim.inventory_items.create(
device=self.device_id,
manufacturer=manufacturer.id,
discovered=True,
tags=[{"name": INVENTORY_TAG["cpu"]["name"]}],
name=cpu["product"],
description="CPU {}".format(cpu["location"]),
# asset_tag=cpu['location']
)
logging.info("Creating CPU model {}".format(cpu["product"]))
def do_netbox_cpus(self):
cpus = self.lshw.get_hw_linux("cpu")
nb_cpus = self.get_netbox_inventory(
device_id=self.device_id,
tag=INVENTORY_TAG["cpu"]["slug"],
)
if not len(nb_cpus) or len(nb_cpus) and len(cpus) != len(nb_cpus):
for x in nb_cpus:
x.delete()
self.create_netbox_cpus()
def get_raid_cards(self, filter_cards=False):
raid_class = None
if self.server.manufacturer in ("Dell", "Huawei"):
if is_tool("omreport"):
raid_class = OmreportRaid
if is_tool("storcli"):
raid_class = StorcliRaid
elif self.server.manufacturer in ("HP", "HPE"):
if is_tool("ssacli"):
raid_class = HPRaid
if not raid_class:
return []
self.raid = raid_class()
if (
filter_cards
and config.expansion_as_device
and self.server.own_expansion_slot()
):
return [
c
for c in self.raid.get_controllers()
if c.is_external() is self.update_expansion
]
else:
return self.raid.get_controllers()
def create_netbox_raid_card(self, raid_card):
manufacturer = self.find_or_create_manufacturer(raid_card.get_manufacturer())
name = raid_card.get_product_name()
serial = raid_card.get_serial_number()
nb_raid_card = nb.dcim.inventory_items.create(
device=self.device_id,
discovered=True,
manufacturer=manufacturer.id if manufacturer else None,
tags=[{"name": INVENTORY_TAG["raid_card"]["name"]}],
name="{}".format(name),
serial="{}".format(serial),
description="RAID Card",
)
logging.info(
"Creating RAID Card {name} (SN: {serial})".format(
name=name,
serial=serial,
)
)
return nb_raid_card
def do_netbox_raid_cards(self):
"""
Update raid cards in netbobx
Since we only push:
* Name
* Manufacturer
* Serial
We only need to handle destroy and new cards
"""
nb_raid_cards = self.get_netbox_inventory(
device_id=self.device_id, tag=[INVENTORY_TAG["raid_card"]["slug"]]
)
raid_cards = self.get_raid_cards(filter_cards=True)
# delete cards that are in netbox but not locally
# use the serial_number has the comparison element
for nb_raid_card in nb_raid_cards:
if nb_raid_card.serial not in [x.get_serial_number() for x in raid_cards]:
logging.info(
"Deleting unknown locally RAID Card {serial}".format(
serial=nb_raid_card.serial,
)
)
nb_raid_card.delete()
# create card that are not in netbox
for raid_card in raid_cards:
if raid_card.get_serial_number() not in [x.serial for x in nb_raid_cards]:
self.create_netbox_raid_card(raid_card)
def is_virtual_disk(self, disk, raid_devices):
disk_type = disk.get("type")
logicalname = disk.get("logicalname")
description = disk.get("description")
size = disk.get("size")
product = disk.get("product")
if (
logicalname in raid_devices
or disk_type is None
or product is None
or description is None
):
return True
non_raid_disks = [
"MR9361-8i",
]
if (
logicalname in raid_devices
or product in non_raid_disks
or "virtual" in product.lower()
or "logical" in product.lower()
or "volume" in description.lower()
or "dvd-ram" in description.lower()
or description == "SCSI Enclosure"
or (size is None and logicalname is None)
):
return True
return False
def get_hw_disks(self):
disks = []
for raid_card in self.get_raid_cards(filter_cards=True):
disks.extend(raid_card.get_physical_disks())
raid_devices = [
d.get("custom_fields", {}).get("vd_device")
for d in disks
if d.get("custom_fields", {}).get("vd_device")
]
for disk in self.lshw.get_hw_linux("storage"):
if self.is_virtual_disk(disk, raid_devices):
continue
size = round(int(disk.get("size", 0)) / 1073741824, 1)
d = {
"name": "",
"Size": "{} GB".format(size),
"logicalname": disk.get("logicalname"),
"description": disk.get("description"),
"SN": disk.get("serial"),
"Model": disk.get("product"),
"Type": disk.get("type"),
}
if disk.get("vendor"):
d["Vendor"] = disk["vendor"]
else:
d["Vendor"] = get_vendor(disk["product"])
disks.append(d)
# remove duplicate serials
seen = set()
uniq = [x for x in disks if x["SN"] not in seen and not seen.add(x["SN"])]
return uniq
def create_netbox_disk(self, disk):
manufacturer = None
if "Vendor" in disk:
manufacturer = self.find_or_create_manufacturer(disk["Vendor"])
name = "{} ({})".format(disk["Model"], disk["Size"])
description = disk["Type"]
sn = disk.get("SN", "unknown")
parms = {
"device": self.device_id,
"discovered": True,
"tags": [{"name": INVENTORY_TAG["disk"]["name"]}],
"name": name,
"serial": sn,
"part_id": disk["Model"],
"description": description,
"manufacturer": getattr(manufacturer, "id", None),
}
if config.process_virtual_drives:
parms["custom_fields"] = disk.get("custom_fields", {})
_ = nb.dcim.inventory_items.create(**parms)
logging.info(
"Creating Disk {model} {serial}".format(
model=disk["Model"],
serial=sn,
)
)
def dump_disks_map(self, disks):
disk_map = [d["custom_fields"] for d in disks if "custom_fields" in d]
if config.dump_disks_map == "-":
f = sys.stdout
else:
f = open(config.dump_disks_map, "w")
f.write(json.dumps(disk_map, separators=(",", ":"), indent=4, sort_keys=True))
if config.dump_disks_map != "-":
f.close()
def do_netbox_disks(self):
nb_disks = self.get_netbox_inventory(
device_id=self.device_id, tag=INVENTORY_TAG["disk"]["slug"]
)
disks = self.get_hw_disks()
if config.dump_disks_map:
try:
self.dump_disks_map(disks)
except Exception as e:
logging.error("Failed to dump disks map: {}".format(e))
logging.debug(traceback.format_exc())
disk_serials = [d["SN"] for d in disks if "SN" in d]
# delete disks that are in netbox but not locally
# use the serial_number has the comparison element
for nb_disk in nb_disks:
if nb_disk.serial not in disk_serials or config.force_disk_refresh:
logging.info(
"Deleting unknown locally Disk {serial}".format(
serial=nb_disk.serial,
)
)
nb_disk.delete()
if config.force_disk_refresh:
nb_disks = self.get_netbox_inventory(
device_id=self.device_id, tag=INVENTORY_TAG["disk"]["slug"]
)
# create disks that are not in netbox
for disk in disks:
if disk.get("SN") not in [d.serial for d in nb_disks]:
self.create_netbox_disk(disk)
def create_netbox_memory(self, memory):
manufacturer = self.find_or_create_manufacturer(memory["vendor"])
name = "Slot {} ({}GB)".format(memory["slot"], memory["size"])
nb_memory = nb.dcim.inventory_items.create(
device=self.device_id,
discovered=True,
manufacturer=manufacturer.id,
tags=[{"name": INVENTORY_TAG["memory"]["name"]}],
name=name,
part_id=memory["product"],
serial=memory["serial"],
description=memory["description"],
)
logging.info(
"Creating Memory {location} {type} {size}GB".format(
location=memory["slot"],
type=memory["product"],
size=memory["size"],
)
)
return nb_memory
def do_netbox_memories(self):
memories = self.lshw.memories
nb_memories = self.get_netbox_inventory(
device_id=self.device_id, tag=INVENTORY_TAG["memory"]["slug"]
)
for nb_memory in nb_memories:
if nb_memory.serial not in [x["serial"] for x in memories]:
logging.info(
"Deleting unknown locally Memory {serial}".format(
serial=nb_memory.serial,
)
)
nb_memory.delete()
for memory in memories:
if memory.get("serial") not in [x.serial for x in nb_memories]:
self.create_netbox_memory(memory)
def create_netbox_gpus(self, gpus):
for gpu in gpus:
if "product" in gpu and len(gpu["product"]) > 50:
gpu["product"] = gpu["product"][:48] + ".."
manufacturer = self.find_or_create_manufacturer(gpu["vendor"])
_ = nb.dcim.inventory_items.create(
device=self.device_id,
manufacturer=manufacturer.id,
discovered=True,
tags=[{"name": INVENTORY_TAG["gpu"]["name"]}],
name=gpu["product"],
description=gpu["description"],
)
logging.info("Creating GPU model {}".format(gpu["product"]))
def is_external_gpu(self, gpu):
is_3d_gpu = gpu["description"].startswith("3D")
return (
self.server.is_blade()
and self.server.own_gpu_expansion_slot()
and is_3d_gpu
)
def do_netbox_gpus(self):
gpus = []
gpu_models = {}
for gpu in self.lshw.get_hw_linux("gpu"):
# Filters GPU if an expansion bay is detected:
# The internal (VGA) GPU only goes into the blade inventory,
# the external (3D) GPU goes into the expansion blade.
if (
config.expansion_as_device
and self.update_expansion ^ self.is_external_gpu(gpu)
):
continue
gpus.append(gpu)
gpu_models.setdefault(gpu["product"], 0)
gpu_models[gpu["product"]] += 1
nb_gpus = self.get_netbox_inventory(
device_id=self.device_id,
tag=INVENTORY_TAG["gpu"]["slug"],
)
nb_gpu_models = {}
for gpu in nb_gpus:
nb_gpu_models.setdefault(str(gpu), 0)
nb_gpu_models[str(gpu)] += 1
up_to_date = set(gpu_models) == set(nb_gpu_models)
if not gpus or not up_to_date:
for x in nb_gpus:
x.delete()
if gpus and not up_to_date:
self.create_netbox_gpus(gpus)
def create_or_update(self):
if config.inventory is None or config.update_inventory is None:
return False
if self.update_expansion is False:
self.do_netbox_cpus()
self.do_netbox_memories()
self.do_netbox_interfaces()
self.do_netbox_motherboard()
self.do_netbox_gpus()
self.do_netbox_disks()
self.do_netbox_raid_cards()
return True