Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions installer/conf/td-agent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
Flush 5
Log_Level info
Parsers_File /etc/td-agent-bit/parsers.conf
Log_File /var/log/fluent-bit.log
Log_File /var/opt/microsoft/docker-cimprov/log/fluent-bit.log

[INPUT]
Name tail
Tag oms.container.log.*
Path /var/log/containers/*.log
DB /var/log/fblogs.db
DB /var/opt/microsoft/docker-cimprov/state/fblogs.db
Parser docker
Mem_Buf_Limit 30m
Path_Key filepath
Expand Down
105 changes: 51 additions & 54 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const DataType = "CONTAINER_LOG_BLOB"

// IPName for Container Log
const IPName = "Containers"
const containerInventoryPath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory"
const defaultContainerInventoryRefreshInterval = 60
const defaultKubeSystemContainersRefreshInterval = 300

Expand All @@ -51,6 +50,9 @@ var (

// DataUpdateMutex read and write mutex access to the container id set
DataUpdateMutex = &sync.Mutex{}

// ClientSet for querying KubeAPIs
ClientSet *kubernetes.Clientset
)

var (
Expand All @@ -61,27 +63,6 @@ var (
Log = FLBLogger.Printf
)

// ContainerInventory represents the container info
type ContainerInventory struct {
ElementName string `json:"ElementName"`
CreatedTime string `json:"CreatedTime"`
State string `json:"State"`
ExitCode int `json:"ExitCode"`
StartedTime string `json:"StartedTime"`
FinishedTime string `json:"FinishedTime"`
ImageID string `json:"ImageId"`
Image string `json:"Image"`
Repository string `json:"Repository"`
ImageTag string `json:"ImageTag"`
ComposeGroup string `json:"ComposeGroup"`
ContainerHostname string `json:"ContainerHostname"`
Computer string `json:"Computer"`
Command string `json:"Command"`
EnvironmentVar string `json:"EnvironmentVar"`
Ports string `json:"Ports"`
Links string `json:"Links"`
}

// DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin
type DataItem struct {
LogEntry string `json:"LogEntry"`
Expand All @@ -108,29 +89,25 @@ func populateMaps() {

_imageIDMap := make(map[string]string)
_nameIDMap := make(map[string]string)
files, err := ioutil.ReadDir(containerInventoryPath)

pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
Log("error when reading container inventory %s\n", err.Error())
Log("Error getting pods %s\n", err.Error())
}

for _, file := range files {
fullPath := fmt.Sprintf("%s/%s", containerInventoryPath, file.Name())
fileContent, err := ioutil.ReadFile(fullPath)
if err != nil {
Log("Error reading file content %s", fullPath)
Log(err.Error())
}
var containerInventory ContainerInventory
unmarshallErr := json.Unmarshal(fileContent, &containerInventory)

if unmarshallErr != nil {
Log("Unmarshall error when reading file %s %s \n", fullPath, unmarshallErr.Error())
for _, pod := range pods.Items {
for _, status := range pod.Status.ContainerStatuses {
lastSlashIndex := strings.LastIndex(status.ContainerID, "/")
containerID := status.ContainerID[lastSlashIndex+1 : len(status.ContainerID)]
image := status.Image
name := fmt.Sprintf("%s/%s", pod.UID, status.Name)
if containerID != "" {
_imageIDMap[containerID] = image
_nameIDMap[containerID] = name
}
}

_imageIDMap[file.Name()] = containerInventory.Image
_nameIDMap[file.Name()] = containerInventory.ElementName
}

Log("Locking to update image and name maps")
DataUpdateMutex.Lock()
ImageIDMap = _imageIDMap
Expand Down Expand Up @@ -164,7 +141,7 @@ func createLogger() *log.Logger {
logger.SetOutput(&lumberjack.Logger{
Filename: path,
MaxSize: 10, //megabytes
MaxBackups: 3,
MaxBackups: 1,
MaxAge: 28, //days
Compress: true, // false by default
})
Expand Down Expand Up @@ -222,17 +199,8 @@ func updateKubeSystemContainerIDs() {
}

Log("Kube System Log Collection is DISABLED. Collecting containerIds to drop their records")
config, err := rest.InClusterConfig()
if err != nil {
Log("Error getting config %s\n", err.Error())
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
Log("Error getting clientset %s", err.Error())
}

pods, err := clientset.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil {
Log("Error getting pods %s\n", err.Error())
}
Expand Down Expand Up @@ -278,8 +246,27 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
}

stringMap["Id"] = containerID
stringMap["Image"] = ImageIDMap[containerID]
stringMap["Name"] = NameIDMap[containerID]

if val, ok := ImageIDMap[containerID]; ok {
stringMap["Image"] = val
} else {
Log("ContainerId %s not present in Map ", containerID)
Log("CurrentMap Snapshot \n")
for k, v := range ImageIDMap {
Log("%s ==> %s", k, v)
}
}

if val, ok := NameIDMap[containerID]; ok {
stringMap["Name"] = val
} else {
Log("ContainerId %s not present in Map ", containerID)
Log("CurrentMap Snapshot \n")
for k, v := range NameIDMap {
Log("%s ==> %s", k, v)
}
}

stringMap["Computer"] = Computer
mapstructure.Decode(stringMap, &dataItem)
dataItems = append(dataItems, dataItem)
Expand Down Expand Up @@ -334,8 +321,8 @@ func getContainerIDFromFilePath(filepath string) string {
return filepath[start+1 : end]
}

// ReadConfig reads and populates plugin configuration
func ReadConfig(pluginConfPath string) map[string]string {
// InitializeConfig reads and populates plugin configuration
func InitializeConfig(pluginConfPath string) map[string]string {

pluginConf, err := ReadConfiguration(pluginConfPath)
omsadminConf, err := ReadConfiguration(pluginConf["omsadmin_conf_path"])
Expand All @@ -355,5 +342,15 @@ func ReadConfig(pluginConfPath string) map[string]string {
OMSEndpoint = omsadminConf["OMS_ENDPOINT"]
Log("OMSEndpoint %s", OMSEndpoint)

config, err := rest.InClusterConfig()
if err != nil {
Log("Error getting config %s\n", err.Error())
}

ClientSet, err = kubernetes.NewForConfig(config)
if err != nil {
Log("Error getting clientset %s", err.Error())
}

return pluginConf
}
2 changes: 1 addition & 1 deletion source/code/go/src/plugins/out_oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
Log("Initializing out_oms go plugin for fluentbit")
PluginConfiguration = ReadConfig("/etc/opt/microsoft/docker-cimprov/out_oms.conf")
PluginConfiguration = InitializeConfig("/etc/opt/microsoft/docker-cimprov/out_oms.conf")
CreateHTTPClient()
updateContainersData()
return output.FLB_OK
Expand Down