/usr/lib/python3/dist-packages/tomahawk/rsync.py is in python3-tomahawk 0.7.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | # -*- coding: utf-8 -*-
import argparse
import getpass
import os
import signal
import sys
import time
from tomahawk.base import BaseContext, BaseMain, BaseExecutor
from tomahawk.color import (
create_coloring_object
)
from tomahawk.constants import (
DEFAULT_RSYNC_OUTPUT_FORMAT,
DEFAULT_RSYNC_OPTIONS,
)
from tomahawk.expect import CommandWithExpect
from tomahawk.utils import (
shutdown_by_signal,
check_required_command
)
class RsyncContext(BaseContext):
"""
"""
def __init__(self, source, destination, options, out, err):
super(RsyncContext, self).__init__(options, out, err)
self.source = source
self.destination = destination
class RsyncMain(BaseMain):
"""
Main class for tomahawk-rsync
"""
def __init__(self, file):
super(RsyncMain, self).__init__(file)
self.log.debug("options = " + str(self.options))
self.log.debug(
"source = %s, destination = %s" % \
(str(self.options.source), str(self.options.destination))
)
def do_run(self):
self.context = RsyncContext(
self.options.source,
self.options.destination,
self.options.__dict__,
sys.stdout,
sys.stderr
)
check_required_command('rsync')
hosts = self.check_hosts()
rsync_command = 'rsync %s %s %s' % (
self.context.options['rsync_options'],
self.context.source,
self.context.destination
)
color = create_coloring_object(sys.stdout)
# prompt when production environment
self.confirm_execution_on_production(
'Rsync command "%s" will be executed %s hosts. Are you sure? [yes/NO]: '
% (color.green(rsync_command), color.green(len(hosts)))
)
executor = RsyncExecutor(self.context, self.log, hosts)
return executor.execute(self.context.source, self.context.destination)
@classmethod
def create_argument_parser(cls, file):
parser = argparse.ArgumentParser(
prog = os.path.basename(file),
description = "A simple rsync executor for many hosts.",
conflict_handler = 'resolve'
)
parser.add_argument(
'source', metavar='source', help='source',
)
parser.add_argument(
'destination', metavar='destination', help='destination',
)
parser.add_argument(
'-u', '--rsync-user', help='rsync user.'
)
parser.add_argument(
'-o', '--rsync-options', default=DEFAULT_RSYNC_OPTIONS,
help='rsync options. (default: "-avz")'
)
parser.add_argument(
'-m', '--mirror-mode',
help='"push" or "pull". "pull" means copy files remote -> local (default: "push")'
)
parser.add_argument(
'-F', '--output-format', default=DEFAULT_RSYNC_OUTPUT_FORMAT,
help="rsync command output format. (default: '%s')" % (DEFAULT_RSYNC_OUTPUT_FORMAT.replace('%', '%%').replace('\n', '\\n'))
)
# parser.add_argument(
# '-a', '--append-host-suffix', action='store_true', default=True,
# help='Append host name to destination file/dir (only when "--mirror-mode=pull").'
# )
cls.add_common_arguments(parser)
return parser
def _rsync(command, login_password, timeout, expect_delay, debug_enabled):
"""
Execute rsync
"""
# Trap SIGINT(Ctrl-C) to quit executing a command
signal.signal(signal.SIGINT, shutdown_by_signal)
try:
return CommandWithExpect(
command, [], login_password, None,
timeout, expect_delay, debug_enabled
).execute()
except:
from traceback import print_tb
print("""%s: %s""" % (sys.exc_info()[0], sys.exc_info()[1]))
print_tb(sys.exc_info()[2])
raise
class RsyncExecutor(BaseExecutor):
"""
Execute rsync.
Args:
source -- source file/dir
destination -- destination file/dir
Returns: when rsync succeeds, return 0. When errors, return 1
"""
def execute(self, source, destination):
if source is None:
raise RuntimeError('1st argument "source" must not be None')
if destination is None:
raise RuntimeError('2nd argument "destination" must not be None')
options = self.context.options
rsync_user = options.get('rsync_user') or ''
rsync_options = options.get('rsync_options') or DEFAULT_RSYNC_OPTIONS
mirror_mode = options.get('mirror_mode') or 'push'
if mirror_mode not in ('push', 'pull'):
raise RuntimeError('Invalid mirror_mode: ' + mirror_mode)
rsync_template = ''
if mirror_mode == 'push':
if rsync_user:
rsync_template = 'rsync %s %s %s@%%s:%s' % (
rsync_options, source,
rsync_user or '[user]', destination)
else:
rsync_template = 'rsync %s %s %%s:%s' % (
rsync_options, source, destination)
else:
if rsync_user:
rsync_template = 'rsync %s %s@%%s:%s %%s' % (
rsync_options, rsync_user, source)
else:
rsync_template = 'rsync %s %%s:%s %%s' % (
rsync_options, source)
async_results = []
for host in self.hosts:
c = None
if mirror_mode == 'push':
c = rsync_template % (host)
else: # pull
dest = destination
if os.path.exists(destination):
if os.path.isdir(destination):
# if destination is a directory, gets a source filename and appends a host suffix
file_name = os.path.basename(source)
if not destination.endswith('/'):
dest += '/'
dest += '%s__%s' % (host, file_name)
else:
# if destination is a file, simply appends a host suffix
dest = host + '__' + dest
else:
# if file doesn't exist
source_name = os.path.basename(source)
if source.endswith('/'):
os.path.basename(source[0:len(source)-1])
dest += host + '__' + source_name
c = rsync_template % (host, dest)
self.log.debug('command = "%s"' % (c))
async_result = self.process_pool.apply_async(
_rsync,
( c, self.login_password, options['timeout'], options['expect_delay'], options['debug'] )
)
async_results.append({ 'host': host, 'command': c, 'async_result': async_result })
if options['delay'] != 0:
time.sleep(options['delay'])
###########################
# callbacks
###########################
def create_output(color, output_format_template, command, host, exit_status, command_output):
c = command
if exit_status == 0:
c = color.green(command)
return output_format_template.safe_substitute({
'host': host,
'command': c,
'output': command_output,
})
def create_timeout_message(color, output, timeout):
output += 'rsync timed out after %d seconds' % (options['timeout'])
return output
def create_timeout_raise_error_message(color, command, host, timeout):
return '"%s" timed out on host "%s" after %d seconds.' % (c, host, timeout)
def create_failure_message(color, output, exit_status):
output += 'rsync failed ! (status = %d)' % exit_status
return output
def create_failure_raise_error_message(color, command, host):
return '"%s" failed on host "%s"' % (command, host)
def create_failure_last_message(color, command, hosts):
rsync = None
if mirror_mode == 'push':
rsync = rsync_template % ('REMOTE_HOST')
else:
rsync = rsync_template % ('REMOTE_HOST', 'LOCAL')
return '"%s" failed on following hosts\n%s' % (rsync, hosts)
# Call BaseExectuor#process_async_results with callbacks
return self.process_async_results(
async_results,
create_output,
create_timeout_message,
create_timeout_raise_error_message,
create_failure_message,
create_failure_raise_error_message,
create_failure_last_message
)
|