package cmd import ( "context" "fmt" "github.com/bytedance/sonic" "leafdev.top/Ecosystem/recommender/internal/base" "leafdev.top/Ecosystem/recommender/internal/schema" "strconv" "strings" "github.com/spf13/cobra" ) func init() { RootCmd.AddCommand(workerCmd) } var workerCmd = &cobra.Command{ Use: "worker", Short: "Receive chunk data", Long: `Receive chunk data`, Run: func(cmd *cobra.Command, args []string) { app, err := CreateApp() if err != nil { panic(err) } runWorker(app) }, } func runWorker(app *base.Application) { app.Logger.Sugar.Info("start worker") var r = app.Service.Stream.Consumer(app.Config.Kafka.Topic, app.Config.Kafka.GroupId) for { fmt.Println("Loop!") var ctx = context.Background() msg, err := r.ReadMessage(ctx) if err != nil { app.Logger.Sugar.Error(err) continue } //fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,value=%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)) var data = &schema.ProcessPostResult{} if err := sonic.Unmarshal(msg.Value, data); err != nil { app.Logger.Sugar.Error(err) continue } postId, err := strconv.ParseUint(data.PostId, 10, 64) postEntity, err := app.Service.Post.GetPostById(ctx, schema.EntityId(postId)) if err != nil { app.Logger.Sugar.Error(err) continue } app.Logger.Sugar.Infof("receive post %s keywords", postEntity.Title) var processError error for _, k := range data.Keywords { // 清除 k 的空格 if strings.Trim(k, " ") == "" { continue } app.Logger.Sugar.Infof("bind post %s with tag %s", postEntity.Title, k) err = app.Service.Post.BindTag(ctx, postEntity, k) if err != nil { processError = err app.Logger.Sugar.Error(err) continue } } if processError == nil { err = app.Service.Post.SavePostEmbedding(ctx, postEntity) if err != nil { app.Logger.Sugar.Error(err) } err = app.Service.Post.MarkAsProcessed(ctx, postEntity) if err != nil { app.Logger.Sugar.Error(err) } } else { app.Logger.Sugar.Error(processError) } } }