/usr/lib/python2.7/dist-packages/linkcheck/plugins/viruscheck.py is in linkchecker 9.3-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | # -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Check page content for virus infection with clamav.
"""
import os
import socket
from . import _ContentPlugin
from .. import log, LOG_PLUGIN
from ..socketutil import create_socket
class VirusCheck(_ContentPlugin):
"""Checks the page content for virus infections with clamav.
A local clamav daemon must be installed."""
def __init__(self, config):
"""Initialize clamav configuration."""
super(VirusCheck, self).__init__(config)
# XXX read config
self.clamav_conf = get_clamav_conf(canonical_clamav_conf())
if not self.clamav_conf:
log.warn(LOG_PLUGIN, "clamav daemon not found for VirusCheck plugin")
def applies_to(self, url_data):
"""Check for clamav and extern."""
return self.clamav_conf and not url_data.extern[0]
def check(self, url_data):
"""Try to ask GeoIP database for country info."""
data = url_data.get_content()
infected, errors = scan(data, self.clamav_conf)
if infected or errors:
for msg in infected:
url_data.add_warning(u"Virus scan infection: %s" % msg)
for msg in errors:
url_data.add_warning(u"Virus scan error: %s" % msg)
else:
url_data.add_info("No viruses in data found.")
@classmethod
def read_config(cls, configparser):
"""Read configuration file options."""
config = dict()
section = cls.__name__
option = "clamavconf"
if configparser.has_option(section, option):
value = configparser.get(section, option)
else:
value = None
config[option] = value
return config
class ClamavError (Exception):
"""Raised on clamav errors."""
pass
class ClamdScanner (object):
"""Virus scanner using a clamd daemon process."""
def __init__ (self, clamav_conf):
"""Initialize clamd daemon process sockets."""
self.infected = []
self.errors = []
self.sock, self.host = clamav_conf.new_connection()
self.sock_rcvbuf = \
self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
self.wsock = self.new_scansock()
def new_scansock (self):
"""Return a connected socket for sending scan data to it."""
port = None
try:
self.sock.sendall("STREAM")
port = None
for dummy in range(60):
data = self.sock.recv(self.sock_rcvbuf)
i = data.find("PORT")
if i != -1:
port = int(data[i+5:])
break
except socket.error:
self.sock.close()
raise
if port is None:
raise ClamavError(_("clamd is not ready for stream scanning"))
sockinfo = get_sockinfo(self.host, port=port)
wsock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
wsock.connect(sockinfo[0][4])
except socket.error:
wsock.close()
raise
return wsock
def scan (self, data):
"""Scan given data for viruses."""
self.wsock.sendall(data)
def close (self):
"""Get results and close clamd daemon sockets."""
self.wsock.close()
data = self.sock.recv(self.sock_rcvbuf)
while data:
if "FOUND\n" in data:
self.infected.append(data)
if "ERROR\n" in data:
self.errors.append(data)
data = self.sock.recv(self.sock_rcvbuf)
self.sock.close()
def canonical_clamav_conf ():
"""Default clamav configs for various platforms."""
if os.name == 'posix':
clamavconf = "/etc/clamav/clamd.conf"
elif os.name == 'nt':
clamavconf = r"c:\clamav-devel\etc\clamd.conf"
else:
clamavconf = "clamd.conf"
return clamavconf
def get_clamav_conf(filename):
"""Initialize clamav configuration."""
if os.path.isfile(filename):
return ClamavConfig(filename)
log.warn(LOG_PLUGIN, "No ClamAV config file found at %r.", filename)
def get_sockinfo (host, port=None):
"""Return socket.getaddrinfo for given host and port."""
family, socktype = socket.AF_INET, socket.SOCK_STREAM
return socket.getaddrinfo(host, port, family, socktype)
class ClamavConfig (dict):
"""Clamav configuration wrapper, with clamd connection method."""
def __init__ (self, filename):
"""Parse clamav configuration file."""
super(ClamavConfig, self).__init__()
self.parseconf(filename)
if self.get('ScannerDaemonOutputFormat'):
raise ClamavError(_("ScannerDaemonOutputFormat must be disabled"))
if self.get('TCPSocket') and self.get('LocalSocket'):
raise ClamavError(_("only one of TCPSocket and LocalSocket must be enabled"))
def parseconf (self, filename):
"""Parse clamav configuration from given file."""
with open(filename) as fd:
# yet another config format, sigh
for line in fd:
line = line.strip()
if not line or line.startswith("#"):
# ignore empty lines and comments
continue
split = line.split(None, 1)
if len(split) == 1:
self[split[0]] = True
else:
self[split[0]] = split[1]
def new_connection (self):
"""Connect to clamd for stream scanning.
@return: tuple (connected socket, host)
"""
if self.get('LocalSocket'):
host = 'localhost'
sock = self.create_local_socket()
elif self.get('TCPSocket'):
host = self.get('TCPAddr', 'localhost')
sock = self.create_tcp_socket(host)
else:
raise ClamavError(_("one of TCPSocket or LocalSocket must be enabled"))
return sock, host
def create_local_socket (self):
"""Create local socket, connect to it and return socket object."""
sock = create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
addr = self['LocalSocket']
try:
sock.connect(addr)
except socket.error:
sock.close()
raise
return sock
def create_tcp_socket (self, host):
"""Create tcp socket, connect to it and return socket object."""
port = int(self['TCPSocket'])
sockinfo = get_sockinfo(host, port=port)
sock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(sockinfo[0][4])
except socket.error:
sock.close()
raise
return sock
def scan (data, clamconf):
"""Scan data for viruses.
@return (infection msgs, errors)
@rtype ([], [])
"""
try:
scanner = ClamdScanner(clamconf)
except socket.error:
errmsg = _("Could not connect to ClamAV daemon.")
return ([], [errmsg])
try:
scanner.scan(data)
finally:
scanner.close()
return scanner.infected, scanner.errors
|