This post presents Python source code of the system that consists of five scripts including:
1. main.py
2. reporting.py
3. search_by_mac.py
4. search_by_snmp.py
5. utils.py
main.py
libraries added
os - provides a way of using operating system dependent functionality.
subprocess - spawn new processes, connect to their input/output/error pipes, and obtain their return codes
sys - module provides access to some variables used or maintained by the interpreter and to functions
typing - support for types Any, Union, Tuple, Callable, TypeVar, and Generic
reporting - script written to generate all reports
search_by_mac - compare responding device by mac address
search_by_snmp - ping devices and SNMP poll them
utils - helping classes and functions, used in other modules
"""
import os
import subprocess
import sys
from typing import List
from reporting import Reporting
from search_by_mac import search_by_mac
from search_by_snmp import search_by_snmp
from utils import args_to_ips, read_settings, transfer_cfgs, delete_files_in_dir
def main(args: List[str]) -> None:
"""
Run all logic.
:param args: Enumeration of single IPs (like 192.168.0.1) and subnetworks (like 192.168.0.0/24)
"""
settings = read_settings("settings.ini")
cfg_dir = settings["cfg-directory"]
cfgprep_dir = os.path.join(cfg_dir, "cfgprep")
os.makedirs(cfg_dir, exist_ok=True)
os.makedirs(cfgprep_dir, exist_ok=True)
os.makedirs(settings["mrtg-output"], exist_ok=True)
delete_files_in_dir(cfgprep_dir)
snmp_unavailable_ips, created_snmp_cfgs = search_by_snmp(args_to_ips(args), settings)
created_mac_cfgs = search_by_mac(snmp_unavailable_ips, settings)
reporting = Reporting(settings, lambda params: subprocess.call(params))
reporting.make_reports(created_snmp_cfgs + created_mac_cfgs)
transfer_cfgs(cfg_dir)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Not enough arguments. You must pass IP and network addresses")
else:
main(sys.argv[1:])
reporting.py
import os
import re
import time
from itertools import groupby
from typing import List, Dict, Iterable, Tuple, Pattern, Callable
from utils import Cfg, DeviceTypeCfg, MacCfg, template_file
DAY_IN_SECONDS = 24 * 60 * 60
MONTH_IN_SECONDS = 30 * DAY_IN_SECONDS
class DeviceGroup(object):
"""
Represent record from DeviceGroupByMac or DeviceGroupByType file.
"""
def __init__(self, name: str, group_elements: List[str]):
"""
:param name: Group name.
:param group_elements: Group elements. It can be either MAC masks or devices types.
"""
self.name = name
self.group_elements = group_elements
# Regexp, builded group elements
self.regexp = self.group_elements_to_regexp()
def match(self, field: str) -> bool:
"""
Check if field matches the group.
:param field: Either MAC address or device type.
:return: Is field belonging to the group.
"""
return bool(self.regexp.match(field.upper()))
def elements_str(self) -> str:
"""
:return: Group elements as single string with ',' as delimiter.
"""
return ",".join(self.group_elements)
def group_elements_to_regexp(self) -> Pattern:
"""
:return: Regexp object from group elements.
"""
regexp = ""
first = True
for group_element in self.group_elements:
if not first:
regexp += "|"
else:
first = False
regexp += group_element.upper().replace('X', '[0-9A-F]')
return re.compile(regexp)
class Reporting(object):
"""
Contain all data and logic for producing reporting.
"""
def __init__(self, settings: Dict[str, str], invoke_mrtg: Callable[[List[str]], int]) -> None:
"""
:param settings: Settings.
:param invoke_mrtg: Lambda for MRTG invocation
"""
self.settings = settings
self.cfg_dir = os.path.join(self.settings["cfg-directory"], "cfgprep")
self.invoke_mrtg = invoke_mrtg
def call_mrtg(self, cfg_names: Iterable[str]) -> None:
"""
Invoke MRTG for each config.
:param settings: Script settings.
:param cfg_names: Names of configs with ".cfg" extension.
:return: Names of generated MRTG logs.
"""
for cfg_name in cfg_names:
self.invoke_mrtg(["sudo", "env", "LANG=C", self.settings["mrtg-executable"],
os.path.join(self.cfg_dir, cfg_name)])
@staticmethod
def extract_first_three_ip_octets(cfg: Cfg) -> str:
"""
:param cfg: Config information.
:return: First three octets from IP of config.
"""
return ".".join(cfg.ip.split(".")[:-1])
def reports_by_subnets(self, cfgs: List[Cfg]) -> List[Tuple[str, int]]:
"""
Create reports, grouped by subnets.
:param cfgs: Information about created configs.
:return: List of created subnet group configs with count of single configs in each.
"""
subnet_pairs = []
cfgs = sorted(cfgs, key=self.extract_first_three_ip_octets)
for first_three_octets, cfg_group in groupby(cfgs, self.extract_first_three_ip_octets):
filename = "grp2_all{}.x.cfg".format(first_three_octets)
first = True
configs_str = ""
cfg_count = 0
for cfg in cfg_group:
cfg_count += 1
if not first:
configs_str += " + "
else:
first = False
if isinstance(cfg, DeviceTypeCfg):
configs_str += os.path.join(self.cfg_dir, str(cfg))
elif isinstance(cfg, MacCfg):
configs_str += "`more {}`".format(os.path.join(self.cfg_dir, "{}{}.ini".format(cfg.ip, cfg.mac)))
with open(os.path.join(self.cfg_dir, filename), "w") as result_cfg:
result_cfg.write(template_file("ReportBySubnet.template",
{"name": os.path.splitext(filename)[0],
"subnet": first_three_octets + ".x",
"configs": configs_str,
"configs_count": cfg_count,
"max_bytes": 40 * cfg_count}))
subnet_pairs.append((filename, cfg_count))
return subnet_pairs
@staticmethod
def parse_mrtg_log(mrtg_log_path: str) -> Tuple[int, int]:
"""
Parse log, generated by MRTG.
:param mrtg_log_path: Path to MRTG log.
:return: First two digits, representing power, from second log row.
"""
with open(mrtg_log_path, encoding="utf-8", mode="r") as log_file:
# Skip first line
log_file.readline()
line_tokens = log_file.readline().split()
return int(line_tokens[1]), int(line_tokens[2])
def generate_ini_from_logs(self, cfg_count_pairs: Iterable[Tuple[str, int]]) -> List[str]:
"""
Generate ini files, based on MRTG logs.
:param cfg_count_pairs: Pairs of group config, that was used for MRTG invocation,
and count of devices in group config.
:return: Names of generated ini files.
"""
ini_filenames = []
for cfg_filename, device_count in cfg_count_pairs:
cfg_name = os.path.splitext(cfg_filename)[0]
log_name = os.path.join(self.settings["mrtg-output"], cfg_name.lower() + ".log")
first_power, second_power = self.parse_mrtg_log(log_name)
ini_filename = cfg_name + ".ini"
ini_filenames.append(ini_filename)
with open(os.path.join(self.cfg_dir, ini_filename), encoding="utf-8", mode="w") as ini_file:
ini_file.write("\n".join([str(first_power), str(second_power), cfg_name, str(device_count)]))
return ini_filenames
@staticmethod
def group_to_regexp(patterns: List[str]) -> Pattern[str]:
"""
Created regexp for group elements.
:param patterns: Patterns of elements from group.
:return: Regexp for group.
"""
regexp = ""
first = True
for pattern in patterns:
if not first:
regexp += "|"
else:
first = False
regexp += pattern
return re.compile(regexp)
def report_all_devices(self, ini_filenames: List[str]) -> str:
"""
Create report for all devices.
:param ini_filenames: Filenames of ini files of devices.
:return: Filename of created all device report.
"""
first = True
configs_str = ""
for ini_filename in ini_filenames:
if not first:
configs_str += " + "
else:
first = False
configs_str += "`more {}`".format(os.path.join(self.cfg_dir, ini_filename))
cfg_name = "grp1_alldevices.cfg"
with open(os.path.join(self.cfg_dir, cfg_name), "w") as all_cfg:
all_cfg.write(template_file("ReportByAllDevices.template",
{"name": "grp1_alldevices",
"configs": configs_str,
"configs_count": len(ini_filenames),
"max_bytes": 40 * len(ini_filenames)}))
return cfg_name
def report_by_building(self, ini_filenames: List[str]) -> List[str]:
"""
Create reports for devices, grouped by BuildingGroup.
:param ini_filenames: Filenames of ini files of devices.
:return: Filenames of created BuildingGroup reports.
"""
results = []
with open("BuildingGroup", "r") as file:
for line in file:
patterns_group = line.strip().split(",")
regexp = self.group_to_regexp(patterns_group[1:])
group_filenames = []
for ini_filename in ini_filenames:
if regexp.match(ini_filename):
group_filenames.append(ini_filename)
if len(group_filenames) == 0:
continue
first = True
configs_str = ""
for group_filename in group_filenames:
if not first:
configs_str += " + "
else:
first = False
configs_str += "`more {}`".format(os.path.join(self.cfg_dir, group_filename))
building_cfg_name = "grp3_{}.cfg".format(patterns_group[0])
results.append(building_cfg_name)
with open(os.path.join(self.cfg_dir, building_cfg_name), "w") as result_cfg:
result_cfg.write(template_file("ReportByBuilding.template",
{"name": "grp3_" + patterns_group[0],
"configs": configs_str,
"configs_count": len(group_filenames),
"max_bytes": 40 * len(group_filenames)}))
return results
@staticmethod
def read_device_group(file_path: str) -> List[DeviceGroup]:
"""
Read device group file to object representation.
:param file_path: Path to file with group description.
:return: Object representation of device group.
"""
with open(file_path, "r") as file:
result = []
for line in file:
tokens = line.strip().split(",")
result.append(DeviceGroup(tokens[0], tokens[1:]))
return result
def report_by_type(self, cfgs: List[Cfg]) -> List[Tuple[str, int, int]]:
"""
Create reports for devices, grouped by device type.
:param cfgs: Information about created configs.
:return: List of created config name, count configs in group and sum of device power.
"""
device_groups = self.read_device_group("DeviceGroupByType")
device_type_pairs = []
for device_group in device_groups:
for device_type in device_group.group_elements:
match_cfgs = list(filter(lambda cfg: isinstance(cfg, DeviceTypeCfg) and cfg.device_type == device_type, cfgs))
if len(match_cfgs) == 0:
continue
first = True
snmp_mib_str = ""
power_sum = 0
for match_cfg in match_cfgs:
if not first:
snmp_mib_str += " + "
else:
first = False
power_sum += int(match_cfg.power)
snmp_mib_str += "{0}&{0}:{1}@{2}".format(match_cfg.snmp_mib,
self.settings["snmp-password"],
match_cfg.ip)
name = "grp4_{}{}".format(device_group.name, device_type)
cfg_name = name + ".cfg"
with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
result_cfg.write(template_file("ReportByType.template",
{"name": name,
"device_type": device_type,
"snmp_mibs": snmp_mib_str,
"configs_count": len(match_cfgs),
"power_sum": power_sum}))
device_type_pairs.append((cfg_name, len(match_cfgs), power_sum))
return device_type_pairs
def report_by_group(self, device_type_triples: List[Tuple[str, int, int]]) -> List[str]:
"""
Create "By Group" reports.
:param device_type_triples: Information about created configs.
:return: By group config report.
"""
results = []
for device_group in self.read_device_group("DeviceGroupByType"):
match_cfgs = []
all_sum = 0
for device_type in device_group.group_elements:
for cfg_name, cfg_count, power_sum in device_type_triples:
ini_filename = os.path.splitext(cfg_name)[0] + ".ini"
if device_type in ini_filename:
all_sum += power_sum
match_cfgs.append(ini_filename)
if len(match_cfgs) == 0:
continue
first = True
configs_str = ""
for match_cfg in match_cfgs:
if not first:
configs_str += " + "
else:
first = False
configs_str += "`more {}`".format(os.path.join(self.cfg_dir, str(match_cfg)))
cfg_name = "grp5_{}.cfg".format(device_group.name)
results.append(cfg_name)
with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
result_cfg.write(template_file("ReportByGroup.template",
{"name": "grp5_{}".format(device_group.name),
"device_type": device_group.elements_str(),
"device_group": device_group.name,
"configs": configs_str,
"configs_count": len(match_cfgs),
"power_sum": all_sum}))
return results
def report_group_by_mac(self, cfgs: List[Cfg]) -> List[str]:
"""
Create reports for devices, grouped by MAC.
:param cfgs: Information about created configs.
:return: List of created reports grouped by MAC.
"""
results = []
for device_group in self.read_device_group("DeviceGroupByMac"):
match_cfgs = list(filter(lambda cfg: isinstance(cfg, MacCfg) and device_group.match(cfg.mac),
cfgs))
if len(match_cfgs) == 0:
continue
first = True
configs_str = ""
for match_cfg in match_cfgs:
if not first:
configs_str += " + "
else:
first = False
configs_str += "`more {}`".format(os.path.join(self.cfg_dir, "{}{}.ini".format(match_cfg.ip,
match_cfg.mac)))
cfg_name = "grp6_{}byMAC.cfg".format(device_group.name)
results.append(cfg_name)
with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
result_cfg.write(template_file("ReportByGroupByMAC.template",
{"name": "grp6_{}".format(device_group.name),
"configs_count": len(match_cfgs),
"device_group": device_group.name,
"configs": configs_str}))
return results
def report_all_assets_summary(self) -> str:
"""
Create all assets summary report.
:return: Filename of all assets summary report.
"""
now = time.time()
updated_in_last_day = 0
updated_in_last_month = 0
for file_name in os.listdir(self.settings["mrtg-output"]):
file_path = os.path.join(self.settings["mrtg-output"], file_name)
if os.path.isfile(file_path):
updated = now - os.path.getmtime(file_path)
if updated < DAY_IN_SECONDS:
updated_in_last_day += 1
if updated < MONTH_IN_SECONDS:
updated_in_last_month += 1
summary_ini_path = os.path.join(self.cfg_dir, "AllAssetsSummary.ini")
with open(summary_ini_path, "w") as file:
file.write("\n".join([str(updated_in_last_month),
str(updated_in_last_day),
"Responding in the last 30 days",
"Responding in the last 24 hours"]))
all_assets_cfg = "AllAssetsSummary.cfg"
with open(os.path.join(self.cfg_dir, all_assets_cfg), "w") as file:
file.write(template_file("ReportByAllAssetsSummary.template",
{"configs": "`more {}`".format(summary_ini_path),
"updated_in_last_month": updated_in_last_month}))
return all_assets_cfg
def make_reports(self, cfgs: List[Cfg]) -> None:
"""
Create reports for generated single configs.
:param cfgs: Created single device configs.
"""
# Individual reporting
self.call_mrtg((str(cfg) for cfg in cfgs))
# Group 2 - by Subnet
subnets_pairs = self.reports_by_subnets(cfgs)
self.call_mrtg((pair[0] for pair in subnets_pairs))
subnet_ini_filenames = self.generate_ini_from_logs(subnets_pairs)
# Group 1 - All devices
all_devices_cfg = self.report_all_devices(subnet_ini_filenames)
self.call_mrtg([all_devices_cfg])
# Group 3 - By Building
building_cfgs = self.report_by_building(subnet_ini_filenames)
self.call_mrtg(building_cfgs)
# Group 4 - By Type
device_type_triples = self.report_by_type(cfgs)
self.call_mrtg((pair[0] for pair in device_type_triples))
self.generate_ini_from_logs(((pair[0], pair[1]) for pair in device_type_triples))
# Group 5 - By Group
type_group_cfgs = self.report_by_group(device_type_triples)
self.call_mrtg(type_group_cfgs)
# Group 6 - By GroupByMac
mac_group_cfgs = self.report_group_by_mac(cfgs)
self.call_mrtg(mac_group_cfgs)
# All assets summary report
all_assets_cfg = self.report_all_assets_summary()
self.call_mrtg([all_assets_cfg])
searchbymac.py
import csv
import datetime
import os
from typing import Dict, List, Optional
from netaddr import IPSet
from utils import MacCfg, wildcard_to_regexp, normalize_mac, template_file
class MacDevice(object):
"""
Represent record from MACtoDevice file.
"""
def __init__(self, mac: str, device_type: str, site: str, power_use: str, comment: str):
"""
:param mac: MAC address mask.
:param device_type: Device type.
:param site: Site.
:param power_use: Power use.
:param comment: Comment.
"""
self.mac = normalize_mac(mac)
self.mac_regexp = wildcard_to_regexp(self.mac)
self.device_type = device_type
self.site = site
self.power_use = power_use
self.comment = comment
def match(self, normalized_mac: str) -> bool:
"""
Examine is provided MAC corresponds to MAC.
:param normalized_mac: MAC for check.
:return: Is provided MAC match the mask.
"""
return bool(self.mac_regexp.match(normalized_mac))
def read_ip_to_mac(file_path: str) -> Dict[str, str]:
"""
Read IPtoMAC into object representation.
:param file_path: Path to IPtoMAC file.
:return: Dictionary with IP to MAC entries.
"""
with open(file_path, "r") as file:
rows = csv.reader(file, delimiter=',')
result = {}
for row in rows:
result[row[0]] = normalize_mac(row[1])
return result
def read_mac_devices(file_path: str) -> List[MacDevice]:
"""
Read MACtoDevice into object representation.
:param file_path: Path to MACtoDevice file.
:return: List of MacDevice records.
"""
with open(file_path, "r") as file:
rows = csv.reader(file, delimiter=',')
result = []
for row in rows:
result.append(MacDevice(mac=normalize_mac(row[0]),
device_type=row[1],
site=row[2],
power_use=row[3],
comment=row[4]))
return result
def find_mac_device(devices: List[MacDevice], normalized_mac: str) -> Optional[MacDevice]:
"""
Search MacDevice record by MAC.
:param devices: MacDevice records.
:param normalized_mac: MAC in normal form.
:return: Found MacDevice record or None.
"""
for device in devices:
if device.match(normalized_mac):
return device
return None
def search_by_mac(ips: IPSet, settings: Dict[str, str]) -> List[MacCfg]:
"""
Searching provided devices by IP addresses in IPtoMAC and MACtoDevice files.
:param ips: IP addresses of devices.
:param settings: Settings.
:return: List of created MAC configs.
"""
mac_by_ip = read_ip_to_mac("IPtoMAC")
devices = read_mac_devices("MACtoDevice")
cfg_dir = os.path.join(settings["cfg-directory"], "cfgprep")
created_cfgs = []
now = datetime.datetime.now().strftime("%d%m%Y %H%M%S")
with open(os.path.join(cfg_dir, "NoMacList"), "w") as no_mac_file, \
open(os.path.join(cfg_dir, "NoMacMatchList"), "w") as no_mac_match_file:
no_mac_csvwriter = csv.writer(no_mac_file, delimiter=',')
no_mac_match_csvwriter = csv.writer(no_mac_match_file, delimiter=',')
for ip in ips:
mac = mac_by_ip.get(str(ip))
if mac:
device = find_mac_device(devices, mac)
if device:
ini_path = os.path.join(cfg_dir, "{}{}.ini".format(ip, mac))
with open(ini_path, "w") as ini:
ini.write(template_file("IPaddressMACaddress.ini.template",
{'mac_address': mac,
'ip_address': ip,
'site': device.site,
'location': None,
'device_type': device.device_type,
'power_use': device.power_use,
'comment': device.comment}))
with open(os.path.join(cfg_dir, "{}{}.cfg".format(ip, mac)), "w") as cfg:
cfg.write(template_file("IPaddressMACaddress.cfg.template",
{'path': ini_path,
'mac_address': mac,
'ip_address': ip,
'site': device.site,
'location': None,
'device_type': device.device_type,
'power_use': device.power_use,
'comment': device.comment}))
created_cfgs.append(MacCfg(str(ip), mac))
else:
no_mac_match_csvwriter.writerow([ip, mac, now])
else:
# TODO: where to get MAC?
no_mac_csvwriter.writerow([ip, mac, now])
return created_cfgs
searchbysnmp.py
import csv
import datetime
import os
import platform
import subprocess
from typing import List, Optional, Dict, Tuple
from multiping import multi_ping, multiping
from netaddr import IPSet
from pysnmp.hlapi import *
from utils import DeviceTypeCfg, template_file
old_send = MultiPing._send_ping
def send_ping_exc(self, dest_addr, payload):
"""
Decorate MultiPing._send_ping with more precise error message.
:param self: MultiPing object.
:param dest_addr: Address for ping.
:param payload: The payload must be specified as a packed byte string.
Note that its length has to be divisible by 2 for this to work correctly.
"""
try:
old_send(self, dest_addr, payload)
except Exception as exc:
err_msg = "Destination address: {}".format(dest_addr)
raise ValueError(err_msg) from exc
MultiPing._send_ping = send_ping_exc
class SNMPDevice(object):
"""
Record representation from SNMPtoDevice.
"""
def __init__(self, ip: str, hostname: str, site: str, location: str, device_type: str,
power_oid: Optional[str], reference_power: Optional[str], comment: str):
"""
:param ip: IP address.
:param hostname: Device hostname.
:param site: Device site.
:param location: Location of device.
:param device_type: Device type.
:param power_oid: Optional power oid for device.
:param reference_power: Optional reference power of device.
:param comment: Comment for device.
"""
self.ip = ip
self.hostname = hostname
self.site = site
self.location = location
self.device_type = device_type
self.power_oid = power_oid
self.reference_power = reference_power
self.comment = comment
def read_snmp_devices(file_path: str) -> List[SNMPDevice]:
"""
Read SNMPtoDevice file to object records.
:param file_path: Path to SNMPtoDevice file.
:return: SNMPtoDevice object records.
"""
with open(file_path, "r") as file:
rows = csv.reader(file, delimiter=',')
result = []
for row in rows:
result.append(SNMPDevice(ip=row[0],
hostname=row[1],
site=row[2],
location=row[3],
device_type=row[4],
power_oid=row[5],
reference_power=row[6],
comment=row[7]))
return result
def find_snmp_device_by_ip(devices: List[SNMPDevice], ip: str) -> Optional[SNMPDevice]:
"""
Find device in SNMPDevice records by IP address.
:param devices: SNMPDevice records.
:param ip: IP address.
:return: Found device or None.
"""
for device in devices:
if device.ip == ip:
return device
return None
def find_snmp_device_by_hostname(devices: List[SNMPDevice], hostname: str) -> Optional[SNMPDevice]:
"""
Find device in SNMPDevice records by hostname.
:param devices: SNMPDevice records.
:param hostname: Hostname.
:return: Found device or None.
"""
for device in devices:
if device.hostname.lower() == hostname.lower():
return device
return None
def find_snmp_device_by_type(devices: List[SNMPDevice], device_type: str) -> Optional[SNMPDevice]:
"""
Find device in SNMPDevice records by device type.
:param devices: SNMPDevice records.
:param device_type: Device type.
:return: Found device or None.
"""
for device in devices:
if device.device_type.lower() == device_type.lower():
return device
return None
def search_alive_by_ping(ips: IPSet, settings: Dict[str, str]) -> IPSet:
"""
Ping IP addresses to determine aliveness.
:param ips: Set of IP addresses to ping.
:param settings: Settings.
:return: Set of alive IP addresses.
"""
timeout = int(settings["ping-timeout"])
retries = int(settings["ping-retries"])
hosts = [str(ip) for ip in ips]
if settings["use-system-ping"].lower() == "false":
available, dead = multi_ping(hosts, int(timeout), int(retries))
return IPSet(available.keys())
else:
available = []
for host in hosts:
command = ['ping', '-n' if platform.system().lower() == 'windows' else '-c', str(retries + 1),
'-W', str(timeout), host]
if subprocess.call(command) == 0:
available.append(host)
return IPSet(available)
def snmp_get(oid: str, host: str, community="public", port=161, timeout=0.5, retries=1) -> Optional[str]:
"""
Send SNMP request.
:param oid: OID for requesting.
:param host: Host for sending request.
:param community: SNMP community.
:param port: SNMP port.
:param timeout: Timeout for SNMP request.
:param retries: Count of SNMP request retries.
:return: Result of SNMP request or None.
"""
result = getCmd(SnmpEngine(), CommunityData(community),
UdpTransportTarget((host, port), timeout, retries),
ContextData(), ObjectType(ObjectIdentity(oid)))
error_indication, error_status, error_idx, var_binds = next(result)
if error_indication or error_status:
return None
res = var_binds[0][1].prettyPrint()
return res
def search_by_snmp(ips: IPSet, settings: Dict[str, str]) -> Tuple[IPSet, List[DeviceTypeCfg]]:
"""
Examine devices of provided IP addresses by SNMP requests.
:param ips: IP addresses of alive devices.
:param settings: Settings.
:return: IP addresses, unavailable by SNMP, and list of generated SNMP configs.
"""
alive_ips = search_alive_by_ping(ips, settings)
cfg_dir = os.path.join(settings["cfg-directory"], "cfgprep")
snmp_devices = read_snmp_devices("SNMPtoDevice")
snmp_unavailable_ips = IPSet()
created_cfgs = []
now = datetime.datetime.now().strftime("%d%m%Y %H%M%S")
with open(os.path.join(cfg_dir, "NotInMibList"), "w") as not_in_mib_file:
not_in_mib_csvwriter = csv.writer(not_in_mib_file, delimiter=',')
for ip in alive_ips:
ip = str(ip)
system_name = snmp_get(settings["snmp-system-name-oid"], ip, settings["snmp-password"],
int(settings["snmp-port"]), float(settings["snmp-timeout"]),
int(settings["snmp-retries"]))
if not system_name:
print("Can't get system name for IP: {}".format(ip))
snmp_unavailable_ips.add(ip)
continue
snmp_device = find_snmp_device_by_hostname(snmp_devices, system_name)
if not snmp_device:
snmp_device = find_snmp_device_by_ip(snmp_devices, ip)
if not snmp_device:
print("Can't find snmp device by hostname or IP: {}".format(ip))
system_type = snmp_get(settings["snmp-system-type-oid"], ip,
settings["snmp-password"],
int(settings["snmp-port"]),
float(settings["snmp-timeout"]),
int(settings["snmp-retries"]))
snmp_device = find_snmp_device_by_type(snmp_devices, system_type)
if not snmp_device:
print("Can't find snmp by type for IP: {}".format(ip))
not_in_mib_csvwriter.writerow([ip, system_name, now])
# TODO: snmp available
snmp_unavailable_ips.add(ip)
continue
if snmp_device.power_oid:
# TODO: what if no power
power = snmp_get(snmp_device.power_oid, ip,
settings["snmp-password"],
int(settings["snmp-port"]),
float(settings["snmp-timeout"]),
int(settings["snmp-retries"]))
else:
print("Using reference power for snmp device with IP: {}".format(ip))
power = snmp_device.reference_power
# From last edits
not_in_mib_csvwriter.writerow([ip, system_name, now])
# TODO: snmp available
snmp_unavailable_ips.add(ip)
continue
cfg_name = "{}{}{}.cfg".format(ip, system_name, snmp_device.device_type)
with open(os.path.join(cfg_dir, cfg_name), "w") as cfg:
cfg.write(template_file("IPaddressSystemNameDeviceType.cfg.template",
{'system_name': system_name,
'ip_address': ip,
'site': snmp_device.site,
'location': snmp_device.location,
'snmp_mib': snmp_device.power_oid,
'device_type': snmp_device.device_type,
'snmp_community': settings["snmp-password"],
'power_use': power}))
created_cfgs.append(DeviceTypeCfg(str(ip), system_name, snmp_device.device_type,
snmp_device.power_oid, settings["snmp-password"], power))
return snmp_unavailable_ips, created_cfgs
utils.py
import itertools
import os
import re
from abc import ABC
from configparser import ConfigParser
from typing import List, Dict, Pattern
from netaddr import IPSet, IPNetwork
class Cfg(ABC):
"""
Describe created abstract config for device.
"""
def __init__(self, ip: str) -> None:
"""
:param ip: Config device IP address.
"""
self.ip = ip
class MacCfg(Cfg):
"""
Describe created config for device, that wasn't respond on SNMP, and was based on MACtoDevice.
"""
def __init__(self, ip: str, mac: str) -> None:
"""
:param ip: Config device IP address.
:param mac: Config device MAC address.
"""
super().__init__(ip)
self.mac = mac
def __str__(self) -> str:
"""
:return: Name of created config file.
"""
return "{}{}.cfg".format(self.ip, self.mac)
class DeviceTypeCfg(Cfg):
"""
Describe created config for device, that responded to SNMP, and was based on SNMPtoDevice.
"""
def __init__(self, ip: str, system_name: str, device_type: str, snmp_mib: str,
snmp_community: str, power: str) -> None:
"""
:param ip: Config device IP address.
:param system_name: Config device determined system name.
:param device_type: Config device determined device type.
:param snmp_mib: Config device SNMP MIB from SNMPtoDevice.
:param snmp_community: Config device SNMP community from SNMPtoDevice.
:param power: Config device power, received by SNMP.
"""
super().__init__(ip)
self.system_name = system_name
self.device_type = device_type
self.snmp_mib = snmp_mib
self.snmp_community = snmp_community
self.power = power
def __str__(self) -> str:
return "{}{}{}.cfg".format(self.ip, self.system_name, self.device_type)
def args_to_ips(args: List[str]) -> IPSet:
"""
Transform string enumeration of single IP and subnetworks to IPSet.
:param args: IPs in string representation.
:return: IPSet.
"""
result = IPSet()
for arg in args:
if "/" not in arg:
result.add(arg)
else:
net = IPNetwork(arg)
for ip in net.iter_hosts():
if ip == net.network or ip == net.broadcast:
continue
result.add(ip)
return result
def read_settings(file_path: str) -> Dict[str, str]:
"""
Read settings.
:param file_path: Path to settings file.
:return: Settings from file.
"""
config = ConfigParser(interpolation=None)
with open(file_path) as file:
# Workaround for reading ini files, not containing any section
config.read_file(itertools.chain(["[default.section]"], file), source=file_path)
return dict(config["default.section"])
def wildcard_to_regexp(mac_wildcard: str) -> Pattern:
"""
Build regexp from MAC wildcard. Replaces 'X' to any alphanumeric symbol.
:param mac_wildcard: MAC wildcard.
:return: Regexp for MAC, based on wildcard.
"""
return re.compile(mac_wildcard.upper().replace('X', '[0-9A-F]'))
def normalize_mac(mac: str) -> str:
"""
Bring provided MAC to canonical form, removing special symbols like '-', ':', '.'.
:param mac: MAC with possible special symbols.
:return: Normalized MAC.
"""
return mac.strip().replace('-', '').replace(':', '').replace('.', '').upper()
def read_template_str(template_filename: str) -> str:
"""
Read template to string.
:param template_filename: Path to template file.
:return: Template as a string.
"""
with open(template_filename, "r") as file:
return file.read()
def template_file(template_filename: str, params: Dict[str, str]) -> str:
"""
Populate template with params.
:param template_filename: Path to template file.
:param params: Params for populating template.
:return: Result of templating.
"""
template_str = read_template_str(template_filename)
return template_str.format(**params)
def delete_files_in_dir(dir_name: str) -> None:
"""
Clear directory from single files, but not subdirectories.
:param dir_name: Path to cleanable directory.
"""
for file_name in os.listdir(dir_name):
file_path = os.path.join(dir_name, file_name)
if os.path.isfile(file_path):
os.unlink(file_path)
def move_cfg_with_replacement(old_path: str, new_path: str, cfg_dir: str) -> None:
"""
Move config from old path to new path with rewriting paths inside it.
:param old_path: Old path of config.
:param new_path: New path of config.
:param cfg_dir: Destination directory for configs.
"""
with open(old_path, encoding="utf-8", mode="r") as source_file, \
open(new_path, encoding="utf-8", mode="w") as destination_file:
for line in source_file:
destination_file.write(line.replace(os.path.join(cfg_dir, "cfgprep"), cfg_dir))
os.unlink(old_path)
def move_cfgs(source_dir: str, destination_dir: str) -> None:
"""
Move configs from one directory to another with rewriting paths inside them.
:param source_dir: Source directory.
:param destination_dir: Destination directory.
"""
for file_name in os.listdir(source_dir):
source_path = os.path.join(source_dir, file_name)
if os.path.isfile(source_path):
move_cfg_with_replacement(source_path, os.path.join(destination_dir, file_name), destination_dir)
def transfer_cfgs(cfg_dir: str) -> None:
"""
Clean config directory from configs, left from previous time, and than move content of "cfgprep"
into it.
:param cfg_dir: Configs directory.
"""
delete_files_in_dir(cfg_dir)
move_cfgs(os.path.join(cfg_dir, "cfgprep"), cfg_dir)