/usr/share/doc/python-twisted-names/howto/listings/names/override_server.py is in python-twisted-names 17.9.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | # Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
An example demonstrating how to create a custom DNS server.
The server will calculate the responses to A queries where the name begins with
the word "workstation".
Other queries will be handled by a fallback resolver.
eg
python doc/names/howto/listings/names/override_server.py
$ dig -p 10053 @localhost workstation1.example.com A +short
172.0.2.1
"""
from twisted.internet import reactor, defer
from twisted.names import client, dns, error, server
class DynamicResolver(object):
"""
A resolver which calculates the answers to certain queries based on the
query type and name.
"""
_pattern = 'workstation'
_network = '172.0.2'
def _dynamicResponseRequired(self, query):
"""
Check the query to determine if a dynamic response is required.
"""
if query.type == dns.A:
labels = query.name.name.split('.')
if labels[0].startswith(self._pattern):
return True
return False
def _doDynamicResponse(self, query):
"""
Calculate the response to a query.
"""
name = query.name.name
labels = name.split('.')
parts = labels[0].split(self._pattern)
lastOctet = int(parts[1])
answer = dns.RRHeader(
name=name,
payload=dns.Record_A(address=b'%s.%s' % (self._network, lastOctet)))
answers = [answer]
authority = []
additional = []
return answers, authority, additional
def query(self, query, timeout=None):
"""
Check if the query should be answered dynamically, otherwise dispatch to
the fallback resolver.
"""
if self._dynamicResponseRequired(query):
return defer.succeed(self._doDynamicResponse(query))
else:
return defer.fail(error.DomainError())
def main():
"""
Run the server.
"""
factory = server.DNSServerFactory(
clients=[DynamicResolver(), client.Resolver(resolv='/etc/resolv.conf')]
)
protocol = dns.DNSDatagramProtocol(controller=factory)
reactor.listenUDP(10053, protocol)
reactor.listenTCP(10053, factory)
reactor.run()
if __name__ == '__main__':
raise SystemExit(main())
|