client.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "log"
  8. "strings"
  9. "time"
  10. pb "git.mokkon.com/sainw/fho_forward_proto.git"
  11. grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/credentials"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/grpc/status"
  17. "google.golang.org/protobuf/types/known/timestamppb"
  18. )
  19. type FhoClient struct {
  20. id string
  21. key string
  22. dataService pb.DataServiceClient
  23. }
  24. func NewClient(address string, id string, key string) (*FhoClient, error) {
  25. opts := []grpc_retry.CallOption{
  26. grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
  27. }
  28. addresses := strings.Split(address, ":")
  29. host := addresses[0]
  30. port := ""
  31. if len(addresses) > 1 {
  32. port = addresses[1]
  33. }
  34. options := make([]grpc.DialOption, 0)
  35. options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))
  36. if port == "443" {
  37. options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
  38. } else {
  39. options = append(options, grpc.WithInsecure())
  40. }
  41. conn, err := grpc.Dial(address, options...)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil
  46. }
  47. func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
  48. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  49. ctx := metadata.NewOutgoingContext(context.Background(), md)
  50. r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
  51. if err != nil {
  52. return nil, err
  53. }
  54. if r.ReplyStatus == pb.DataReply_ERROR {
  55. return nil, errors.New(r.ErrorMsg)
  56. }
  57. data := r.GetData()
  58. rows := make([]map[string]interface{}, 0)
  59. err = json.Unmarshal([]byte(data), &rows)
  60. if err != nil {
  61. return nil, err
  62. }
  63. return rows, nil
  64. }
  65. type Snapshot struct {
  66. client *FhoClient
  67. object string
  68. updatedAfter time.Time
  69. stream pb.DataService_StreamDataClient
  70. isConnected bool
  71. }
  72. func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
  73. s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
  74. err := s.assignStream()
  75. if err != nil {
  76. return nil, err
  77. }
  78. return s, nil
  79. }
  80. func (s *Snapshot) assignStream() error {
  81. md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object)
  82. ctx := metadata.NewOutgoingContext(context.Background(), md)
  83. stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
  84. if err != nil {
  85. return err
  86. }
  87. s.stream = stream
  88. return nil
  89. }
  90. func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
  91. start:
  92. r, err := s.stream.Recv()
  93. if err == io.EOF {
  94. return nil, err
  95. }
  96. if err != nil {
  97. log.Println("stream error:", err.Error())
  98. statusCode, ok := status.FromError(err)
  99. if ok {
  100. if statusCode.Code() == codes.Unauthenticated {
  101. return nil, err
  102. } else {
  103. log.Println("Retrying stream")
  104. for {
  105. time.Sleep(time.Second * 3)
  106. err := s.assignStream()
  107. if err == nil {
  108. goto start
  109. }
  110. }
  111. }
  112. }
  113. }
  114. if r.ReplyStatus == pb.DataReply_ERROR {
  115. return nil, errors.New(r.ErrorMsg)
  116. }
  117. data := r.GetData()
  118. rec := make([]map[string]interface{}, 0)
  119. err = json.Unmarshal([]byte(data), &rec)
  120. if err != nil {
  121. return nil, err
  122. }
  123. s.updatedAfter = r.LatestUpdatedTime.AsTime()
  124. return &rec, nil
  125. }
  126. func Heartbeat(c *FhoClient, fn func(isConnected bool)) error {
  127. s, err := c.QuerySnapshot("heartbeat", time.Now())
  128. if err != nil {
  129. log.Println("listen heartbeat error:", err.Error())
  130. return err
  131. }
  132. isInit := false
  133. for {
  134. start:
  135. _, err := s.stream.Recv()
  136. if err != nil {
  137. if s.isConnected || !isInit {
  138. fn(false)
  139. s.isConnected = false
  140. isInit = true
  141. }
  142. statusCode, ok := status.FromError(err)
  143. if ok {
  144. if statusCode.Code() == codes.Unauthenticated {
  145. return err
  146. }
  147. }
  148. log.Println("Retrying connection")
  149. for {
  150. time.Sleep(time.Second * 3)
  151. err := s.assignStream()
  152. if err == nil {
  153. goto start
  154. }
  155. }
  156. } else {
  157. if !s.isConnected || !isInit {
  158. fn(true)
  159. s.isConnected = true
  160. isInit = true
  161. }
  162. }
  163. }
  164. }
  165. func (f *FhoClient) update(object string, record interface{}) error {
  166. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  167. ctx := metadata.NewOutgoingContext(context.Background(), md)
  168. bytes, err := json.Marshal(record)
  169. if err != nil {
  170. return err
  171. }
  172. r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)})
  173. if err != nil {
  174. return err
  175. }
  176. if r.ReplyStatus == pb.DataReply_ERROR {
  177. return errors.New(r.ErrorMsg)
  178. }
  179. return nil
  180. }