File fetch_ethercodes.py of Package arpwatch-ethercodes

#! /usr/bin/env python3
"""
Synopsis:
Fetch and generate ethercodes data for arpwatch

Usage: {appname} [-hVvfkt][-T sec][-O ouifile][-o outfile][-p spec]
       -h, --help           this message
       -V, --version        print version and exit
       -v, --verbose        verbose mode (cumulative)
       -f, --force          force operation
       -k, --keep           keep existing {ouifile}
       -t, --timestamp      print timestamp
       -T, --deltat sec     tolerance in timestamp comparison
                            (default: {deltat} sec.)
       -O, --ouifile file   IEEE.org host
                            (default: {ouifile})
       -o, --outfile file   arpwatch ethercodes
                            (default: {outfile})
       -p, --patch spec     patch specfile with updated timestamp

Description:
Fetch current IEEE MA-L Assignments file (oui.csv) from IEEE.org,
and generate ethercodes.dat for arpwatch consumption.

Fetch oui.csv only, if the timestamp is newer (unless --force is given).
Similar, generate ethercodes.dat only, if the timestamps don't match
(again, unless --force is given). Use option --keep to (re)generate
ethercodes.dat from an existing oui.csv.

Patch replaces a pattern NNNNNNNN_NNNNNN with the current timestamp
in the given file.

Notes:
The timestamps of oui.csv fluctuate in a 2 seconds range(!). Therefore
compensate the fluctuation by taking a deltat tolerance factor into
account.

Copyright:
(c)2018 by {author} {email}

License:
{license}
"""
#
#

__version__ = '0.3'
__author__ = 'Hans-Peter Jansen'
__email__ = '<hpj@urpla.net>'
__license__ = 'MIT'


import os
import re
import csv
import sys
import time
import getopt
import traceback
import email.utils
import urllib.error
import urllib.parse
import urllib.request

# avoid encoding issues with print() in locale-less environments
os.environ["PYTHONIOENCODING"] = "utf-8"

class gpar:
    """ global parameter class """
    appdir, appname = os.path.split(sys.argv[0])
    if appdir == '.':
        appdir = os.getcwd()
    pid = os.getpid()
    version = __version__
    author = __author__
    email = __email__
    license = __license__
    loglevel = 0
    force = False
    keep = False
    timestamp = False
    deltat = 2.5
    ouifile = 'http://standards-oui.ieee.org/oui/oui.csv'
    outfile = 'ethercodes.dat'
    patchfile = None


def vout(lvl, msg):
    if lvl <= gpar.loglevel:
        print((msg).format(**gpar.__dict__), file = sys.stdout, flush = True)

stderr = lambda *msg: print(*msg, file = sys.stderr, flush = True)

def exit(ret = 0, msg = None, usage = False):
    """ terminate process with optional message and usage """
    if msg:
        stderr('{}: {}'.format(gpar.appname, msg))
    if usage:
        stderr(__doc__.format(**gpar.__dict__))
    sys.exit(ret)


def cmp_ts(t1, t2):
    """ compare timestamps while taking a global tolerance factor into
        account
        return True, if timestamps match
    """
    return abs(t1 - t2) < gpar.deltat


hex = lambda x: int(x, 16)
hexstr = lambda h: format(h, 'x')

def code_key(val):
    """ return the colon formated code key, if val is an exact 24 byte
        hexadecimal string, and None otherwise
        000000 -> 0:0:0
        ABCDEF -> ab:cd:ef
    """
    if len(val) == 6:
        return ':'.join((map(hexstr, map(hex, (val[:2], val[2:4], val[4:])))))


def append_qemu(content):
    content += b'MA-L,525400,QEMU/KVM,Cyberspace\x0d\x0a'
    return content

def fetch_infile(infile):
    # check oui.csv parameter
    vout(1, 'check {ouifile}')
    req = urllib.request.urlopen(gpar.ouifile)
    vout(3, 'header info: {}'.format(req.info()))
    header = req.info()
    ouisize = int(header['Content-Length'])
    vout(1, 'oui file size: {}'.format(ouisize))
    ouidate = header['Last-Modified']
    vout(1, 'oui file date: {}'.format(ouidate))
    ouidate = email.utils.parsedate(ouidate)
    ouitime = time.mktime(ouidate)
    vout(3, 'parsed oui file date: {} ({})'.format(
            time.asctime(ouidate), ouitime))

    # check, if local oui.csv is outdated
    fetchoui = False
    if gpar.force:
        fetchoui = True
    elif not os.path.exists(infile):
        vout(1, 'no local file {infile} found')
        fetchoui = True
    elif os.path.getsize(infile) != ouisize:
        vout(1, 'local file size differs: {} vs. {} remote'.format(
                os.path.getsize(infile), ouisize))
        fetchoui = True
    elif not cmp_ts(os.stat(infile).st_mtime, ouitime):
        vout(3, str(os.stat(infile).st_mtime))
        vout(3, str(ouitime))
        mtime = time.localtime(os.stat(infile).st_mtime)
        otime = time.localtime(ouitime)
        vout(1, 'local file date differs: {} vs. {} remote'.format(
                time.asctime(mtime), time.asctime(otime)))
        fetchoui = True
    # fetch oui.csv
    if fetchoui:
        vout(1, 'fetch {ouifile}')
        content = req.read()
        content = append_qemu(content)
        lines = content.split(sep=b'\r')
        heading = lines.pop(0).replace(b'\n', b'', 1)

        sort_dict = {}
        for line in lines:
            stripped = line.replace(b'\n', b'', 1)
            items = stripped.split(b',', 3)
            if len(stripped) == 0:
                continue
            OUI = items[1].decode('ascii').lower()
            if OUI in sort_dict:
                sort_dict[OUI].append(stripped)
            else:
                sort_dict[OUI] = [stripped]

        with open(infile, 'wb') as f:
            CRLF = b'\r\n'
            f.write(heading)
            f.write(CRLF)
            for OUI in sorted(sort_dict.keys()):
                items = sort_dict[OUI]
                for item in items:
                    f.write(item)
                    f.write(CRLF)
                        
        os.utime(infile, (ouitime, ouitime))

    return ouidate


