项目基本功能

This commit is contained in:
ivamp 2024-11-10 03:49:53 +08:00
parent d6fabedb23
commit 3e1c44839b
38 changed files with 945 additions and 246 deletions

218
cmd/create-collection.go Normal file
View File

@ -0,0 +1,218 @@
package cmd
import (
"context"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/spf13/cobra"
"leafdev.top/Ecosystem/recommender/internal/base"
)
func init() {
RootCmd.AddCommand(createCollectionCmd)
createCollectionCmd.Flags().String("dim", "768", "模型的维度")
// 将 dim 参数标记为必填
err := createCollectionCmd.MarkFlagRequired("dim")
if err != nil {
panic(err)
}
}
var createCollectionCmd = &cobra.Command{
Use: "create-collection",
Run: func(cmd *cobra.Command, args []string) {
app, err := CreateApp()
if err != nil {
panic(err)
return
}
// 获取 flag
dim, err := cmd.Flags().GetString("dim")
if err != nil {
panic(err)
}
createMilvusTagCollection(app, dim)
createMilvusUserSummaryCollection(app, dim)
createMilvusPostCollection(app, dim)
},
}
func createMilvusTagCollection(app *base.Application, dim string) {
var ctx = context.Background()
var field = []*entity.Field{
{
Name: "tag_id",
PrimaryKey: true,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "vector",
PrimaryKey: false,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": dim,
},
},
}
var schema = &entity.Schema{
CollectionName: app.Config.Milvus.TagCollection,
Description: "",
AutoID: true,
Fields: field,
EnableDynamicField: true,
}
err := app.Milvus.CreateCollection(ctx, schema, 2)
if err != nil {
panic(err)
}
index := entity.NewGenericIndex("idx_tag_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.TagCollection, "tag_id", index, false)
if err != nil {
panic(err)
}
index, err = entity.NewIndexAUTOINDEX(entity.L2)
if err != nil {
panic(err)
}
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.TagCollection, "vector", index, false)
if err != nil {
panic(err)
}
}
func createMilvusUserSummaryCollection(app *base.Application, dim string) {
var ctx = context.Background()
var field = []*entity.Field{
{
Name: "external_user_id",
PrimaryKey: true,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "application_id",
PrimaryKey: false,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "vector",
PrimaryKey: false,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": dim,
},
},
}
var schema = &entity.Schema{
CollectionName: app.Config.Milvus.UserSummaryCollection,
Description: "",
AutoID: true,
Fields: field,
EnableDynamicField: true,
}
err := app.Milvus.CreateCollection(ctx, schema, 2)
if err != nil {
panic(err)
}
index := entity.NewGenericIndex("idx_external_user_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.UserSummaryCollection, "external_user_id", index, false)
if err != nil {
panic(err)
}
index = entity.NewGenericIndex("idx_application_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.UserSummaryCollection, "application_id", index, false)
if err != nil {
panic(err)
}
index, err = entity.NewIndexAUTOINDEX(entity.L2)
if err != nil {
panic(err)
}
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.UserSummaryCollection, "vector", index, false)
if err != nil {
panic(err)
}
}
func createMilvusPostCollection(app *base.Application, dim string) {
var ctx = context.Background()
var field = []*entity.Field{
{
Name: "post_id",
PrimaryKey: true,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "category_id",
PrimaryKey: false,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "application_id",
PrimaryKey: false,
AutoID: false,
DataType: entity.FieldTypeInt64,
},
{
Name: "vector",
PrimaryKey: false,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": dim,
},
},
}
var schema = &entity.Schema{
CollectionName: app.Config.Milvus.PostCollection,
Description: "",
AutoID: true,
Fields: field,
EnableDynamicField: true,
}
err := app.Milvus.CreateCollection(ctx, schema, 2)
if err != nil {
panic(err)
}
index := entity.NewGenericIndex("idx_post_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.PostCollection, "post_id", index, false)
if err != nil {
panic(err)
}
index = entity.NewGenericIndex("idx_category_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.PostCollection, "category_id", index, false)
if err != nil {
panic(err)
}
index = entity.NewGenericIndex("idx_application_id", entity.Inverted, map[string]string{})
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.PostCollection, "application_id", index, false)
if err != nil {
panic(err)
}
index, err = entity.NewIndexAUTOINDEX(entity.L2)
if err != nil {
panic(err)
}
err = app.Milvus.CreateIndex(ctx, app.Config.Milvus.PostCollection, "vector", index, false)
if err != nil {
panic(err)
}
}

View File

@ -1,7 +1,9 @@
package cmd package cmd
import ( import (
"context"
"leafdev.top/Ecosystem/recommender/internal/base" "leafdev.top/Ecosystem/recommender/internal/base"
"sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -25,15 +27,22 @@ var scheduleCmd = &cobra.Command{
} }
func runSchedule(app *base.Application) { func runSchedule(app *base.Application) {
// var wg sync.WaitGroup
// var ctx = context.Background() var wg sync.WaitGroup
// wg.Add(1) var ctx = context.Background()
// // 启动一个定时器
// go func() {
// }() wg.Add(1)
// 启动一个定时器
go func() {
// wg.Wait() // defer cancel()
// run embedding
ctx.Done()
defer wg.Done()
}()
wg.Wait()
} }

View File

