make dialer context aware and avoid redundant session create's

This commit is contained in:
idk
2019-02-27 20:25:36 -05:00
parent 2e7a301855
commit d7af8dafa9
7 changed files with 174 additions and 315 deletions

View File

@ -9,28 +9,31 @@ import (
// Accept creates a new Client and accepts a connection on it // Accept creates a new Client and accepts a connection on it
func (c *Client) Accept() (net.Conn, error) { func (c *Client) Accept() (net.Conn, error) {
id, newAddr, err := c.CreateStreamSession("") var err error
var id int32
id = c.NewID()
c.destination, err = c.CreateStreamSession(id, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
fmt.Println("NewAddr:", newAddr) fmt.Println("destination:", c.destination)
newC, err := NewDefaultClient() c, err = c.NewClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.debug { if c.debug {
newC.SamConn = debug.WrapConn(newC.SamConn) c.SamConn = debug.WrapConn(c.SamConn)
} }
resp, err := newC.StreamAccept(id) resp, err := c.StreamAccept(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fmt.Println("Accept Resp:", resp) fmt.Println("Accept Resp:", resp)
return newC.SamConn, nil return c.SamConn, nil
} }

View File

@ -3,6 +3,8 @@ package goSam
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"math"
"math/rand"
"net" "net"
"github.com/cryptix/go/debug" "github.com/cryptix/go/debug"
@ -15,8 +17,10 @@ type Client struct {
SamConn net.Conn SamConn net.Conn
rd *bufio.Reader rd *bufio.Reader
//id int32
sigType string sigType string
destination string
inLength uint inLength uint
inVariance int inVariance int
@ -41,6 +45,9 @@ type Client struct {
compression bool compression bool
debug bool debug bool
//NEVER, EVER modify lastaddr yourself.
lastaddr string
id int32
} }
var SAMsigTypes = []string{ var SAMsigTypes = []string{
@ -61,6 +68,10 @@ func NewClient(addr string) (*Client, error) {
return NewClientFromOptions(SetAddr(addr)) return NewClientFromOptions(SetAddr(addr))
} }
func (c *Client) NewID() int32 {
return rand.Int31n(math.MaxInt32)
}
// NewClientFromOptions creates a new client, connecting to a specified port // NewClientFromOptions creates a new client, connecting to a specified port
func NewClientFromOptions(opts ...func(*Client) error) (*Client, error) { func NewClientFromOptions(opts ...func(*Client) error) (*Client, error) {
var c Client var c Client
@ -68,21 +79,22 @@ func NewClientFromOptions(opts ...func(*Client) error) (*Client, error) {
c.port = "7656" c.port = "7656"
c.inLength = 3 c.inLength = 3
c.inVariance = 0 c.inVariance = 0
c.inQuantity = 4 c.inQuantity = 1
c.inBackups = 2 c.inBackups = 1
c.outLength = 3 c.outLength = 3
c.outVariance = 0 c.outVariance = 0
c.outQuantity = 4 c.outQuantity = 1
c.outBackups = 2 c.outBackups = 1
c.dontPublishLease = true c.dontPublishLease = true
c.encryptLease = false c.encryptLease = false
c.reduceIdle = false c.reduceIdle = false
c.reduceIdleTime = 300000 c.reduceIdleTime = 300000
c.reduceIdleQuantity = 4 c.reduceIdleQuantity = 1
c.closeIdle = true c.closeIdle = true
c.closeIdleTime = 600000 c.closeIdleTime = 600000
c.debug = false c.debug = false
c.sigType = "" c.sigType = SAMsigTypes[4]
c.id = 0
for _, o := range opts { for _, o := range opts {
if err := o(&c); err != nil { if err := o(&c); err != nil {
return nil, err return nil, err
@ -142,3 +154,29 @@ func (c *Client) Close() error {
c.rd = nil c.rd = nil
return c.SamConn.Close() return c.SamConn.Close()
} }
func (c *Client) NewClient() (*Client, error) {
return NewClientFromOptions(
SetHost(c.host),
SetPort(c.port),
SetDebug(c.debug),
SetInLength(c.inLength),
SetOutLength(c.outLength),
SetInVariance(c.inVariance),
SetOutVariance(c.outVariance),
SetInQuantity(c.inQuantity),
SetOutQuantity(c.outQuantity),
SetInBackups(c.inBackups),
SetOutBackups(c.outBackups),
SetUnpublished(c.dontPublishLease),
SetEncrypt(c.encryptLease),
SetReduceIdle(c.reduceIdle),
SetReduceIdleTime(c.reduceIdleTime),
SetReduceIdleQuantity(c.reduceIdleQuantity),
SetCloseIdle(c.closeIdle),
SetCloseIdleTime(c.closeIdleTime),
SetCompression(c.compression),
setlastaddr(c.lastaddr),
setid(c.id),
)
}

66
dial.go
View File

@ -1,10 +1,45 @@
package goSam package goSam
import ( import (
"context"
"fmt"
"net" "net"
"strings" "strings"
) )
// DialContext implements the net.DialContext function and can be used for http.Transport
func (c *Client) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
errCh := make(chan error, 1)
connCh := make(chan net.Conn, 1)
go func() {
if conn, err := c.Dial(network, addr); err != nil {
errCh <- err
} else if ctx.Err() != nil {
conn.Close()
} else {
connCh <- conn
}
}()
select {
case err := <-errCh:
return nil, err
case conn := <-connCh:
return conn, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (c Client) dialCheck(addr string) (int32, bool) {
fmt.Println("DIAL CHECK")
if c.lastaddr != addr {
fmt.Println("Preparing to dial new address.")
return c.NewID(), true
}
fmt.Println("No new address to dial.")
return c.NewID(), false
}
// Dial implements the net.Dial function and can be used for http.Transport // Dial implements the net.Dial function and can be used for http.Transport
func (c *Client) Dial(network, addr string) (net.Conn, error) { func (c *Client) Dial(network, addr string) (net.Conn, error) {
portIdx := strings.Index(addr, ":") portIdx := strings.Index(addr, ":")
@ -16,20 +51,33 @@ func (c *Client) Dial(network, addr string) (net.Conn, error) {
return nil, err return nil, err
} }
id, _, err := c.CreateStreamSession("") var id int32
var test bool
if id, test = c.dialCheck(addr); test == true {
c.destination, err = c.CreateStreamSession(id, "")
if err != nil {
return nil, err
}
c.lastaddr = addr
newC, err := c.NewClient()
if err != nil {
return nil, err
}
err = newC.StreamConnect(id, addr)
if err != nil {
return nil, err
}
return newC.SamConn, nil
}
c, err = c.NewClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
newC, err := NewClient(c.samaddr()) err = c.StreamConnect(c.id, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c.SamConn, nil
err = newC.StreamConnect(id, addr)
if err != nil {
return nil, err
}
return newC.SamConn, nil
} }

View File

@ -62,6 +62,20 @@ func SetHost(s string) func(*Client) error {
} }
} }
func setlastaddr(s string) func(*Client) error {
return func(c *Client) error {
c.lastaddr = s
return nil
}
}
func setid(s int32) func(*Client) error {
return func(c *Client) error {
c.id = s
return nil
}
}
//SetPort sets the port of the client's SAM bridge using a string //SetPort sets the port of the client's SAM bridge using a string
func SetPort(s string) func(*Client) error { func SetPort(s string) func(*Client) error {
return func(c *Client) error { return func(c *Client) error {

View File

@ -98,23 +98,26 @@ func TestOptionAddrMixedSlice(t *testing.T) {
} }
func TestOptionHost(t *testing.T) { func TestOptionHost(t *testing.T) {
client, err := NewClientFromOptions(SetHost("127.0.0.1"), SetDebug(true)) client, err := NewClientFromOptions(
if err != nil { SetHost("127.0.0.1"),
t.Fatalf("NewClientFromOptions() Error: %q\n", err) SetPort("7656"),
} SetInLength(3),
if result, err := client.validCreate(); err != nil { SetOutLength(3),
t.Fatalf(err.Error()) SetInVariance(1),
} else { SetOutVariance(1),
t.Log(result) SetInQuantity(6),
} SetOutQuantity(6),
client.CreateStreamSession("") SetInBackups(2),
if err := client.Close(); err != nil { SetOutBackups(2),
t.Fatalf("client.Close() Error: %q\n", err) SetEncrypt(true),
} SetDebug(true),
} SetUnpublished(true),
SetReduceIdle(true),
func TestOptionPort(t *testing.T) { SetReduceIdleTime(300001),
client, err := NewClientFromOptions(SetPort("7656"), SetDebug(true)) SetReduceIdleQuantity(4),
SetCloseIdle(true),
SetCloseIdleTime(300001),
)
if err != nil { if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err) t.Fatalf("NewClientFromOptions() Error: %q\n", err)
} }
@ -130,272 +133,26 @@ func TestOptionPort(t *testing.T) {
} }
func TestOptionPortInt(t *testing.T) { func TestOptionPortInt(t *testing.T) {
client, err := NewClientFromOptions(SetPortInt(7656), SetDebug(true)) client, err := NewClientFromOptions(
if err != nil { SetHost("127.0.0.1"),
t.Fatalf("NewClientFromOptions() Error: %q\n", err) SetPortInt(7656),
} SetInLength(3),
if result, err := client.validCreate(); err != nil { SetOutLength(3),
t.Fatalf(err.Error()) SetInVariance(1),
} else { SetOutVariance(1),
t.Log(result) SetInQuantity(6),
} SetOutQuantity(6),
client.CreateStreamSession("") SetInBackups(2),
if err := client.Close(); err != nil { SetOutBackups(2),
t.Fatalf("client.Close() Error: %q\n", err) SetEncrypt(true),
} SetDebug(true),
} SetUnpublished(true),
SetReduceIdle(true),
func TestOptionDebug(t *testing.T) { SetReduceIdleTime(300001),
client, err := NewClientFromOptions(SetDebug(true)) SetReduceIdleQuantity(4),
if err != nil { SetCloseIdle(true),
t.Fatalf("NewClientFromOptions() Error: %q\n", err) SetCloseIdleTime(300001),
} )
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionInLength(t *testing.T) {
client, err := NewClientFromOptions(SetInLength(3), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.inlength()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionOutLength(t *testing.T) {
client, err := NewClientFromOptions(SetInLength(3), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.outlength()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionInVariance(t *testing.T) {
client, err := NewClientFromOptions(SetInVariance(1), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.invariance()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionOutVariance(t *testing.T) {
client, err := NewClientFromOptions(SetOutVariance(1), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.outvariance()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionInQuantity(t *testing.T) {
client, err := NewClientFromOptions(SetInQuantity(6), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.inquantity()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionOutQuantity(t *testing.T) {
client, err := NewClientFromOptions(SetOutQuantity(6), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.outquantity()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionInBackups(t *testing.T) {
client, err := NewClientFromOptions(SetInBackups(5), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.inbackups()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionOutBackups(t *testing.T) {
client, err := NewClientFromOptions(SetOutBackups(5), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
client.outbackups()
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionEncryptLease(t *testing.T) {
client, err := NewClientFromOptions(SetEncrypt(true), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionUnpublishedLease(t *testing.T) {
client, err := NewClientFromOptions(SetUnpublished(true), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionReduceIdle(t *testing.T) {
client, err := NewClientFromOptions(SetReduceIdle(true), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionReduceIdleTime(t *testing.T) {
client, err := NewClientFromOptions(SetReduceIdleTime(300001), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionReduceIdleCount(t *testing.T) {
client, err := NewClientFromOptions(SetReduceIdleQuantity(4), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionCloseIdle(t *testing.T) {
client, err := NewClientFromOptions(SetCloseIdle(true), SetDebug(true))
if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err)
}
if result, err := client.validCreate(); err != nil {
t.Fatalf(err.Error())
} else {
t.Log(result)
}
client.CreateStreamSession("")
if err := client.Close(); err != nil {
t.Fatalf("client.Close() Error: %q\n", err)
}
}
func TestOptionCloseIdleTime(t *testing.T) {
client, err := NewClientFromOptions(SetCloseIdleTime(300001), SetDebug(true))
if err != nil { if err != nil {
t.Fatalf("NewClientFromOptions() Error: %q\n", err) t.Fatalf("NewClientFromOptions() Error: %q\n", err)
} }

View File

@ -57,7 +57,7 @@ func parseReply(line string) (*Reply, error) {
} }
} }
r.Pairs[kvPair[0]] = kvPair[1] r.Pairs[kvPair[0]] = kvPair[len(kvPair)-1]
} }
return r, nil return r, nil

View File

@ -2,7 +2,7 @@ package goSam
import ( import (
"fmt" "fmt"
"math" // "math"
"math/rand" "math/rand"
"time" "time"
) )
@ -13,32 +13,31 @@ func init() {
// CreateStreamSession creates a new STREAM Session. // CreateStreamSession creates a new STREAM Session.
// Returns the Id for the new Client. // Returns the Id for the new Client.
func (c *Client) CreateStreamSession(dest string) (int32, string, error) { func (c *Client) CreateStreamSession(id int32, dest string) (string, error) {
if dest == "" { if dest == "" {
dest = "TRANSIENT" dest = "TRANSIENT"
} }
c.id = id
id := rand.Int31n(math.MaxInt32)
r, err := c.sendCmd( r, err := c.sendCmd(
"SESSION CREATE STYLE=STREAM ID=%d DESTINATION=%s %s %s\n", "SESSION CREATE STYLE=STREAM ID=%d DESTINATION=%s %s %s\n",
id, c.id,
dest, dest,
c.sigtype(), c.sigtype(),
c.allOptions(), c.allOptions(),
) )
if err != nil { if err != nil {
return -1, "", err return "", err
} }
// TODO: move check into sendCmd() // TODO: move check into sendCmd()
if r.Topic != "SESSION" || r.Type != "STATUS" { if r.Topic != "SESSION" || r.Type != "STATUS" {
return -1, "", fmt.Errorf("Unknown Reply: %+v\n", r) return "", fmt.Errorf("Unknown Reply: %+v\n", r)
} }
result := r.Pairs["RESULT"] result := r.Pairs["RESULT"]
if result != "OK" { if result != "OK" {
return -1, "", ReplyError{ResultKeyNotFound, r} return "", ReplyError{ResultKeyNotFound, r}
} }
return id, r.Pairs["DESTINATION"], nil return r.Pairs["DESTINATION"], nil
} }