fix: Change the implementation of retry from 307 redirect to Retry middleware

This commit is contained in:
AhhhLiu 2023-11-15 22:20:33 +08:00
parent 58bb3ab6f6
commit 3a443f0e47
6 changed files with 123 additions and 16 deletions

View File

@ -83,6 +83,7 @@ var PreConsumedQuota = 500
var ApproximateTokenEnabled = false var ApproximateTokenEnabled = false
var RetryTimes = 0 var RetryTimes = 0
var RetryInterval = 0 // unit is millisecond
var RootUserEmail = "" var RootUserEmail = ""
var IsMasterNode = os.Getenv("NODE_TYPE") != "slave" var IsMasterNode = os.Getenv("NODE_TYPE") != "slave"

View File

@ -1,10 +1,11 @@
package controller package controller
import ( import (
"encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"one-api/common" "one-api/common"
"strconv"
"strings" "strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -197,13 +198,26 @@ func Relay(c *gin.Context) {
} }
if err != nil { if err != nil {
requestId := c.GetString(common.RequestIdKey) requestId := c.GetString(common.RequestIdKey)
retryTimesStr := c.Query("retry") go func() {
retryTimes, _ := strconv.Atoi(retryTimesStr) defer func() {
if retryTimesStr == "" { if r := recover(); r != nil {
retryTimes = common.RetryTimes //ignore
} }
}()
channelId := c.GetInt("channel_id")
common.LogError(c.Request.Context(), fmt.Sprintf("relay error (channel #%d): %s", channelId, err.Message))
// https://platform.openai.com/docs/guides/error-codes/api-errors
if shouldDisableChannel(&err.OpenAIError, err.StatusCode) {
channelId := c.GetInt("channel_id")
channelName := c.GetString("channel_name")
disableChannel(channelId, channelName, err.Message)
}
}()
retryTimes := c.GetInt("retry")
if retryTimes > 0 { if retryTimes > 0 {
c.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("%s?retry=%d", c.Request.URL.Path, retryTimes-1)) openaiErr, _ := json.Marshal(err.OpenAIError)
_ = c.Error(errors.New(string(openaiErr)))
return
} else { } else {
if err.StatusCode == http.StatusTooManyRequests { if err.StatusCode == http.StatusTooManyRequests {
err.OpenAIError.Message = "当前分组上游负载已饱和,请稍后再试" err.OpenAIError.Message = "当前分组上游负载已饱和,请稍后再试"
@ -213,14 +227,6 @@ func Relay(c *gin.Context) {
"error": err.OpenAIError, "error": err.OpenAIError,
}) })
} }
channelId := c.GetInt("channel_id")
common.LogError(c.Request.Context(), fmt.Sprintf("relay error (channel #%d): %s", channelId, err.Message))
// https://platform.openai.com/docs/guides/error-codes/api-errors
if shouldDisableChannel(&err.OpenAIError, err.StatusCode) {
channelId := c.GetInt("channel_id")
channelName := c.GetString("channel_name")
disableChannel(channelId, channelName, err.Message)
}
} }
} }

80
middleware/retry.go Normal file
View File

