diff --git a/ratelimiter/leakybucket.go b/ratelimiter/leakybucket.go new file mode 100644 index 0000000..358b69e --- /dev/null +++ b/ratelimiter/leakybucket.go @@ -0,0 +1,97 @@ +// Package ratelimiter implements the Leaky Bucket ratelimiting algorithm with memcached and in-memory backends. +package ratelimiter + +import ( + "time" +) + +type LeakyBucket struct { + Size uint16 + Fill float64 + LeakInterval time.Duration // time.Duration for 1 unit of size to leak + Lastupdate time.Time + Now func() time.Time +} + +func NewLeakyBucket(size uint16, leakInterval time.Duration) *LeakyBucket { + bucket := LeakyBucket{ + Size: size, + Fill: 0, + LeakInterval: leakInterval, + Now: time.Now, + Lastupdate: time.Now(), + } + + return &bucket +} + +func (b *LeakyBucket) updateFill() { + now := b.Now() + if b.Fill > 0 { + elapsed := now.Sub(b.Lastupdate) + + b.Fill -= float64(elapsed) / float64(b.LeakInterval) + if b.Fill < 0 { + b.Fill = 0 + } + } + b.Lastupdate = now +} + +func (b *LeakyBucket) Pour(amount uint16) bool { + b.updateFill() + + var newfill float64 = b.Fill + float64(amount) + + if newfill > float64(b.Size) { + return false + } + + b.Fill = newfill + + return true +} + +// The time at which this bucket will be completely drained +func (b *LeakyBucket) DrainedAt() time.Time { + return b.Lastupdate.Add(time.Duration(b.Fill * float64(b.LeakInterval))) +} + +// The duration until this bucket is completely drained +func (b *LeakyBucket) TimeToDrain() time.Duration { + return b.DrainedAt().Sub(b.Now()) +} + +func (b *LeakyBucket) TimeSinceLastUpdate() time.Duration { + return b.Now().Sub(b.Lastupdate) +} + +type LeakyBucketSer struct { + Size uint16 + Fill float64 + LeakInterval time.Duration // time.Duration for 1 unit of size to leak + Lastupdate time.Time +} + +func (b *LeakyBucket) Serialise() *LeakyBucketSer { + bucket := LeakyBucketSer{ + Size: b.Size, + Fill: b.Fill, + LeakInterval: b.LeakInterval, + Lastupdate: b.Lastupdate, + } + + return &bucket +} + +func (b *LeakyBucketSer) DeSerialise() *LeakyBucket { + bucket := LeakyBucket{ + Size: b.Size, + Fill: b.Fill, + LeakInterval: b.LeakInterval, + Lastupdate: b.Lastupdate, + Now: time.Now, + } + + return &bucket +} diff --git a/ratelimiter/leakybucket_test.go b/ratelimiter/leakybucket_test.go new file mode 100644 index 0000000..b43dddb --- /dev/null +++ b/ratelimiter/leakybucket_test.go @@ -0,0 +1,73 @@ +package ratelimiter + +import ( + "testing" + "time" +) + +func TestPour(t *testing.T) { + bucket := NewLeakyBucket(60, time.Second) + bucket.Lastupdate = time.Unix(0, 0) + + bucket.Now = func() time.Time { return time.Unix(1, 0) } + + if bucket.Pour(61) { + t.Error("Expected false") + } + + if !bucket.Pour(10) { + t.Error("Expected true") + } + + if !bucket.Pour(49) { + t.Error("Expected true") + } + + if bucket.Pour(2) { + t.Error("Expected false") + } + + bucket.Now = func() time.Time { return time.Unix(61, 0) } + if !bucket.Pour(60) { + t.Error("Expected true") + } + + if bucket.Pour(1) { + t.Error("Expected false") + } + + bucket.Now = func() time.Time { return time.Unix(70, 0) } + + if !bucket.Pour(1) { + t.Error("Expected true") + } + +} + +func TestTimeSinceLastUpdate(t *testing.T) { + bucket := NewLeakyBucket(60, time.Second) + bucket.Now = func() time.Time { return time.Unix(1, 0) } + bucket.Pour(1) + bucket.Now = func() time.Time { return time.Unix(2, 0) } + + sinceLast := bucket.TimeSinceLastUpdate() + if sinceLast != time.Second*1 { + t.Errorf("Expected time since last update to be less than 1 second, got %d", sinceLast) + } +} + +func TestTimeToDrain(t *testing.T) { + bucket := NewLeakyBucket(60, time.Second) + bucket.Now = func() time.Time { return time.Unix(1, 0) } + bucket.Pour(10) + + if bucket.TimeToDrain() != time.Second*10 { + t.Error("Time to drain should be 10 seconds") + } + + bucket.Now = func() time.Time { return time.Unix(2, 0) } + + if bucket.TimeToDrain() != time.Second*9 { + t.Error("Time to drain should be 9 seconds") + } +} diff --git a/ratelimiter/memory.go b/ratelimiter/memory.go new file mode 100644 index 0000000..8f6a578 --- /dev/null +++ b/ratelimiter/memory.go @@ -0,0 +1,58 @@ +package ratelimiter + +import ( + "errors" + "time" +) + +const GC_SIZE int = 100 + +type Memory struct { + store map[string]LeakyBucket + lastGCCollected time.Time +} + +func NewMemory() *Memory { + m := new(Memory) + m.store = make(map[string]LeakyBucket) + m.lastGCCollected = time.Now() + return m +} + +func (m *Memory) GetBucketFor(key string) (*LeakyBucket, error) { + + bucket, ok := m.store[key] + if !ok { + return nil, errors.New("miss") + } + + return &bucket, nil +} + +func (m *Memory) SetBucketFor(key string, bucket LeakyBucket) error { + + if len(m.store) > GC_SIZE { + m.GarbageCollect() + } + + m.store[key] = bucket + + return nil +} + +func (m *Memory) GarbageCollect() { + now := time.Now() + + // rate limit GC to once per minute + if now.Add(60*time.Second).Unix() > m.lastGCCollected.Unix() { + + for key, bucket := range m.store { + // if the bucket is drained, then GC + if bucket.DrainedAt().Unix() > now.Unix() { + delete(m.store, key) + } + } + + m.lastGCCollected = now + } +} diff --git a/ratelimiter/storage.go b/ratelimiter/storage.go new file mode 100644 index 0000000..89b2fe8 --- /dev/null +++ b/ratelimiter/storage.go @@ -0,0 +1,6 @@ +package ratelimiter + +type Storage interface { + GetBucketFor(string) (*LeakyBucket, error) + SetBucketFor(string, LeakyBucket) error +}