Skip to content

Simplify handler map #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 28 additions & 42 deletions client/handler_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type HandlerMap struct {
}

type waiter struct {
ready chan<- struct{} // Closed when semaphore acquired.
ready chan<- ResponseHandler // Closed when semaphore acquired.
}

func NewHandlerMap() *HandlerMap {
Expand Down Expand Up @@ -44,6 +44,7 @@ func (m *HandlerMap) Put(key string, value ResponseHandler) {
}
w := next.Value.(waiter)
waiters.Remove(next)
w.ready <- value
close(w.ready)
}
delete(m.waitersMap, key)
Expand All @@ -66,50 +67,35 @@ func (m *HandlerMap) Get(key string, timeoutMs int) (value ResponseHandler, ok b
return
}

// let's remember the current time
curTime := time.Now()
maxTime := curTime.Add(time.Duration(timeoutMs) * time.Millisecond)

for time.Now().Before(maxTime) && !ok {
value, ok = m.innerMap[key]
if !ok {
nsLeft := maxTime.Sub(time.Now()).Nanoseconds()
ctx, _ := context.WithTimeout(context.Background(), time.Duration(nsLeft)*time.Nanosecond)

waiters, wok := m.waitersMap[key]
if !wok {
waiters = &list.List{}
m.waitersMap[key] = waiters
}
ready := make(chan struct{})
w := waiter{ready: ready}
elem := waiters.PushBack(w)
m.mu.Unlock() // unlock before we start waiting on stuff

select {
case <-ctx.Done():
m.mu.Lock()
select {
case <-ready:
// in case we got signalled during cancellation
continue
default:
// we got timeout, let's remove
waiters.Remove(elem)
if waiters.Len() == 0 {
delete(m.waitersMap, key)
}
}
m.mu.Unlock()
return

case <-ready:
m.mu.Lock() // going back to the loop, gotta lock
continue
ctx, _ := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
waiters, wok := m.waitersMap[key]
if !wok {
waiters = &list.List{}
m.waitersMap[key] = waiters
}
ready := make(chan ResponseHandler)
w := waiter{ready: ready}
elem := waiters.PushBack(w)
m.mu.Unlock() // unlock before waiting

select {
case <-ctx.Done():
m.mu.Lock()
// check if the response arrived when it timed out
select {
case value = <-ready:
ok = true
default:
// got timeout, let's remove waiter
waiters.Remove(elem)
if waiters.Len() == 0 {
delete(m.waitersMap, key)
}
}
m.mu.Unlock()
case value = <-ready:
ok = true
}

m.mu.Unlock()
return
}
141 changes: 134 additions & 7 deletions client/handler_map_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package client

import (
"fmt"
"github.com/stretchr/testify/assert"
"math"
"testing"
"time"
)

const (
testKey = "test_key"
timeoutMs = 200
marginErrorPct = 10
testKey = "test_key"
timeoutMs = 200
marginErrorMs = 10
numHandlers = 20
)

func getMsSince(startTime time.Time) int {
Expand Down Expand Up @@ -60,9 +62,9 @@ func TestHandlerMapDelayedPutRetrieve(t *testing.T) {
myHandler(nil)
actualResponseMs := getMsSince(startTime)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualResponseMs-expectedResponseMs))/float64(expectedResponseMs) < float64(marginErrorPct)/100
return math.Abs(float64(actualResponseMs-expectedResponseMs)) < marginErrorMs
}
assert.Condition(t, comp, "Response did not arrive within %d%% margin, expected time %d ms", marginErrorPct, expectedResponseMs)
assert.Condition(t, comp, "Response did not arrive within %d ms margin, expected time %d ms, actual time %d ms", marginErrorMs, expectedResponseMs, actualResponseMs)

}

Expand All @@ -87,9 +89,9 @@ func TestHandlerMapTimeoutPutTooLate(t *testing.T) {
actualTimeoutMs := getMsSince(startTime)
t.Logf("test: timed out waiting for key at %d ms after start\n", actualTimeoutMs)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualTimeoutMs-timeoutMs))/timeoutMs < float64(marginErrorPct)/100
return math.Abs(float64(actualTimeoutMs-timeoutMs)) < marginErrorMs
}
assert.Condition(t, comp, "Timeout did not occur within %d%% margin, expected timeout ms: %d", marginErrorPct, timeoutMs)
assert.Condition(t, comp, "Timeout did not occur within %d ms margin, expected timeout ms: %d, actual time %d ms", marginErrorMs, timeoutMs, actualTimeoutMs)
// wait till producer has added the element
time.Sleep(3 * timeoutMs * time.Millisecond)
counts, waiters := handler_map.GetCounts()
Expand All @@ -98,3 +100,128 @@ func TestHandlerMapTimeoutPutTooLate(t *testing.T) {
}

}