def parse_csv(infile):
    vout(1, 'parse {infile}')
    codes = {}
    with open(infile, newline = '', encoding = 'utf-8') as f:
        reader = csv.reader(f)
        for row in reader:
            vout(3, str(row))
            # generate
            code = code_key(row[1])
            if code:
                if code in codes and codes[code] != row[2]:
                    vout(1, 'value {} exists already: "{}", "{}"'.format(
                            code, codes[code], row[2]))
                else:
                    codes[code] = row[2]
    return codes


def patch_file(patchfile, timestamp):
    vout(1, 'patch {}'.format(patchfile))
    with open(patchfile, 'r+', encoding = 'utf-8') as f:
        content = f.read()
        patched, count = re.subn('\d{8}', timestamp, content)
        if count and content != patched:
            f.seek(0)
            f.write(patched)
            vout(1, '{} occurances replaced in {}'.format(count, patchfile))
        else:
            vout(1, '{} unchanged'.format(patchfile))


def fetch_ethercodes():
    # extract file argument from URL
    gpar.infile = os.path.basename(urllib.parse.urlparse(gpar.ouifile).path)
    # default: oui.csv
    infile = gpar.infile
    # default: ethercodes.dat
    outfile = gpar.outfile
    if gpar.keep:
        if os.path.exists(infile):
            # force generation of ethercodes.dat from existing oui.csv
            gpar.force = True
            ouidate = time.localtime(os.stat(infile).st_mtime)
        else:
            exit(1, 'keep option selected, but no {infile}')
    else:
        ouidate = fetch_infile(infile)
    ouitime = time.mktime(ouidate)

    # check, if ethercodes.dat is outdated
    gencodes = False
    if gpar.force:
        gencodes = True
    elif not os.path.exists(outfile):
        vout(1, 'no local file {} found'.format(outfile))
        gencodes = True
    elif not cmp_ts(os.stat(outfile).st_mtime, ouitime):
        vout(3, str(os.stat(outfile).st_mtime))
        vout(3, str(ouitime))
        mtime = time.localtime(os.stat(outfile).st_mtime)
        otime = time.localtime(ouitime)
        vout(2, 'local file date differs: {} vs. {} remote'.format(
                time.asctime(mtime), time.asctime(otime)))
        gencodes = True

    # generate ethercodes.dat
    if gencodes:
        codes = parse_csv(infile)
        gpar.nrcodes = len(codes)
        vout(1, 'generate {outfile} with {nrcodes} entries')
        with open(outfile, 'w', newline = '', encoding = 'utf-8') as f:
            for key in sorted(codes.keys()):
                f.write('%s\t%s\n' % (key, codes[key]))
        os.utime(outfile, (ouitime, ouitime))
        vout(1, 'successful')
    else:
        vout(1, 'code file {outfile} up to date already')

    timestamp = time.strftime('%Y%m%d', ouidate)
    if gpar.timestamp:
        vout(0, 'timestamp: {}'.format(timestamp))

    if gpar.patchfile is not None:
        patch_file(gpar.patchfile, timestamp)

    return 0


def main(argv = None):
    if argv is None:
        argv = sys.argv[1:]

    # yeah, oldschool, I know...
    try:
        optlist, args = getopt.getopt(argv, 'hVvfktT:O:o:p:',
            ('help', 'version', 'verbose', 'force', 'keep', 'timestamp',
             'deltat', 'ouifile', 'outfile', 'patch')
        )
    except getopt.error as msg:
        exit(1, msg, True)

    for opt, par in optlist:
        if opt in ('-h', '--help'):
            exit(usage = True)
        elif opt in ('-V', '--version'):
            exit(msg = 'version %s' % gpar.version)
        elif opt in ('-v', '--verbose'):
            gpar.loglevel += 1
        elif opt in ('-f', '--force'):
            gpar.force = True
        elif opt in ('-k', '--keep'):
            gpar.keep = True
        elif opt in ('-t', '--timestamp'):
            gpar.timestamp = True
        elif opt in ('-T', '--deltat'):
            gpar.deltat = par
        elif opt in ('-O', '--ouifile'):
            gpar.ouifile = par
        elif opt in ('-o', '--outfile'):
            gpar.outfile = par
        elif opt in ('-p', '--patch'):
            gpar.patchfile = par

    try:
        fetch_ethercodes()
    except Exception:
        exit(2, 'unexpected exception occured:\n%s' % traceback.format_exc())


if __name__ == '__main__':
    sys.exit(main())
# vim: ts=8 shiftwidth=8 expandtab
openSUSE Build Service is sponsored by