diff --git a/asynchclient.go b/asynchclient.go index 074336e..9f34237 100644 --- a/asynchclient.go +++ b/asynchclient.go @@ -877,13 +877,13 @@ func (c *asyncClient) Hset(arg0 string, arg1 string, arg2 []byte) (stat FutureBo } // Redis HGETALL command. -func (c *asyncClient) Hgetall(arg0 string) (result FutureBytes, err Error) { +func (c *asyncClient) Hgetall(arg0 string) (result FutureBytesArray, err Error) { arg0bytes := []byte(arg0) var resp *PendingResponse resp, err = c.conn.QueueRequest(&HGETALL, [][]byte{arg0bytes}) if err == nil { - result = resp.future.(FutureBytes) + result = resp.future.(FutureBytesArray) } return result, err diff --git a/connection.go b/connection.go index 2fbb9dc..c7c5823 100644 --- a/connection.go +++ b/connection.go @@ -21,12 +21,13 @@ import ( "log" "net" // "os" + "sync" "time" ) const ( TCP = "tcp" - UNIX = "unix" + UNIX = "unix" LOCALHOST = "127.0.0.1" // ns1MSec = 1000000 //// ns1Sec = ns1MSec * 1000 @@ -148,6 +149,7 @@ type connHdl struct { spec *ConnectionSpec conn net.Conn // may want to change this to TCPConn reader *bufio.Reader + m sync.Mutex } func (chdl *connHdl) String() string { @@ -165,19 +167,19 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) { return nil, NewError(SYSTEM_ERR, fmt.Sprintf("%s(): failed to allocate connHdl", here)) } - var mode, addr string - if (spec.port == 0) { - mode = UNIX - addr = spec.host - } else { - mode = TCP - addr = fmt.Sprintf("%s:%d", spec.host, spec.port) - _, e := net.ResolveTCPAddr(TCP, addr) - if e != nil { - msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr) - return nil, NewErrorWithCause(SYSTEM_ERR, msg, e) - } - } + var mode, addr string + if spec.port == 0 { + mode = UNIX + addr = spec.host + } else { + mode = TCP + addr = fmt.Sprintf("%s:%d", spec.host, spec.port) + _, e := net.ResolveTCPAddr(TCP, addr) + if e != nil { + msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr) + return nil, NewErrorWithCause(SYSTEM_ERR, msg, e) + } + } conn, e := net.Dial(mode, addr) switch { @@ -195,6 +197,7 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) { hdl.reader = bufio.NewReaderSize(conn, bufsize) log.Printf(" Connected to %s", hdl) } + hdl.m = sync.Mutex{} return hdl, nil } @@ -204,12 +207,12 @@ func configureConn(conn net.Conn, spec *ConnectionSpec) { // but we absolutely need to be able to use timeouts. // conn.SetReadTimeout(spec.rTimeout); // conn.SetWriteTimeout(spec.wTimeout); - if tcp, ok := conn.(*net.TCPConn); ok { - tcp.SetLinger(spec.lingerspec) - tcp.SetKeepAlive(spec.keepalive) - tcp.SetReadBuffer(spec.rBufSize) - tcp.SetWriteBuffer(spec.wBufSize) - } + if tcp, ok := conn.(*net.TCPConn); ok { + tcp.SetLinger(spec.lingerspec) + tcp.SetKeepAlive(spec.keepalive) + tcp.SetReadBuffer(spec.rBufSize) + tcp.SetWriteBuffer(spec.wBufSize) + } } // onConnect event handler will issue AUTH/SELECT on new connection @@ -259,15 +262,62 @@ type SyncConnection interface { Close() error } +type connPool struct { + active int + count int + pool []*connHdl // Connection pool + conn chan *connHdl // To handle concurrency +} + +func (p *connPool) Close() error { + var err error + + for i := 0; i < p.count; i++ { + err = p.pool[i].Close() + // XXX handle err + } + + return err +} + +func (p *connPool) AddConnection(conn *connHdl) { + // Put the connection in the pool, and add it to the channel + p.pool[p.active] = conn + p.active = p.active + 1 + p.conn <- conn +} + +func (p *connPool) ServiceRequest(cmd *Command, args [][]byte) (Response, Error) { + // Wait for a connection to become available + var conn = <-p.conn + res, err := conn.ServiceRequest(cmd, args) + + // Return our connection to the pool + p.conn <- conn + return res, err +} + +func newConnPool(count int) *connPool { + return &connPool{0, count, make([]*connHdl, count), make(chan *connHdl, count)} +} + // Creates a new SyncConnection using the provided ConnectionSpec func NewSyncConnection(spec *ConnectionSpec) (c SyncConnection, err Error) { - connHdl, e := newConnHdl(spec) - if e != nil { - return nil, e - } + var e Error + + // XXX In order to allow backwards compatability + // the pool size is hardcoded for now. + pool := newConnPool(4) + for i := 0; i < 4; i++ { + connHdl, e := newConnHdl(spec) + if e != nil { + return nil, e + } - e = connHdl.onConnect() - return connHdl, e + e = connHdl.onConnect() + pool.AddConnection(connHdl) + } + return pool, e } // Implementation of SyncConnection.ServiceRequest. diff --git a/redis.go b/redis.go index 3562e2a..9a9d4e4 100644 --- a/redis.go +++ b/redis.go @@ -509,6 +509,15 @@ type AsyncClient interface { // Redis ZRANGEBYSCORE command. Zrangebyscore(key string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error) + // Redis HGET command. + Hget(key string, hashkey string) (result FutureBytes, err Error) + + // Redis HSET command. + Hset(key string, hashkey string, arg1 []byte) (stat FutureBool, err Error) + + // Redis HGETALL command. + Hgetall(key string) (result FutureBytesArray, err Error) + // Redis FLUSHDB command. Flushdb() (status FutureBool, err Error) diff --git a/synchclient.go b/synchclient.go index 12f66e7..6a7967d 100644 --- a/synchclient.go +++ b/synchclient.go @@ -51,11 +51,10 @@ func NewSynchClient() (c Client, err Error) { // func NewSynchClientWithSpec(spec *ConnectionSpec) (c Client, err Error) { _c := new(syncClient) - _c.conn, err = NewSyncConnection(spec) + _c.conn, err = NewSyncConnection(spec) if err != nil { - return nil, withError (err) - } -// _c.conn = conn + return nil, withError (err) + } return _c, nil }