class Fluent::TimeSlicedOutput

Attributes

localtime[RW]
time_slicer[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::BufferedOutput.new
# File lib/fluent/output.rb, line 488
def initialize
  super
  @localtime = true
  #@ignore_old = false   # TODO
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::BufferedOutput#configure
# File lib/fluent/output.rb, line 507
def configure(conf)
  super

  if conf['utc']
    @localtime = false
  elsif conf['localtime']
    @localtime = true
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if @timezone
    @time_slicer = Timezone.formatter(@timezone, @time_slice_format)
  elsif @localtime
    @time_slicer = Proc.new {|time|
      Time.at(time).strftime(@time_slice_format)
    }
  else
    @time_slicer = Proc.new {|time|
      Time.at(time).utc.strftime(@time_slice_format)
    }
  end

  @time_slice_cache_interval = time_slice_cache_interval
  @before_tc = nil
  @before_key = nil

  if @flush_interval
    if conf['time_slice_wait']
      $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}"
    end
    @enqueue_buffer_proc = Proc.new do
      @buffer.keys.each {|key|
        @buffer.push(key)
      }
    end

  else
    @flush_interval = [60, @time_slice_cache_interval].min
    @enqueue_buffer_proc = Proc.new do
      nowslice = @time_slicer.call(Engine.now - @time_slice_wait)
      @buffer.keys.each {|key|
        if key < nowslice
          @buffer.push(key)
        end
      }
    end
  end
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/output.rb, line 560
def emit(tag, es, chain)
  @emit_count += 1
  formatted_data = {}
  es.each {|time,record|
    begin
      tc = time / @time_slice_cache_interval
      if @before_tc == tc
        key = @before_key
      else
        @before_tc = tc
        key = @time_slicer.call(time)
        @before_key = key
      end
    rescue => e
      @router.emit_error_event(tag, Engine.now, {'time' => time, 'record' => record}, e)
      next
    end

    formatted_data[key] ||= ''
    formatted_data[key] << format(tag, time, record)
  }
  formatted_data.each { |key, data|
    if @buffer.emit(key, data, chain)
      submit_flush
    end
  }
end
enqueue_buffer(force = false) click to toggle source
# File lib/fluent/output.rb, line 588
def enqueue_buffer(force = false)
  if force
    @buffer.keys.each {|key|
      @buffer.push(key)
    }
  else
    @enqueue_buffer_proc.call
  end
end

Private Instance Methods

time_slice_cache_interval() click to toggle source

def format(tag, event) end

# File lib/fluent/output.rb, line 602
def time_slice_cache_interval
  if @time_slicer.call(0) != @time_slicer.call(60-1)
    return 1
  elsif @time_slicer.call(0) != @time_slicer.call(60*60-1)
    return 30
  elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1)
    return 60*30
  else
    return 24*60*30
  end
end