Commit 87ec68c8 by Bogdan Ungureanu

Adding net/cluster library

parent 7adc37e2
GO libraries GO libraries
============ ============
net/cluster - simple network cluster net/cluster - simple network cluster
cluster
=======
A simple go library for creating a network cluster with smart peer discovery.
package cluster
import (
"crypto/tls"
"errors"
"log"
"net"
"os"
"strings"
"sync"
"time"
)
// ToDo :
// better testing
// Cluster App Interface
type ClusterApplication interface {
OnNodeJoin(p Peer)
OnMessage(m *Message)
OnBroadcast(m *BroadcastMessage)
OnHeartbeat(p Peer)
OnNodeExit(p Peer)
}
// Cluster class
type Cluster struct {
tcp net.Listener
tls *tls.Config
self *Peer
peers map[int64]Peer
applications []ClusterApplication
heartbeats map[int64]time.Time
heartbeatFrequency int
networkTimeout int
optlock *sync.RWMutex
peerslock *sync.RWMutex
waitgroup *sync.WaitGroup
message chan *message
receiveQuit chan bool
heartbeatQuit chan bool
logLevel int
log *log.Logger
}
// Simple way to create a cluster instance
func New(app ClusterApplication, name string, domain, laddr string, tlsconfig *tls.Config) (*Cluster, error) {
var err error
// Generate new PeerId
selfpeer, err := NewPeer(name, domain, laddr)
if err != nil {
return nil, err
}
// Init cluster
cluster := &Cluster{
tcp: nil,
tls: tlsconfig,
self: selfpeer,
peers: make(map[int64]Peer),
message: make(chan *message),
receiveQuit: make(chan bool),
heartbeats: make(map[int64]time.Time),
heartbeatQuit: make(chan bool),
heartbeatFrequency: DefaultHeartbeatFrequency,
networkTimeout: DefaultNetworkTimeout,
optlock: new(sync.RWMutex),
peerslock: new(sync.RWMutex),
waitgroup: new(sync.WaitGroup),
logLevel: LogLevelDebug,
log: log.New(os.Stdout, "", log.LstdFlags),
}
cluster.RegisterCallback(app)
localaddr := cluster.self.StringAddr()
cluster.tcp, err = net.Listen("tcp", localaddr)
if err != nil {
return nil, err
}
if cluster.tls != nil {
cluster.tcp = tls.NewListener(cluster.tcp, cluster.tls)
cluster.debug("Listening TLS on %s", localaddr)
} else {
cluster.debug("Listening on %s", localaddr)
}
go cluster.acceptRoutine()
go cluster.receiveRoutine()
go cluster.heartbeatRoutine()
return cluster, err
}
// SetLogger sets the log.Logger that the Cluster will write to.
func (c *Cluster) SetLogger(l *log.Logger) error {
c.optlock.Lock()
defer c.optlock.Unlock()
if l == nil {
return errors.New("Invalid Logger interface")
}
c.log = l
return nil
}
// SetLogLevel sets the level of logging that will be written to the Logger.
func (c *Cluster) SetLogLevel(level int) error {
c.optlock.Lock()
defer c.optlock.Unlock()
if level < LogLevelDebug || level > LogLevelError {
return errors.New("Invalid log level")
}
c.logLevel = level
return nil
}
// SetHeartbeatFrequency sets the frequency in seconds with which heartbeats
// will be sent from this Node to test the health of other Nodes in the Cluster.
func (c *Cluster) SetHeartbeatFrequency(freq int) error {
c.optlock.Lock()
defer c.optlock.Unlock()
if freq < 1 {
return errors.New("Heartbeat frequency should be positive and greater then 0 seconds")
}
c.heartbeatFrequency = freq
return nil
}
// GetHeartbeatFrequency return the frequency in seconds
func (c *Cluster) GetHeartbeatFrequency() int {
c.optlock.RLock()
defer c.optlock.RUnlock()
return c.heartbeatFrequency
}
// GetLogLevel return the current logLevel
func (c *Cluster) GetLogLevel() int {
c.optlock.Lock()
defer c.optlock.Unlock()
return c.logLevel
}
// My node Id
func (c *Cluster) GetID() int64 {
c.optlock.RLock()
defer c.optlock.RUnlock()
return c.self.Id
}
// MyNode Name as string
func (c *Cluster) StringName() string {
c.optlock.RLock()
defer c.optlock.RUnlock()
return c.self.Name
}
// MyNode IP as string
func (c *Cluster) StringAddr() string {
c.optlock.RLock()
defer c.optlock.RUnlock()
return c.self.StringAddr()
}
// Register Applications callback
func (c *Cluster) RegisterCallback(app ClusterApplication) {
c.peerslock.Lock()
defer c.peerslock.Unlock()
c.applications = append(c.applications, app)
}
// Connect to peer by Network Address
func (c *Cluster) Connect(address string) error {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
c.err("Connect() error %s, %s", address, err)
return err
}
if err := c.Send(Peer{TCPAddr: addr}, MsgJoin, c.self); err != nil {
return err
}
return nil
}
// Send a message to a Peer
func (c *Cluster) Send(to Peer, Type int8, data interface{}) error {
c.optlock.RLock()
defer c.optlock.RUnlock()
networkTimeout := time.Duration(c.networkTimeout) * time.Second
raddr := to.StringAddr()
conn, err := net.DialTimeout("tcp", raddr, networkTimeout)
if err != nil {
c.warn("Send: Unable to dial connection to %s (%s)", raddr, err)
return err
}
defer conn.Close()
if c.tls != nil {
colonPos := strings.LastIndex(raddr, ":")
if colonPos == -1 {
colonPos = len(raddr)
}
hostname := raddr[:colonPos]
// If no ServerName is set, infer the ServerName
// from the hostname we're connecting to.
if c.tls.ServerName == "" {
// Make a copy to avoid polluting argument or default.
conn := *c.tls
conn.ServerName = hostname
c.tls = &conn
}
tslconn := tls.Client(conn, c.tls)
if err = tslconn.Handshake(); err != nil {
conn.Close()
c.warn("Unable to handshake TSL connection with %s", raddr)
return err
}
conn = tslconn
}
enc := newEncoder(conn)
deadline := time.Now().Add(networkTimeout)
if err := conn.SetWriteDeadline(deadline); err != nil {
return err
}
if err := enc.Encode(c.self.Id, to.Id, Type, data); err != nil {
return err
}
return nil
}
// Broadcast a message
func (c *Cluster) Broadcast(message interface{}) {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
for _, peer := range c.peers {
if err := c.Send(peer, MsgBroadcast, message); err != nil {
c.debug("Broadcast to %s@%s failed: %s",
peer.StringID(),
peer.StringAddr(),
err,
)
}
}
}
// Disconnect from remote peer
func (c *Cluster) Disconnect(p Peer) {
c.Send(p, MsgLeave, nil)
c.onNodeExit(p, true)
}
//MySelf Shutdown
func (c *Cluster) Shutdown() {
// ToDo : Fix non existing cluster
if c == nil {
return
}
c.debug("Shutting down cluster ...")
c.receiveQuit <- true
c.heartbeatQuit <- true
c.waitgroup.Wait()
c.peerslock.Lock()
defer c.peerslock.Unlock()
c.tcp.Close()
for _, peer := range c.peers {
if err := c.Send(peer, MsgLeave, nil); err != nil {
c.debug("Unable to send Goodbye to %s@%s failed: %s",
peer.StringID(),
peer.StringAddr(),
err,
)
} else {
// Unlearn remotes
delete(c.peers, peer.Id)
delete(c.heartbeats, peer.Id)
}
}
c.debug("Closing DONE")
}
// Get list of peers
func (c *Cluster) GetPeers() []Peer {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
rs := make([]Peer, 0, len(c.peers))
for _, r := range c.peers {
rs = append(rs, r)
}
return rs
}
// Return a peer by id
func (c *Cluster) GetPeerById(peerid int64) (Peer, bool) {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
peer, known := c.peers[peerid]
return peer, known
}
// on NodeJoin
func (c *Cluster) onNodeJoin(peer Peer) error {
c.peerslock.Lock()
defer c.peerslock.Unlock()
if peer.equal(c.self) || peer.Id == c.self.Id {
c.debug("Ignoring peer '%s@%s'. (Self)",
peer.StringID(),
peer.StringAddr(),
)
return nil
}
if _, ok := c.peers[peer.Id]; ok {
c.debug("Ignoring peer %s@%s. (Known)",
peer.StringID(),
peer.StringAddr(),
)
return nil
}
// Respond to join
if err := c.Send(peer, MsgJoin, c.self); err != nil {
return err
}
c.peers[peer.Id] = peer
// onNodeJoin notification
for _, app := range c.applications {
app.OnNodeJoin(peer)
}
// Send peer list to new remote
var peers []Peer
for _, r := range c.peers {
// Send only list of peers from same domain
if r.Domain == peer.Domain {
peers = append(peers, r)
}
}
if err := c.Send(peer, MsgPeers, peers); err != nil {
return err
}
c.debug("Learned new %s@%s",
peer.StringID(),
peer.StringAddr(),
)
return nil
}
// onNodeExit
func (c *Cluster) onNodeExit(peer Peer, wipeHistory bool) {
c.peerslock.Lock()
defer c.peerslock.Unlock()
if _, ok := c.peers[peer.Id]; !ok {
return
}
delete(c.peers, peer.Id)
if wipeHistory {
delete(c.heartbeats, peer.Id)
}
// Send Leave notification
for _, app := range c.applications {
app.OnNodeExit(peer)
}
c.debug("Unlearned remote: %s@%s",
peer.StringID(),
peer.StringAddr(),
)
}
package cluster
import (
"crypto/tls"
"fmt"
"log"
"os"
"sync"
"testing"
"time"
)
var rsaCertPEM = `-----BEGIN CERTIFICATE-----
MIIDxzCCAq+gAwIBAgIJAPWnIg4IdFtuMA0GCSqGSIb3DQEBCwUAMHoxCzAJBgNV
BAYTAlJPMRAwDgYDVQQIDAdSb21hbmlhMQ8wDQYDVQQHDAZHYWxhdGkxFTATBgNV
BAoMDENsdXN0ZXItVGVzdDEQMA4GA1UEAwwHY2x1c3RlcjEfMB0GCSqGSIb3DQEJ
ARYQdGVjaEBjbHVzdGVyLm5ldDAeFw0xNDA2MDYxNDEyMjlaFw0xNTA2MDYxNDEy
MjlaMHoxCzAJBgNVBAYTAlJPMRAwDgYDVQQIDAdSb21hbmlhMQ8wDQYDVQQHDAZH
YWxhdGkxFTATBgNVBAoMDENsdXN0ZXItVGVzdDEQMA4GA1UEAwwHY2x1c3RlcjEf
MB0GCSqGSIb3DQEJARYQdGVjaEBjbHVzdGVyLm5ldDCCASIwDQYJKoZIhvcNAQEB
BQADggEPADCCAQoCggEBAKE8pij/D9Y2Y6qw6FlaA17Eym+FpezKcxf13s6BvVRA
QZAqQYCYjbLgbh0aTcNFowu68xyL6ozV+f4MwnNxquApzcPAyxh7W8y3AAp+oEzP
ngfjRKxHesvHpPSCLOvDeyt/soDxshvT2dJZxEeRIpRSypsg6EpBcR/oYC5gWH0G
8nMAUGwm4+t6j7YBXRYeO94++Y8N1IMffok7x22f68miTgLscl0lcsT2W7/k3J6s
lQQge2i88z7dUZ+yQHr3VbtpcMxR3hzvJE0iDomtz0UxgkdJEZ5jxp4N9+dr2XpN
NEUWG73FniH7ACeLeJRp6YCzeybnOwYbwsiDbICAK9sCAwEAAaNQME4wHQYDVR0O
BBYEFCkPxwp9co/mRUXmTOZ7qwxPIzZdMB8GA1UdIwQYMBaAFCkPxwp9co/mRUXm
TOZ7qwxPIzZdMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAXtyODe
I6/nAFBStxMWqo4qHND/mr5T8UZMrsjgc+bRaufAStGVOLOXKrG3Q1u+7FLureOP
WgPxZmtTzZ+jWEtzGZ/weeZyjFrYLAI+7JlkPfPg8DGeYC3JXxudikmPmUvgh6lX
XmYMhl1Uah2+cxQjQRueQ8pf4pHxrlICXjYbG4PdGKGZ0duYSay24p9QsEVj+R1E
C6XRf3co8u9ej7US4P5Nxib2b1rgEX7sb3SuQRzMXzTX2pjD58PpA19APFePn8ZG
Hdw4zVHcQDrLQbarHqrjmg0dFTeG8YZEk1zIktoZutRfgTS3pAjm4KR4WXZXkbO6
JL1ZlAZrc0bbWbQ=
-----END CERTIFICATE-----
`
var rsaKeyPEM = `-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQChPKYo/w/WNmOq
sOhZWgNexMpvhaXsynMX9d7Ogb1UQEGQKkGAmI2y4G4dGk3DRaMLuvMci+qM1fn+
DMJzcargKc3DwMsYe1vMtwAKfqBMz54H40SsR3rLx6T0gizrw3srf7KA8bIb09nS
WcRHkSKUUsqbIOhKQXEf6GAuYFh9BvJzAFBsJuPreo+2AV0WHjvePvmPDdSDH36J
O8dtn+vJok4C7HJdJXLE9lu/5NyerJUEIHtovPM+3VGfskB691W7aXDMUd4c7yRN
Ig6Jrc9FMYJHSRGeY8aeDffna9l6TTRFFhu9xZ4h+wAni3iUaemAs3sm5zsGG8LI
g2yAgCvbAgMBAAECggEBAIf3HeNaWx931Aofjn+yoT7sUg1DNY19lxqwcofP7jk6
yk7o4z2u52bdTN3rzYGKmpiMuO/sqQjEdECgv+UcCE1p0yNs05c8h0gVVcqDx2ee
eJQMOV/SEyH+pAKwN3NE7/vZfkywBFLlDj0NB7CWGgPb4RB56oibsOqISXsyMcyT
lRsViW7Hk3KUpIlz/jgFNInEjTeLsvx5L2GHN6fXxquGEqsss4IZXQ9fCApi6xmc
xWcMklB5RN6O/6hPL4Ib4AmK8OSxOF0+odnlyDQs4k07euW5apyKpKwa/c00g0lb
cmuGvwhHIig5S5uAZ3Vzq5oPLeuh05CU0TbIqECPxVECgYEA1R1+casygkCc9Xao
bvWlQHuAJxKxo/bQ/DXNn4l2qR84ZfYsWS+GhIBN/cKZgHLllsy427DAKGY6/lmt
CDltsz5cE42eCL+wehP2dhp3iGqXFAig5af0w4me+cMExuKBOYkS7MGTKq9DqFkG
Fec1Utln0njaP4+pvCNIeepLYCkCgYEAwa6suHBp/1C2e5rWBmUf9j574ckbc6nh
9UoyDBVuooho4u9HSDb6f/ZTiQe3kjxjmpnp8M487IhW9Tzmd50+o2da3l60vIm5
M2ZNxFRgiythiqrKSCnwLRFGpFAqnEu2tZNNvJQLmmTrWFvgxSway8bxpURNg04y
6H8Bd1TZnGMCgYEAiwaHkx2MgJ4oBpVWwbPsDsnCSzsNuZnssWtj7XxedWuRfip+
udugFFYjCrTlMH9DuQFqYp7GbFRsjbrwfxn/r1ux82uCOdDbDnhxYpBXhB2M2xvZ
4peTu+/OTr7jId9nT6JVPy/0knbtWyhgKO/AwIBlE0+ViLtujfYydJ3ceCkCgYEA
srg2FPNWPAwEd0ZHHBuQRK8frRbfx/kI0kkmqVPVhREOh+l4A0EIIa/xIU8Hq18i
IfTIlDYarcCZTS5nFBT7SdkDVpJZgGgthypttC6P75uWJFi406IvR8bbQp/e0d5j
uGU2pD6P/mYFbMFLRWYPS95F+NRwGiu8eiFH/w9CxjUCgYBa01kBP/feQtJtsZ6A
59VtGzTFTpbLNTtej7uTxPOuoOAg9a9c3XnOzAL733H49mfPFvdHOFJqLuFMSiKB
XAuwYxd9hct7rW8IJQPzs/YdZIwoEGW1TEOUtUcpLDrzUiOpO8r1mj4eYeUbLGTn
peMptpBttAcUzESn1Kd5oTTT7Q==
-----END PRIVATE KEY-----
`
type TestApplication struct{}
func (c *TestApplication) OnNodeJoin(p Peer) {}
func (c *TestApplication) OnHeartbeat(p Peer) {}
func (c *TestApplication) OnMessage(msg *Message) {}
func (c *TestApplication) OnBroadcast(msg *BroadcastMessage) {}
func (c *TestApplication) OnNodeExit(p Peer) {}
var node1name string = "node1"
var node2name string = "node2"
var node3name string = "node3"
func TestSetLogLevel(t *testing.T) {
cluster := &Cluster{
logLevel: LogLevelDebug,
optlock: new(sync.RWMutex),
}
err := cluster.SetLogLevel(-1)
if err == nil {
t.Fatalf("Invalid Log Level accepted")
}
cluster.SetLogLevel(LogLevelWarn)
if cluster.GetLogLevel() != LogLevelWarn {
t.Fatalf("Invalid LogLevel different than we setup")
}
}
func TestHeartbeatFrequency(t *testing.T) {
cluster := &Cluster{
heartbeatFrequency: 10,
optlock: new(sync.RWMutex),
}
err := cluster.SetHeartbeatFrequency(0)
if err == nil {
t.Fatalf("Invalid Heartbeat Frequency accepted")
}
err = cluster.SetHeartbeatFrequency(-10)
if err == nil {
t.Fatalf("Invalid Heartbeat Frequency accepted")
}
cluster.SetHeartbeatFrequency(2)
if cluster.heartbeatFrequency != cluster.GetHeartbeatFrequency() {
t.Fatalf("Invalid Heartbeat Frequency different than we setup")
}
}
func TestID(t *testing.T) {
// Generate new PeerId
peer_name := "test"
peer_addr := "127.0.0.1:10123"
selfpeer, err := NewPeer(peer_name, "testing", peer_addr)
if err != nil {
t.Error(err)
}
cluster := &Cluster{
self: selfpeer,
optlock: new(sync.RWMutex),
}
if cluster.StringName() != peer_name {
t.Fatalf("Invalid peer name expected %s got %s", peer_name, cluster.StringName())
}
if cluster.StringAddr() != peer_addr {
t.Fatal("Invalid peer address expected %s got %s ", peer_addr, cluster.StringAddr())
}
if cluster.GetID() == int64(0) {
t.Fatal("Expected to retrieve a peer id")
}
if cluster.GetID() < int64(0) {
t.Fatal("Peer id should be positive value")
}
if cluster.self.StringID() != fmt.Sprintf("%d", cluster.GetID()) {
t.Fatal("Invalid peer string id")
}
otherpeer, err := NewPeer(peer_name, "testing", "127.0.0.1:1234")
if selfpeer.StringAddr() == otherpeer.StringAddr() {
t.Fatalf("Address should be differennt")
}
if selfpeer.equal(otherpeer) {
t.Fatalf("Peer should be different")
}
}
func TestSetLogger(t *testing.T) {
cluster := &Cluster{
heartbeatFrequency: 10,
optlock: new(sync.RWMutex),
}
logger := log.New(os.Stdout, "", log.LstdFlags)
err := cluster.SetLogger(nil)
if err == nil {
t.Fatalf("Invalid logger accepted")
}
err = cluster.SetLogger(logger)
if err != nil {
t.Fatalf("Logger not accepted")
}
}
func TestNodeJoinExit(t *testing.T) {
var peer *Peer
// Generate new PeerId
selfpeer, err := NewPeer("test", "testing", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
cluster := &Cluster{
self: selfpeer,
heartbeatFrequency: 10,
optlock: new(sync.RWMutex),
peerslock: new(sync.RWMutex),
peers: make(map[int64]Peer),
logLevel: LogLevelDebug,
log: log.New(os.Stdout, "", log.LstdFlags),
}
peer, _ = NewPeer("test2", "local", "127.0.0.1:20")
cluster.onNodeJoin(*peer)
cluster.onNodeJoin(*selfpeer)
cluster.Broadcast("test message for all")
cluster.peers[peer.Id] = *peer
cluster.onNodeExit(*peer, true)
cluster.Disconnect(*peer)
}
func TestCluster(t *testing.T) {
cert, err := tls.X509KeyPair([]byte(rsaCertPEM), []byte(rsaKeyPEM))
if err != nil {
t.Fatalf("loadkeys failed: %s", err)
}
tlsconfig := tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
}
node1, err := New(&TestApplication{}, node1name, "local", "0.0.0.0:5001", &tlsconfig)
if err != nil {
t.Fatal(err)
}
node2, err := New(&TestApplication{}, node2name, "local", "0.0.0.0:5002", &tlsconfig)
if err != nil {
t.Fatal(err)
}
node3, err := New(&TestApplication{}, node3name, "local", "127.0.0.1:5001", nil)
if err == nil {
t.Fatal("Complete New() TestCase")
}
node3, err = New(&TestApplication{}, node3name, "local", "0.0.0.0:5003", nil)
if err != nil {
t.Fatal(err)
}
node1.Connect(node2.StringAddr())
node2.Connect(node3.StringAddr())
node3.Connect(node1.StringAddr())
err = node1.Connect(node3.StringAddr())
if err == nil {
t.Fatalf("Should not be able to connect TLS to nonTLS")
}
err = node1.Connect("127.0.0.1")
if err == nil {
t.Fatalf("Invalid address in connect")
}
// Wait for peers to join
time.Sleep(3 * time.Second)
node1.Broadcast([]byte("TSL Broadcast"))
// Add bogus node (100% broacast test)
node1.peerslock.Lock()
bogus, _ := NewPeer("test", "local", "127.0.0.1:5010")
node1.peers[bogus.Id] = *bogus
node1.peerslock.Unlock()
node1.Broadcast([]byte("Bogus peeer TSL Broadcast"))
//
peers1 := node1.GetPeers()
peers2 := node2.GetPeers()
if peers1[0].Id != node2.GetID() {
t.Fatalf("Same peer list ?")
}
if peers2[0].Id != node1.GetID() {
t.Fatalf("Same peer list ?")
}
node1.Shutdown()
node2.Shutdown()
}
package cluster
// Built in message types
const (
MsgJoin int8 = iota
MsgPeers
MsgLeave
MsgHeartbeat
MsgBroadcast
)
// Log levels
const (
LogLevelDebug = iota
LogLevelWarn
LogLevelError
)
const (
DefaultHeartbeatFrequency int = 10
DefaultNetworkTimeout int = 5
)
package cluster
import (
"bytes"
"encoding/gob"
"io"
"net"
"time"
)
// Public message struct
type Message struct {
From Peer
Type int8
Payload []byte
}
// Public broadcast message struct
type BroadcastMessage struct {
From Peer
Payload []byte
}
// Internal message structure
type message struct {
Id int64 // Message ID
From int64 // Source Peer.Id
To int64 // Destintaion Peer.Id
Type int8 // Message type
Payload []byte
remote *net.TCPAddr
}
// message Encoder
type messageEncoder struct {
msg *message
enc *gob.Encoder
}
// message Decoder
type messageDecoder struct {
msg *message
dec *gob.Decoder
}
// Decode message.Payload into given interface
func (m *message) Decode(v interface{}) error {
b := bytes.NewReader(m.Payload)
r := gob.NewDecoder(b)
if err := r.Decode(v); err != nil {
return err
}
return nil
}
// Create a new Decoder instance
func newDecoder(reader io.Reader) *messageDecoder {
dec := &messageDecoder{
msg: new(message),
dec: gob.NewDecoder(reader),
}
return dec
}
// Decode received message
func (m *messageDecoder) Decode() (*message, error) {
err := m.dec.Decode(m.msg)
if err != nil {
return nil, err
}
return m.msg, nil
}
// Create new Encoder instance
func newEncoder(writer io.Writer) *messageEncoder {
enc := &messageEncoder{
msg: new(message),
enc: gob.NewEncoder(writer),
}
return enc
}
// Encode Message
func (m *messageEncoder) Encode(from int64, to int64, mtype int8, data interface{}) error {
var payload []byte
switch d := data.(type) {
case []byte:
payload = d
case string:
payload = []byte(d)
case nil:
default:
b := new(bytes.Buffer)
w := gob.NewEncoder(b)
if err := w.Encode(data); err != nil {
return err
}
payload = b.Bytes()
}
m.msg = &message{
Id: time.Now().UnixNano(),
From: from,
To: to,
Type: mtype,
Payload: payload,
}
return m.enc.Encode(m.msg)
}
package cluster
import (
"bytes"
"testing"
)
type Msg1 struct {
Field1 string
Field2 string
field3 string
}
var from int64 = 1000
var to int64 = 1001
var mtype int8 = 1
func TestEncodeTypes(t *testing.T) {
var err error
type msg2 struct{ field1 string }
reader := new(bytes.Buffer)
enc := newEncoder(reader)
err = enc.Encode(from, to, mtype, []byte("adasd"))
if err != nil {
t.Fatalf("Error encoding message %s", err)
}
err = enc.Encode(from, to, mtype, "adasd")
if err != nil {
t.Fatalf("Error encoding message %s", err)
}
err = enc.Encode(from, to, mtype, nil)
if err != nil {
t.Fatalf("Error encoding message %s", err)
}
err = enc.Encode(from, to, mtype, &msg2{})
if err == nil {
t.Fatalf("Gob encoding did not sent any error!", err)
}
}
func TestEncodeDecode(t *testing.T) {
var err error
reader := new(bytes.Buffer)
enc := newEncoder(reader)
data := &Msg1{
Field1: "test1",
Field2: "test2",
field3: "test3",
}
err = enc.Encode(from, to, mtype, data)
if err != nil {
t.Fatalf("Error encoding message %s", err)
}
dec := newDecoder(reader)
msg, err := dec.Decode()
if err != nil {
t.Fatalf("Decoded did not return an error%s", err)
}
var recv Msg1
err = msg.Decode(&recv)
// ToDo :fix
if err != nil {
t.Fatalf("Decoded did not return an error %s", err)
}
}
func TestDecodeBogus(t *testing.T) {
reader := new(bytes.Buffer)
dec := newDecoder(reader)
reader.Write([]byte("bogus data"))
msg, err := dec.Decode()
if err == nil {
t.Fatalf("Decoded did not return an error")
}
//Bogus struc decode
type bogusStruct struct {
field1 string
}
reader = new(bytes.Buffer)
enc := newEncoder(reader)
err = enc.Encode(from, to, mtype, nil)
if err != nil {
t.Fatalf("Error encoding message %s", err)
}
dec = newDecoder(reader)
msg, err = dec.Decode()
var decval bogusStruct
err = msg.Decode(&decval)
if err == nil {
t.Fatalf("Decoded did not return an error")
}
}
package cluster
import (
"fmt"
"math/rand"
"net"
"time"
)
// Public Peer struct
type Peer struct {
Id int64
Name string
Domain string
TCPAddr *net.TCPAddr
}
// Peer id to string
func (p *Peer) StringID() string {
return fmt.Sprintf("%d", p.Id)
}
// Peer address to string
func (p *Peer) StringAddr() string {
return p.TCPAddr.String()
}
// Comapre peer address
func (p *Peer) equal(p2 *Peer) bool {
return p.TCPAddr.IP.Equal(p2.TCPAddr.IP) &&
p.TCPAddr.Port == p2.TCPAddr.Port &&
p.TCPAddr.Zone == p2.TCPAddr.Zone
}
// Generate a new Peer
func NewPeer(name string, domain string, laddr string) (*Peer, error) {
var err error
rand.Seed(time.Now().UTC().UnixNano())
peer := &Peer{
Id: rand.Int63(),
Name: name,
Domain: domain,
}
if peer.TCPAddr, err = net.ResolveTCPAddr("tcp", laddr); err != nil {
return nil, err
}
return peer, nil
}
// Accept connection routine
func (c *Cluster) acceptRoutine() {
defer c.tcp.Close()
for {
conn, err := c.tcp.Accept()
if err != nil {
// Connection closed , exit this routine
return
}
// Receive and decode routine
go func(c *Cluster, conn net.Conn) {
dec := newDecoder(conn)
c.optlock.RLock()
networkTimeout := time.Duration(c.networkTimeout) * time.Second
c.optlock.RUnlock()
deadline := time.Now().Add(networkTimeout)
if err := conn.SetReadDeadline(deadline); err != nil {
c.warn("Timeout receiving from '(%s, %s)': %s", conn.LocalAddr(), conn.RemoteAddr(), err)
return
}
msg, err := dec.Decode()
if err != nil {
c.err("Could decode message from '(%s, %s)': %s", conn.LocalAddr(), conn.RemoteAddr(), err)
conn.Close()
return
}
remoteaddr, err := net.ResolveTCPAddr("tcp", conn.RemoteAddr().String())
if err != nil {
c.err("Error resolving remote host address %s, %s", remoteaddr, err)
return
}
msg.remote = remoteaddr
c.message <- msg
}(c, conn)
}
}
// Send Heartbeats routing
func (c *Cluster) heartbeatRoutine() {
c.waitgroup.Add(1)
defer c.waitgroup.Done()
check := func() {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
for _, peer := range c.peers {
lastheard, ok := c.heartbeats[peer.Id]
if !ok {
c.debug("Never heard from '%s'", peer.StringID())
go c.Connect(peer.TCPAddr.String())
continue
} else if time.Since(lastheard) > c.getHealthyInterval()*2 {
c.warn("Remote '%s' hasn't responded in %s.",
peer.StringID(),
c.getHealthyInterval()*2,
)
// Do an ungraceful disconnect since we might want to try
// reconnecting later.
go c.onNodeExit(peer, false)
continue
}
if err := c.Send(peer, MsgHeartbeat, nil); err != nil {
continue
}
}
}
for {
select {
case <-c.heartbeatQuit:
c.debug("Shutting down hearbeat ...")
return
case <-time.After(c.getHealthyInterval()):
check()
}
}
}
// Receive messageRouting
func (c *Cluster) receiveRoutine() {
c.waitgroup.Add(1)
defer c.waitgroup.Done()
for {
select {
case <-c.receiveQuit:
c.debug("Shutting down receive thread ...")
return
case msg := <-c.message:
c.peerslock.Lock()
c.heartbeats[msg.From] = time.Now()
c.peerslock.Unlock()
switch msg.Type {
// Node joins the cluster
case MsgJoin:
var peer Peer
err := msg.Decode(&peer)
if err != nil {
c.warn("Unable to decode Join message from %s",
msg.remote.IP)
return
}
// Rewrite remote Addr with what we see in connection
if peer.TCPAddr.IP.String() == "0.0.0.0" {
peer.TCPAddr.IP = msg.remote.IP
}
c.debug("Join received from %s@%s ",
peer.StringID(),
peer.StringAddr(),
)
c.onNodeJoin(peer)
// Node
case MsgPeers:
var peerlist []Peer
err := msg.Decode(&peerlist)
if err != nil {
c.warn("Unable to decode Peers message from %s",
msg.remote.IP)
return
}
if rpeer, known := c.GetPeerById(msg.From); known == true {
c.debug("Peers list received from peer %s@%s ",
rpeer.StringID(),
rpeer.StringAddr(),
)
for _, peer := range peerlist {
// Lear learn the remote ip from connection
if peer.Id != c.self.Id {
c.debug("Learning new peer %s@%s from %s@%s",
peer.StringID(),
peer.StringAddr(),
rpeer.StringID(),
rpeer.StringAddr(),
)
c.onNodeJoin(peer)
}
}
} else {
c.warn("Peers list received unknown peer %s ",
msg.remote.IP.String(),
)
}
case MsgHeartbeat:
if peer, known := c.GetPeerById(msg.From); known == true {
c.debug("Heartbeat message received from %s@%s",
peer.StringID(),
peer.StringAddr(),
)
// Send Heartbeat notification
for _, app := range c.applications {
app.OnHeartbeat(peer)
}
}
// Node Leaves the cluster
case MsgLeave:
if peer, known := c.GetPeerById(msg.From); known == true {
c.debug("Remove message from %s@%s",
peer.StringID(),
peer.StringAddr(),
)
c.onNodeExit(peer, true)
}
case MsgBroadcast:
if peer, known := c.GetPeerById(msg.From); known == true {
c.debug("Broadcast message received from %s@%s",
peer.StringID(),
peer.StringAddr(),
)
for _, app := range c.applications {
app.OnBroadcast(
&BroadcastMessage{
From: peer,
Payload: msg.Payload,
})
}
}
default:
if peer, known := c.GetPeerById(msg.From); known == true {
c.debug("General message received from %s@%s",
peer.StringID(),
peer.StringAddr(),
)
for _, app := range c.applications {
app.OnMessage(
&Message{
From: peer,
Type: msg.Type,
Payload: msg.Payload,
})
}
}
}
}
}
}
// Return a list of peers from same domain
func (c *Cluster) getPeersByDomain(domain string) []Peer {
c.peerslock.RLock()
defer c.peerslock.RUnlock()
var peers []Peer
for _, r := range c.peers {
// Send only list of peers from same domain
if r.Domain == domain {
peers = append(peers, r)
}
}
return peers
}
// getHeartbeatInterval
func (c *Cluster) getHealthyInterval() time.Duration {
c.optlock.RLock()
defer c.optlock.RUnlock()
return time.Duration(c.heartbeatFrequency) * time.Second
}
// Log debug level message
func (c *Cluster) debug(format string, v ...interface{}) {
c.optlock.RLock()
defer c.optlock.RUnlock()
if c.logLevel <= LogLevelDebug {
c.log.Printf("%s[%d] <debug> %s",
c.self.Name,
c.self.Id,
fmt.Sprintf(format, v...),
)
}
}
// Log warn level message
func (c *Cluster) warn(format string, v ...interface{}) {
c.optlock.RLock()
defer c.optlock.RUnlock()
if c.logLevel <= LogLevelWarn {
c.log.Printf("%s[%d] <warning> %s",
c.self.Name,
c.self.Id,
fmt.Sprintf(format, v...),
)
}
}
// Log error level message
func (c *Cluster) err(format string, v ...interface{}) {
c.optlock.RLock()
defer c.optlock.RUnlock()
if c.logLevel <= LogLevelError {
c.log.Printf("%s[%d] <error> %s",
c.self.Name,
c.self.Id,
fmt.Sprintf(format, v...),
)
}
}
package cluster
import (
"log"
"os"
"sync"
"testing"
"time"
)
func TestHealtyInterval(t *testing.T) {
hafreq := 10
cluster := &Cluster{
heartbeatFrequency: hafreq,
optlock: new(sync.RWMutex),
}
haInterval := time.Duration(hafreq) * time.Second
if cluster.getHealthyInterval() != haInterval {
t.Fatalf("Invalid Heartbeat HealtyInterval")
}
}
func TestNewPeer(t *testing.T) {
var err error
_, err = NewPeer("test", "testing", "127.0.0.256:0")
if err == nil {
t.Fatalf("Invalid address accepted (1)")
}
_, err = NewPeer("test", "testing", "127.0.0.256:60000")
if err == nil {
t.Fatalf("Invalid address accepted (2)")
}
}
func TestAcceptRoutine(t *testing.T) {
}
func TestPerrsByDomain(t *testing.T) {
var peer *Peer
// Generate new PeerId
selfpeer, err := NewPeer("test", "testing", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
cluster := &Cluster{
self: selfpeer,
peers: make(map[int64]Peer),
peerslock: new(sync.RWMutex),
}
peer, _ = NewPeer("test1", "test", "127.0.0.1:10")
cluster.peers[peer.Id] = *peer
peer, _ = NewPeer("test2", "test", "127.0.0.1:20")
cluster.peers[peer.Id] = *peer
peer, _ = NewPeer("test2", "local", "127.0.0.1:20")
cluster.peers[peer.Id] = *peer
peers := cluster.getPeersByDomain("test")
if len(peers) != 2 {
t.Fatalf("We expect to find 2 peers in domain test")
}
peers = cluster.getPeersByDomain("local")
if len(peers) != 1 {
t.Fatalf("We expect to find 1 peer in domain local")
}
peers = cluster.GetPeers()
if len(peers) != 3 {
t.Fatalf("We expect to find 3 peers in total")
}
}
func TestDebuging(t *testing.T) {
// Generate new PeerId
selfpeer, err := NewPeer("test", "testing", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
cluster := &Cluster{
self: selfpeer,
optlock: new(sync.RWMutex),
logLevel: LogLevelDebug,
log: log.New(os.Stdout, "", log.LstdFlags),
}
cluster.debug("Debug message")
cluster.warn("Warn message")
cluster.err("Error message")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment