99 lines
2.1 KiB
Go
99 lines
2.1 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/bytedance/sonic"
|
|
"leafdev.top/Ecosystem/recommender/internal/base"
|
|
"leafdev.top/Ecosystem/recommender/internal/schema"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
func init() {
|
|
RootCmd.AddCommand(workerCmd)
|
|
}
|
|
|
|
var workerCmd = &cobra.Command{
|
|
Use: "worker",
|
|
Short: "Receive chunk data",
|
|
Long: `Receive chunk data`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
app, err := CreateApp()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
runWorker(app)
|
|
},
|
|
}
|
|
|
|
func runWorker(app *base.Application) {
|
|
app.Logger.Sugar.Info("start worker")
|
|
var r = app.Service.Stream.Consumer(app.Config.Kafka.Topic, app.Config.Kafka.GroupId)
|
|
|
|
for {
|
|
fmt.Println("Loop!")
|
|
var ctx = context.Background()
|
|
|
|
msg, err := r.ReadMessage(ctx)
|
|
if err != nil {
|
|
app.Logger.Sugar.Error(err)
|
|
continue
|
|
}
|
|
|
|
//fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,value=%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value))
|
|
|
|
var data = &schema.ProcessPostResult{}
|
|
|
|
if err := sonic.Unmarshal(msg.Value, data); err != nil {
|
|
app.Logger.Sugar.Error(err)
|
|
continue
|
|
}
|
|
|
|
postId, err := strconv.ParseUint(data.PostId, 10, 64)
|
|
|
|
postEntity, err := app.Service.Post.GetPostById(ctx, schema.EntityId(postId))
|
|
if err != nil {
|
|
app.Logger.Sugar.Error(err)
|
|
continue
|
|
}
|
|
|
|
app.Logger.Sugar.Infof("receive post %s keywords", postEntity.Title)
|
|
|
|
var processError error
|
|
|
|
for _, k := range data.Keywords {
|
|
// 清除 k 的空格
|
|
if strings.Trim(k, " ") == "" {
|
|
continue
|
|
}
|
|
|
|
app.Logger.Sugar.Infof("bind post %s with tag %s", postEntity.Title, k)
|
|
err = app.Service.Post.BindTag(ctx, postEntity, k)
|
|
if err != nil {
|
|
processError = err
|
|
app.Logger.Sugar.Error(err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if processError == nil {
|
|
err = app.Service.Post.SavePostEmbedding(ctx, postEntity)
|
|
if err != nil {
|
|
app.Logger.Sugar.Error(err)
|
|
}
|
|
|
|
err = app.Service.Post.MarkAsProcessed(ctx, postEntity)
|
|
if err != nil {
|
|
app.Logger.Sugar.Error(err)
|
|
}
|
|
} else {
|
|
app.Logger.Sugar.Error(processError)
|
|
}
|
|
|
|
}
|
|
}
|