package main import ( "context" "flag" "fmt" "github.com/bytedance/sonic" "net/http" "os" "path/filepath" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) type Deployment struct { Name string `json:"name"` Namespace string `json:"namespace"` Replicas int32 `json:"replicas"` ReadyReplicas int32 `json:"ready_replicas"` Status string `json:"status"` //CreatedAt string `json:"created_at"` } // Cache 缓存 type Cache struct { Deployments *[]Deployment `json:"deployments,omitempty"` LastUpdated time.Time `json:"-"` } var cachedData *Cache var config *rest.Config var err error var clientset *kubernetes.Clientset var namespaceList []string var refreshing = false func main() { // 获取环境变量中的命名空间列表 namespaces := os.Getenv("NAMESPACES") if namespaces == "" { fmt.Println("Environment variable NAMESPACES is not set") return } namespaceList = strings.Split(namespaces, ",") // 检查是否提供了 kubeconfig 文件路径 home := filepath.Join("~", ".kube", "config") kubeconfig := flag.String("kubeconfig", home, "absolute path to the kubeconfig file") flag.Parse() // 判断是否在集群内部运行 if _, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/token"); err == nil { // 在集群内部运行,使用集群内部配置 config, err = rest.InClusterConfig() if err != nil { fmt.Printf("Error creating in-cluster config: %v\n", err) return } } else { // 使用本地 kubeconfig 文件 config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { fmt.Printf("Error building kubeconfig: %v\n", err) return } } // 创建 Kubernetes 客户端 clientset, err = kubernetes.NewForConfig(config) if err != nil { fmt.Printf("Error creating Kubernetes client: %v\n", err) return } startWeb() } func startWeb() { // 创建一个新的 HTTP 服务器 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { // 判断是否正在刷新 if refreshing { time.Sleep(1 * time.Second) return } refreshing = true defer func() { refreshing = false }() if cachedData == nil || time.Since(cachedData.LastUpdated) >= 2*time.Second { d := getNS() // 缓存数据 cachedData = &Cache{ Deployments: d, LastUpdated: time.Now(), } } // 使用 JSON 格式返回数据 // header w.Header().Set("Content-Type", "application/json") // body text, err := sonic.Marshal(&cachedData) if err != nil { fmt.Printf("Error encoding JSON: %v\n", err) return } _, err = w.Write(text) if err != nil { fmt.Printf("Error writing JSON: %v\n", err) return } }) // 启动 HTTP 服务器 err := http.ListenAndServe(":8080", nil) if err != nil { fmt.Printf("Error starting HTTP server: %v\n", err) return } } func getNS() *[]Deployment { var d []Deployment // 遍历所有命名空间 for _, namespace := range namespaceList { deploymentsClient := clientset.AppsV1().Deployments(namespace) deployments, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{}) if err != nil { fmt.Printf("Error listing deployments in namespace %s: %v\n", namespace, err) continue } // 获取状态 progressing for _, deployment := range deployments.Items { // get type var status string // 只有达到最低 available replicas 才能进入 available if deployment.Status.AvailableReplicas >= deployment.Status.Replicas { status = "Available" } else { status = "Progressing" } //fmt.Println("AvailableReplicas", deployment.Status.AvailableReplicas) //fmt.Println("UnavailableReplicas", deployment.Status.UnavailableReplicas) d = append(d, Deployment{ Name: deployment.Name, Namespace: deployment.Namespace, Replicas: *deployment.Spec.Replicas, Status: status, ReadyReplicas: deployment.Status.ReadyReplicas, }) } } return &d }