Skip to content

AsyncClient interface #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: go1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions asynchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,13 +877,13 @@ func (c *asyncClient) Hset(arg0 string, arg1 string, arg2 []byte) (stat FutureBo
}

// Redis HGETALL command.
func (c *asyncClient) Hgetall(arg0 string) (result FutureBytes, err Error) {
func (c *asyncClient) Hgetall(arg0 string) (result FutureBytesArray, err Error) {
arg0bytes := []byte(arg0)

var resp *PendingResponse
resp, err = c.conn.QueueRequest(&HGETALL, [][]byte{arg0bytes})
if err == nil {
result = resp.future.(FutureBytes)
result = resp.future.(FutureBytesArray)
}
return result, err

Expand Down
102 changes: 76 additions & 26 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"log"
"net"
// "os"
"sync"
"time"
)

const (
TCP = "tcp"
UNIX = "unix"
UNIX = "unix"
LOCALHOST = "127.0.0.1"
// ns1MSec = 1000000
//// ns1Sec = ns1MSec * 1000
Expand Down Expand Up @@ -148,6 +149,7 @@ type connHdl struct {
spec *ConnectionSpec
conn net.Conn // may want to change this to TCPConn
reader *bufio.Reader
m sync.Mutex
}

func (chdl *connHdl) String() string {
Expand All @@ -165,19 +167,19 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) {
return nil, NewError(SYSTEM_ERR, fmt.Sprintf("%s(): failed to allocate connHdl", here))
}

var mode, addr string
if (spec.port == 0) {
mode = UNIX
addr = spec.host
} else {
mode = TCP
addr = fmt.Sprintf("%s:%d", spec.host, spec.port)
_, e := net.ResolveTCPAddr(TCP, addr)
if e != nil {
msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr)
return nil, NewErrorWithCause(SYSTEM_ERR, msg, e)
}
}
var mode, addr string
if spec.port == 0 {
mode = UNIX
addr = spec.host
} else {
mode = TCP
addr = fmt.Sprintf("%s:%d", spec.host, spec.port)
_, e := net.ResolveTCPAddr(TCP, addr)
if e != nil {
msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr)
return nil, NewErrorWithCause(SYSTEM_ERR, msg, e)
}
}

conn, e := net.Dial(mode, addr)
switch {
Expand All @@ -195,6 +197,7 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) {
hdl.reader = bufio.NewReaderSize(conn, bufsize)
log.Printf("<INFO> Connected to %s", hdl)
}
hdl.m = sync.Mutex{}
return hdl, nil
}

Expand All @@ -204,12 +207,12 @@ func configureConn(conn net.Conn, spec *ConnectionSpec) {
// but we absolutely need to be able to use timeouts.
// conn.SetReadTimeout(spec.rTimeout);
// conn.SetWriteTimeout(spec.wTimeout);
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetLinger(spec.lingerspec)
tcp.SetKeepAlive(spec.keepalive)
tcp.SetReadBuffer(spec.rBufSize)
tcp.SetWriteBuffer(spec.wBufSize)
}
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetLinger(spec.lingerspec)
tcp.SetKeepAlive(spec.keepalive)
tcp.SetReadBuffer(spec.rBufSize)
tcp.SetWriteBuffer(spec.wBufSize)
}
}

// onConnect event handler will issue AUTH/SELECT on new connection
Expand Down Expand Up @@ -259,15 +262,62 @@ type SyncConnection interface {
Close() error
}

type connPool struct {
active int
count int
pool []*connHdl // Connection pool
conn chan *connHdl // To handle concurrency
}

func (p *connPool) Close() error {
var err error

for i := 0; i < p.count; i++ {
err = p.pool[i].Close()
// XXX handle err
}

return err
}

func (p *connPool) AddConnection(conn *connHdl) {
// Put the connection in the pool, and add it to the channel
p.pool[p.active] = conn
p.active = p.active + 1
p.conn <- conn
}

func (p *connPool) ServiceRequest(cmd *Command, args [][]byte) (Response, Error) {
// Wait for a connection to become available
var conn = <-p.conn
res, err := conn.ServiceRequest(cmd, args)

// Return our connection to the pool
p.conn <- conn
return res, err
}

func newConnPool(count int) *connPool {
return &connPool{0, count, make([]*connHdl, count), make(chan *connHdl, count)}
}

// Creates a new SyncConnection using the provided ConnectionSpec
func NewSyncConnection(spec *ConnectionSpec) (c SyncConnection, err Error) {
connHdl, e := newConnHdl(spec)
if e != nil {
return nil, e
}
var e Error

// XXX In order to allow backwards compatability
// the pool size is hardcoded for now.
pool := newConnPool(4)
for i := 0; i < 4; i++ {
connHdl, e := newConnHdl(spec)
if e != nil {
return nil, e
}

e = connHdl.onConnect()
return connHdl, e
e = connHdl.onConnect()
pool.AddConnection(connHdl)
}
return pool, e
}

// Implementation of SyncConnection.ServiceRequest.
Expand Down
9 changes: 9 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,15 @@ type AsyncClient interface {
// Redis ZRANGEBYSCORE command.
Zrangebyscore(key string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error)

// Redis HGET command.
Hget(key string, hashkey string) (result FutureBytes, err Error)

// Redis HSET command.
Hset(key string, hashkey string, arg1 []byte) (stat FutureBool, err Error)

// Redis HGETALL command.
Hgetall(key string) (result FutureBytesArray, err Error)

// Redis FLUSHDB command.
Flushdb() (status FutureBool, err Error)

Expand Down
7 changes: 3 additions & 4 deletions synchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ func NewSynchClient() (c Client, err Error) {
//
func NewSynchClientWithSpec(spec *ConnectionSpec) (c Client, err Error) {
_c := new(syncClient)
_c.conn, err = NewSyncConnection(spec)
_c.conn, err = NewSyncConnection(spec)
if err != nil {
return nil, withError (err)
}
// _c.conn = conn
return nil, withError (err)
}
return _c, nil
}

Expand Down