/usr/share/julia/base/managers.jl is in julia-common 0.4.7-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 | # This file is a part of Julia. License is MIT: http://julialang.org/license
# Built-in SSH and Local Managers
immutable SSHManager <: ClusterManager
machines::Dict
function SSHManager(machines)
# machines => array of machine elements
# machine => address or (address, cnt)
# address => string of form `[user@]host[:port] bind_addr[:bind_port]`
# cnt => :auto or number
# :auto launches NUM_CORES number of workers at address
# number launches the specified number of workers at address
mhist = Dict()
for m in machines
if isa(m, Tuple)
host=m[1]
cnt=m[2]
else
host=m
cnt=1
end
current_cnt = get(mhist, host, 0)
if isa(cnt, Number)
mhist[host] = isa(current_cnt, Number) ? current_cnt + Int(cnt) : Int(cnt)
else
mhist[host] = cnt
end
end
new(mhist)
end
end
function check_addprocs_args(kwargs)
for keyname in kwargs
!(keyname[1] in [:dir, :exename, :exeflags, :topology]) && throw(ArgumentError("Invalid keyword argument $(keyname[1])"))
end
end
# SSHManager
# start and connect to processes via SSH, optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
# A machine is either a <hostname> or a tuple of (<hostname>, count)
function addprocs(machines::AbstractVector; tunnel=false, sshflags=``, max_parallel=10, kwargs...)
check_addprocs_args(kwargs)
addprocs(SSHManager(machines); tunnel=tunnel, sshflags=sshflags, max_parallel=max_parallel, kwargs...)
end
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
# Launch one worker on each unique host in parallel. Additional workers are launched later.
# Wait for all launches to complete.
launch_tasks = cell(length(manager.machines))
for (i,(machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @schedule try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
catch e
print(STDERR, "exception launching on machine $(machine) : $(e)\n")
end
end
end
for t in launch_tasks
wait(t)
end
notify(launch_ntfy)
end
show(io::IO, manager::SSHManager) = println(io, "SSHManager(machines=", manager.machines, ")")
function launch_on_machine(manager::SSHManager, machine, cnt, params, launched, launch_ntfy::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
# machine could be of the format [user@]host[:port] bind_addr[:bind_port]
machine_bind = split(machine)
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
end
exeflags = `$exeflags --worker`
machine_def = machine_bind[1]
machine_def = split(machine_def, ':')
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : ``
sshflags = `$(params[:sshflags]) $portopt`
host = machine_def[1]
# Build up the ssh command
tval = haskey(ENV, "JULIA_WORKER_TIMEOUT") ? `export JULIA_WORKER_TIMEOUT=$(ENV["JULIA_WORKER_TIMEOUT"]);` : ``
cmd = `cd $dir && $tval $exename $exeflags` # launch julia
cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under
cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch
# launch
io, pobj = open(detach(cmd), "r")
wconfig = WorkerConfig()
wconfig.io = io
wconfig.host = host
wconfig.tunnel = params[:tunnel]
wconfig.sshflags = sshflags
wconfig.exeflags = exeflags
wconfig.exename = exename
wconfig.count = cnt
wconfig.max_parallel = params[:max_parallel]
push!(launched, wconfig)
notify(launch_ntfy)
end
function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :interrupt
ospid = get(config.ospid, 0)
if ospid > 0
host = get(config.host)
sshflags = get(config.sshflags)
if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`)
println("Error sending a Ctrl-C to julia worker $id on $host")
end
else
# This state can happen immediately after an addprocs
println("Worker $id cannot be presently interrupted.")
end
end
end
let tunnel_port = 9201
global next_tunnel_port
function next_tunnel_port()
retval = tunnel_port
if tunnel_port > 32000
tunnel_port = 9201
else
tunnel_port += 1
end
retval
end
end
# establish an SSH tunnel to a remote worker
# returns P such that localhost:P connects to host:port
function ssh_tunnel(user, host, bind_addr, port, sshflags)
localp = next_tunnel_port()
ntries = cnt = 100
while !success(detach(`ssh -T -a -x -o ExitOnForwardFailure=yes -f $sshflags $(user)@$host -L $localp:$bind_addr:$(Int(port)) sleep 60`)) && cnt > 0
localp = next_tunnel_port()
cnt -= 1
end
(cnt == 0) && error("Unable to create SSH tunnel after $cnt tries. No free port?")
localp
end
# LocalManager
immutable LocalManager <: ClusterManager
np::Integer
end
addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
function addprocs(np::Integer; kwargs...)
check_addprocs_args(kwargs)
addprocs(LocalManager(np); kwargs...)
end
show(io::IO, manager::LocalManager) = println(io, "LocalManager()")
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
for i in 1:manager.np
io, pobj = open(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
push!(launched, wconfig)
end
notify(c)
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :interrupt
kill(get(config.process), 2)
end
end
# DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager
immutable DefaultClusterManager <: ClusterManager
end
const tunnel_hosts_map = Dict{AbstractString, Semaphore}()
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if !isnull(config.connect_at)
# this is a worker-to-worker setup call.
return connect_w2w(pid, config)
end
# master connecting to workers
if !isnull(config.io)
(bind_addr, port) = read_worker_host_port(get(config.io))
pubhost=get(config.host, bind_addr)
config.host = pubhost
config.port = port
else
pubhost=get(config.host)
port=get(config.port)
bind_addr=get(config.bind_addr, pubhost)
end
tunnel = get(config.tunnel, false)
s = split(pubhost,'@')
user = ""
if length(s) > 1
user = s[1]
pubhost = s[2]
else
if haskey(ENV, "USER")
user = ENV["USER"]
elseif tunnel
error("USER must be specified either in the environment or as part of the hostname when tunnel option is used")
end
end
if tunnel
if !haskey(tunnel_hosts_map, pubhost)
tunnel_hosts_map[pubhost] = Semaphore(get(config.max_parallel, typemax(Int)))
end
sem = tunnel_hosts_map[pubhost]
sshflags = get(config.sshflags)
acquire(sem)
try
(s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags)
finally
release(sem)
end
else
(s, bind_addr) = connect_to_worker(bind_addr, port)
end
config.bind_addr = bind_addr
# write out a subset of the connect_at required for further worker-worker connection setups
config.connect_at = (bind_addr, port)
if !isnull(config.io)
let pid = pid
redirect_worker_output(pid, get(config.io))
end
end
(s, s)
end
function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
config.host = rhost
config.port = rport
if get(get(config.environ), :self_is_local, false) && get(get(config.environ), :r_is_local, false)
# If on localhost, use the loopback address - this addresses
# the special case of system suspend wherein the local ip
# may be changed upon system awake.
(s, bind_addr) = connect_to_worker("127.0.0.1", rport)
else
(s, bind_addr)= connect_to_worker(rhost, rport)
end
(s,s)
end
const client_port = Ref{Cushort}(0)
function socket_reuse_port()
s = TCPSocket()
client_host = Ref{Cuint}(0)
ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
# early access to a socket fd, i.e., before a bind call.
@linux_only begin
try
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void}, ), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
throw(SystemError("setsockopt() SO_REUSEPORT : "))
end
ccall(:jl_tcp_getsockname_v4, Int32,
(Ptr{Void}, Ref{Cuint}, Ref{Cushort}),
s.handle, client_host, client_port) < 0 && throw(SystemError("getsockname() : "))
catch e
# This is an issue only on systems with lots of client connections, hence delay the warning....
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
# provide a clean new socket
return TCPSocket()
end
end
return s
end
function connect_to_worker(host::AbstractString, port::Integer)
# Connect to the loopback port if requested host has the same ipaddress as self.
s = socket_reuse_port()
if host == string(LPROC.bind_addr)
s = connect(s, "127.0.0.1", UInt16(port))
else
s = connect(s, host, UInt16(port))
end
# Avoid calling getaddrinfo if possible - involves a DNS lookup
# host may be a stringified ipv4 / ipv6 address or a dns name
if host == "localhost"
bind_addr = "127.0.0.1"
else
try
bind_addr = string(parseip(host))
catch
bind_addr = string(getaddrinfo(host))
end
end
(s, bind_addr)
end
function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags)
s = connect("localhost", ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags))
(s, bind_addr)
end
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end
|