Python Scripts

This post presents Python source code of the system that consists of five scripts including:

1. main.py
2. reporting.py
3. search_by_mac.py
4. search_by_snmp.py
5. utils.py

main.py

libraries added 
	os - provides a way of using operating system dependent functionality. 
	subprocess - spawn new processes, connect to their input/output/error pipes, and obtain their return codes
	sys -  module provides access to some variables used or maintained by the interpreter and to functions
	typing - support for types Any, Union, Tuple, Callable, TypeVar, and Generic
	reporting - script written to generate all reports
	search_by_mac - compare responding device by mac address
	search_by_snmp - ping devices and SNMP poll them
	utils - helping classes and functions, used in other modules
"""
import os
import subprocess
import sys
from typing import List

from reporting import Reporting
from search_by_mac import search_by_mac
from search_by_snmp import search_by_snmp
from utils import args_to_ips, read_settings, transfer_cfgs, delete_files_in_dir


def main(args: List[str]) -> None:
    """
    Run all logic.
    :param args: Enumeration of single IPs (like 192.168.0.1) and subnetworks (like 192.168.0.0/24)
	
    """
    settings = read_settings("settings.ini")
    cfg_dir = settings["cfg-directory"]
    cfgprep_dir = os.path.join(cfg_dir, "cfgprep")

    os.makedirs(cfg_dir, exist_ok=True)
    os.makedirs(cfgprep_dir, exist_ok=True)
    os.makedirs(settings["mrtg-output"], exist_ok=True)

    delete_files_in_dir(cfgprep_dir)

    snmp_unavailable_ips, created_snmp_cfgs = search_by_snmp(args_to_ips(args), settings)
    created_mac_cfgs = search_by_mac(snmp_unavailable_ips, settings)

    reporting = Reporting(settings, lambda params: subprocess.call(params))
    reporting.make_reports(created_snmp_cfgs + created_mac_cfgs)

    transfer_cfgs(cfg_dir)


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Not enough arguments. You must pass IP and network addresses")
    else:
        main(sys.argv[1:])

reporting.py

import os
import re
import time
from itertools import groupby
from typing import List, Dict, Iterable, Tuple, Pattern, Callable

from utils import Cfg, DeviceTypeCfg, MacCfg, template_file

DAY_IN_SECONDS = 24 * 60 * 60
MONTH_IN_SECONDS = 30 * DAY_IN_SECONDS


class DeviceGroup(object):
    """
    Represent record from DeviceGroupByMac or DeviceGroupByType file.
    """

    def __init__(self, name: str, group_elements: List[str]):
        """
        :param name: Group name.
        :param group_elements: Group elements. It can be either MAC masks or devices types.
        """

        self.name = name
        self.group_elements = group_elements
        # Regexp, builded group elements
        self.regexp = self.group_elements_to_regexp()

    def match(self, field: str) -> bool:
        """
        Check if field matches the group.

        :param field: Either MAC address or device type.
        :return: Is field belonging to the group.
        """

        return bool(self.regexp.match(field.upper()))

    def elements_str(self) -> str:
        """
        :return: Group elements as single string with ',' as delimiter.
        """

        return ",".join(self.group_elements)

    def group_elements_to_regexp(self) -> Pattern:
        """
        :return: Regexp object from group elements.
        """

        regexp = ""
        first = True

        for group_element in self.group_elements:
            if not first:
                regexp += "|"
            else:
                first = False

            regexp += group_element.upper().replace('X', '[0-9A-F]')

        return re.compile(regexp)


class Reporting(object):
    """
    Contain all data and logic for producing reporting.
    """

    def __init__(self, settings: Dict[str, str], invoke_mrtg: Callable[[List[str]], int]) -> None:
        """
        :param settings: Settings.
        :param invoke_mrtg: Lambda for MRTG invocation
        """

        self.settings = settings
        self.cfg_dir = os.path.join(self.settings["cfg-directory"], "cfgprep")
        self.invoke_mrtg = invoke_mrtg

    def call_mrtg(self, cfg_names: Iterable[str]) -> None:
        """
        Invoke MRTG for each config.

        :param settings: Script settings.
        :param cfg_names: Names of configs with ".cfg" extension.
        :return: Names of generated MRTG logs.
        """

        for cfg_name in cfg_names:
            self.invoke_mrtg(["sudo", "env", "LANG=C", self.settings["mrtg-executable"],
                              os.path.join(self.cfg_dir, cfg_name)])

    @staticmethod
    def extract_first_three_ip_octets(cfg: Cfg) -> str:
        """
        :param cfg: Config information.
        :return: First three octets from IP of config.
        """

        return ".".join(cfg.ip.split(".")[:-1])

    def reports_by_subnets(self, cfgs: List[Cfg]) -> List[Tuple[str, int]]:
        """
        Create reports, grouped by subnets.

        :param cfgs: Information about created configs.
        :return: List of created subnet group configs with count of single configs in each.
        """

        subnet_pairs = []

        cfgs = sorted(cfgs, key=self.extract_first_three_ip_octets)
        for first_three_octets, cfg_group in groupby(cfgs, self.extract_first_three_ip_octets):
            filename = "grp2_all{}.x.cfg".format(first_three_octets)

            first = True
            configs_str = ""
            cfg_count = 0
            for cfg in cfg_group:
                cfg_count += 1

                if not first:
                    configs_str += " + "
                else:
                    first = False

                if isinstance(cfg, DeviceTypeCfg):
                    configs_str += os.path.join(self.cfg_dir, str(cfg))
                elif isinstance(cfg, MacCfg):
                    configs_str += "`more  {}`".format(os.path.join(self.cfg_dir, "{}{}.ini".format(cfg.ip, cfg.mac)))

            with open(os.path.join(self.cfg_dir, filename), "w") as result_cfg:
                result_cfg.write(template_file("ReportBySubnet.template",
                                               {"name": os.path.splitext(filename)[0],
                                                "subnet": first_three_octets + ".x",
                                                "configs": configs_str,
                                                "configs_count": cfg_count,
                                                "max_bytes": 40 * cfg_count}))

            subnet_pairs.append((filename, cfg_count))

        return subnet_pairs

    @staticmethod
    def parse_mrtg_log(mrtg_log_path: str) -> Tuple[int, int]:
        """
        Parse log, generated by MRTG.

        :param mrtg_log_path: Path to MRTG log.
        :return: First two digits, representing power, from second log row.
        """

        with open(mrtg_log_path, encoding="utf-8", mode="r") as log_file:
            # Skip first line
            log_file.readline()
            line_tokens = log_file.readline().split()
            return int(line_tokens[1]), int(line_tokens[2])

    def generate_ini_from_logs(self, cfg_count_pairs: Iterable[Tuple[str, int]]) -> List[str]:
        """
        Generate ini files, based on MRTG logs.

        :param cfg_count_pairs: Pairs of group config, that was used for MRTG invocation,
                                and count of devices in group config.
        :return: Names of generated ini files.
        """

        ini_filenames = []

        for cfg_filename, device_count in cfg_count_pairs:
            cfg_name = os.path.splitext(cfg_filename)[0]

            log_name = os.path.join(self.settings["mrtg-output"], cfg_name.lower() + ".log")
            first_power, second_power = self.parse_mrtg_log(log_name)

            ini_filename = cfg_name + ".ini"
            ini_filenames.append(ini_filename)
            with open(os.path.join(self.cfg_dir, ini_filename), encoding="utf-8", mode="w") as ini_file:
                ini_file.write("\n".join([str(first_power), str(second_power), cfg_name, str(device_count)]))

        return ini_filenames

    @staticmethod
    def group_to_regexp(patterns: List[str]) -> Pattern[str]:
        """
        Created regexp for group elements.

        :param patterns: Patterns of elements from group.
        :return: Regexp for group.
        """

        regexp = ""
        first = True
        for pattern in patterns:
            if not first:
                regexp += "|"
            else:
                first = False

            regexp += pattern

        return re.compile(regexp)

    def report_all_devices(self, ini_filenames: List[str]) -> str:
        """
        Create report for all devices.

        :param ini_filenames: Filenames of ini files of devices.
        :return: Filename of created all device report.
        """

        first = True
        configs_str = ""
        for ini_filename in ini_filenames:
            if not first:
                configs_str += " + "
            else:
                first = False

            configs_str += "`more {}`".format(os.path.join(self.cfg_dir, ini_filename))

        cfg_name = "grp1_alldevices.cfg"
        with open(os.path.join(self.cfg_dir, cfg_name), "w") as all_cfg:
            all_cfg.write(template_file("ReportByAllDevices.template",
                                        {"name": "grp1_alldevices",
                                         "configs": configs_str,
                                         "configs_count": len(ini_filenames),
                                         "max_bytes": 40 * len(ini_filenames)}))
            return cfg_name

    def report_by_building(self, ini_filenames: List[str]) -> List[str]:
        """
        Create reports for devices, grouped by BuildingGroup.

        :param ini_filenames: Filenames of ini files of devices.
        :return: Filenames of created BuildingGroup reports.
        """

        results = []
        with open("BuildingGroup", "r") as file:
            for line in file:
                patterns_group = line.strip().split(",")
                regexp = self.group_to_regexp(patterns_group[1:])

                group_filenames = []
                for ini_filename in ini_filenames:
                    if regexp.match(ini_filename):
                        group_filenames.append(ini_filename)

                if len(group_filenames) == 0:
                    continue

                first = True
                configs_str = ""
                for group_filename in group_filenames:
                    if not first:
                        configs_str += " + "
                    else:
                        first = False

                    configs_str += "`more  {}`".format(os.path.join(self.cfg_dir, group_filename))

                building_cfg_name = "grp3_{}.cfg".format(patterns_group[0])
                results.append(building_cfg_name)
                with open(os.path.join(self.cfg_dir, building_cfg_name), "w") as result_cfg:
                    result_cfg.write(template_file("ReportByBuilding.template",
                                                   {"name": "grp3_" + patterns_group[0],
                                                    "configs": configs_str,
                                                    "configs_count": len(group_filenames),
                                                    "max_bytes": 40 * len(group_filenames)}))

            return results

    @staticmethod
    def read_device_group(file_path: str) -> List[DeviceGroup]:
        """
        Read device group file to object representation.

        :param file_path: Path to file with group description.
        :return: Object representation of device group.
        """

        with open(file_path, "r") as file:
            result = []

            for line in file:
                tokens = line.strip().split(",")
                result.append(DeviceGroup(tokens[0], tokens[1:]))

            return result

    def report_by_type(self, cfgs: List[Cfg]) -> List[Tuple[str, int, int]]:
        """
        Create reports for devices, grouped by device type.

        :param cfgs: Information about created configs.
        :return: List of created config name, count configs in group and sum of device power.
        """

        device_groups = self.read_device_group("DeviceGroupByType")

        device_type_pairs = []
        for device_group in device_groups:
            for device_type in device_group.group_elements:
                match_cfgs = list(filter(lambda cfg: isinstance(cfg, DeviceTypeCfg) and cfg.device_type == device_type, cfgs))

                if len(match_cfgs) == 0:
                    continue

                first = True
                snmp_mib_str = ""
                power_sum = 0
                for match_cfg in match_cfgs:
                    if not first:
                        snmp_mib_str += " + "
                    else:
                        first = False

                    power_sum += int(match_cfg.power)
                    snmp_mib_str += "{0}&{0}:{1}@{2}".format(match_cfg.snmp_mib,
                                                             self.settings["snmp-password"],
                                                             match_cfg.ip)

                name = "grp4_{}{}".format(device_group.name, device_type)
                cfg_name = name + ".cfg"
                with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
                    result_cfg.write(template_file("ReportByType.template",
                                                   {"name": name,
                                                    "device_type": device_type,
                                                    "snmp_mibs": snmp_mib_str,
                                                    "configs_count": len(match_cfgs),
                                                    "power_sum": power_sum}))
                device_type_pairs.append((cfg_name, len(match_cfgs), power_sum))

        return device_type_pairs

    def report_by_group(self, device_type_triples: List[Tuple[str, int, int]]) -> List[str]:
        """
        Create "By Group" reports.

        :param device_type_triples: Information about created configs.
        :return: By group config report.
        """

        results = []
        for device_group in self.read_device_group("DeviceGroupByType"):
            match_cfgs = []
            all_sum = 0

            for device_type in device_group.group_elements:
                for cfg_name, cfg_count, power_sum in device_type_triples:
                    ini_filename = os.path.splitext(cfg_name)[0] + ".ini"

                    if device_type in ini_filename:
                        all_sum += power_sum
                        match_cfgs.append(ini_filename)

            if len(match_cfgs) == 0:
                continue

            first = True
            configs_str = ""
            for match_cfg in match_cfgs:
                if not first:
                    configs_str += " + "
                else:
                    first = False

                configs_str += "`more {}`".format(os.path.join(self.cfg_dir, str(match_cfg)))

            cfg_name = "grp5_{}.cfg".format(device_group.name)
            results.append(cfg_name)
            with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
                result_cfg.write(template_file("ReportByGroup.template",
                                               {"name": "grp5_{}".format(device_group.name),
                                                "device_type": device_group.elements_str(),
                                                "device_group": device_group.name,
                                                "configs": configs_str,
                                                "configs_count": len(match_cfgs),
                                                "power_sum": all_sum}))

        return results

    def report_group_by_mac(self, cfgs: List[Cfg]) -> List[str]:
        """
        Create reports for devices, grouped by MAC.

        :param cfgs: Information about created configs.
        :return: List of created reports grouped by MAC.
        """

        results = []
        for device_group in self.read_device_group("DeviceGroupByMac"):
            match_cfgs = list(filter(lambda cfg: isinstance(cfg, MacCfg) and device_group.match(cfg.mac),
                                     cfgs))

            if len(match_cfgs) == 0:
                continue

            first = True
            configs_str = ""
            for match_cfg in match_cfgs:
                if not first:
                    configs_str += " + "
                else:
                    first = False

                configs_str += "`more {}`".format(os.path.join(self.cfg_dir, "{}{}.ini".format(match_cfg.ip,
                                                                                               match_cfg.mac)))

            cfg_name = "grp6_{}byMAC.cfg".format(device_group.name)
            results.append(cfg_name)
            with open(os.path.join(self.cfg_dir, cfg_name), "w") as result_cfg:
                result_cfg.write(template_file("ReportByGroupByMAC.template",
                                               {"name": "grp6_{}".format(device_group.name),
                                                "configs_count": len(match_cfgs),
                                                "device_group": device_group.name,
                                                "configs": configs_str}))

        return results

    def report_all_assets_summary(self) -> str:
        """
        Create all assets summary report.

        :return: Filename of all assets summary report.
        """

        now = time.time()
        updated_in_last_day = 0
        updated_in_last_month = 0

        for file_name in os.listdir(self.settings["mrtg-output"]):
            file_path = os.path.join(self.settings["mrtg-output"], file_name)

            if os.path.isfile(file_path):
                updated = now - os.path.getmtime(file_path)

                if updated < DAY_IN_SECONDS:
                    updated_in_last_day += 1
                if updated < MONTH_IN_SECONDS:
                    updated_in_last_month += 1

        summary_ini_path = os.path.join(self.cfg_dir, "AllAssetsSummary.ini")
        with open(summary_ini_path, "w") as file:
            file.write("\n".join([str(updated_in_last_month),
                                  str(updated_in_last_day),
                                  "Responding in the last 30 days",
                                  "Responding in the last 24 hours"]))

        all_assets_cfg = "AllAssetsSummary.cfg"
        with open(os.path.join(self.cfg_dir, all_assets_cfg), "w") as file:
            file.write(template_file("ReportByAllAssetsSummary.template",
                                     {"configs": "`more {}`".format(summary_ini_path),
                                      "updated_in_last_month": updated_in_last_month}))
            return all_assets_cfg

    def make_reports(self, cfgs: List[Cfg]) -> None:
        """
        Create reports for generated single configs.

        :param cfgs: Created single device configs.
        """

        # Individual reporting
        self.call_mrtg((str(cfg) for cfg in cfgs))

        # Group 2 - by Subnet
        subnets_pairs = self.reports_by_subnets(cfgs)
        self.call_mrtg((pair[0] for pair in subnets_pairs))
        subnet_ini_filenames = self.generate_ini_from_logs(subnets_pairs)

        # Group 1 - All devices
        all_devices_cfg = self.report_all_devices(subnet_ini_filenames)
        self.call_mrtg([all_devices_cfg])

        # Group 3 - By Building
        building_cfgs = self.report_by_building(subnet_ini_filenames)
        self.call_mrtg(building_cfgs)

        # Group 4 - By Type
        device_type_triples = self.report_by_type(cfgs)
        self.call_mrtg((pair[0] for pair in device_type_triples))
        self.generate_ini_from_logs(((pair[0], pair[1]) for pair in device_type_triples))

        # Group 5 - By Group
        type_group_cfgs = self.report_by_group(device_type_triples)
        self.call_mrtg(type_group_cfgs)

        # Group 6 - By GroupByMac
        mac_group_cfgs = self.report_group_by_mac(cfgs)
        self.call_mrtg(mac_group_cfgs)

		# All assets summary report
        all_assets_cfg = self.report_all_assets_summary()
        self.call_mrtg([all_assets_cfg])

searchbymac.py

import csv
import datetime
import os
from typing import Dict, List, Optional

from netaddr import IPSet

from utils import MacCfg, wildcard_to_regexp, normalize_mac, template_file


class MacDevice(object):
    """
    Represent record from MACtoDevice file.
    """

    def __init__(self, mac: str, device_type: str, site: str, power_use: str, comment: str):
        """
        :param mac: MAC address mask.
        :param device_type: Device type.
        :param site: Site.
        :param power_use: Power use.
        :param comment: Comment.
        """

        self.mac = normalize_mac(mac)
        self.mac_regexp = wildcard_to_regexp(self.mac)
        self.device_type = device_type
        self.site = site
        self.power_use = power_use
        self.comment = comment

    def match(self, normalized_mac: str) -> bool:
        """
        Examine is provided MAC corresponds to MAC.

        :param normalized_mac: MAC for check.
        :return: Is provided MAC match the mask.
        """

        return bool(self.mac_regexp.match(normalized_mac))


def read_ip_to_mac(file_path: str) -> Dict[str, str]:
    """
    Read IPtoMAC into object representation.

    :param file_path: Path to IPtoMAC file.
    :return: Dictionary with IP to MAC entries.
    """

    with open(file_path, "r") as file:
        rows = csv.reader(file, delimiter=',')

        result = {}
        for row in rows:
            result[row[0]] = normalize_mac(row[1])

        return result


def read_mac_devices(file_path: str) -> List[MacDevice]:
    """
    Read MACtoDevice into object representation.

    :param file_path: Path to MACtoDevice file.
    :return: List of MacDevice records.
    """

    with open(file_path, "r") as file:
        rows = csv.reader(file, delimiter=',')

        result = []
        for row in rows:
            result.append(MacDevice(mac=normalize_mac(row[0]),
                                    device_type=row[1],
                                    site=row[2],
                                    power_use=row[3],
                                    comment=row[4]))
        return result


def find_mac_device(devices: List[MacDevice], normalized_mac: str) -> Optional[MacDevice]:
    """
    Search MacDevice record by MAC.

    :param devices: MacDevice records.
    :param normalized_mac: MAC in normal form.
    :return: Found MacDevice record or None.
    """

    for device in devices:
        if device.match(normalized_mac):
            return device

    return None


def search_by_mac(ips: IPSet, settings: Dict[str, str]) -> List[MacCfg]:
    """
    Searching provided devices by IP addresses in IPtoMAC and MACtoDevice files.

    :param ips: IP addresses of devices.
    :param settings: Settings.
    :return: List of created MAC configs.
    """

    mac_by_ip = read_ip_to_mac("IPtoMAC")
    devices = read_mac_devices("MACtoDevice")
    cfg_dir = os.path.join(settings["cfg-directory"], "cfgprep")

    created_cfgs = []
    now = datetime.datetime.now().strftime("%d%m%Y %H%M%S")
    with open(os.path.join(cfg_dir, "NoMacList"), "w") as no_mac_file, \
         open(os.path.join(cfg_dir, "NoMacMatchList"), "w") as no_mac_match_file:
        no_mac_csvwriter = csv.writer(no_mac_file, delimiter=',')
        no_mac_match_csvwriter = csv.writer(no_mac_match_file, delimiter=',')

        for ip in ips:
            mac = mac_by_ip.get(str(ip))
            if mac:
                device = find_mac_device(devices, mac)
                if device:
                    ini_path = os.path.join(cfg_dir, "{}{}.ini".format(ip, mac))
                    with open(ini_path, "w") as ini:
                        ini.write(template_file("IPaddressMACaddress.ini.template",
                                                {'mac_address': mac,
                                                 'ip_address': ip,
                                                 'site': device.site,
                                                 'location': None,
                                                 'device_type': device.device_type,
                                                 'power_use': device.power_use,
                                                 'comment': device.comment}))

                    with open(os.path.join(cfg_dir, "{}{}.cfg".format(ip, mac)), "w") as cfg:
                        cfg.write(template_file("IPaddressMACaddress.cfg.template",
                                                {'path': ini_path,
                                                 'mac_address': mac,
                                                 'ip_address': ip,
                                                 'site': device.site,
                                                 'location': None,
                                                 'device_type': device.device_type,
                                                 'power_use': device.power_use,
                                                 'comment': device.comment}))

                    created_cfgs.append(MacCfg(str(ip), mac))
                else:
                    no_mac_match_csvwriter.writerow([ip, mac, now])
            else:
                # TODO: where to get MAC?
                no_mac_csvwriter.writerow([ip, mac, now])

    return created_cfgs

searchbysnmp.py

import csv
import datetime
import os
import platform
import subprocess
from typing import List, Optional, Dict, Tuple

from multiping import multi_ping, multiping
from netaddr import IPSet
from pysnmp.hlapi import *

from utils import DeviceTypeCfg, template_file

old_send = MultiPing._send_ping


def send_ping_exc(self, dest_addr, payload):
    """
    Decorate MultiPing._send_ping with more precise error message.

    :param self: MultiPing object.
    :param dest_addr: Address for ping.
    :param payload: The payload must be specified as a packed byte string.
                    Note that its length has to be divisible by 2 for this to work correctly.
    """

    try:
        old_send(self, dest_addr, payload)
    except Exception as exc:
        err_msg = "Destination address: {}".format(dest_addr)
        raise ValueError(err_msg) from exc


MultiPing._send_ping = send_ping_exc


class SNMPDevice(object):
    """
    Record representation from SNMPtoDevice.
    """

    def __init__(self, ip: str, hostname: str, site: str, location: str, device_type: str,
                 power_oid: Optional[str], reference_power: Optional[str], comment: str):
        """
        :param ip: IP address.
        :param hostname: Device hostname.
        :param site: Device site.
        :param location: Location of device.
        :param device_type: Device type.
        :param power_oid: Optional power oid for device.
        :param reference_power: Optional reference power of device.
        :param comment: Comment for device.
        """

        self.ip = ip
        self.hostname = hostname
        self.site = site
        self.location = location
        self.device_type = device_type
        self.power_oid = power_oid
        self.reference_power = reference_power
        self.comment = comment


def read_snmp_devices(file_path: str) -> List[SNMPDevice]:
    """
    Read SNMPtoDevice file to object records.

    :param file_path: Path to SNMPtoDevice file.
    :return: SNMPtoDevice object records.
    """

    with open(file_path, "r") as file:
        rows = csv.reader(file, delimiter=',')

        result = []
        for row in rows:
            result.append(SNMPDevice(ip=row[0],
                                     hostname=row[1],
                                     site=row[2],
                                     location=row[3],
                                     device_type=row[4],
                                     power_oid=row[5],
                                     reference_power=row[6],
                                     comment=row[7]))

        return result


def find_snmp_device_by_ip(devices: List[SNMPDevice], ip: str) -> Optional[SNMPDevice]:
    """
    Find device in SNMPDevice records by IP address.

    :param devices: SNMPDevice records.
    :param ip: IP address.
    :return: Found device or None.
    """

    for device in devices:
        if device.ip == ip:
            return device

    return None


def find_snmp_device_by_hostname(devices: List[SNMPDevice], hostname: str) -> Optional[SNMPDevice]:
    """
    Find device in SNMPDevice records by hostname.

    :param devices: SNMPDevice records.
    :param hostname: Hostname.
    :return: Found device or None.
    """

    for device in devices:
        if device.hostname.lower() == hostname.lower():
            return device

    return None


def find_snmp_device_by_type(devices: List[SNMPDevice], device_type: str) -> Optional[SNMPDevice]:
    """
    Find device in SNMPDevice records by device type.

    :param devices: SNMPDevice records.
    :param device_type: Device type.
    :return: Found device or None.
    """

    for device in devices:
        if device.device_type.lower() == device_type.lower():
            return device

    return None


def search_alive_by_ping(ips: IPSet, settings: Dict[str, str]) -> IPSet:
    """
    Ping IP addresses to determine aliveness.

    :param ips: Set of IP addresses to ping.
    :param settings: Settings.
    :return: Set of alive IP addresses.
    """

    timeout = int(settings["ping-timeout"])
    retries = int(settings["ping-retries"])
    hosts = [str(ip) for ip in ips]

    if settings["use-system-ping"].lower() == "false":
        available, dead = multi_ping(hosts, int(timeout), int(retries))
        return IPSet(available.keys())
    else:
        available = []

        for host in hosts:
            command = ['ping', '-n' if platform.system().lower() == 'windows' else '-c', str(retries + 1),
                       '-W', str(timeout), host]

            if subprocess.call(command) == 0:
                available.append(host)

        return IPSet(available)


def snmp_get(oid: str, host: str, community="public", port=161, timeout=0.5, retries=1) -> Optional[str]:
    """
    Send SNMP request.

    :param oid: OID for requesting.
    :param host: Host for sending request.
    :param community: SNMP community.
    :param port: SNMP port.
    :param timeout: Timeout for SNMP request.
    :param retries: Count of SNMP request retries.
    :return: Result of SNMP request or None.
    """

    result = getCmd(SnmpEngine(), CommunityData(community),
                    UdpTransportTarget((host, port), timeout, retries),
                    ContextData(), ObjectType(ObjectIdentity(oid)))

    error_indication, error_status, error_idx, var_binds = next(result)
    if error_indication or error_status:
        return None

    res = var_binds[0][1].prettyPrint()
    return res


def search_by_snmp(ips: IPSet, settings: Dict[str, str]) -> Tuple[IPSet, List[DeviceTypeCfg]]:
    """
    Examine devices of provided IP addresses by SNMP requests.

    :param ips: IP addresses of alive devices.
    :param settings: Settings.
    :return: IP addresses, unavailable by SNMP, and list of generated SNMP configs.
    """

    alive_ips = search_alive_by_ping(ips, settings)
    cfg_dir = os.path.join(settings["cfg-directory"], "cfgprep")

    snmp_devices = read_snmp_devices("SNMPtoDevice")

    snmp_unavailable_ips = IPSet()
    created_cfgs = []
    now = datetime.datetime.now().strftime("%d%m%Y %H%M%S")
    with open(os.path.join(cfg_dir, "NotInMibList"), "w") as not_in_mib_file:
        not_in_mib_csvwriter = csv.writer(not_in_mib_file, delimiter=',')

        for ip in alive_ips:
            ip = str(ip)
            system_name = snmp_get(settings["snmp-system-name-oid"], ip, settings["snmp-password"],
                                   int(settings["snmp-port"]), float(settings["snmp-timeout"]),
                                   int(settings["snmp-retries"]))
            if not system_name:
                print("Can't get system name for IP: {}".format(ip))
                snmp_unavailable_ips.add(ip)
                continue

            snmp_device = find_snmp_device_by_hostname(snmp_devices, system_name)
            if not snmp_device:
                snmp_device = find_snmp_device_by_ip(snmp_devices, ip)

            if not snmp_device:
                print("Can't find snmp device by hostname or IP: {}".format(ip))
                system_type = snmp_get(settings["snmp-system-type-oid"], ip,
                                       settings["snmp-password"],
                                       int(settings["snmp-port"]),
                                       float(settings["snmp-timeout"]),
                                       int(settings["snmp-retries"]))
                snmp_device = find_snmp_device_by_type(snmp_devices, system_type)

            if not snmp_device:
                print("Can't find snmp by type for IP: {}".format(ip))
                not_in_mib_csvwriter.writerow([ip, system_name, now])
                # TODO: snmp available
                snmp_unavailable_ips.add(ip)
                continue

            if snmp_device.power_oid:
                # TODO: what if no power
                power = snmp_get(snmp_device.power_oid, ip,
                                 settings["snmp-password"],
                                 int(settings["snmp-port"]),
                                 float(settings["snmp-timeout"]),
                                 int(settings["snmp-retries"]))
            else:
                print("Using reference power for snmp device with IP: {}".format(ip))
                power = snmp_device.reference_power
                # From last edits
                not_in_mib_csvwriter.writerow([ip, system_name, now])
                # TODO: snmp available
                snmp_unavailable_ips.add(ip)
                continue

            cfg_name = "{}{}{}.cfg".format(ip, system_name, snmp_device.device_type)
            with open(os.path.join(cfg_dir, cfg_name), "w") as cfg:
                cfg.write(template_file("IPaddressSystemNameDeviceType.cfg.template",
                                        {'system_name': system_name,
                                         'ip_address': ip,
                                         'site': snmp_device.site,
                                         'location': snmp_device.location,
                                         'snmp_mib': snmp_device.power_oid,
                                         'device_type': snmp_device.device_type,
                                         'snmp_community': settings["snmp-password"],
                                         'power_use': power}))

            created_cfgs.append(DeviceTypeCfg(str(ip), system_name, snmp_device.device_type,
                                              snmp_device.power_oid, settings["snmp-password"], power))

    return snmp_unavailable_ips, created_cfgs

utils.py

import itertools
import os
import re
from abc import ABC
from configparser import ConfigParser
from typing import List, Dict, Pattern

from netaddr import IPSet, IPNetwork


class Cfg(ABC):
    """
    Describe created abstract config for device.
    """

    def __init__(self, ip: str) -> None:
        """
        :param ip: Config device IP address.
        """

        self.ip = ip


class MacCfg(Cfg):
    """
    Describe created config for device, that wasn't respond on SNMP, and was based on MACtoDevice.
    """

    def __init__(self, ip: str, mac: str) -> None:
        """
        :param ip: Config device IP address.
        :param mac: Config device MAC address.
        """

        super().__init__(ip)

        self.mac = mac

    def __str__(self) -> str:
        """
        :return: Name of created config file.
        """

        return "{}{}.cfg".format(self.ip, self.mac)


class DeviceTypeCfg(Cfg):
    """
    Describe created config for device, that responded to SNMP, and was based on SNMPtoDevice.
    """

    def __init__(self, ip: str, system_name: str, device_type: str, snmp_mib: str,
                 snmp_community: str, power: str) -> None:
        """
        :param ip: Config device IP address.
        :param system_name: Config device determined system name.
        :param device_type: Config device determined device type.
        :param snmp_mib: Config device SNMP MIB from SNMPtoDevice.
        :param snmp_community: Config device SNMP community from SNMPtoDevice.
        :param power: Config device power, received by SNMP.
        """

        super().__init__(ip)

        self.system_name = system_name
        self.device_type = device_type
        self.snmp_mib = snmp_mib
        self.snmp_community = snmp_community
        self.power = power

    def __str__(self) -> str:
        return "{}{}{}.cfg".format(self.ip, self.system_name, self.device_type)


def args_to_ips(args: List[str]) -> IPSet:
    """
    Transform string enumeration of single IP and subnetworks to IPSet.

    :param args: IPs in string representation.
    :return: IPSet.
    """

    result = IPSet()
    for arg in args:
        if "/" not in arg:
            result.add(arg)
        else:
            net = IPNetwork(arg)
            for ip in net.iter_hosts():
                if ip == net.network or ip == net.broadcast:
                    continue

                result.add(ip)

    return result


def read_settings(file_path: str) -> Dict[str, str]:
    """
    Read settings.

    :param file_path: Path to settings file.
    :return: Settings from file.
    """

    config = ConfigParser(interpolation=None)

    with open(file_path) as file:
        # Workaround for reading ini files, not containing any section
        config.read_file(itertools.chain(["[default.section]"], file), source=file_path)

    return dict(config["default.section"])


def wildcard_to_regexp(mac_wildcard: str) -> Pattern:
    """
    Build regexp from MAC wildcard. Replaces 'X' to any alphanumeric symbol.

    :param mac_wildcard: MAC wildcard.
    :return: Regexp for MAC, based on wildcard.
    """

    return re.compile(mac_wildcard.upper().replace('X', '[0-9A-F]'))


def normalize_mac(mac: str) -> str:
    """
    Bring provided MAC to canonical form, removing special symbols like '-', ':', '.'.

    :param mac: MAC with possible special symbols.
    :return: Normalized MAC.
    """

    return mac.strip().replace('-', '').replace(':', '').replace('.', '').upper()


def read_template_str(template_filename: str) -> str:
    """
    Read template to string.

    :param template_filename: Path to template file.
    :return: Template as a string.
    """

    with open(template_filename, "r") as file:
        return file.read()


def template_file(template_filename: str, params: Dict[str, str]) -> str:
    """
    Populate template with params.

    :param template_filename: Path to template file.
    :param params: Params for populating template.
    :return: Result of templating.
    """

    template_str = read_template_str(template_filename)
    return template_str.format(**params)


def delete_files_in_dir(dir_name: str) -> None:
    """
    Clear directory from single files, but not subdirectories.

    :param dir_name: Path to cleanable directory.
    """

    for file_name in os.listdir(dir_name):
        file_path = os.path.join(dir_name, file_name)

        if os.path.isfile(file_path):
            os.unlink(file_path)


def move_cfg_with_replacement(old_path: str, new_path: str, cfg_dir: str) -> None:
    """
    Move config from old path to new path with rewriting paths inside it.

    :param old_path: Old path of config.
    :param new_path: New path of config.
    :param cfg_dir: Destination directory for configs.
    """

    with open(old_path, encoding="utf-8", mode="r") as source_file, \
         open(new_path, encoding="utf-8", mode="w") as destination_file:
        for line in source_file:
            destination_file.write(line.replace(os.path.join(cfg_dir, "cfgprep"), cfg_dir))
    os.unlink(old_path)


def move_cfgs(source_dir: str, destination_dir: str) -> None:
    """
    Move configs from one directory to another with rewriting paths inside them.

    :param source_dir: Source directory.
    :param destination_dir: Destination directory.
    """

    for file_name in os.listdir(source_dir):
        source_path = os.path.join(source_dir, file_name)

        if os.path.isfile(source_path):
            move_cfg_with_replacement(source_path, os.path.join(destination_dir, file_name), destination_dir)


def transfer_cfgs(cfg_dir: str) -> None:
    """
    Clean config directory from configs, left from previous time, and than move content of "cfgprep"
    into it.

    :param cfg_dir: Configs directory.
    """

    delete_files_in_dir(cfg_dir)
    move_cfgs(os.path.join(cfg_dir, "cfgprep"), cfg_dir)