1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package register
import ( "context" "fmt" "log" "time"
uuid "github.com/satori/go.uuid" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" )
var client *clientv3.Client
const ( prefix = "service" )
func init() { var err error client, err = clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } }
func Register(ctx context.Context, serviceName, addr string) error { log.Println("Try register to etcd ...") lease := clientv3.NewLease(client) cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() leaseResp, err := lease.Grant(cancelCtx, 3) if err != nil { return err }
leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err }
em, err := endpoints.NewManager(client, prefix) if err != nil { return err }
cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{ Addr: addr, }, clientv3.WithLease(leaseResp.ID)); err != nil { return err } log.Println("Register etcd success")
del := func() { log.Println("Register close")
cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() em.DeleteEndpoint(cancelCtx, serviceName)
lease.Close() } keepRegister(ctx, leaseChannel, del, serviceName, addr)
return nil }
func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, addr string) { go func() { failedCount := 0 for { select { case resp := <-leaseChannel: if resp != nil { } else { log.Println("keep alive failed.") failedCount++ for failedCount > 3 { cleanFunc() if err := Register(ctx, serviceName, addr); err != nil { time.Sleep(time.Second) continue } return } continue } case <-ctx.Done(): cleanFunc() client.Close() return } } }() }
|