|
@@ -76,6 +76,7 @@ type Snapshot struct {
|
|
|
object string
|
|
|
updatedAfter time.Time
|
|
|
stream pb.DataService_StreamDataClient
|
|
|
+ isConnected bool
|
|
|
}
|
|
|
|
|
|
func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
|
|
@@ -136,6 +137,46 @@ start:
|
|
|
return &rec, nil
|
|
|
}
|
|
|
|
|
|
+func Heartbeat(c *FhoClient, fn func(isConnected bool)) error {
|
|
|
+ s, err := c.QuerySnapshot("heartbeat", time.Now())
|
|
|
+ if err != nil {
|
|
|
+ log.Println("listen heartbeat error:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ isInit := false
|
|
|
+ for {
|
|
|
+ start:
|
|
|
+ _, err := s.stream.Recv()
|
|
|
+ if err != nil {
|
|
|
+ if s.isConnected || !isInit {
|
|
|
+ fn(false)
|
|
|
+ s.isConnected = false
|
|
|
+ isInit = true
|
|
|
+ }
|
|
|
+ statusCode, ok := status.FromError(err)
|
|
|
+ if ok {
|
|
|
+ if statusCode.Code() == codes.Unauthenticated {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("Retrying connection")
|
|
|
+ for {
|
|
|
+ time.Sleep(time.Second * 3)
|
|
|
+ err := s.assignStream()
|
|
|
+ if err == nil {
|
|
|
+ goto start
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if !s.isConnected || !isInit {
|
|
|
+ fn(true)
|
|
|
+ s.isConnected = true
|
|
|
+ isInit = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (f *FhoClient) update(object string, record interface{}) error {
|
|
|
md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
|
|
|
ctx := metadata.NewOutgoingContext(context.Background(), md)
|