作者:楊奇龍
本文來源:原創投稿
*愛可生開源社區出品,原創內容未經授權不得隨意使用,轉載請聯系小編并注明來源。
一、前言
作為熱點頻出的電商系統,經常遇到高并發,熱點秒殺的場景。我們在開發設計高并發海量業務請求的系統時,通常利用三板斧:緩存、降級和限流來保障系統穩定性。
緩存:使業務數據更靠近數據的使用者,提升程序訪問數據速度和增大系統QPS容量。
降級:當系統容量達到一定健康閾值時根據當前業務情況及流量對一些服務和頁面有策略的降級,以此釋放服務器資源以保證核心任務的正常運行。
限流:通過對并發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統穩定可用,一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理。
本文聊聊限流的常用算法,并且通過案例測試驗證令牌桶算法。
二、限流算法
目前程序開發過程常用的限流算法有兩個:漏桶算法和令牌桶算法。
漏桶算法
漏桶算法的原理比較簡單,請求進入到漏桶中,漏桶以一定的速率漏水。當請求過多時,水直接溢出。可以看出,漏桶算法可以強制限制數據的傳輸速度。
如圖所示,把請求比作是水滴,水先滴到桶里,通過漏洞并以限定的速度出水,當水來得過猛而出水不夠快時就會導致水直接溢出,即拒絕服務。

圖片來自網絡
漏桶的出水速度是恒定的,那么意味著如果瞬時大流量的話,將有大部分請求被丟棄掉(也就是所謂的溢出)。
令牌桶算法
令牌桶算法的原理是系統以一定速率向桶中放入令牌,如果有請求時,請求會從桶中取出令牌,如果能取到令牌,則可以繼續完成請求,否則等待或者拒絕服務。這種算法可以應對突發程度的請求,因此比漏桶算法好。

圖片來自網絡
漏桶算法和令牌桶算法的選擇
兩者的主要區別漏桶算法能夠強行限制處理數據的速率,不論系統是否空閑。而令牌桶算法能夠在限制數據的平均處理速率的同時還允許某種程度的突發流量。
如何理解上面的含義呢?
漏桶算法,比如系統吞吐量是 120/s,業務請求 130/s,使用漏斗限流 100/s,起到限流的作用,多余的請求將產生等待或者丟棄。
對于令牌桶算法,每秒產生 100 個令牌,系統容量 200 個令牌。正常情況下,業務請求 100/s 時,請求能被正常被處理。當有突發流量過來比如 200 個請求時,因為系統容量有 200 個令牌可以同一時刻處理掉這 200 個請求。如果是漏桶算法,則只能處理 100 個請求,其他的請求等待或者被丟棄。
三、代碼實現
本案例使用 Python 基于令牌桶算法進行測試。
# encoding: utf-8"""author: [email protected]: 2020/9/9 10:43 PMfunc: """import timeimport multiprocessing
TEST = { # 測試 {'test1': 20} 每秒產生的令牌數量 'all': { 'test1': {'test1': 20}, 'test2': {'test2': 50}, 'test3': {'test3': 80}, 'test4': {'test4': 100},#表示突發100個請求 'test5': {'test5': 200},#表示突發200個請求 'test6': {'test6': 20}, }}
class TokenBucket(object): # rate是令牌發放速度,capacity是桶的大小 def __init__(self, rate, capacity): self._rate = rate self._capacity = capacity self._current_amount = 0 self._last_consume_time = int(time.time())
# token_amount是發送數據需要的令牌數 def consume(self, token_amount): time.sleep(1) # 計算從上次發送到這次發送,新發放的令牌數量 increment = (int(time.time()) - self._last_consume_time) * self._rate # 令牌數量不能超過桶的容量 self._current_amount = min( increment + self._current_amount, self._capacity) # 如果沒有足夠的令牌,則不能發送數據 if token_amount > self._current_amount: return False self._last_consume_time = int(time.time()) self._current_amount -= token_amount return True
def job(): i = 100 while i>1: for result in result_dict.values(): key = tuple(result.keys())[0] rate = tuple(result.values())[0] i = i-1 if i <= 0: break
if not limiter.consume(rate): print(key + ' 限流') else: print(key + ' 正常')
def run(): threads = [multiprocessing.Process(target=job) for i in range(3)] for thread in threads: thread.start()
if __name__ == '__main__': result_dict = TEST["all"] RATE = 30 CAPACITY = 120 limiter = TokenBucket(RATE, CAPACITY) run()
這段測試代碼比較簡單,大家可以結合自己的業務場景在編寫工具時看看哪里可以使用到限流算法。
四、參考文章
https://github.com/titan-web/rate-limit
https://www.jianshu.com/p/c6b20845561a
https://www.simpleApples.com/2018/03/20/implementation-of-the-token-bucket-algorithm-in-python/