# File lib/fluent/process.rb, line 37 def initialize require 'drb' DRb.start_service(create_drb_uri, Broker.new) @parent_uri = DRb.uri end
# File lib/fluent/process.rb, line 43 def fork(forward_interval, delegate_object) ipr, ipw = IO.pipe # child Engine.emit_stream -> parent Engine.emit_stream opr, opw = IO.pipe # parent target.emit -> child target.emit pid = Process.fork if pid # parent process ipw.close opr.close forward_thread = process_parent(ipr, opw, pid, forward_interval, delegate_object) return pid, forward_thread end # child process ipr.close opw.close forward_thread = process_child(ipw, opr, forward_interval, delegate_object) return nil, forward_thread end
# File lib/fluent/process.rb, line 75 def create_drb_uri "drbunix:" # TODO end
# File lib/fluent/process.rb, line 168 def input_forward_main(ipr, pid) read_event_stream(ipr) {|tag,es| # FIXME error handling begin Engine.emit_stream(tag, es) rescue $log.warn "failed to emit", error: $!.to_s, pid: Process.pid $log.warn_backtrace end } rescue $log.error "error on input process forwarding thread", error: $!.to_s, pid: Process.pid $log.error_backtrace raise end
# File lib/fluent/process.rb, line 216 def new_forwarder(w, interval) if interval < 0.2 # TODO interval Forwarder.new(w) else DelayedForwarder.new(w, interval) end end
def override_delegate_methods(target, child_uri)
remote = DRbObject.new_with_uri(child_uri) delegate_methods(target).each {|name| target.define_singleton_method(name) do |*args,&block| remote.send(name, *args, &block) end }
end
def delegate_methods(target)
target.methods - Object.public_instance_methods
end
# File lib/fluent/process.rb, line 152 def output_forward_main(opr, target) read_event_stream(opr) {|tag,es| # FIXME error handling begin target.emit(tag, es, NullOutputChain.instance) rescue $log.warn "failed to emit", error: $!.to_s, pid: Process.pid $log.warn_backtrace end } rescue $log.error "error on output process forwarding thread", error: $!.to_s, pid: Process.pid $log.error_backtrace raise end
# File lib/fluent/process.rb, line 80 def process_child(ipw, opr, forward_interval, delegate_object) DRb.start_service(create_drb_uri, delegate_object) child_uri = DRb.uri send_header(ipw, child_uri) # override target.emit_stream to write event stream to the pipe fwd = new_forwarder(ipw, forward_interval) Engine.define_singleton_method(:emit_stream) do |tag,es| fwd.emit(tag, es) end # read event stream from the pipe and forward to target.emit forward_thread = Thread.new(opr, delegate_object, &method(:output_forward_main)) # override global methods to call parent process override_shared_methods(@parent_uri) return forward_thread end
# File lib/fluent/process.rb, line 118 def process_parent(ipr, opw, pid, forward_interval, delegate_object) child_uri = read_header(ipr) # read event stream from the pipe and forward to Engine.emit_stream forward_thread = Thread.new(ipr, pid, &method(:input_forward_main)) # note: don't override methods in parent process # because another process may fork after overriding #override_delegate_methods(delegate_object, child_uri) # return forwarder for DetachProcessMixin to # override target.emit and write event stream to the pipe fwd = new_forwarder(opw, forward_interval) # note: override emit method on DetachProcessMixin forward_thread.define_singleton_method(:forwarder) do fwd end return forward_thread end
# File lib/fluent/process.rb, line 184 def read_event_stream(r, &block) u = Fluent::Engine.msgpack_factory.unpacker(r) begin #buf = '' #map = {} #while true # r.readpartial(64*1024, buf) # u.feed_each(buf) {|tag,ms| # if msbuf = map[tag] # msbuf << ms # else # map[tag] = ms # end # } # unless map.empty? # map.each_pair {|tag,ms| # es = MessagePackEventStream.new(ms) # block.call(tag, es) # } # map.clear # end #end u.each {|tag,ms| es = MessagePackEventStream.new(ms) block.call(tag, es) } rescue EOFError ensure r.close end end
# File lib/fluent/process.rb, line 64 def read_header(ipr) sz = ipr.read(4).unpack('N')[0] ipr.read(sz) end
# File lib/fluent/process.rb, line 69 def send_header(ipw, data) ipw.write [data.bytesize].pack('N') ipw.write data ipw.flush end