feat: Move the module to src, start to type
This commit is contained in:
parent
13b84b4da1
commit
d0b6e1375f
30 changed files with 37 additions and 17 deletions
|
@ -31,8 +31,9 @@ def run(config):
|
|||
)
|
||||
server = VirtualMachine(dmi=dmi)
|
||||
else:
|
||||
manufacturer = dmidecode.get_by_type(dmi, "Chassis")[0].get("Manufacturer")
|
||||
try:
|
||||
chassis = dmidecode.get_by_type(dmi, "Chassis") or []
|
||||
manufacturer = chassis[0].get("Manufacturer")
|
||||
server = MANUFACTURERS[manufacturer](dmi=dmi)
|
||||
except KeyError:
|
||||
server = GenericHost(dmi=dmi)
|
|
@ -3,8 +3,10 @@ import logging
|
|||
import re
|
||||
import sys
|
||||
import traceback
|
||||
from typing import overload
|
||||
|
||||
import pynetbox
|
||||
from pynetbox.core.api import Record
|
||||
from pynetbox.core.query import RequestError
|
||||
|
||||
from netbox_agent.config import config
|
||||
from netbox_agent.config import netbox_instance as nb
|
||||
|
@ -74,6 +76,12 @@ class Inventory:
|
|||
ret.append(nb_tag)
|
||||
return ret
|
||||
|
||||
@overload
|
||||
def find_or_create_manufacturer(self, name: None) -> None: ...
|
||||
|
||||
@overload
|
||||
def find_or_create_manufacturer(self, name: str) -> Record: ...
|
||||
|
||||
def find_or_create_manufacturer(self, name):
|
||||
if name is None:
|
||||
return None
|
||||
|
@ -90,12 +98,15 @@ class Inventory:
|
|||
|
||||
logging.info("Creating missing manufacturer {name}".format(name=name))
|
||||
|
||||
# We only create
|
||||
assert not isinstance(manufacturer, list)
|
||||
|
||||
return manufacturer
|
||||
|
||||
def get_netbox_inventory(self, device_id, tag):
|
||||
try:
|
||||
items = nb.dcim.inventory_items.filter(device_id=device_id, tag=tag)
|
||||
except pynetbox.core.query.RequestError:
|
||||
except RequestError:
|
||||
logging.info("Tag {tag} is missing, returning empty array.".format(tag=tag))
|
||||
items = []
|
||||
|
|
@ -55,7 +55,7 @@ class LSHW:
|
|||
if j["class"] == "bridge":
|
||||
self.walk_bridge(j)
|
||||
|
||||
def get_hw_linux(self, hwclass):
|
||||
def get_hw_linux(self, hwclass) -> list:
|
||||
if hwclass == "cpu":
|
||||
return self.cpus
|
||||
if hwclass == "gpu":
|
||||
|
@ -66,6 +66,7 @@ class LSHW:
|
|||
return self.disks
|
||||
if hwclass == "memory":
|
||||
return self.memories
|
||||
raise KeyError
|
||||
|
||||
def find_network(self, obj):
|
||||
# Some interfaces do not have device (logical) name (eth0, for
|
|
@ -30,15 +30,9 @@ def get_device_type(type):
|
|||
def get_device_platform(device_platform):
|
||||
if device_platform is None:
|
||||
try:
|
||||
# Python 3.8+ moved linux_distribution() to distro
|
||||
try:
|
||||
import distro
|
||||
import distro
|
||||
|
||||
linux_distribution = " ".join(distro.linux_distribution())
|
||||
except ImportError:
|
||||
import platform
|
||||
|
||||
linux_distribution = " ".join(platform.linux_distribution())
|
||||
linux_distribution = " ".join(distro.linux_distribution())
|
||||
|
||||
if not linux_distribution:
|
||||
return None
|
|
@ -5,6 +5,8 @@ from itertools import chain, islice
|
|||
|
||||
import netifaces
|
||||
from netaddr import IPAddress
|
||||
from pynetbox.core.api import Record
|
||||
from pynetbox.core.app import App
|
||||
|
||||
from netbox_agent.config import config
|
||||
from netbox_agent.config import netbox_instance as nb
|
||||
|
@ -13,7 +15,9 @@ from netbox_agent.ipmi import IPMI
|
|||
from netbox_agent.lldp import LLDP
|
||||
|
||||
|
||||
class Network(object):
|
||||
class Network:
|
||||
nb_net: App
|
||||
|
||||
def __init__(self, server, *args, **kwargs):
|
||||
self.nics = []
|
||||
|
||||
|
@ -39,7 +43,7 @@ class Network(object):
|
|||
for choice in ipam_c[_choice_type]:
|
||||
self.ipam_choices[key][choice["display_name"]] = choice["value"]
|
||||
|
||||
def get_network_type():
|
||||
def get_network_type(self):
|
||||
return NotImplementedError
|
||||
|
||||
def scan(self):
|
||||
|
@ -57,8 +61,12 @@ class Network(object):
|
|||
)
|
||||
continue
|
||||
|
||||
ip_addr = netifaces.ifaddresses(interface).get(netifaces.AF_INET, [])
|
||||
ip6_addr = netifaces.ifaddresses(interface).get(netifaces.AF_INET6, [])
|
||||
ip_addr = netifaces.ifaddresses(interface).get(
|
||||
netifaces.InterfaceType.AF_INET, []
|
||||
)
|
||||
ip6_addr = netifaces.ifaddresses(interface).get(
|
||||
netifaces.InterfaceType.AF_INET6, []
|
||||
)
|
||||
if config.network.ignore_ips:
|
||||
for i, ip in enumerate(ip_addr):
|
||||
if re.match(config.network.ignore_ips, ip["addr"]):
|
||||
|
@ -204,7 +212,7 @@ class Network(object):
|
|||
|
||||
return self.dcim_choices["interface:type"]["Other"]
|
||||
|
||||
def get_or_create_vlan(self, vlan_id):
|
||||
def get_or_create_vlan(self, vlan_id) -> Record:
|
||||
# FIXME: we may need to specify the datacenter
|
||||
# since users may have same vlan id in multiple dc
|
||||
vlan = nb.ipam.vlans.get(
|
||||
|
@ -215,6 +223,9 @@ class Network(object):
|
|||
name="VLAN {}".format(vlan_id),
|
||||
vid=vlan_id,
|
||||
)
|
||||
|
||||
assert isinstance(vlan, Record)
|
||||
|
||||
return vlan
|
||||
|
||||
def reset_vlan_on_interface(self, nic, interface):
|
||||
|
@ -229,6 +240,8 @@ class Network(object):
|
|||
# The object returned by pynetbox's save isn't always working (since pynetbox 6)
|
||||
interface = self.nb_net.interfaces.get(id=interface.id)
|
||||
|
||||
assert interface is not None
|
||||
|
||||
# Handle the case were the local interface isn't an interface vlan as reported by Netbox
|
||||
# and that LLDP doesn't report a vlan-id
|
||||
if (
|
Loading…
Reference in a new issue