@ -0,0 +1,80 @@
package middleware
import (
"bytes"
"github.com/gin-gonic/gin"
"io"
"net/http"
"one-api/common"
"strconv"
"time"
)
func RetryHandler(group *gin.RouterGroup) gin.HandlerFunc {
var retryHandler gin.HandlerFunc
// 获取RetryHandler在当前HandlersChain的位置
index := len(group.Handlers) + 1
retryHandler = func(c *gin.Context) {
// Backup request
hasBody := c.Request.ContentLength > 0
backupHeader := c.Request.Header.Clone()
var backupBody []byte
var err error
if hasBody {
backupBody, err = io.ReadAll(c.Request.Body)
if err != nil {
abortWithMessage(c, http.StatusBadRequest, "Invalid request")
return
}
_ = c.Request.Body.Close()
c.Request.Body = io.NopCloser(bytes.NewBuffer(backupBody))
}
// 获取 retryHandler 后续的中间件
// Get next handlers
nextHandlers := group.Handlers[index:]
// 加入Relay处理函数 c.Handler() => c.handlers.Last() => controller.Relay
// Add Relay handler
nextHandlers = append(nextHandlers, c.Handler())
// Retry
maxRetryStr := c.Query("retry")
maxRetry, err := strconv.Atoi(maxRetryStr)
if err != nil || maxRetryStr == "" || maxRetry < 0 || maxRetry > common.RetryTimes {
maxRetry = common.RetryTimes
}
retryDelay := time.Duration(common.RetryInterval) * time.Millisecond
for i := maxRetry; i >= 0; i-- {
c.Set("retry", i)
if i == maxRetry {
// 第一次请求, 直接执行使用c.Next()调用后续中间件, 防止直接使用handler 内部调用c.Next() 导致重复执行
// First request, execute next middleware
c.Next()
} else {
// 重试, 恢复请求头和请求体, 并执行后续中间件
// Retry, restore request and execute next middleware
c.Request.Header = backupHeader.Clone()
if hasBody {
c.Request.Body = io.NopCloser(bytes.NewBuffer(backupBody))
}
for _, handler := range nextHandlers {
handler(c)
}
}
// If no errors, return
if len(c.Errors) == 0 {
return
}
// c.index 指向 AbortIndex 可以防止出错时重复执行后续中间件
c.Abort()
// If errors, retry after delay
time.Sleep(retryDelay)
// Clear errors to avoid confusion in next middleware
c.Errors = c.Errors[:0]
}
}
return retryHandler
}

View File

@ -205,6 +205,8 @@ func updateOptionMap(key string, value string) (err error) {
common.PreConsumedQuota, _ = strconv.Atoi(value) common.PreConsumedQuota, _ = strconv.Atoi(value)
case "RetryTimes": case "RetryTimes":
common.RetryTimes, _ = strconv.Atoi(value) common.RetryTimes, _ = strconv.Atoi(value)
case "RetryInterval":
common.RetryInterval, _ = strconv.Atoi(value)
case "ModelRatio": case "ModelRatio":
err = common.UpdateModelRatioByJSONString(value) err = common.UpdateModelRatioByJSONString(value)
case "GroupRatio": case "GroupRatio":

View File

@ -17,6 +17,7 @@ func SetRelayRouter(router *gin.Engine) {
modelsRouter.GET("/:model", controller.RetrieveModel) modelsRouter.GET("/:model", controller.RetrieveModel)
} }
relayV1Router := router.Group("/v1") relayV1Router := router.Group("/v1")
relayV1Router.Use(middleware.RetryHandler(relayV1Router))
relayV1Router.Use(middleware.TokenAuth(), middleware.Distribute()) relayV1Router.Use(middleware.TokenAuth(), middleware.Distribute())
{ {
relayV1Router.POST("/completions", controller.Relay) relayV1Router.POST("/completions", controller.Relay)

View File

@ -21,7 +21,8 @@ const OperationSetting = () => {
DisplayInCurrencyEnabled: '', DisplayInCurrencyEnabled: '',
DisplayTokenStatEnabled: '', DisplayTokenStatEnabled: '',
ApproximateTokenEnabled: '', ApproximateTokenEnabled: '',
RetryTimes: 0 RetryTimes: 0,
RetryInterval: 0,
}); });
const [originInputs, setOriginInputs] = useState({}); const [originInputs, setOriginInputs] = useState({});
let [loading, setLoading] = useState(false); let [loading, setLoading] = useState(false);
@ -128,6 +129,9 @@ const OperationSetting = () => {
if (originInputs['RetryTimes'] !== inputs.RetryTimes) { if (originInputs['RetryTimes'] !== inputs.RetryTimes) {
await updateOption('RetryTimes', inputs.RetryTimes); await updateOption('RetryTimes', inputs.RetryTimes);
} }
if (originInputs['RetryInterval'] !== inputs.RetryInterval) {
await updateOption('RetryInterval', inputs.RetryInterval);
}
break; break;
} }
}; };
@ -190,6 +194,19 @@ const OperationSetting = () => {
value={inputs.RetryTimes} value={inputs.RetryTimes}
placeholder='失败重试次数' placeholder='失败重试次数'
/> />
<Form.Input
label='失败重试间隔(毫秒)'
name='RetryInterval'
type={'number'}
step='1'
min='0'
onChange={handleInputChange}
autoComplete='new-password'
value={inputs.RetryInterval}
placeholder='失败重试间隔'
/>
</Form.Group> </Form.Group>
<Form.Group inline> <Form.Group inline>
<Form.Checkbox <Form.Checkbox