/usr/lib/ruby/vendor_ruby/faraday/adapter/em_http.rb is in ruby-faraday 0.9.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | module Faraday
class Adapter
# EventMachine adapter is useful for either asynchronous requests
# when in EM reactor loop or for making parallel requests in
# synchronous code.
class EMHttp < Faraday::Adapter
module Options
def connection_config(env)
options = {}
configure_proxy(options, env)
configure_timeout(options, env)
configure_socket(options, env)
configure_ssl(options, env)
options
end
def request_config(env)
options = {
:body => read_body(env),
:head => env[:request_headers],
# :keepalive => true,
# :file => 'path/to/file', # stream data off disk
}
configure_compression(options, env)
options
end
def read_body(env)
body = env[:body]
body.respond_to?(:read) ? body.read : body
end
def configure_proxy(options, env)
if proxy = request_options(env)[:proxy]
options[:proxy] = {
:host => proxy[:uri].host,
:port => proxy[:uri].port,
:authorization => [proxy[:user], proxy[:password]]
}
end
end
def configure_socket(options, env)
if bind = request_options(env)[:bind]
options[:bind] = {
:host => bind[:host],
:port => bind[:port]
}
end
end
def configure_ssl(options, env)
if env[:url].scheme == 'https' && env[:ssl]
options[:ssl] = {
:cert_chain_file => env[:ssl][:ca_file],
:verify_peer => env[:ssl].fetch(:verify, true)
}
end
end
def configure_timeout(options, env)
timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
options[:connect_timeout] = options[:inactivity_timeout] = timeout
options[:connect_timeout] = open_timeout if open_timeout
end
def configure_compression(options, env)
if env[:method] == :get and not options[:head].key? 'accept-encoding'
options[:head]['accept-encoding'] = 'gzip, compressed'
end
end
def request_options(env)
env[:request]
end
end
include Options
dependency 'em-http'
self.supports_parallel = true
def self.setup_parallel_manager(options = nil)
Manager.new
end
def call(env)
super
perform_request env
@app.call env
end
def perform_request(env)
if parallel?(env)
manager = env[:parallel_manager]
manager.add {
perform_single_request(env).
callback { env[:response].finish(env) }
}
else
unless EventMachine.reactor_running?
error = nil
# start EM, block until request is completed
EventMachine.run do
perform_single_request(env).
callback { EventMachine.stop }.
errback { |client|
error = error_message(client)
EventMachine.stop
}
end
raise_error(error) if error
else
# EM is running: instruct upstream that this is an async request
env[:parallel_manager] = true
perform_single_request(env).
callback { env[:response].finish(env) }.
errback {
# TODO: no way to communicate the error in async mode
raise NotImplementedError
}
end
end
rescue EventMachine::Connectify::CONNECTError => err
if err.message.include?("Proxy Authentication Required")
raise Error::ConnectionFailed, %{407 "Proxy Authentication Required "}
else
raise Error::ConnectionFailed, err
end
rescue => err
if defined?(OpenSSL) && OpenSSL::SSL::SSLError === err
raise Faraday::SSLError, err
else
raise
end
end
# TODO: reuse the connection to support pipelining
def perform_single_request(env)
req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
req.setup_request(env[:method], request_config(env)).callback { |client|
save_response(env, client.response_header.status, client.response) do |resp_headers|
client.response_header.each do |name, value|
resp_headers[name.to_sym] = value
end
end
}
end
def error_message(client)
client.error or "request failed"
end
def raise_error(msg)
errklass = Faraday::Error::ClientError
if msg == Errno::ETIMEDOUT
errklass = Faraday::Error::TimeoutError
msg = "request timed out"
elsif msg == Errno::ECONNREFUSED
errklass = Faraday::Error::ConnectionFailed
msg = "connection refused"
elsif msg == "connection closed by server"
errklass = Faraday::Error::ConnectionFailed
end
raise errklass, msg
end
def parallel?(env)
!!env[:parallel_manager]
end
# The parallel manager is designed to start an EventMachine loop
# and block until all registered requests have been completed.
class Manager
def initialize
reset
end
def reset
@registered_procs = []
@num_registered = 0
@num_succeeded = 0
@errors = []
@running = false
end
def running?() @running end
def add
if running?
perform_request { yield }
else
@registered_procs << Proc.new
end
@num_registered += 1
end
def run
if @num_registered > 0
@running = true
EventMachine.run do
@registered_procs.each do |proc|
perform_request(&proc)
end
end
if @errors.size > 0
raise Faraday::Error::ClientError, @errors.first || "connection failed"
end
end
ensure
reset
end
def perform_request
client = yield
client.callback { @num_succeeded += 1; check_finished }
client.errback { @errors << client.error; check_finished }
end
def check_finished
if @num_succeeded + @errors.size == @num_registered
EventMachine.stop
end
end
end
end
end
end
begin
require 'openssl'
rescue LoadError
warn "Warning: no such file to load -- openssl. Make sure it is installed if you want HTTPS support"
else
require 'faraday/adapter/em_http_ssl_patch'
end if Faraday::Adapter::EMHttp.loaded?
|