func TestMixedMultiPutGet(t *testing.T) {

handler_map := NewHandlerMap()
startTime := time.Now()
delayedResponseTimeMs := timeoutMs / 2
timedoutResponseTimeMs := timeoutMs + marginErrorMs

getHandler := func(key string) ResponseHandler {
return func(r *Response) {
assert.Equal(t, key, r.Handle, fmt.Sprintf("Handler Key Mismatch, expected: %s, actual: %s", key, r.Handle))
t.Logf("test: got a response [%s] at time %d ms after start\n", r.Handle, getMsSince(startTime))
}
}

go func() {

for i := 0; i < numHandlers; i++ {
key := fmt.Sprintf("%s-%d", testKey, i)
handler_map.Put(key, getHandler(key))
}

// at this point the Get would be waiting for the response.
counts, _ := handler_map.GetCounts()
assert.Equal(t, numHandlers, counts, "Map Elements")

time.Sleep(time.Duration(delayedResponseTimeMs) * time.Millisecond)

// by now we have some non-delayed elements, and also have waiters for the delayed elements
counts, waiters := handler_map.GetCounts()
assert.Equal(t, numHandlers, counts, "Map Elements") // elements for non-delayed ones
assert.Equal(t, numHandlers*2, waiters, "Map Elements") // for delayed ones

for i := 0; i < numHandlers; i++ {
key := fmt.Sprintf("%s-%d-delayed", testKey, i)
handler_map.Put(key, getHandler(key))
}

// at this point the Get would be waiting for the response.
counts, _ = handler_map.GetCounts()
assert.Equal(t, numHandlers*2, counts, "Map Elements")

time.Sleep(time.Duration(timedoutResponseTimeMs) * time.Millisecond)
for i := 0; i < numHandlers; i++ {
key := fmt.Sprintf("%s-%d-toolate", testKey, i)
handler_map.Put(key, getHandler(key))
}

}()

completion := make(chan bool)
getData := func(expectedTimeMs int, key string) {
myHandler, ok := handler_map.Get(key, timeoutMs)

if !ok {
t.Errorf("Failed to get test key at time %d ms after start\n", getMsSince(startTime))
} else {
myHandler(&Response{Handle: key})
actualResponseMs := getMsSince(startTime)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualResponseMs-expectedTimeMs)) < marginErrorMs
}
assert.Condition(t, comp, "Response did not arrive within %d ms margin, expected time %d ms, actual time: %d ms", marginErrorMs, expectedTimeMs, actualResponseMs)
}
completion <- true
}

getTimeouts := func(expectedTimeMs int, key string) {
_, ok := handler_map.Get(key, timeoutMs)

if ok {
t.Errorf("Should have gotten a timeout on key %s but received ok %d ms after start\n", key, getMsSince(startTime))
} else {
t.Logf("test: got the expected timeout on key %s %d ms after start\n", key, getMsSince(startTime))
actualResponseMs := getMsSince(startTime)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualResponseMs-expectedTimeMs)) < marginErrorMs
}
assert.Condition(t, comp, "Timeout did not occur within %d ms margin, expected time %d ms, actual time: %d ms", marginErrorMs, expectedTimeMs, actualResponseMs)
}
completion <- true
}

for i := 0; i < numHandlers; i++ {
go getData(0, fmt.Sprintf("%s-%d", testKey, i))
go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i))
// second set of receivers
go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i))
go getTimeouts(timeoutMs, fmt.Sprintf("%s-%d-timedout", testKey, i))
}

// wait for completion of non-timed out response (immediate + delayed ones)
for i := 0; i < numHandlers*3; i++ {
_ = <-completion
}

counts, waiters := handler_map.GetCounts()
assert.Equal(t, numHandlers*2, counts, "Map Elements")
assert.Equal(t, numHandlers, waiters, "Waiter groups") // the "timedout" keys are waiting for timeout

for i := 0; i < numHandlers; i++ {
handler_map.Delete(fmt.Sprintf("%s-%d", testKey, i))
handler_map.Delete(fmt.Sprintf("%s-%d-delayed", testKey, i))
}

counts, waiters = handler_map.GetCounts()
assert.Equal(t, 0, counts, "Map Elements")
assert.Equal(t, numHandlers, waiters, "Waiter groups")

// wait for timed out responses
for i := 0; i < numHandlers; i++ {
_ = <-completion
}

// delete the timed out keys
for i := 0; i < numHandlers; i++ {
handler_map.Delete(fmt.Sprintf("%s-%d-timedout", testKey, i))
}

// at last should have no elements and no waiters
counts, waiters = handler_map.GetCounts()
assert.Equal(t, 0, counts, "Map Elements")
assert.Equal(t, 0, waiters, "Waiter groups")

}