This file is indexed.

/usr/sbin/maas-dhcp-helper is in maas-rack-controller 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o755.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#! /usr/bin/python3
# -*- mode: python -*-
# Copyright 2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Utility script used by maas-dhcpd and maas-dhcpd6.

`notify` is called by maas-dhcpd and maas-dhcpd6 with the on execute command
from isc-dhcp-server when a lease is committed, released, or expired. Sends
a JSON encoded message over the `dhcpd.sock` to the running rackd process.

`clean` is called by maas-dhcpd and maas-dhcpd6 when either service starts
to remove any host entries from the passed leases file. This is used to make
sure that when maas-dhcpd and maas-dhcpd6 start that only the host entries
from the dhcpd.conf or dhcpd6.conf are used.
"""

import argparse


def notify_add_arguments(parser):
    """Initialise options for sending DHCP notification over the dhcpd.sock.

    :param parser: An instance of :class:`ArgumentParser`.
    """
    parser.add_argument(
        "--action", action="store", required=True,
        choices=['commit', 'expiry', 'release'], help=(
            "Action taken by DHCP server for the lease."))
    parser.add_argument(
        "--mac", action="store", required=True, help=(
            "MAC address for lease."))
    parser.add_argument(
        "--ip-family", action="store", required=True, choices=['ipv4', 'ipv6'],
        help="IP address family for lease.")
    parser.add_argument(
        "--ip", action="store", required=True, help=(
            "IP address for lease."))
    parser.add_argument(
        "--lease-time", action="store", type=int, required=False, help=(
            "Length of time before the lease expires."))
    parser.add_argument(
        "--hostname", action="store", required=False, help=(
            "Hostname of the machine for the lease."))
    parser.add_argument(
        "--socket", action="store", required=False,
        default="/var/lib/maas/dhcpd.sock", help=argparse.SUPPRESS)


def notify(args):
    """Write DHCP action notification to the `dhcpd.sock`."""
    # Perform the imports for this command. This speeds up the operation of
    # each command.
    from contextlib import closing
    import json
    import socket
    import time

    notify_packet = {
        "action": args.action,
        "mac": args.mac,
        "ip_family": args.ip_family,
        "ip": args.ip,
        "timestamp": int(time.time()),
    }

    # Lease time is required by the commit action and hostname is optional.
    if args.action == "commit":
        notify_packet["lease_time"] = args.lease_time
        hostname = args.hostname
        has_hostname = (
            hostname is not None and
            len(hostname) > 0 and
            not hostname.isspace())
        if has_hostname:
            notify_packet["hostname"] = hostname

    # Connect to the socket and send the packet over as JSON.
    payload = json.dumps(notify_packet)
    conn = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    conn.connect(args.socket)
    with closing(conn):
        conn.send(payload.encode("utf-8"))


def clean_add_arguments(parser):
    """Initialise options for cleaning the dhcpd.leases file.

    :param parser: An instance of :class:`ArgumentParser`.
    """
    parser.add_argument(
        "leases_file", action="store",
        help=("Leases file to remove host entries from."))


def clean(args):
    """Remove all host entries from leases file."""
    # Perform the imports for this command. This speeds up the operation of
    # each command.
    import re

    with open(args.leases_file, "r", encoding="utf-8") as fp:
        content = fp.read()
    cleaned_content = re.sub("host [^{]+{[^}]+}\n", "", content)
    with open(args.leases_file, "w", encoding="utf-8") as fp:
        fp.write(cleaned_content)


def main():
    """Main entry point for the script."""
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(title="command")
    subparsers.required = True
    subparsers.dest = "command"

    # Notify.
    notify_parser = subparsers.add_parser(
        "notify", help="Write DHCP action notification to the `dhcpd.sock`.")
    notify_parser.set_defaults(handler=notify)
    notify_add_arguments(notify_parser)

    # Clean.
    clean_parser = subparsers.add_parser(
        "clean", help="Remove all host entries from leases file.")
    clean_parser.set_defaults(handler=clean)
    clean_add_arguments(clean_parser)

    # Run the parser and the command.
    args = parser.parse_args()
    args.handler(args)


if __name__ == "__main__":
    main()