fluentbit-log-collect/main.go
2024-05-14 15:13:45 +08:00

135 lines
3.3 KiB
Go

package main
import (
"fmt"
"github.com/gin-gonic/gin"
"leafdev.top/ivampiresp/msgpack-fluentbit-log-collect/config"
"log"
"net/http"
"os"
"time"
)
var Router *gin.Engine
type LogStruct struct {
Date float64 `json:"date"`
Time time.Time `json:"time"`
Stream string `json:"stream"`
P string `json:"_p"`
Log string `json:"log"`
Kubernetes struct {
PodName string `json:"pod_name"`
NamespaceName string `json:"namespace_name"`
PodID string `json:"pod_id"`
Labels map[string]interface{} `json:"labels"`
Annotations map[string]interface{} `json:"annotations"`
Host string `json:"host"`
ContainerName string `json:"container_name"`
DockerID string `json:"docker_id"`
ContainerImage string `json:"container_image"`
} `json:"kubernetes"`
}
func init() {
config.InitConfig()
InitGin()
Controller()
}
func main() {
err := Router.Run(config.Gin.Addr)
if err != nil {
log.Fatal(err)
return
}
}
func InitGin() {
gin.SetMode(gin.ReleaseMode)
Router = gin.Default()
}
func Controller() {
Router.POST("/", func(c *gin.Context) {
c.Header("Content-Type", "application/json")
if !ValidatePassword(c.GetHeader("Authorization")) {
c.Status(http.StatusUnauthorized)
return
}
var remoteLog []LogStruct
// bind LogStruct
err := c.BindJSON(&remoteLog)
if err != nil {
c.Status(http.StatusBadRequest)
fmt.Println(err)
return
}
// parse remoteLog.time
for _, logStruct := range remoteLog {
err := Process(logStruct)
if err != nil {
c.Status(http.StatusInternalServerError)
fmt.Println(err)
return
}
}
})
}
func ValidatePassword(pass string) bool {
return pass == config.Auth.Pass
}
func Process(logStruct LogStruct) error {
fileName := GetPath(true, logStruct.Time.Year(), logStruct.Time.Month(), logStruct.Time.Day(), logStruct.Time.Hour(), logStruct.Kubernetes.NamespaceName, logStruct.Stream, logStruct.Kubernetes.ContainerName)
path := GetPath(false, logStruct.Time.Year(), logStruct.Time.Month(), logStruct.Time.Day(), logStruct.Time.Hour(), logStruct.Kubernetes.NamespaceName, logStruct.Stream, logStruct.Kubernetes.ContainerName)
// validate path exists
if _, err := os.Stat(path); os.IsNotExist(err) {
// create path
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return err
}
}
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer func(file *os.File) {
err := file.Close()
if err != nil {
fmt.Println(err)
}
}(file)
_, err = file.WriteString(fmt.Sprintf("[%s] %s\n", logStruct.Time, logStruct.Log))
if err != nil {
return err
}
return nil
}
func GetPath(includeFileName bool, year int, month time.Month, day int, hour int, namespace string, stream string, containerName string) string {
// dir + year/month/day/namespace/hour/podName-stream.log
// month to int
monthNumber := int(month)
if includeFileName {
return fmt.Sprintf("%s/%d-%d/%d/%s/%d/%s-%s.log", config.Dir.Dir, year, monthNumber, day, namespace, hour, containerName, stream)
}
return fmt.Sprintf("%s/%d-%d/%d/%s/%d/", config.Dir.Dir, year, monthNumber, day, namespace, hour)
}