@ -7,6 +7,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/base" "leafdev.top/Ecosystem/recommender/internal/base"
"leafdev.top/Ecosystem/recommender/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Ecosystem/recommender/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
"leafdev.top/Ecosystem/recommender/internal/base/milvus"
"leafdev.top/Ecosystem/recommender/internal/base/orm" "leafdev.top/Ecosystem/recommender/internal/base/orm"
"leafdev.top/Ecosystem/recommender/internal/base/redis" "leafdev.top/Ecosystem/recommender/internal/base/redis"
"leafdev.top/Ecosystem/recommender/internal/base/s3" "leafdev.top/Ecosystem/recommender/internal/base/s3"
@ -28,6 +29,7 @@ var ProviderSet = wire.NewSet(
redis.NewRedis, redis.NewRedis,
s3.NewS3, s3.NewS3,
batch.NewBatch, batch.NewBatch,
milvus.NewService,
service.Provider, service.Provider,
handler.ProviderSet, handler.ProviderSet,
router.ProviderSetRouter, router.ProviderSetRouter,

View File

@ -11,6 +11,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/base" "leafdev.top/Ecosystem/recommender/internal/base"
"leafdev.top/Ecosystem/recommender/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Ecosystem/recommender/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
"leafdev.top/Ecosystem/recommender/internal/base/milvus"
"leafdev.top/Ecosystem/recommender/internal/base/orm" "leafdev.top/Ecosystem/recommender/internal/base/orm"
"leafdev.top/Ecosystem/recommender/internal/base/redis" "leafdev.top/Ecosystem/recommender/internal/base/redis"
"leafdev.top/Ecosystem/recommender/internal/base/s3" "leafdev.top/Ecosystem/recommender/internal/base/s3"
@ -30,6 +31,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/service/application" "leafdev.top/Ecosystem/recommender/internal/service/application"
"leafdev.top/Ecosystem/recommender/internal/service/auth" "leafdev.top/Ecosystem/recommender/internal/service/auth"
"leafdev.top/Ecosystem/recommender/internal/service/category" "leafdev.top/Ecosystem/recommender/internal/service/category"
"leafdev.top/Ecosystem/recommender/internal/service/embedding"
"leafdev.top/Ecosystem/recommender/internal/service/jwks" "leafdev.top/Ecosystem/recommender/internal/service/jwks"
"leafdev.top/Ecosystem/recommender/internal/service/post" "leafdev.top/Ecosystem/recommender/internal/service/post"
"leafdev.top/Ecosystem/recommender/internal/service/stream" "leafdev.top/Ecosystem/recommender/internal/service/stream"
@ -49,13 +51,15 @@ func CreateApp() (*base.Application, error) {
applicationController := v1.NewApplicationController(authService, applicationService) applicationController := v1.NewApplicationController(authService, applicationService)
application_v1ApplicationController := application_v1.NewApplicationController(authService, applicationService) application_v1ApplicationController := application_v1.NewApplicationController(authService, applicationService)
streamService := stream.NewService(config) streamService := stream.NewService(config)
postService := post.NewService(query, config, streamService) client := milvus.NewService(config, loggerLogger)
redisRedis := redis.NewRedis(config)
embeddingService := embedding.NewService(config, loggerLogger, query, redisRedis)
postService := post.NewService(query, config, streamService, client, embeddingService)
categoryService := category.NewService(query) categoryService := category.NewService(query)
postController := application_v1.NewPostController(authService, applicationService, postService, categoryService) postController := application_v1.NewPostController(authService, applicationService, postService, categoryService)
categoryController := application_v1.NewCategoryController(authService, applicationService, postService, categoryService) categoryController := application_v1.NewCategoryController(authService, applicationService, postService, categoryService)
userService := user.NewService(query, postService, loggerLogger) userService := user.NewService(query, postService, loggerLogger, client, embeddingService, config)
redisRedis := redis.NewRedis(config) userController := application_v1.NewUserController(authService, applicationService, userService, postService, loggerLogger, redisRedis, categoryService)
userController := application_v1.NewUserController(authService, applicationService, userService, postService, loggerLogger, redisRedis)
handlers := http.NewHandler(applicationController, application_v1ApplicationController, postController, categoryController, userController) handlers := http.NewHandler(applicationController, application_v1ApplicationController, postController, categoryController, userController)
api := router.NewApiRoute(handlers) api := router.NewApiRoute(handlers)
swaggerRouter := router.NewSwaggerRoute() swaggerRouter := router.NewSwaggerRoute()
@ -71,13 +75,13 @@ func CreateApp() (*base.Application, error) {
grpcInterceptor := grpc.NewInterceptor(interceptorAuth, interceptorLogger) grpcInterceptor := grpc.NewInterceptor(interceptorAuth, interceptorLogger)
grpcHandlers := grpc.NewHandler(documentService, grpcInterceptor) grpcHandlers := grpc.NewHandler(documentService, grpcInterceptor)
handlerHandler := handler.NewHandler(grpcHandlers, handlers) handlerHandler := handler.NewHandler(grpcHandlers, handlers)
serviceService := service.NewService(loggerLogger, jwksJWKS, streamService, authService, applicationService, postService, categoryService, userService) serviceService := service.NewService(loggerLogger, jwksJWKS, streamService, authService, applicationService, postService, categoryService, userService, embeddingService)
batchBatch := batch.NewBatch(loggerLogger) batchBatch := batch.NewBatch(loggerLogger)
s3S3 := s3.NewS3(config) s3S3 := s3.NewS3(config)
baseApplication := base.NewApplication(config, httpServer, handlerHandler, loggerLogger, serviceService, httpMiddleware, redisRedis, batchBatch, s3S3, db, query) baseApplication := base.NewApplication(config, httpServer, handlerHandler, loggerLogger, serviceService, httpMiddleware, redisRedis, batchBatch, s3S3, db, query, client)
return baseApplication, nil return baseApplication, nil
} }
// wire.go: // wire.go:
var ProviderSet = wire.NewSet(conf.ProviderConfig, logger.NewZapLogger, orm.NewGORM, dao.NewQuery, redis.NewRedis, s3.NewS3, batch.NewBatch, service.Provider, handler.ProviderSet, router.ProviderSetRouter, server.NewHTTPServer, base.NewApplication) var ProviderSet = wire.NewSet(conf.ProviderConfig, logger.NewZapLogger, orm.NewGORM, dao.NewQuery, redis.NewRedis, s3.NewS3, batch.NewBatch, milvus.NewService, service.Provider, handler.ProviderSet, router.ProviderSetRouter, server.NewHTTPServer, base.NewApplication)

View File

@ -81,6 +81,11 @@ func runWorker(app *base.Application) {
} }
if processError == nil { if processError == nil {
err = app.Service.Post.SavePostEmbedding(ctx, postEntity)
if err != nil {
app.Logger.Sugar.Error(err)
}
err = app.Service.Post.MarkAsProcessed(ctx, postEntity) err = app.Service.Post.MarkAsProcessed(ctx, postEntity)
if err != nil { if err != nil {
app.Logger.Sugar.Error(err) app.Logger.Sugar.Error(err)

View File

@ -44,10 +44,12 @@ s3:
milvus: milvus:
host: 127.0.0.1 host: 127.0.0.1
port: 19530 port: 19530
db_name: library db_name: recommender
# 由于 Milvus 不支持新增列, 如果更换了 Embedding Model,建议新建一个 Collection # 由于 Milvus 不支持新增列, 如果更换了 Embedding Model,建议新建一个 Collection
# 或者可以扩展张量 # 或者可以扩展张量
document_collection: documents user_summary_collection: user_summaries
tag_collection: tags
post_collection: posts
user: user:
password: password:

View File

@ -718,34 +718,16 @@ const docTemplate = `{
"summary": "Dislike", "summary": "Dislike",
"parameters": [ "parameters": [
{ {
"description": "UserLikePost", "description": "UserDislikePost",
"name": "UserLikePost", "name": "UserDislikePost",
"in": "body", "in": "body",
"required": true, "required": true,
"schema": { "schema": {
"$ref": "#/definitions/request.UserLikePost" "$ref": "#/definitions/request.UserDislikePost"
} }
} }
], ],
"responses": { "responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/response.ResponseBody"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/entity.Category"
}
}
}
]
}
},
"400": { "400": {
"description": "Bad Request", "description": "Bad Request",
"schema": { "schema": {
@ -784,6 +766,45 @@ const docTemplate = `{
} }
} }
], ],
"responses": {
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/response.ResponseBody"
}
}
}
}
},
"/applications/v1/users/_suggest": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "推荐资源",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"application_api"
],
"summary": "Suggest",
"parameters": [
{
"description": "UserSuggestsRequest",
"name": "UserSuggestsRequest",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/request.UserSuggestsRequest"
}
}
],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
@ -796,7 +817,10 @@ const docTemplate = `{
"type": "object", "type": "object",
"properties": { "properties": {
"data": { "data": {
"$ref": "#/definitions/entity.Category" "type": "array",
"items": {
"$ref": "#/definitions/entity.Post"
}
} }
} }
} }
@ -909,6 +933,9 @@ const docTemplate = `{
}, },
"updated_at": { "updated_at": {
"type": "string" "type": "string"
},
"vectorized": {
"type": "boolean"
} }
} }
}, },
@ -1014,6 +1041,17 @@ const docTemplate = `{
} }
} }
}, },
"request.UserDislikePost": {
"type": "object",
"properties": {
"external_user_id": {
"type": "string"
},
"post_id": {
"type": "integer"
}
}
},
"request.UserLikePost": { "request.UserLikePost": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1025,6 +1063,21 @@ const docTemplate = `{
} }
} }
}, },
"request.UserSuggestsRequest": {
"type": "object",
"required": [
"category_id",
"external_user_id"
],
"properties": {
"category_id": {
"type": "integer"
},
"external_user_id": {
"type": "string"
}
}
},
"response.ResponseBody": { "response.ResponseBody": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@ -709,34 +709,16 @@
"summary": "Dislike", "summary": "Dislike",
"parameters": [ "parameters": [
{ {
"description": "UserLikePost", "description": "UserDislikePost",
"name": "UserLikePost", "name": "UserDislikePost",
"in": "body", "in": "body",
"required": true, "required": true,
"schema": { "schema": {
"$ref": "#/definitions/request.UserLikePost" "$ref": "#/definitions/request.UserDislikePost"
} }
} }
], ],
"responses": { "responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/response.ResponseBody"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/entity.Category"
}
}
}
]
}
},
"400": { "400": {
"description": "Bad Request", "description": "Bad Request",
"schema": { "schema": {
@ -775,6 +757,45 @@
} }
} }
], ],
"responses": {
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/response.ResponseBody"
}
}
}
}
},
"/applications/v1/users/_suggest": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "推荐资源",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"application_api"
],
"summary": "Suggest",
"parameters": [
{
"description": "UserSuggestsRequest",
"name": "UserSuggestsRequest",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/request.UserSuggestsRequest"
}
}
],
"responses": { "responses": {
"200": { "200": {
"description": "OK", "description": "OK",
@ -787,7 +808,10 @@
"type": "object", "type": "object",
"properties": { "properties": {
"data": { "data": {
"$ref": "#/definitions/entity.Category" "type": "array",
"items": {
"$ref": "#/definitions/entity.Post"
}
} }
} }
} }
@ -900,6 +924,9 @@
}, },
"updated_at": { "updated_at": {
"type": "string" "type": "string"
},
"vectorized": {
"type": "boolean"
} }
} }
}, },
@ -1005,6 +1032,17 @@
} }
} }
}, },
"request.UserDislikePost": {
"type": "object",
"properties": {
"external_user_id": {
"type": "string"
},
"post_id": {
"type": "integer"
}
}
},
"request.UserLikePost": { "request.UserLikePost": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1016,6 +1054,21 @@
} }
} }
}, },
"request.UserSuggestsRequest": {
"type": "object",
"required": [
"category_id",
"external_user_id"
],
"properties": {
"category_id": {
"type": "integer"
},
"external_user_id": {
"type": "string"
}
}
},
"response.ResponseBody": { "response.ResponseBody": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@ -62,6 +62,8 @@ definitions:
type: string type: string
updated_at: updated_at:
type: string type: string
vectorized:
type: boolean
type: object type: object
page.PagedResult-entity_Category: page.PagedResult-entity_Category:
properties: properties:
@ -133,6 +135,13 @@ definitions:
- target_id - target_id
- title - title
type: object type: object
request.UserDislikePost:
properties:
external_user_id:
type: string
post_id:
type: integer
type: object
request.UserLikePost: request.UserLikePost:
properties: properties:
external_user_id: external_user_id:
@ -140,6 +149,16 @@ definitions:
post_id: post_id:
type: integer type: integer
type: object type: object
request.UserSuggestsRequest:
properties:
category_id:
type: integer
external_user_id:
type: string
required:
- category_id
- external_user_id
type: object
response.ResponseBody: response.ResponseBody:
properties: properties:
data: {} data: {}
@ -554,24 +573,15 @@ paths:
- application/json - application/json
description: 从用户的标签喜好中移除内容 description: 从用户的标签喜好中移除内容
parameters: parameters:
- description: UserLikePost - description: UserDislikePost
in: body in: body
name: UserLikePost name: UserDislikePost
required: true required: true
schema: schema:
$ref: '#/definitions/request.UserLikePost' $ref: '#/definitions/request.UserDislikePost'
produces: produces:
- application/json - application/json
responses: responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/response.ResponseBody'
- properties:
data:
$ref: '#/definitions/entity.Category'
type: object
"400": "400":
description: Bad Request description: Bad Request
schema: schema:
@ -596,15 +606,6 @@ paths:
produces: produces:
- application/json - application/json
responses: responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/response.ResponseBody'
- properties:
data:
$ref: '#/definitions/entity.Category'
type: object
"400": "400":
description: Bad Request description: Bad Request
schema: schema:
@ -614,6 +615,41 @@ paths:
summary: Like summary: Like
tags: tags:
- application_api - application_api
/applications/v1/users/_suggest:
post:
consumes:
- application/json
description: 推荐资源
parameters:
- description: UserSuggestsRequest
in: body
name: UserSuggestsRequest
required: true
schema:
$ref: '#/definitions/request.UserSuggestsRequest'
produces:
- application/json
responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/response.ResponseBody'
- properties:
data:
items:
$ref: '#/definitions/entity.Post'
type: array
type: object
"400":
description: Bad Request
schema:
$ref: '#/definitions/response.ResponseBody'
security:
- ApiKeyAuth: []
summary: Suggest
tags:
- application_api
securityDefinitions: securityDefinitions:
ApiKeyAuth: ApiKeyAuth:
in: header in: header

View File

@ -1,6 +1,7 @@
package base package base
import ( import (
"github.com/milvus-io/milvus-sdk-go/v2/client"
"gorm.io/gorm" "gorm.io/gorm"
"leafdev.top/Ecosystem/recommender/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Ecosystem/recommender/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
@ -26,6 +27,7 @@ type Application struct {
Redis *redis.Redis Redis *redis.Redis
Batch *batch.Batch Batch *batch.Batch
S3 *s3.S3 S3 *s3.S3
Milvus client.Client
} }
func NewApplication( func NewApplication(
@ -40,6 +42,7 @@ func NewApplication(
S3 *s3.S3, S3 *s3.S3,
GORM *gorm.DB, GORM *gorm.DB,
DAO *dao.Query, DAO *dao.Query,
Milvus client.Client,
) *Application { ) *Application {
return &Application{ return &Application{
Config: config, Config: config,
@ -53,5 +56,6 @@ func NewApplication(
S3: S3, S3: S3,
GORM: GORM, GORM: GORM,
DAO: DAO, DAO: DAO,
Milvus: Milvus,
} }
} }

View File

@ -103,7 +103,9 @@ type Milvus struct {
Host string `yaml:"host" mapstructure:"host"` Host string `yaml:"host" mapstructure:"host"`
Port int `yaml:"port" mapstructure:"port"` Port int `yaml:"port" mapstructure:"port"`
DBName string `yaml:"db_name" mapstructure:"db_name"` DBName string `yaml:"db_name" mapstructure:"db_name"`
DocumentCollection string `yaml:"document_collection" mapstructure:"document_collection"`
User string `yaml:"user" mapstructure:"user"` User string `yaml:"user" mapstructure:"user"`
Password string `yaml:"password" mapstructure:"password"` Password string `yaml:"password" mapstructure:"password"`
UserSummaryCollection string `yaml:"user_summary_collection" mapstructure:"user_summary_collection"`
TagCollection string `yaml:"tag_collection" mapstructure:"tag_collection"`
PostCollection string `yaml:"post_collection" mapstructure:"post_collection"`
} }

View File

@ -3,13 +3,13 @@ package milvus
import ( import (
"context" "context"
"github.com/milvus-io/milvus-sdk-go/v2/client" "github.com/milvus-io/milvus-sdk-go/v2/client"
"leafdev.top/Leaf/leaf-library/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Leaf/leaf-library/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
"strconv" "strconv"
) )
func NewMilvus(config *conf.Config, logger *logger.Logger) client.Client { func NewService(config *conf.Config, logger *logger.Logger) client.Client {
var address = config.Milvus.Host + ":" + strconv.Itoa(config.Milvus.Port) var address = config.Milvus.Host + ":" + strconv.Itoa(config.Milvus.Port)
logger.Sugar.Infof("Waiting for milvus, address=%s, dbname=%s", address, config.Milvus.DBName) logger.Sugar.Infof("Waiting for milvus, address=%s, dbname=%s", address, config.Milvus.DBName)

View File

@ -58,11 +58,6 @@ func newPostTag(db *gorm.DB, opts ...gen.DOOption) postTag {
db: db.Session(&gorm.Session{}), db: db.Session(&gorm.Session{}),
RelationField: field.NewRelation("Tag", "entity.Tag"), RelationField: field.NewRelation("Tag", "entity.Tag"),
Application: struct {
field.RelationField
}{
RelationField: field.NewRelation("Tag.Application", "entity.Application"),
},
} }
_postTag.fillFieldMap() _postTag.fillFieldMap()
@ -217,10 +212,6 @@ type postTagBelongsToTag struct {
db *gorm.DB db *gorm.DB
field.RelationField field.RelationField
Application struct {
field.RelationField
}
} }
func (a postTagBelongsToTag) Where(conds ...field.Expr) *postTagBelongsToTag { func (a postTagBelongsToTag) Where(conds ...field.Expr) *postTagBelongsToTag {

View File

@ -36,6 +36,7 @@ func newPost(db *gorm.DB, opts ...gen.DOOption) post {
_post.ApplicationId = field.NewUint(tableName, "application_id") _post.ApplicationId = field.NewUint(tableName, "application_id")
_post.CategoryId = field.NewUint(tableName, "category_id") _post.CategoryId = field.NewUint(tableName, "category_id")
_post.Processed = field.NewBool(tableName, "processed") _post.Processed = field.NewBool(tableName, "processed")
_post.Vectorized = field.NewBool(tableName, "vectorized")
_post.Application = postBelongsToApplication{ _post.Application = postBelongsToApplication{
db: db.Session(&gorm.Session{}), db: db.Session(&gorm.Session{}),
@ -71,6 +72,7 @@ type post struct {
ApplicationId field.Uint ApplicationId field.Uint
CategoryId field.Uint CategoryId field.Uint
Processed field.Bool Processed field.Bool
Vectorized field.Bool
Application postBelongsToApplication Application postBelongsToApplication
Category postBelongsToCategory Category postBelongsToCategory
@ -99,6 +101,7 @@ func (p *post) updateTableName(table string) *post {
p.ApplicationId = field.NewUint(table, "application_id") p.ApplicationId = field.NewUint(table, "application_id")
p.CategoryId = field.NewUint(table, "category_id") p.CategoryId = field.NewUint(table, "category_id")
p.Processed = field.NewBool(table, "processed") p.Processed = field.NewBool(table, "processed")
p.Vectorized = field.NewBool(table, "vectorized")
p.fillFieldMap() p.fillFieldMap()
@ -115,7 +118,7 @@ func (p *post) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
} }
func (p *post) fillFieldMap() { func (p *post) fillFieldMap() {
p.fieldMap = make(map[string]field.Expr, 11) p.fieldMap = make(map[string]field.Expr, 12)
p.fieldMap["id"] = p.Id p.fieldMap["id"] = p.Id
p.fieldMap["created_at"] = p.CreatedAt p.fieldMap["created_at"] = p.CreatedAt
p.fieldMap["updated_at"] = p.UpdatedAt p.fieldMap["updated_at"] = p.UpdatedAt
@ -125,6 +128,7 @@ func (p *post) fillFieldMap() {
p.fieldMap["application_id"] = p.ApplicationId p.fieldMap["application_id"] = p.ApplicationId
p.fieldMap["category_id"] = p.CategoryId p.fieldMap["category_id"] = p.CategoryId
p.fieldMap["processed"] = p.Processed p.fieldMap["processed"] = p.Processed
p.fieldMap["vectorized"] = p.Vectorized
} }

View File

@ -35,11 +35,6 @@ func newTagMapping(db *gorm.DB, opts ...gen.DOOption) tagMapping {
db: db.Session(&gorm.Session{}), db: db.Session(&gorm.Session{}),
RelationField: field.NewRelation("Tag", "entity.Tag"), RelationField: field.NewRelation("Tag", "entity.Tag"),
Application: struct {
field.RelationField
}{
RelationField: field.NewRelation("Tag.Application", "entity.Application"),
},
} }
_tagMapping.Application = tagMappingBelongsToApplication{ _tagMapping.Application = tagMappingBelongsToApplication{
@ -122,10 +117,6 @@ type tagMappingBelongsToTag struct {
db *gorm.DB db *gorm.DB
field.RelationField field.RelationField
Application struct {
field.RelationField
}
} }
func (a tagMappingBelongsToTag) Where(conds ...field.Expr) *tagMappingBelongsToTag { func (a tagMappingBelongsToTag) Where(conds ...field.Expr) *tagMappingBelongsToTag {

View File

@ -29,12 +29,7 @@ func newTag(db *gorm.DB, opts ...gen.DOOption) tag {
_tag.ALL = field.NewAsterisk(tableName) _tag.ALL = field.NewAsterisk(tableName)
_tag.Id = field.NewUint(tableName, "id") _tag.Id = field.NewUint(tableName, "id")
_tag.Name = field.NewString(tableName, "name") _tag.Name = field.NewString(tableName, "name")
_tag.ApplicationId = field.NewUint(tableName, "application_id") _tag.Vectorized = field.NewBool(tableName, "vectorized")
_tag.Application = tagBelongsToApplication{
db: db.Session(&gorm.Session{}),
RelationField: field.NewRelation("Application", "entity.Application"),
}
_tag.fillFieldMap() _tag.fillFieldMap()
@ -47,8 +42,7 @@ type tag struct {
ALL field.Asterisk ALL field.Asterisk
Id field.Uint Id field.Uint
Name field.String Name field.String
ApplicationId field.Uint Vectorized field.Bool
Application tagBelongsToApplication
fieldMap map[string]field.Expr fieldMap map[string]field.Expr
} }
@ -67,7 +61,7 @@ func (t *tag) updateTableName(table string) *tag {
t.ALL = field.NewAsterisk(table) t.ALL = field.NewAsterisk(table)
t.Id = field.NewUint(table, "id") t.Id = field.NewUint(table, "id")
t.Name = field.NewString(table, "name") t.Name = field.NewString(table, "name")
t.ApplicationId = field.NewUint(table, "application_id") t.Vectorized = field.NewBool(table, "vectorized")
t.fillFieldMap() t.fillFieldMap()
@ -84,11 +78,10 @@ func (t *tag) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
} }
func (t *tag) fillFieldMap() { func (t *tag) fillFieldMap() {
t.fieldMap = make(map[string]field.Expr, 4) t.fieldMap = make(map[string]field.Expr, 3)
t.fieldMap["id"] = t.Id t.fieldMap["id"] = t.Id
t.fieldMap["name"] = t.Name t.fieldMap["name"] = t.Name
t.fieldMap["application_id"] = t.ApplicationId t.fieldMap["vectorized"] = t.Vectorized
} }
func (t tag) clone(db *gorm.DB) tag { func (t tag) clone(db *gorm.DB) tag {
@ -101,77 +94,6 @@ func (t tag) replaceDB(db *gorm.DB) tag {
return t return t
} }
type tagBelongsToApplication struct {
db *gorm.DB
field.RelationField
}
func (a tagBelongsToApplication) Where(conds ...field.Expr) *tagBelongsToApplication {
if len(conds) == 0 {
return &a
}
exprs := make([]clause.Expression, 0, len(conds))
for _, cond := range conds {
exprs = append(exprs, cond.BeCond().(clause.Expression))
}
a.db = a.db.Clauses(clause.Where{Exprs: exprs})
return &a
}
func (a tagBelongsToApplication) WithContext(ctx context.Context) *tagBelongsToApplication {
a.db = a.db.WithContext(ctx)
return &a
}
func (a tagBelongsToApplication) Session(session *gorm.Session) *tagBelongsToApplication {
a.db = a.db.Session(session)
return &a
}
func (a tagBelongsToApplication) Model(m *entity.Tag) *tagBelongsToApplicationTx {
return &tagBelongsToApplicationTx{a.db.Model(m).Association(a.Name())}
}
type tagBelongsToApplicationTx struct{ tx *gorm.Association }
func (a tagBelongsToApplicationTx) Find() (result *entity.Application, err error) {
return result, a.tx.Find(&result)
}
func (a tagBelongsToApplicationTx) Append(values ...*entity.Application) (err error) {
targetValues := make([]interface{}, len(values))
for i, v := range values {
targetValues[i] = v
}
return a.tx.Append(targetValues...)
}
func (a tagBelongsToApplicationTx) Replace(values ...*entity.Application) (err error) {
targetValues := make([]interface{}, len(values))
for i, v := range values {
targetValues[i] = v
}
return a.tx.Replace(targetValues...)
}
func (a tagBelongsToApplicationTx) Delete(values ...*entity.Application) (err error) {
targetValues := make([]interface{}, len(values))
for i, v := range values {
targetValues[i] = v
}
return a.tx.Delete(targetValues...)
}
func (a tagBelongsToApplicationTx) Clear() error {
return a.tx.Clear()
}
func (a tagBelongsToApplicationTx) Count() int64 {
return a.tx.Count()
}
type tagDo struct{ gen.DO } type tagDo struct{ gen.DO }
type ITagDo interface { type ITagDo interface {

View File

@ -35,11 +35,6 @@ func newUserTagScore(db *gorm.DB, opts ...gen.DOOption) userTagScore {
db: db.Session(&gorm.Session{}), db: db.Session(&gorm.Session{}),
RelationField: field.NewRelation("Tag", "entity.Tag"), RelationField: field.NewRelation("Tag", "entity.Tag"),
Application: struct {
field.RelationField
}{
RelationField: field.NewRelation("Tag.Application", "entity.Application"),
},
} }
_userTagScore.Application = userTagScoreBelongsToApplication{ _userTagScore.Application = userTagScoreBelongsToApplication{
@ -122,10 +117,6 @@ type userTagScoreBelongsToTag struct {
db *gorm.DB db *gorm.DB
field.RelationField field.RelationField
Application struct {
field.RelationField
}
} }
func (a userTagScoreBelongsToTag) Where(conds ...field.Expr) *userTagScoreBelongsToTag { func (a userTagScoreBelongsToTag) Where(conds ...field.Expr) *userTagScoreBelongsToTag {

View File

@ -13,6 +13,7 @@ type Post struct {
Category *Category `json:"category"` Category *Category `json:"category"`
CategoryId *schema.EntityId `json:"category_id"` CategoryId *schema.EntityId `json:"category_id"`
Processed bool `json:"processed"` Processed bool `json:"processed"`
Vectorized bool `json:"vectorized"`
} }
func (u *Post) TableName() string { func (u *Post) TableName() string {

View File

@ -5,8 +5,9 @@ import "leafdev.top/Ecosystem/recommender/internal/schema"
type Tag struct { type Tag struct {
Id schema.EntityId `gorm:"primarykey" json:"id"` Id schema.EntityId `gorm:"primarykey" json:"id"`
Name string `json:"name"` Name string `json:"name"`
Application *Application Vectorized bool `json:"vectorized"`
ApplicationId schema.EntityId `json:"application_id"` //Application *Application
//ApplicationId schema.EntityId `json:"application_id"`
} }
func (u *Tag) TableName() string { func (u *Tag) TableName() string {

View File

@ -10,6 +10,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/service/auth" "leafdev.top/Ecosystem/recommender/internal/service/auth"
"leafdev.top/Ecosystem/recommender/internal/service/category" "leafdev.top/Ecosystem/recommender/internal/service/category"
"leafdev.top/Ecosystem/recommender/internal/service/post" "leafdev.top/Ecosystem/recommender/internal/service/post"
"leafdev.top/Ecosystem/recommender/pkg/consts"
"net/http" "net/http"
) )
@ -94,6 +95,17 @@ func (pc *PostController) Save(c *gin.Context) {
return return
} }
exists, err := pc.postService.TargetIdExists(c, app, postSaveRequest.TargetId)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusBadRequest).Send()
return
}
if exists {
response.Ctx(c).Status(http.StatusConflict).Error(consts.ErrPostTargetIdExists).Send()
return
}
var postEntity = &entity.Post{ var postEntity = &entity.Post{
Title: postSaveRequest.Title, Title: postSaveRequest.Title,
Content: postSaveRequest.Content, Content: postSaveRequest.Content,
@ -103,7 +115,7 @@ func (pc *PostController) Save(c *gin.Context) {
Processed: false, Processed: false,
} }
err = pc.postService.CreatePost(c, postEntity) err = pc.postService.CreatePost(c, app, postEntity)
response.Ctx(c).Error(err).Data(postEntity).Send() response.Ctx(c).Error(err).Data(postEntity).Send()
return return

View File

@ -8,6 +8,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/handler/http/response" "leafdev.top/Ecosystem/recommender/internal/handler/http/response"
"leafdev.top/Ecosystem/recommender/internal/service/application" "leafdev.top/Ecosystem/recommender/internal/service/application"
"leafdev.top/Ecosystem/recommender/internal/service/auth" "leafdev.top/Ecosystem/recommender/internal/service/auth"
"leafdev.top/Ecosystem/recommender/internal/service/category"
"leafdev.top/Ecosystem/recommender/internal/service/post" "leafdev.top/Ecosystem/recommender/internal/service/post"
"leafdev.top/Ecosystem/recommender/internal/service/user" "leafdev.top/Ecosystem/recommender/internal/service/user"
"leafdev.top/Ecosystem/recommender/pkg/consts" "leafdev.top/Ecosystem/recommender/pkg/consts"
@ -26,6 +27,7 @@ type UserController struct {
postService *post.Service postService *post.Service
logger *logger.Logger logger *logger.Logger
redis *redis.Redis redis *redis.Redis
categoryService *category.Service
} }
func NewUserController( func NewUserController(
@ -35,7 +37,7 @@ func NewUserController(
postService *post.Service, postService *post.Service,
logger *logger.Logger, logger *logger.Logger,
redis *redis.Redis, redis *redis.Redis,
categoryService *category.Service,
) *UserController { ) *UserController {
return &UserController{ return &UserController{
authService: authService, authService: authService,
@ -44,6 +46,7 @@ func NewUserController(
postService: postService, postService: postService,
logger: logger, logger: logger,
redis: redis, redis: redis,
categoryService: categoryService,
} }
} }
@ -55,7 +58,6 @@ func NewUserController(
// @Produce json // @Produce json
// @Security ApiKeyAuth // @Security ApiKeyAuth
// @Param UserLikePost body request.UserLikePost true "UserLikePost" // @Param UserLikePost body request.UserLikePost true "UserLikePost"
// @Success 200 {object} response.ResponseBody{data=entity.Category}
// @Failure 400 {object} response.ResponseBody // @Failure 400 {object} response.ResponseBody
// @Router /applications/v1/users/_like [post] // @Router /applications/v1/users/_like [post]
func (uc *UserController) Like(c *gin.Context) { func (uc *UserController) Like(c *gin.Context) {
@ -90,6 +92,11 @@ func (uc *UserController) Like(c *gin.Context) {
return return
} }
if !postEntity.Vectorized || !postEntity.Processed {
response.Ctx(c).Status(http.StatusBadRequest).Error(consts.ErrPostNotReady).Send()
return
}
// 检测是否有 // 检测是否有
var cacheKey = uc.redis.Prefix(TaskProcessing + ":" + userLikePostRequest.PostId.String()) var cacheKey = uc.redis.Prefix(TaskProcessing + ":" + userLikePostRequest.PostId.String())
// if exists // if exists
@ -130,8 +137,7 @@ func (uc *UserController) Like(c *gin.Context) {
// @Accept json // @Accept json
// @Produce json // @Produce json
// @Security ApiKeyAuth // @Security ApiKeyAuth
// @Param UserLikePost body request.UserLikePost true "UserLikePost" // @Param UserDislikePost body request.UserDislikePost true "UserDislikePost"
// @Success 200 {object} response.ResponseBody{data=entity.Category}
// @Failure 400 {object} response.ResponseBody // @Failure 400 {object} response.ResponseBody
// @Router /applications/v1/users/_dislike [post] // @Router /applications/v1/users/_dislike [post]
func (uc *UserController) Dislike(c *gin.Context) { func (uc *UserController) Dislike(c *gin.Context) {
@ -166,6 +172,11 @@ func (uc *UserController) Dislike(c *gin.Context) {
return return
} }
if !postEntity.Vectorized || !postEntity.Processed {
response.Ctx(c).Status(http.StatusBadRequest).Error(consts.ErrPostNotReady).Send()
return
}
// 检测是否有 // 检测是否有
var cacheKey = uc.redis.Prefix(TaskProcessing + ":" + userDislikePostRequest.PostId.String()) var cacheKey = uc.redis.Prefix(TaskProcessing + ":" + userDislikePostRequest.PostId.String())
exists, err := uc.redis.Client.Exists(c, cacheKey).Result() exists, err := uc.redis.Client.Exists(c, cacheKey).Result()
@ -197,3 +208,80 @@ func (uc *UserController) Dislike(c *gin.Context) {
response.Ctx(c).Status(http.StatusNoContent).Send() response.Ctx(c).Status(http.StatusNoContent).Send()
return return
} }
// Suggest godoc
// @Summary Suggest
// @Description 推荐资源
// @Tags application_api
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param UserSuggestsRequest body request.UserSuggestsRequest true "UserSuggestsRequest"
// @Success 200 {object} response.ResponseBody{data=[]entity.Post}
// @Failure 400 {object} response.ResponseBody
// @Router /applications/v1/users/_suggest [post]
func (uc *UserController) Suggest(c *gin.Context) {
app, err := uc.authService.GetApplication(c)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusBadRequest).Send()
return
}
var userSuggestsRequest = &request.UserSuggestsRequest{}
if err := c.ShouldBindJSON(userSuggestsRequest); err != nil {
response.Ctx(c).Error(err).Status(http.StatusBadRequest).Send()
return
}
exists, err := uc.userService.UserExists(c, userSuggestsRequest.ExternalUserId, app)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Send()
return
}
if !exists {
response.Ctx(c).Status(http.StatusNotFound).Send()
return
}
externalUserEntity, err := uc.userService.GetExternalUser(c, userSuggestsRequest.ExternalUserId, app)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Send()
return
}
// category
exists, err = uc.categoryService.CategoryExists(c, userSuggestsRequest.CategoryId, app)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Send()
return
}
if !exists {
response.Ctx(c).Status(http.StatusNotFound).Error(consts.ErrCategoryNotExists).Send()
return
}
categoryEntity, err := uc.categoryService.GetCategoryById(c, userSuggestsRequest.CategoryId)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Send()
return
}
// 建议文章
err = uc.userService.SummaryUser(c, externalUserEntity, app)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Error(err).Send()
return
}
// 建议文章
posts, err := uc.userService.SuggestPosts(c, externalUserEntity, categoryEntity)
if err != nil {
response.Ctx(c).Error(err).Status(http.StatusInternalServerError).Send()
return
}
response.Ctx(c).Data(posts).Send()
}

View File

@ -19,3 +19,8 @@ type UserDislikePost struct {
PostId schema.EntityId `json:"post_id" uri:"post_id"` PostId schema.EntityId `json:"post_id" uri:"post_id"`
ExternalUserId string `json:"external_user_id"` ExternalUserId string `json:"external_user_id"`
} }
type UserSuggestsRequest struct {
ExternalUserId string `json:"external_user_id" binding:"required"`
CategoryId schema.EntityId `json:"category_id" binding:"required"`
}

View File

@ -53,9 +53,11 @@ CREATE TABLE `tags`
( (
id serial NOT NULL, id serial NOT NULL,
name varchar(255) NOT NULL, name varchar(255) NOT NULL,
application_id bigint unsigned NOT NULL, vectorized bool DEFAULT FALSE,
# application_id bigint unsigned NOT NULL,
primary key (id), primary key (id),
foreign key (application_id) references applications (id) on delete cascade index (vectorized)
# foreign key (application_id) references applications (id) on delete cascade
); );
CREATE TABLE `tag_mappings` CREATE TABLE `tag_mappings`
@ -79,10 +81,11 @@ CREATE TABLE `posts`
content LONGTEXT NOT NULL, content LONGTEXT NOT NULL,
application_id bigint unsigned NOT NULL, application_id bigint unsigned NOT NULL,
processed BOOLEAN DEFAULT FALSE, processed BOOLEAN DEFAULT FALSE,
vectorized bool DEFAULT FALSE,
category_id bigint unsigned, category_id bigint unsigned,
created_at timestamp DEFAULT CURRENT_TIMESTAMP, created_at timestamp DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp DEFAULT CURRENT_TIMESTAMP, updated_at timestamp DEFAULT CURRENT_TIMESTAMP,
index (target_id, processed, application_id), index (target_id, processed, vectorized, application_id),
primary key (id), primary key (id),
foreign key (application_id) references applications (id) on delete cascade, foreign key (application_id) references applications (id) on delete cascade,
foreign key (category_id) references categories (id) foreign key (category_id) references categories (id)
@ -138,6 +141,6 @@ DROP TABLE IF EXISTS `user_likes`;
DROP TABLE IF EXISTS `posts`; DROP TABLE IF EXISTS `posts`;
DROP TABLE IF EXISTS `tags`; DROP TABLE IF EXISTS `tags`;
DROP TABLE IF EXISTS `categories`; DROP TABLE IF EXISTS `categories`;
DROP TABLE IF EXISTS `external_users`;
DROP TABLE IF EXISTS `application_tokens`; DROP TABLE IF EXISTS `application_tokens`;
DROP TABLE IF EXISTS `applications`; DROP TABLE IF EXISTS `applications`;
DROP TABLE IF EXISTS `external_users`;

View File

@ -48,4 +48,5 @@ func (a *Api) InitApplicationApi(r *gin.RouterGroup) {
r.POST("/users/_like", a.h.ApplicationUserApi.Like) r.POST("/users/_like", a.h.ApplicationUserApi.Like)
r.POST("/users/_dislike", a.h.ApplicationUserApi.Dislike) r.POST("/users/_dislike", a.h.ApplicationUserApi.Dislike)
r.POST("/users/_suggest", a.h.ApplicationUserApi.Suggest)
} }

View File

@ -40,3 +40,12 @@ func (s *Service) DeleteCategory(c context.Context, category *entity.Category) e
func (s *Service) GetCategoryById(c context.Context, categoryId schema.EntityId) (*entity.Category, error) { func (s *Service) GetCategoryById(c context.Context, categoryId schema.EntityId) (*entity.Category, error) {
return s.dao.WithContext(c).Category.Where(s.dao.Category.Id.Eq(categoryId.Uint())).First() return s.dao.WithContext(c).Category.Where(s.dao.Category.Id.Eq(categoryId.Uint())).First()
} }
func (s *Service) CategoryExists(c context.Context, categoryId schema.EntityId, applicationEntity *entity.Application) (bool, error) {
count, err := s.dao.WithContext(c).Category.
Where(s.dao.Category.Id.Eq(categoryId.Uint())).
Where(s.dao.Category.ApplicationId.Eq(applicationEntity.Id.Uint())).
Count()
return count > 0, err
}

View File

@ -2,10 +2,10 @@ package embedding
import ( import (
"github.com/tmc/langchaingo/llms/openai" "github.com/tmc/langchaingo/llms/openai"
"leafdev.top/Leaf/leaf-library/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Leaf/leaf-library/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
"leafdev.top/Leaf/leaf-library/internal/base/redis" "leafdev.top/Ecosystem/recommender/internal/base/redis"
"leafdev.top/Leaf/leaf-library/internal/dao" "leafdev.top/Ecosystem/recommender/internal/dao"
) )
type Service struct { type Service struct {
@ -19,7 +19,7 @@ type Service struct {
func NewService(config *conf.Config, logger *logger.Logger, dao *dao.Query, redis *redis.Redis) *Service { func NewService(config *conf.Config, logger *logger.Logger, dao *dao.Query, redis *redis.Redis) *Service {
llm, err := openai.New( llm, err := openai.New(
openai.WithToken(config.OpenAI.ApiKey), openai.WithToken(config.OpenAI.ApiKey),
openai.WithBaseURL(config.OpenAI.InternalBaseUrl), openai.WithBaseURL(config.OpenAI.BaseUrl),
openai.WithEmbeddingModel(config.OpenAI.EmbeddingModel), openai.WithEmbeddingModel(config.OpenAI.EmbeddingModel),
) )

View File

@ -5,6 +5,7 @@ import (
"github.com/iVampireSP/pkg/page" "github.com/iVampireSP/pkg/page"
"leafdev.top/Ecosystem/recommender/internal/entity" "leafdev.top/Ecosystem/recommender/internal/entity"
"leafdev.top/Ecosystem/recommender/internal/schema" "leafdev.top/Ecosystem/recommender/internal/schema"
"leafdev.top/Ecosystem/recommender/pkg/consts"
) )
func (s *Service) ListPosts(c context.Context, pagedResult *page.PagedResult[*entity.Post], category *entity.Category, application *entity.Application) error { func (s *Service) ListPosts(c context.Context, pagedResult *page.PagedResult[*entity.Post], category *entity.Category, application *entity.Application) error {
@ -29,8 +30,23 @@ func (s *Service) ListPosts(c context.Context, pagedResult *page.PagedResult[*en
return nil return nil
} }
func (s *Service) CreatePost(c context.Context, post *entity.Post) error { func (s *Service) TargetIdExists(c context.Context, application *entity.Application, targetId string) (bool, error) {
err := s.dao.WithContext(c).Post.Create(post) count, err := s.dao.WithContext(c).Post.Where(s.dao.Post.TargetId.Eq(targetId)).
Where(s.dao.Post.ApplicationId.Eq(application.Id.Uint())).
Count()
return count > 0, err
}
func (s *Service) CreatePost(c context.Context, applicationEntity *entity.Application, post *entity.Post) error {
// 检测 target_id 是否存在
exists, err := s.TargetIdExists(c, applicationEntity, post.TargetId)
if exists {
return consts.ErrPostTargetIdExists
}
post.ApplicationId = applicationEntity.Id
err = s.dao.WithContext(c).Post.Create(post)
if err != nil { if err != nil {
return err return err

View File

@ -1,8 +1,10 @@
package post package post
import ( import (
"github.com/milvus-io/milvus-sdk-go/v2/client"
"leafdev.top/Ecosystem/recommender/internal/base/conf" "leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Ecosystem/recommender/internal/dao" "leafdev.top/Ecosystem/recommender/internal/dao"
"leafdev.top/Ecosystem/recommender/internal/service/embedding"
"leafdev.top/Ecosystem/recommender/internal/service/stream" "leafdev.top/Ecosystem/recommender/internal/service/stream"
) )
@ -10,12 +12,16 @@ type Service struct {
dao *dao.Query dao *dao.Query
config *conf.Config config *conf.Config
stream *stream.Service stream *stream.Service
milvus client.Client
embedding *embedding.Service
} }
func NewService(dao *dao.Query, config *conf.Config, stream *stream.Service) *Service { func NewService(dao *dao.Query, config *conf.Config, stream *stream.Service, milvus client.Client, embedding *embedding.Service) *Service {
return &Service{ return &Service{
dao: dao, dao: dao,
config: config, config: config,
stream: stream, stream: stream,
milvus: milvus,
embedding: embedding,
} }
} }

View File

@ -2,12 +2,14 @@ package post
import ( import (
"context" "context"
entity2 "github.com/milvus-io/milvus-sdk-go/v2/entity"
"leafdev.top/Ecosystem/recommender/internal/entity" "leafdev.top/Ecosystem/recommender/internal/entity"
) )
func (s *Service) GetTag(c context.Context, name string, applicationEntity *entity.Application) (*entity.Tag, error) { func (s *Service) GetTag(c context.Context, name string) (*entity.Tag, error) {
var tmq = s.dao.WithContext(c).TagMapping.Where(s.dao.TagMapping.Name.Eq(name)). var tmq = s.dao.WithContext(c).TagMapping.Where(s.dao.TagMapping.Name.Eq(name))
Where(s.dao.TagMapping.ApplicationId.Eq(applicationEntity.Id.Uint())) //Where(s.dao.TagMapping.ApplicationId.Eq(applicationEntity.Id.Uint()))
tmqCount, err := tmq.Count() tmqCount, err := tmq.Count()
if err != nil { if err != nil {
return nil, err return nil, err
@ -23,13 +25,23 @@ func (s *Service) GetTag(c context.Context, name string, applicationEntity *enti
return r.Tag, nil return r.Tag, nil
} }
return s.dao.WithContext(c).Tag.Where(s.dao.Tag.Name.Eq(name)). t, err := s.dao.WithContext(c).Tag.Where(s.dao.Tag.Name.Eq(name)).
Where(s.dao.Tag.ApplicationId.Eq(applicationEntity.Id.Uint())). //Where(s.dao.Tag.ApplicationId.Eq(applicationEntity.Id.Uint())).
FirstOrCreate() FirstOrCreate()
if err != nil {
return nil, err
}
if !t.Vectorized {
err = s.SaveTagEmbedding(c, t)
}
return t, err
} }
func (s *Service) HasBindTag(c context.Context, post *entity.Post, tagName string) (bool, error) { func (s *Service) HasBindTag(c context.Context, post *entity.Post, tagName string) (bool, error) {
tag, err := s.GetTag(c, tagName, post.Application) tag, err := s.GetTag(c, tagName)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -44,7 +56,7 @@ func (s *Service) HasBindTag(c context.Context, post *entity.Post, tagName strin
} }
func (s *Service) BindTag(c context.Context, post *entity.Post, tagName string) error { func (s *Service) BindTag(c context.Context, post *entity.Post, tagName string) error {
tag, err := s.GetTag(c, tagName, post.Application) tag, err := s.GetTag(c, tagName)
if err != nil { if err != nil {
return err return err
} }
@ -69,3 +81,64 @@ func (s *Service) MarkAsProcessed(c context.Context, post *entity.Post) error {
return err return err
} }
func (s *Service) SaveTagEmbedding(c context.Context, tag *entity.Tag) error {
emb, err := s.embedding.TextEmbedding(c, []string{tag.Name})
if err != nil {
return err
}
var entityCols = []entity2.Column{
entity2.NewColumnFloatVector("vector", s.config.OpenAI.EmbeddingDim, emb),
entity2.NewColumnInt64("tag_id", []int64{int64(tag.Id)}),
}
_, err = s.milvus.Upsert(c, s.config.Milvus.TagCollection, "", entityCols...)
if err != nil {
return err
}
_, err = s.dao.WithContext(c).Tag.Where(s.dao.Tag.Id.Eq(tag.Id.Uint())).Update(s.dao.Tag.Vectorized, true)
return err
}
func (s *Service) SavePostEmbedding(c context.Context, post *entity.Post) error {
tags, err := s.GetPostTags(c, post)
if err != nil {
return err
}
var tagString = ""
for _, tag := range tags {
tagString += tag.Name + " "
}
// 裁剪 > s.config.OpenAI.EmbeddingDim
if len(tagString) > s.config.OpenAI.EmbeddingDim {
tagString = tagString[:s.config.OpenAI.EmbeddingDim]
}
emb, err := s.embedding.TextEmbedding(c, []string{tagString})
if err != nil {
return err
}
var entityCols = []entity2.Column{
entity2.NewColumnFloatVector("vector", s.config.OpenAI.EmbeddingDim, emb),
entity2.NewColumnInt64("post_id", []int64{int64(post.Id)}),
entity2.NewColumnInt64("category_id", []int64{int64(post.CategoryId.Uint())}),
entity2.NewColumnInt64("application_id", []int64{int64(post.ApplicationId.Uint())}),
}
_, err = s.milvus.Upsert(c, s.config.Milvus.PostCollection, "", entityCols...)
if err != nil {
return err
}
_, err = s.dao.WithContext(c).Post.Where(s.dao.Post.Id.Eq(post.Id.Uint())).Update(s.dao.Post.Vectorized, true)
return err
}

View File

@ -5,6 +5,7 @@ import (
"leafdev.top/Ecosystem/recommender/internal/service/application" "leafdev.top/Ecosystem/recommender/internal/service/application"
"leafdev.top/Ecosystem/recommender/internal/service/auth" "leafdev.top/Ecosystem/recommender/internal/service/auth"
"leafdev.top/Ecosystem/recommender/internal/service/category" "leafdev.top/Ecosystem/recommender/internal/service/category"
"leafdev.top/Ecosystem/recommender/internal/service/embedding"
"leafdev.top/Ecosystem/recommender/internal/service/jwks" "leafdev.top/Ecosystem/recommender/internal/service/jwks"
"leafdev.top/Ecosystem/recommender/internal/service/post" "leafdev.top/Ecosystem/recommender/internal/service/post"
"leafdev.top/Ecosystem/recommender/internal/service/stream" "leafdev.top/Ecosystem/recommender/internal/service/stream"
@ -22,10 +23,12 @@ type Service struct {
Post *post.Service Post *post.Service
Category *category.Service Category *category.Service
User *user.Service User *user.Service
Embedding *embedding.Service
} }
var Provider = wire.NewSet( var Provider = wire.NewSet(
jwks.NewJWKS, jwks.NewJWKS,
embedding.NewService,
stream.NewService, stream.NewService,
auth.NewAuthService, auth.NewAuthService,
application.NewService, application.NewService,
@ -44,6 +47,7 @@ func NewService(
post *post.Service, post *post.Service,
category *category.Service, category *category.Service,
user *user.Service, user *user.Service,
embedding *embedding.Service,
) *Service { ) *Service {
return &Service{ return &Service{
logger, logger,
@ -54,5 +58,6 @@ func NewService(
post, post,
category, category,
user, user,
embedding,
} }
} }

View File

@ -5,6 +5,20 @@ import (
"leafdev.top/Ecosystem/recommender/internal/entity" "leafdev.top/Ecosystem/recommender/internal/entity"
) )
func (s *Service) GetExternalUser(c context.Context, externalUserId string, applicationEntity *entity.Application) (*entity.ExternalUser, error) {
iu, err := s.dao.WithContext(c).ExternalUser.
Where(s.dao.ExternalUser.ApplicationId.Eq(applicationEntity.Id.Uint())).
Where(s.dao.ExternalUser.ExternalId.Eq(externalUserId)).First()
return iu, err
}
func (s *Service) UserExists(c context.Context, externalUserId string, applicationEntity *entity.Application) (bool, error) {
iu, err := s.dao.WithContext(c).ExternalUser.
Where(s.dao.ExternalUser.ApplicationId.Eq(applicationEntity.Id.Uint())).
Where(s.dao.ExternalUser.ExternalId.Eq(externalUserId)).Count()
return iu > 0, err
}
func (s *Service) GetOrCreateExternalUser(c context.Context, externalUserId string, applicationEntity *entity.Application) (*entity.ExternalUser, error) { func (s *Service) GetOrCreateExternalUser(c context.Context, externalUserId string, applicationEntity *entity.Application) (*entity.ExternalUser, error) {
//Where(s.dao.UserTagScore.ExternalUserId.Eq(externalUserEntity.Id.Uint())). //Where(s.dao.UserTagScore.ExternalUserId.Eq(externalUserEntity.Id.Uint())).
count, err := s.dao.WithContext(c).ExternalUser. count, err := s.dao.WithContext(c).ExternalUser.

View File

@ -4,13 +4,9 @@ import (
"context" "context"
"leafdev.top/Ecosystem/recommender/internal/entity" "leafdev.top/Ecosystem/recommender/internal/entity"
"leafdev.top/Ecosystem/recommender/internal/schema" "leafdev.top/Ecosystem/recommender/internal/schema"
"time" "leafdev.top/Ecosystem/recommender/pkg/consts"
) )
const TaskProcessing = "user_likes"
var LockTTL = time.Minute * 10
func (s *Service) HasLiked(c context.Context, externalUserEntity *entity.ExternalUser, applicationEntity *entity.Application, postEntity *entity.Post) (bool, error) { func (s *Service) HasLiked(c context.Context, externalUserEntity *entity.ExternalUser, applicationEntity *entity.Application, postEntity *entity.Post) (bool, error) {
count, err := s.dao.WithContext(c).UserLike.Where(s.dao.UserLike.ExternalUserId.Eq(externalUserEntity.Id.Uint())). count, err := s.dao.WithContext(c).UserLike.Where(s.dao.UserLike.ExternalUserId.Eq(externalUserEntity.Id.Uint())).
Where(s.dao.UserLike.PostId.Eq(postEntity.Id.Uint())). Where(s.dao.UserLike.PostId.Eq(postEntity.Id.Uint())).
@ -68,6 +64,10 @@ func (s *Service) LikePost(c context.Context, externalUserEntity *entity.Externa
// } // }
//}(lock, c) //}(lock, c)
if !postEntity.Vectorized || !postEntity.Processed {
return consts.ErrPostNotReady
}
// get tags // get tags
postTags, err := s.postService.GetPostTags(c, postEntity) postTags, err := s.postService.GetPostTags(c, postEntity)
if err != nil { if err != nil {
@ -111,6 +111,10 @@ func (s *Service) DislikePost(c context.Context, externalUserEntity *entity.Exte
// } // }
//}(lock, c) //}(lock, c)
if !postEntity.Vectorized || !postEntity.Processed {
return consts.ErrPostNotReady
}
// get tags // get tags
postTags, err := s.postService.GetPostTags(c, postEntity) postTags, err := s.postService.GetPostTags(c, postEntity)
if err != nil { if err != nil {

View File

@ -1,8 +1,11 @@
package user package user
import ( import (
"github.com/milvus-io/milvus-sdk-go/v2/client"
"leafdev.top/Ecosystem/recommender/internal/base/conf"
"leafdev.top/Ecosystem/recommender/internal/base/logger" "leafdev.top/Ecosystem/recommender/internal/base/logger"
"leafdev.top/Ecosystem/recommender/internal/dao" "leafdev.top/Ecosystem/recommender/internal/dao"
"leafdev.top/Ecosystem/recommender/internal/service/embedding"
"leafdev.top/Ecosystem/recommender/internal/service/post" "leafdev.top/Ecosystem/recommender/internal/service/post"
) )
@ -10,16 +13,25 @@ type Service struct {
dao *dao.Query dao *dao.Query
postService *post.Service postService *post.Service
logger *logger.Logger logger *logger.Logger
milvus client.Client
embedding *embedding.Service
config *conf.Config
} }
func NewService( func NewService(
dao *dao.Query, dao *dao.Query,
postService *post.Service, postService *post.Service,
logger *logger.Logger, logger *logger.Logger,
milvus client.Client,
embedding *embedding.Service,
config *conf.Config,
) *Service { ) *Service {
return &Service{ return &Service{
dao: dao, dao: dao,
postService: postService, postService: postService,
logger: logger, logger: logger,
milvus: milvus,
embedding: embedding,
config: config,
} }
} }

View File

@ -0,0 +1,70 @@
package user
import (
"context"
"fmt"
"github.com/milvus-io/milvus-sdk-go/v2/client"
entity2 "github.com/milvus-io/milvus-sdk-go/v2/entity"
"leafdev.top/Ecosystem/recommender/internal/entity"
)
func (s *Service) SuggestPosts(c context.Context, externalUserEntity *entity.ExternalUser, categoryEntity *entity.Category) ([]*entity.Post, error) {
emb, err := s.embedding.TextEmbedding(c, []string{externalUserEntity.Summary})
if err != nil {
return nil, err
}
var filter = fmt.Sprintf("application_id == %d && category_id == %s", externalUserEntity.ApplicationId, categoryEntity.Id)
sp, err := entity2.NewIndexAUTOINDEXSearchParam(1)
if err != nil {
return nil, err
}
vector := entity2.FloatVector(emb[0])
postResults, err := s.milvus.Search(c, s.config.Milvus.PostCollection,
[]string{},
filter,
[]string{"post_id", "category_id"},
[]entity2.Vector{vector},
"vector",
entity2.L2,
3,
sp, client.WithLimit(7))
var ids []uint
for _, res := range postResults {
// 没找到,直接返回空的
if res.ResultCount == 0 {
return make([]*entity.Post, 0), nil
}
var blockIdColumn *entity2.ColumnInt64
for _, field := range res.Fields {
if field.Name() == "post_id" {
c, ok := field.(*entity2.ColumnInt64)
if ok {
blockIdColumn = c
}
}
}
// 没有记录
if blockIdColumn == nil {
return make([]*entity.Post, 0), nil
//return nil, fmt.Errorf("block_id column not found")
}
for i := 0; i < res.ResultCount; i++ {
id, err := blockIdColumn.ValueByIdx(i)
if err != nil {
return nil, err
}
ids = append(ids, uint(id))
}
}
posts, err := s.dao.Post.Where(s.dao.Post.Where(s.dao.Post.Id.In(ids...))).Find()
return posts, err
}

View File

@ -0,0 +1,59 @@
package user
import (
"context"
entity2 "github.com/milvus-io/milvus-sdk-go/v2/entity"
"leafdev.top/Ecosystem/recommender/internal/entity"
"leafdev.top/Ecosystem/recommender/pkg/consts"
)
func (s *Service) SummaryUser(c context.Context, externalUserEntity *entity.ExternalUser, applicationEntity *entity.Application) error {
tags, err := s.GetHighScoreTags(c, externalUserEntity, applicationEntity)
if err != nil {
return err
}
if len(tags) == 0 {
return consts.ErrExternalUserDoesNotLikeAnyPost
}
var tagString = ""
for _, tag := range tags {
tagString += tag.Name + " "
}
// 裁剪 > s.config.OpenAI.EmbeddingDim
if len(tagString) > s.config.OpenAI.EmbeddingDim {
tagString = tagString[:s.config.OpenAI.EmbeddingDim]
}
if externalUserEntity.Summary != tagString {
_, err = s.dao.ExternalUser.Where(s.dao.ExternalUser.Id.Eq(externalUserEntity.Id.Uint())).Update(
s.dao.ExternalUser.Summary, tagString,
)
if err != nil {
return err
}
emb, err := s.embedding.TextEmbedding(c, []string{tagString})
if err != nil {
return err
}
var entityCols = []entity2.Column{
entity2.NewColumnFloatVector("vector", s.config.OpenAI.EmbeddingDim, emb),
entity2.NewColumnInt64("external_user_id", []int64{int64(externalUserEntity.Id)}),
entity2.NewColumnInt64("application_id", []int64{int64(applicationEntity.Id)}),
}
_, err = s.milvus.Upsert(c, s.config.Milvus.UserSummaryCollection, "", entityCols...)
if err != nil {
return err
}
}
return err
}

View File

@ -112,3 +112,25 @@ func (s *Service) RemoveTags(c context.Context, externalUserEntity *entity.Exter
return nil return nil
} }
func (s *Service) GetHighScoreTags(c context.Context, externalUserEntity *entity.ExternalUser, applicationEntity *entity.Application) ([]*entity.Tag, error) {
tagScores, err := s.dao.WithContext(c).UserTagScore.
Preload(s.dao.UserTagScore.Tag).
Where(s.dao.UserTagScore.ApplicationId.Eq(applicationEntity.Id.Uint())).
Where(s.dao.UserTagScore.ExternalUserId.Eq(externalUserEntity.Id.Uint())).
Order(s.dao.UserTagScore.Score.Desc()).
//Where(s.dao.UserTagScore.Score.Gt(3)).
Limit(30).Find()
if err != nil {
return nil, err
}
var tags []*entity.Tag
for _, tagScore := range tagScores {
tags = append(tags, tagScore.Tag)
}
return tags, nil
}

8
pkg/consts/category.go Normal file
View File

@ -0,0 +1,8 @@
package consts
import "errors"
var (
ErrCategoryExists = errors.New("category already exists")
ErrCategoryNotExists = errors.New("category does not exist")
)

View File

@ -3,5 +3,8 @@ package consts
import "errors" import "errors"
var ( var (
ErrPostNotReady = errors.New("post not ready")
ErrAnotherOperationInProgress = errors.New("another operation in progress") ErrAnotherOperationInProgress = errors.New("another operation in progress")
ErrExternalUserDoesNotLikeAnyPost = errors.New("external user does not like any post")
ErrPostTargetIdExists = errors.New("post target id exists")
) )