recommender/cmd/worker.go
2024-11-10 03:49:53 +08:00

99 lines
2.1 KiB
Go

package cmd
import (
"context"
"fmt"
"github.com/bytedance/sonic"
"leafdev.top/Ecosystem/recommender/internal/base"
"leafdev.top/Ecosystem/recommender/internal/schema"
"strconv"
"strings"
"github.com/spf13/cobra"
)
func init() {
RootCmd.AddCommand(workerCmd)
}
var workerCmd = &cobra.Command{
Use: "worker",
Short: "Receive chunk data",
Long: `Receive chunk data`,
Run: func(cmd *cobra.Command, args []string) {
app, err := CreateApp()
if err != nil {
panic(err)
}
runWorker(app)
},
}
func runWorker(app *base.Application) {
app.Logger.Sugar.Info("start worker")
var r = app.Service.Stream.Consumer(app.Config.Kafka.Topic, app.Config.Kafka.GroupId)
for {
fmt.Println("Loop!")
var ctx = context.Background()
msg, err := r.ReadMessage(ctx)
if err != nil {
app.Logger.Sugar.Error(err)
continue
}
//fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,value=%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value))
var data = &schema.ProcessPostResult{}
if err := sonic.Unmarshal(msg.Value, data); err != nil {
app.Logger.Sugar.Error(err)
continue
}
postId, err := strconv.ParseUint(data.PostId, 10, 64)
postEntity, err := app.Service.Post.GetPostById(ctx, schema.EntityId(postId))
if err != nil {
app.Logger.Sugar.Error(err)
continue
}
app.Logger.Sugar.Infof("receive post %s keywords", postEntity.Title)
var processError error
for _, k := range data.Keywords {
// 清除 k 的空格
if strings.Trim(k, " ") == "" {
continue
}
app.Logger.Sugar.Infof("bind post %s with tag %s", postEntity.Title, k)
err = app.Service.Post.BindTag(ctx, postEntity, k)
if err != nil {
processError = err
app.Logger.Sugar.Error(err)
continue
}
}
if processError == nil {
err = app.Service.Post.SavePostEmbedding(ctx, postEntity)
if err != nil {
app.Logger.Sugar.Error(err)
}
err = app.Service.Post.MarkAsProcessed(ctx, postEntity)
if err != nil {
app.Logger.Sugar.Error(err)
}
} else {
app.Logger.Sugar.Error(processError)
}
}
}