This file is indexed.

/usr/lib/python2.7/dist-packages/linkcheck/plugins/viruscheck.py is in linkchecker 9.3-5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Check page content for virus infection with clamav.
"""
import os
import socket
from . import _ContentPlugin
from .. import log, LOG_PLUGIN
from ..socketutil import create_socket


class VirusCheck(_ContentPlugin):
    """Checks the page content for virus infections with clamav.
    A local clamav daemon must be installed."""

    def __init__(self, config):
        """Initialize clamav configuration."""
        super(VirusCheck, self).__init__(config)
        # XXX read config
        self.clamav_conf = get_clamav_conf(canonical_clamav_conf())
        if not self.clamav_conf:
            log.warn(LOG_PLUGIN, "clamav daemon not found for VirusCheck plugin")

    def applies_to(self, url_data):
        """Check for clamav and extern."""
        return self.clamav_conf and not url_data.extern[0]

    def check(self, url_data):
        """Try to ask GeoIP database for country info."""
        data = url_data.get_content()
        infected, errors = scan(data, self.clamav_conf)
        if infected or errors:
            for msg in infected:
                url_data.add_warning(u"Virus scan infection: %s" % msg)
            for msg in errors:
                url_data.add_warning(u"Virus scan error: %s" % msg)
        else:
            url_data.add_info("No viruses in data found.")

    @classmethod
    def read_config(cls, configparser):
        """Read configuration file options."""
        config = dict()
        section = cls.__name__
        option = "clamavconf"
        if configparser.has_option(section, option):
            value = configparser.get(section, option)
        else:
            value = None
        config[option] = value
        return config


class ClamavError (Exception):
    """Raised on clamav errors."""
    pass


class ClamdScanner (object):
    """Virus scanner using a clamd daemon process."""

    def __init__ (self, clamav_conf):
        """Initialize clamd daemon process sockets."""
        self.infected = []
        self.errors = []
        self.sock, self.host = clamav_conf.new_connection()
        self.sock_rcvbuf = \
             self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
        self.wsock = self.new_scansock()

    def new_scansock (self):
        """Return a connected socket for sending scan data to it."""
        port = None
        try:
            self.sock.sendall("STREAM")
            port = None
            for dummy in range(60):
                data = self.sock.recv(self.sock_rcvbuf)
                i = data.find("PORT")
                if i != -1:
                    port = int(data[i+5:])
                    break
        except socket.error:
            self.sock.close()
            raise
        if port is None:
            raise ClamavError(_("clamd is not ready for stream scanning"))
        sockinfo = get_sockinfo(self.host, port=port)
        wsock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            wsock.connect(sockinfo[0][4])
        except socket.error:
            wsock.close()
            raise
        return wsock

    def scan (self, data):
        """Scan given data for viruses."""
        self.wsock.sendall(data)

    def close (self):
        """Get results and close clamd daemon sockets."""
        self.wsock.close()
        data = self.sock.recv(self.sock_rcvbuf)
        while data:
            if "FOUND\n" in data:
                self.infected.append(data)
            if "ERROR\n" in data:
                self.errors.append(data)
            data = self.sock.recv(self.sock_rcvbuf)
        self.sock.close()


def canonical_clamav_conf ():
    """Default clamav configs for various platforms."""
    if os.name == 'posix':
        clamavconf = "/etc/clamav/clamd.conf"
    elif os.name == 'nt':
        clamavconf = r"c:\clamav-devel\etc\clamd.conf"
    else:
        clamavconf = "clamd.conf"
    return clamavconf


def get_clamav_conf(filename):
    """Initialize clamav configuration."""
    if os.path.isfile(filename):
        return ClamavConfig(filename)
    log.warn(LOG_PLUGIN, "No ClamAV config file found at %r.", filename)


def get_sockinfo (host, port=None):
    """Return socket.getaddrinfo for given host and port."""
    family, socktype = socket.AF_INET, socket.SOCK_STREAM
    return socket.getaddrinfo(host, port, family, socktype)


class ClamavConfig (dict):
    """Clamav configuration wrapper, with clamd connection method."""

    def __init__ (self, filename):
        """Parse clamav configuration file."""
        super(ClamavConfig, self).__init__()
        self.parseconf(filename)
        if self.get('ScannerDaemonOutputFormat'):
            raise ClamavError(_("ScannerDaemonOutputFormat must be disabled"))
        if self.get('TCPSocket') and self.get('LocalSocket'):
            raise ClamavError(_("only one of TCPSocket and LocalSocket must be enabled"))

    def parseconf (self, filename):
        """Parse clamav configuration from given file."""
        with open(filename) as fd:
            # yet another config format, sigh
            for line in fd:
                line = line.strip()
                if not line or line.startswith("#"):
                    # ignore empty lines and comments
                    continue
                split = line.split(None, 1)
                if len(split) == 1:
                    self[split[0]] = True
                else:
                    self[split[0]] = split[1]

    def new_connection (self):
        """Connect to clamd for stream scanning.

        @return: tuple (connected socket, host)
        """
        if self.get('LocalSocket'):
            host = 'localhost'
            sock = self.create_local_socket()
        elif self.get('TCPSocket'):
            host = self.get('TCPAddr', 'localhost')
            sock = self.create_tcp_socket(host)
        else:
            raise ClamavError(_("one of TCPSocket or LocalSocket must be enabled"))
        return sock, host

    def create_local_socket (self):
        """Create local socket, connect to it and return socket object."""
        sock = create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        addr = self['LocalSocket']
        try:
            sock.connect(addr)
        except socket.error:
            sock.close()
            raise
        return sock

    def create_tcp_socket (self, host):
        """Create tcp socket, connect to it and return socket object."""
        port = int(self['TCPSocket'])
        sockinfo = get_sockinfo(host, port=port)
        sock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect(sockinfo[0][4])
        except socket.error:
            sock.close()
            raise
        return sock


def scan (data, clamconf):
    """Scan data for viruses.
    @return (infection msgs, errors)
    @rtype ([], [])
    """
    try:
        scanner = ClamdScanner(clamconf)
    except socket.error:
        errmsg = _("Could not connect to ClamAV daemon.")
        return ([], [errmsg])
    try:
        scanner.scan(data)
    finally:
        scanner.close()
    return scanner.infected, scanner.errors