From 752639560fd2a6a5c655f66e3ed6a04159faf8db Mon Sep 17 00:00:00 2001 From: JustSong Date: Fri, 15 Mar 2024 00:30:15 +0800 Subject: [PATCH] feat: able to use separated database for table logs --- README.md | 39 +++++++++++++++++++------------------ main.go | 16 +++++++++++++++- model/log.go | 24 +++++++++++------------ model/main.go | 53 ++++++++++++++++++++++++++++++--------------------- 4 files changed, 78 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 0ba659c4..d0b6b70c 100644 --- a/README.md +++ b/README.md @@ -349,38 +349,39 @@ graph LR + `SQL_MAX_OPEN_CONNS`:最大打开连接数,默认为 `1000`。 + 如果报错 `Error 1040: Too many connections`,请适当减小该值。 + `SQL_CONN_MAX_LIFETIME`:连接的最大生命周期,默认为 `60`,单位分钟。 -4. `FRONTEND_BASE_URL`:设置之后将重定向页面请求到指定的地址,仅限从服务器设置。 +4. `LOG_SQL_DSN`:设置之后将为 `logs` 表使用独立的数据库,请使用 MySQL 或 PostgreSQL。 +5. `FRONTEND_BASE_URL`:设置之后将重定向页面请求到指定的地址,仅限从服务器设置。 + 例子:`FRONTEND_BASE_URL=https://openai.justsong.cn` -5. `MEMORY_CACHE_ENABLED`:启用内存缓存,会导致用户额度的更新存在一定的延迟,可选值为 `true` 和 `false`,未设置则默认为 `false`。 +6. `MEMORY_CACHE_ENABLED`:启用内存缓存,会导致用户额度的更新存在一定的延迟,可选值为 `true` 和 `false`,未设置则默认为 `false`。 + 例子:`MEMORY_CACHE_ENABLED=true` -6. `SYNC_FREQUENCY`:在启用缓存的情况下与数据库同步配置的频率,单位为秒,默认为 `600` 秒。 +7. `SYNC_FREQUENCY`:在启用缓存的情况下与数据库同步配置的频率,单位为秒,默认为 `600` 秒。 + 例子:`SYNC_FREQUENCY=60` -7. `NODE_TYPE`:设置之后将指定节点类型,可选值为 `master` 和 `slave`,未设置则默认为 `master`。 +8. `NODE_TYPE`:设置之后将指定节点类型,可选值为 `master` 和 `slave`,未设置则默认为 `master`。 + 例子:`NODE_TYPE=slave` -8. `CHANNEL_UPDATE_FREQUENCY`:设置之后将定期更新渠道余额,单位为分钟,未设置则不进行更新。 +9. `CHANNEL_UPDATE_FREQUENCY`:设置之后将定期更新渠道余额,单位为分钟,未设置则不进行更新。 + 例子:`CHANNEL_UPDATE_FREQUENCY=1440` -9. `CHANNEL_TEST_FREQUENCY`:设置之后将定期检查渠道,单位为分钟,未设置则不进行检查。 - + 例子:`CHANNEL_TEST_FREQUENCY=1440` -10. `POLLING_INTERVAL`:批量更新渠道余额以及测试可用性时的请求间隔,单位为秒,默认无间隔。 +10. `CHANNEL_TEST_FREQUENCY`:设置之后将定期检查渠道,单位为分钟,未设置则不进行检查。 + + 例子:`CHANNEL_TEST_FREQUENCY=1440` +11. `POLLING_INTERVAL`:批量更新渠道余额以及测试可用性时的请求间隔,单位为秒,默认无间隔。 + 例子:`POLLING_INTERVAL=5` -11. `BATCH_UPDATE_ENABLED`:启用数据库批量更新聚合,会导致用户额度的更新存在一定的延迟可选值为 `true` 和 `false`,未设置则默认为 `false`。 +12. `BATCH_UPDATE_ENABLED`:启用数据库批量更新聚合,会导致用户额度的更新存在一定的延迟可选值为 `true` 和 `false`,未设置则默认为 `false`。 + 例子:`BATCH_UPDATE_ENABLED=true` + 如果你遇到了数据库连接数过多的问题,可以尝试启用该选项。 -12. `BATCH_UPDATE_INTERVAL=5`:批量更新聚合的时间间隔,单位为秒,默认为 `5`。 +13. `BATCH_UPDATE_INTERVAL=5`:批量更新聚合的时间间隔,单位为秒,默认为 `5`。 + 例子:`BATCH_UPDATE_INTERVAL=5` -13. 请求频率限制: +14. 请求频率限制: + `GLOBAL_API_RATE_LIMIT`:全局 API 速率限制(除中继请求外),单 ip 三分钟内的最大请求数,默认为 `180`。 + `GLOBAL_WEB_RATE_LIMIT`:全局 Web 速率限制,单 ip 三分钟内的最大请求数,默认为 `60`。 -14. 编码器缓存设置: +15. 编码器缓存设置: + `TIKTOKEN_CACHE_DIR`:默认程序启动时会联网下载一些通用的词元的编码,如:`gpt-3.5-turbo`,在一些网络环境不稳定,或者离线情况,可能会导致启动有问题,可以配置此目录缓存数据,可迁移到离线环境。 + `DATA_GYM_CACHE_DIR`:目前该配置作用与 `TIKTOKEN_CACHE_DIR` 一致,但是优先级没有它高。 -15. `RELAY_TIMEOUT`:中继超时设置,单位为秒,默认不设置超时时间。 -16. `SQLITE_BUSY_TIMEOUT`:SQLite 锁等待超时设置,单位为毫秒,默认 `3000`。 -17. `GEMINI_SAFETY_SETTING`:Gemini 的安全设置,默认 `BLOCK_NONE`。 -18. `THEME`:系统的主题设置,默认为 `default`,具体可选值参考[此处](./web/README.md)。 -19. `ENABLE_METRIC`:是否根据请求成功率禁用渠道,默认不开启,可选值为 `true` 和 `false`。 -20. `METRIC_QUEUE_SIZE`:请求成功率统计队列大小,默认为 `10`。 -21. `METRIC_SUCCESS_RATE_THRESHOLD`:请求成功率阈值,默认为 `0.8`。 +16. `RELAY_TIMEOUT`:中继超时设置,单位为秒,默认不设置超时时间。 +17. `SQLITE_BUSY_TIMEOUT`:SQLite 锁等待超时设置,单位为毫秒,默认 `3000`。 +18. `GEMINI_SAFETY_SETTING`:Gemini 的安全设置,默认 `BLOCK_NONE`。 +19. `THEME`:系统的主题设置,默认为 `default`,具体可选值参考[此处](./web/README.md)。 +20. `ENABLE_METRIC`:是否根据请求成功率禁用渠道,默认不开启,可选值为 `true` 和 `false`。 +21. `METRIC_QUEUE_SIZE`:请求成功率统计队列大小,默认为 `10`。 +22. `METRIC_SUCCESS_RATE_THRESHOLD`:请求成功率阈值,默认为 `0.8`。 ### 命令行参数 1. `--port `: 指定服务器监听的端口号,默认为 `3000`。 diff --git a/main.go b/main.go index 83d7e7ed..b20c6daf 100644 --- a/main.go +++ b/main.go @@ -30,11 +30,25 @@ func main() { if config.DebugEnabled { logger.SysLog("running in debug mode") } + var err error // Initialize SQL Database - err := model.InitDB() + model.DB, err = model.InitDB("SQL_DSN") if err != nil { logger.FatalLog("failed to initialize database: " + err.Error()) } + if os.Getenv("LOG_SQL_DSN") != "" { + logger.SysLog("using secondary database for table logs") + model.LOG_DB, err = model.InitDB("LOG_SQL_DSN") + if err != nil { + logger.FatalLog("failed to initialize secondary database: " + err.Error()) + } + } else { + model.LOG_DB = model.DB + } + err = model.CreateRootAccountIfNeed() + if err != nil { + logger.FatalLog("database init error: " + err.Error()) + } defer func() { err := model.CloseDB() if err != nil { diff --git a/model/log.go b/model/log.go index 85c0ba90..4409f73e 100644 --- a/model/log.go +++ b/model/log.go @@ -45,7 +45,7 @@ func RecordLog(userId int, logType int, content string) { Type: logType, Content: content, } - err := DB.Create(log).Error + err := LOG_DB.Create(log).Error if err != nil { logger.SysError("failed to record log: " + err.Error()) } @@ -69,7 +69,7 @@ func RecordConsumeLog(ctx context.Context, userId int, channelId int, promptToke Quota: int(quota), ChannelId: channelId, } - err := DB.Create(log).Error + err := LOG_DB.Create(log).Error if err != nil { logger.Error(ctx, "failed to record log: "+err.Error()) } @@ -78,9 +78,9 @@ func RecordConsumeLog(ctx context.Context, userId int, channelId int, promptToke func GetAllLogs(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, startIdx int, num int, channel int) (logs []*Log, err error) { var tx *gorm.DB if logType == LogTypeUnknown { - tx = DB + tx = LOG_DB } else { - tx = DB.Where("type = ?", logType) + tx = LOG_DB.Where("type = ?", logType) } if modelName != "" { tx = tx.Where("model_name = ?", modelName) @@ -107,9 +107,9 @@ func GetAllLogs(logType int, startTimestamp int64, endTimestamp int64, modelName func GetUserLogs(userId int, logType int, startTimestamp int64, endTimestamp int64, modelName string, tokenName string, startIdx int, num int) (logs []*Log, err error) { var tx *gorm.DB if logType == LogTypeUnknown { - tx = DB.Where("user_id = ?", userId) + tx = LOG_DB.Where("user_id = ?", userId) } else { - tx = DB.Where("user_id = ? and type = ?", userId, logType) + tx = LOG_DB.Where("user_id = ? and type = ?", userId, logType) } if modelName != "" { tx = tx.Where("model_name = ?", modelName) @@ -128,17 +128,17 @@ func GetUserLogs(userId int, logType int, startTimestamp int64, endTimestamp int } func SearchAllLogs(keyword string) (logs []*Log, err error) { - err = DB.Where("type = ? or content LIKE ?", keyword, keyword+"%").Order("id desc").Limit(config.MaxRecentItems).Find(&logs).Error + err = LOG_DB.Where("type = ? or content LIKE ?", keyword, keyword+"%").Order("id desc").Limit(config.MaxRecentItems).Find(&logs).Error return logs, err } func SearchUserLogs(userId int, keyword string) (logs []*Log, err error) { - err = DB.Where("user_id = ? and type = ?", userId, keyword).Order("id desc").Limit(config.MaxRecentItems).Omit("id").Find(&logs).Error + err = LOG_DB.Where("user_id = ? and type = ?", userId, keyword).Order("id desc").Limit(config.MaxRecentItems).Omit("id").Find(&logs).Error return logs, err } func SumUsedQuota(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string, channel int) (quota int64) { - tx := DB.Table("logs").Select("ifnull(sum(quota),0)") + tx := LOG_DB.Table("logs").Select("ifnull(sum(quota),0)") if username != "" { tx = tx.Where("username = ?", username) } @@ -162,7 +162,7 @@ func SumUsedQuota(logType int, startTimestamp int64, endTimestamp int64, modelNa } func SumUsedToken(logType int, startTimestamp int64, endTimestamp int64, modelName string, username string, tokenName string) (token int) { - tx := DB.Table("logs").Select("ifnull(sum(prompt_tokens),0) + ifnull(sum(completion_tokens),0)") + tx := LOG_DB.Table("logs").Select("ifnull(sum(prompt_tokens),0) + ifnull(sum(completion_tokens),0)") if username != "" { tx = tx.Where("username = ?", username) } @@ -183,7 +183,7 @@ func SumUsedToken(logType int, startTimestamp int64, endTimestamp int64, modelNa } func DeleteOldLog(targetTimestamp int64) (int64, error) { - result := DB.Where("created_at < ?", targetTimestamp).Delete(&Log{}) + result := LOG_DB.Where("created_at < ?", targetTimestamp).Delete(&Log{}) return result.RowsAffected, result.Error } @@ -207,7 +207,7 @@ func SearchLogsByDayAndModel(userId, start, end int) (LogStatistics []*LogStatis groupSelect = "strftime('%Y-%m-%d', datetime(created_at, 'unixepoch')) as day" } - err = DB.Raw(` + err = LOG_DB.Raw(` SELECT `+groupSelect+`, model_name, count(1) as request_count, sum(quota) as quota, diff --git a/model/main.go b/model/main.go index 05150fd9..ca7a35b2 100644 --- a/model/main.go +++ b/model/main.go @@ -17,8 +17,9 @@ import ( ) var DB *gorm.DB +var LOG_DB *gorm.DB -func createRootAccountIfNeed() error { +func CreateRootAccountIfNeed() error { var user User //if user.Status != util.UserStatusEnabled { if err := DB.First(&user).Error; err != nil { @@ -41,9 +42,9 @@ func createRootAccountIfNeed() error { return nil } -func chooseDB() (*gorm.DB, error) { - if os.Getenv("SQL_DSN") != "" { - dsn := os.Getenv("SQL_DSN") +func chooseDB(envName string) (*gorm.DB, error) { + if os.Getenv(envName) != "" { + dsn := os.Getenv(envName) if strings.HasPrefix(dsn, "postgres://") { // Use PostgreSQL logger.SysLog("using PostgreSQL as database") @@ -71,23 +72,22 @@ func chooseDB() (*gorm.DB, error) { }) } -func InitDB() (err error) { - db, err := chooseDB() +func InitDB(envName string) (db *gorm.DB, err error) { + db, err = chooseDB(envName) if err == nil { if config.DebugSQLEnabled { db = db.Debug() } - DB = db - sqlDB, err := DB.DB() + sqlDB, err := db.DB() if err != nil { - return err + return nil, err } sqlDB.SetMaxIdleConns(env.Int("SQL_MAX_IDLE_CONNS", 100)) sqlDB.SetMaxOpenConns(env.Int("SQL_MAX_OPEN_CONNS", 1000)) sqlDB.SetConnMaxLifetime(time.Second * time.Duration(env.Int("SQL_MAX_LIFETIME", 60))) if !config.IsMasterNode { - return nil + return db, err } if common.UsingMySQL { _, _ = sqlDB.Exec("DROP INDEX idx_channels_key ON channels;") // TODO: delete this line when most users have upgraded @@ -95,46 +95,55 @@ func InitDB() (err error) { logger.SysLog("database migration started") err = db.AutoMigrate(&Channel{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&Token{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&User{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&Option{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&Redemption{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&Ability{}) if err != nil { - return err + return nil, err } err = db.AutoMigrate(&Log{}) if err != nil { - return err + return nil, err } logger.SysLog("database migrated") - err = createRootAccountIfNeed() - return err + return db, err } else { logger.FatalLog(err) } - return err + return db, err } -func CloseDB() error { - sqlDB, err := DB.DB() +func closeDB(db *gorm.DB) error { + sqlDB, err := db.DB() if err != nil { return err } err = sqlDB.Close() return err } + +func CloseDB() error { + if LOG_DB != DB { + err := closeDB(LOG_DB) + if err != nil { + return err + } + } + return closeDB(DB) +}