From 87a9cf8ddb77f789a805b433ca4ff92556f7d8a0 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Thu, 16 Aug 2018 11:58:10 -0700 Subject: [PATCH 1/2] Remove Log Processing from fluentd configuration --- installer/conf/container.conf | 32 -- .../code/plugin/containerlogtailfilereader.rb | 396 ------------------ source/code/plugin/filter_container_log.rb | 42 -- 3 files changed, 470 deletions(-) delete mode 100644 source/code/plugin/containerlogtailfilereader.rb delete mode 100644 source/code/plugin/filter_container_log.rb diff --git a/installer/conf/container.conf b/installer/conf/container.conf index a20fdbe5a..9eaed9b47 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -50,18 +50,6 @@ ] -# Container log -# Example line which matches the format: -# {"log"=>"Test 9th January\n", "stream"=>"stdout", "time"=>"2018-01-09T23:14:39.273429353Z", "ContainerID"=>"ee1ec26aa974af81b21fff24cef8ec78bf7ac1558b5de6f1eb1a5b28ecd6d559", "Image"=>"ubuntu", "Name"=>"determined_wilson", "SourceSystem"=>"Containers"} -# NOTE: The LogEntryTimeStamp is just being appended in the begining of the LogEntry field. This is the actual time the log was generated and the TimeGenerated field in Kusto is different - - type containerlog_sudo_tail - pos_file /var/opt/microsoft/docker-cimprov/state/ContainerLogFile.pos.log - tag oms.container.log - format /\"log\"=>\"(?.*)", \"stream\"=>\"(?.*)", \"time\"=>\"(?.*)", \"ContainerID\"=>\"(?.*)", \"Image\"=>\"(?.*)", \"Name\"=>\"(?.*)", \"SourceSystem\"=>\"(?.*)"}/ - run_interval 60s - - # Container host inventory type omi @@ -95,11 +83,6 @@ type filter_container -# Seperate filter for container log - - type filter_container_log - - type out_oms_api log_level debug @@ -152,21 +135,6 @@ max_retry_wait 9m - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_log*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - type out_oms log_level info diff --git a/source/code/plugin/containerlogtailfilereader.rb b/source/code/plugin/containerlogtailfilereader.rb deleted file mode 100644 index 2d55b1d73..000000000 --- a/source/code/plugin/containerlogtailfilereader.rb +++ /dev/null @@ -1,396 +0,0 @@ - -require 'optparse' -require 'json' -require 'logger' -require_relative 'omslog' -require 'fluent/filter' - -module ContainerLogTailscript - - class ContainerLogNewTail - def initialize(paths) - @paths = paths - @tails = {} - @pos_file = $options[:pos_file] - @read_from_head = $options[:read_from_head] - @pf = nil - @pf_file = nil - - @log = Logger.new(STDERR) - @log.formatter = proc do |severity, time, progname, msg| - "#{severity} #{msg}\n" - end - end - - attr_reader :paths - - def start - start_watchers(@paths) unless @paths.empty? - end - - def shutdown - @pf_file.close if @pf_file - end - - def setup_watcher(path, pe) - tw = TailWatcher.new(path, pe, @read_from_head, @log, &method(:receive_lines)) - tw.on_notify - tw - end - - def start_watchers(paths) - if @pos_file - @pf_file = File.open(@pos_file, File::RDWR|File::CREAT) - @pf_file.sync = true - @pf = PositionFile.parse(@pf_file) - end - paths.each { |path| - pe = nil - if @pf - pe = @pf[path] #pe is FilePositionEntry instance - if pe.read_inode.zero? - begin - pe.update(File::Stat.new(path).ino, 0) - rescue Errno::ENOENT - @log.warn "#{path} not found. Continuing without tailing it." - end - end - end - - @tails[path] = setup_watcher(path, pe) - } - end - - def receive_lines(lines, tail_watcher) - unless lines.empty? - puts lines - end - return true - end - - class TailWatcher - def initialize(path, pe, read_from_head, log, &receive_lines) - @path = path - @pe = pe || MemoryPositionEntry.new - @read_from_head = read_from_head - @log = log - @receive_lines = receive_lines - @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) - @io_handler = nil - @containerIDFilePath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/" - end - - attr_reader :path - - def wrap_receive_lines(lines) - newLines = [] - containerID = @path.split('/').last.chomp('-json.log') - containerInspectInformation = @containerIDFilePath + containerID - tempContainerInfo = {} - begin - File.open(containerInspectInformation) { |f| tempContainerInfo = JSON.parse(f.readline)} - lines.each { |line| - unless line.empty? - newLine = {} - newLine = JSON.parse(line) - newLine["ContainerID"] = containerID - newLine["Image"] = tempContainerInfo["Image"] - newLine["Name"] = tempContainerInfo["ElementName"] - newLine["SourceSystem"] = "Containers" - newLines.push(newLine) - end - } - rescue Exception => e - #File doesn't exist or error in reading the data - @log.error "Caught exception when opening file -> #{e}" - end - @receive_lines.call(newLines, self) - end - - def on_notify - @rotate_handler.on_notify if @rotate_handler - return unless @io_handler - @io_handler.on_notify - end - - def on_rotate(io) - if io - # first time - stat = io.stat - fsize = stat.size - inode = stat.ino - - last_inode = @pe.read_inode - if @read_from_head - pos = 0 - @pe.update(inode, pos) - elsif inode == last_inode - # rotated file has the same inode number as the pos_file. - # seek to the saved position - pos = @pe.read_pos - elsif last_inode != 0 - # read data from the head of the rotated file. - pos = 0 - @pe.update(inode, pos) - else - # this is the first MemoryPositionEntry for the first time fluentd started. - # seeks to the end of the file to know where to start tailing - pos = fsize - @pe.update(inode, pos) - end - io.seek(pos) - @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines)) - else - @io_handler = NullIOHandler.new - end - end - - class IOHandler - def initialize(io, pe, log, &receive_lines) - @log = log - @io = io - @pe = pe - @log = log - @read_lines_limit = 100 - @receive_lines = receive_lines - @buffer = ''.force_encoding('ASCII-8BIT') - @iobuf = ''.force_encoding('ASCII-8BIT') - @lines = [] - end - - attr_reader :io - - def on_notify - begin - read_more = false - if @lines.empty? - begin - while true - if @buffer.empty? - @io.readpartial(512, @buffer) - else - @buffer << @io.readpartial(512, @iobuf) - end - while line = @buffer.slice!(/.*?\n/m) - @lines << line - end - if @lines.size >= @read_lines_limit - # not to use too much memory in case the file is very large - read_more = true - break - end - end - rescue EOFError - end - end - - unless @lines.empty? - if @receive_lines.call(@lines) - @pe.update_pos(@io.pos - @buffer.bytesize) - @lines.clear - else - read_more = false - end - end - end while read_more - - rescue - @log.error "#{$!.to_s}" - close - end - - def close - @io.close unless @io.closed? - end - end - - class NullIOHandler - def initialize - end - - def io - end - - def on_notify - end - - def close - end - end - - class RotateHandler - def initialize(path, log, &on_rotate) - @path = path - @inode = nil - @fsize = -1 # first - @on_rotate = on_rotate - @log = log - end - - def on_notify - begin - stat = File.stat(@path) #returns a File::Stat object for the file named @path - inode = stat.ino - fsize = stat.size - rescue Errno::ENOENT - # moved or deleted - inode = nil - fsize = 0 - end - - begin - if @inode != inode || fsize < @fsize - # rotated or truncated - begin - io = File.open(@path) - rescue Errno::ENOENT - end - @on_rotate.call(io) - end - @inode = inode - @fsize = fsize - end - - rescue - @log.error "#{$!.to_s}" - end - end - end - - - class PositionFile - UNWATCHED_POSITION = 0xffffffffffffffff - - def initialize(file, map, last_pos) - @file = file - @map = map - @last_pos = last_pos - end - - def [](path) - if m = @map[path] - return m - end - - @file.pos = @last_pos - @file.write path - @file.write "\t" - seek = @file.pos - @file.write "0000000000000000\t0000000000000000\n" - @last_pos = @file.pos - - @map[path] = FilePositionEntry.new(@file, seek) - end - - def self.parse(file) - compact(file) - - map = {} - file.pos = 0 - file.each_line {|line| - m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m - path = m[1] - seek = file.pos - line.bytesize + path.bytesize + 1 - map[path] = FilePositionEntry.new(file, seek) - } - new(file, map, file.pos) - end - - # Clean up unwatched file entries - def self.compact(file) - file.pos = 0 - existent_entries = file.each_line.map { |line| - m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m - path = m[1] - pos = m[2].to_i(16) - ino = m[3].to_i(16) - # 32bit inode converted to 64bit at this phase - pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino]) - }.compact - - file.pos = 0 - file.truncate(0) - file.write(existent_entries.join) - end - end - - # pos inode - # ffffffffffffffff\tffffffffffffffff\n - class FilePositionEntry - POS_SIZE = 16 - INO_OFFSET = 17 - INO_SIZE = 16 - LN_OFFSET = 33 - SIZE = 34 - - def initialize(file, seek) - @file = file - @seek = seek - end - - def update(ino, pos) - @file.pos = @seek - @file.write "%016x\t%016x" % [pos, ino] - end - - def update_pos(pos) - @file.pos = @seek - @file.write "%016x" % pos - end - - def read_inode - @file.pos = @seek + INO_OFFSET - raw = @file.read(INO_SIZE) - raw ? raw.to_i(16) : 0 - end - - def read_pos - @file.pos = @seek - raw = @file.read(POS_SIZE) - raw ? raw.to_i(16) : 0 - end - end - - class MemoryPositionEntry - def initialize - @pos = 0 - @inode = 0 - end - - def update(ino, pos) - @inode = ino - @pos = pos - end - - def update_pos(pos) - @pos = pos - end - - def read_pos - @pos - end - - def read_inode - @inode - end - end - end -end - -if __FILE__ == $0 - $options = {:read_from_head => false} - OptionParser.new do |opts| - opts.on("-p", "--posfile [POSFILE]") do |p| - $options[:pos_file] = p - end - opts.on("-h", "--[no-]readfromhead") do |h| - $options[:read_from_head] = h - end - end.parse! - a = ContainerLogTailscript::ContainerLogNewTail.new(ARGV) - a.start - a.shutdown -end - diff --git a/source/code/plugin/filter_container_log.rb b/source/code/plugin/filter_container_log.rb deleted file mode 100644 index 21e146a35..000000000 --- a/source/code/plugin/filter_container_log.rb +++ /dev/null @@ -1,42 +0,0 @@ -# frozen_string_literal: true - -require 'fluent/filter' - -module Fluent - require 'logger' - class PassThruFilter < Filter - Fluent::Plugin.register_filter('filter_container_log', self) - - def configure(conf) - super - end - - def start - super - @hostname = OMS::Common.get_hostname or "Unknown host" - end - - def shutdown - super - end - - def filter(tag, time, record) - begin - #Try to force utf-8 encoding on the string so that all characters can flow through to - #$log.info "before : #{record['LogEntry']}" - record['LogEntry'].force_encoding('UTF-8') - rescue - $log.error "Failed to convert record['LogEntry'] : '#{record['LogEntry']}' to UTF-8 using force_encoding." - $log.error "Current string encoding for record['LogEntry'] is #{record['LogEntry'].encoding}" - end - - record['Computer'] = @hostname - wrapper = { - "DataType"=>"CONTAINER_LOG_BLOB", - "IPName"=>"Containers", - "DataItems"=>[record.each{|k,v| record[k]=v}] - } - wrapper - end - end -end From 308be41fe87202ee6e289cc9c952a24910eed133 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Thu, 16 Aug 2018 12:01:14 -0700 Subject: [PATCH 2/2] Remove plugin references from base_container.data --- installer/datafiles/base_container.data | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index c49a8d1d0..ec0728c01 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -23,14 +23,11 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/filter_docker_log.rb; source/code/plugin/filter_docker_log.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_container.rb; source/code/plugin/filter_container.rb; 644; root; root -/opt/microsoft/omsagent/plugin/filter_container_log.rb; source/code/plugin/filter_container_log.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/code/plugin/in_kube_podinventory.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_events.rb; source/code/plugin/in_kube_events.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_logs.rb; source/code/plugin/in_kube_logs.rb; 644; root; root /opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/code/plugin/KubernetesApiClient.rb; 644; root; root -/opt/microsoft/omsagent/plugin/in_containerlog_sudo_tail.rb; source/code/plugin/in_containerlog_sudo_tail.rb; 644; root; root -/opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb; source/code/plugin/containerlogtailfilereader.rb; 744; root; root /etc/opt/microsoft/docker-cimprov/container.conf; installer/conf/container.conf; 644; root; root @@ -88,15 +85,6 @@ WriteInstallInfo() { } WriteInstallInfo -#Setup sudo permission for containerlogtailfilereader -if [ -z $(cat /etc/sudoers.d/omsagent | grep /containerlogtailfilereader.rb) ] -then - chmod +w /etc/sudoers.d/omsagent - echo "#run containerlogtailfilereader.rb for docker-provider" >> /etc/sudoers.d/omsagent - echo "omsagent ALL=(ALL) NOPASSWD: /opt/microsoft/omsagent/ruby/bin/ruby /opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb *" >> /etc/sudoers.d/omsagent - chmod 440 /etc/sudoers.d/omsagent -fi - # Get the state file in place with proper permissions touch /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt chmod 644 /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt