以下文章來源于李文周 ,作者李文周
限流
限流又稱為流量控制(流控),通常是指限制到達系統的并發請求數。
我們生活中也會經常遇到限流的場景,比如:某景區限制每日進入景區的游客數量為 8 萬人;沙河地鐵站早高峰通過站外排隊逐一放行的方式限制同一時間進入車站的旅客數量等。
限流雖然會影響部分用戶的使用體驗,但是卻能在一定程度上保障系統的穩定性,不至于崩潰(大家都沒了用戶體驗)。
而互聯網上類似需要限流的業務場景也有很多,比如電商系統的秒殺、微博上突發熱點新聞、雙十一購物節、12306 搶票等等。這些場景下的用戶請求量通常會激增,遠遠超過平時正常的請求量,此時如果不加任何限制很容易就會將后端服務打垮,影響服務的穩定性。
此外,一些廠商公開的 API 服務通常也會限制用戶的請求次數,比如百度地圖開放平臺等會根據用戶的付費情況來限制用戶的請求數等。

常用的限流策略
漏桶
漏桶法限流很好理解,假設我們有一個水桶按固定的速率向下方滴落一滴水,無論有多少請求,請求的速率有多大,都按照固定的速率流出,對應到系統中就是按照固定的速率處理請求。

漏桶算法原理
漏桶法的關鍵點在于漏桶始終按照固定的速率運行,但是它并不能很好的處理有大量突發請求的場景,畢竟在某些場景下我們可能需要提高系統的處理效率,而不是一味的按照固定速率處理請求。
關于漏桶的實現,uber 團隊有一個開源的https://github.com/uber-go/ratelimit實現。使用方法也比較簡單,Take() 方法會返回漏桶下一次滴水的時間。
import (
"fmt"
"time"
"go.uber.org/ratelimit"
)func main() { rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
}
它的源碼實現也比較簡單,這里大致說一下關鍵的地方,有興趣的同學可以自己去看一下完整的源碼。
限制器是一個接口類型,其要求實現一個Take()方法:
type Limiter interface {
// Take方法應該阻塞已確保滿足 RPS
Take() time.Time
}
實現限制器接口的結構體定義如下,這里可以重點留意下maxSlack字段,它在后面的Take()方法中的處理。
type limiter struct {
sync.Mutex // 鎖 last time.Time // 上一次的時刻
sleepFor time.Duration // 需要等待的時間
perRequest time.Duration // 每次的時間間隔
maxSlack time.Duration // 最大的富余量
clock Clock // 時鐘
}
limiter結構體實現Limiter接口的Take()方法內容如下:
// Take 會阻塞確保兩次請求之間的時間走完
// Take 調用平均數為 time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// 如果是第一次請求就直接放行
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor 根據 perRequest 和上一次請求的時刻計算應該sleep的時間
// 由于每次請求間隔的時間可能會超過perRequest, 所以這個數字可能為負數,并在多個請求之間累加
t.sleepFor += t.perRequest - now.Sub(t.last)
// 我們不應該讓sleepFor負的太多,因為這意味著一個服務在短時間內慢了很多隨后會得到更高的RPS。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// 如果 sleepFor 是正值那么就 sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0
} else {
t.last = now } return t.last
}
上面的代碼根據記錄每次請求的間隔時間和上一次請求的時刻來計算當次請求需要阻塞的時間——sleepFor,這里需要留意的是sleepFor的值可能為負,在經過間隔時間長的兩次訪問之后會導致隨后大量的請求被放行,所以代碼中針對這個場景有專門的優化處理。maxSlack默認值可以通過創建限制器的New函數看到。
func New(rate int, opts ...Option) Limiter {
l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: -10 * time.Second / time.Duration(rate),
} for _, opt := range opts {
opt(l) } if l.clock == nil {
l.clock = clock.New() } return l
}
令牌桶
令牌桶其實和漏桶的原理類似,令牌桶按固定的速率往桶里放入令牌,并且只要能從桶里取出令牌就能通過,令牌桶支持突發流量的快速處理。

令牌桶原理
對于從桶里取不到令牌的場景,我們可以選擇等待也可以直接拒絕并返回。
對于令牌桶的 Go 語言實現,大家可以參照https://github.com/juju/ratelimit。
這個庫支持多種令牌桶模式,并且使用起來也比較簡單。
創建令牌桶的方法:
// 創建指定填充速率和容量大小的令牌桶
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// 創建指定填充速率、容量大小和每次填充的令牌數的令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
// 創建填充速度為指定速率和容量大小的令牌桶
// NewBucketWithRate(0.1, 200) 表示每秒填充20個令牌
func NewBucketWithRate(rate float64, capacity int64) *Bucket
取出令牌的方法:
// 非阻塞的取token
func (tb *Bucket) Take(count int64) time.Duration
func (tb *Bucket) TakeAvailable(count int64) int64// 最多等maxWait時間取tokenfunc (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
// 阻塞的取tokenfunc (tb *Bucket) Wait(count int64)func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
雖說是令牌桶,但是我們沒有必要真的去生成令牌放到桶里,我們只需要每次來取令牌的時候計算一下,當前是否有足夠的令牌可以使用就可以了,具體的計算公式如下。
當前令牌數 = 上一次剩余的令牌數 + (本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔 * 每次放置的令牌數
github.com/juju/ratelimit這個庫中關于令牌數計算的具體實現如下:
func (tb *Bucket) adjustavailableTokens(tick int64) {
if tb.availableTokens >= tb.capacity {
return
} tb.availableTokens += (tick - tb.latestTick) * tb.quantum if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity } tb.latestTick = tick return
}
獲取令牌的TakeAvailable函數關鍵部分的源碼如下:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
} tb.adjustavailableTokens(tb.currentTick(now)) if tb.availableTokens <= 0 {
return 0
} if count > tb.availableTokens {
count = tb.availableTokens
} tb.availableTokens -= count
return count
}
大家從代碼中也可以看到其實令牌桶的實現并沒有很復雜。
gin 框架中使用限流中間件
在 gin 框架構建的項目中,我們可以將限流組件定義成中間件。
這里使用令牌桶作為限流策略,編寫一個限流中間件如下:
func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {
bucket := ratelimit.NewBucket(fillInterval, cap)
return func(c *gin.Context) {
// 如果取不到令牌就返回響應 if bucket.TakeAvailable(1) == 0 {
c.String(http.StatusOK, "rate limit...")
c.Abort()
return } c.Next()
}}
對于該限流中間件的注冊位置,我們可以按照不同的限流策略將其添加到不同的地方,例如:
- 如果要對全站限流就可以添加成全局的中間件。
- 如果是某一組路由需要限流,那么就只需添加到對應的路由組即可。