client.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "log"
  8. "strings"
  9. "time"
  10. pb "git.mokkon.com/sainw/fho_forward_proto.git"
  11. grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/credentials"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/grpc/status"
  17. "google.golang.org/protobuf/types/known/timestamppb"
  18. )
  19. type FhoClient struct {
  20. id string
  21. key string
  22. dataService pb.DataServiceClient
  23. }
  24. func NewClient(address string, id string, key string) (*FhoClient, error) {
  25. opts := []grpc_retry.CallOption{
  26. grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
  27. }
  28. addresses := strings.Split(address, ":")
  29. host := addresses[0]
  30. port := ""
  31. if len(addresses) > 1 {
  32. port = addresses[1]
  33. }
  34. options := make([]grpc.DialOption, 0)
  35. options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))
  36. if port == "443" {
  37. options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
  38. } else {
  39. options = append(options, grpc.WithInsecure())
  40. }
  41. conn, err := grpc.Dial(address, options...)
  42. if err != nil {
  43. return nil, err
  44. }
  45. client := &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}
  46. go func(c *FhoClient) {
  47. Heartbeat(c, func(isConnected bool) {})
  48. }(client)
  49. return client, nil
  50. }
  51. func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
  52. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  53. ctx := metadata.NewOutgoingContext(context.Background(), md)
  54. r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
  55. if err != nil {
  56. return nil, err
  57. }
  58. if r.ReplyStatus == pb.DataReply_ERROR {
  59. return nil, errors.New(r.ErrorMsg)
  60. }
  61. data := r.GetData()
  62. rows := make([]map[string]interface{}, 0)
  63. err = json.Unmarshal([]byte(data), &rows)
  64. if err != nil {
  65. return nil, err
  66. }
  67. return rows, nil
  68. }
  69. type Snapshot struct {
  70. client *FhoClient
  71. object string
  72. updatedAfter time.Time
  73. stream pb.DataService_StreamDataClient
  74. isConnected bool
  75. }
  76. func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
  77. s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
  78. err := s.assignStream()
  79. if err != nil {
  80. return nil, err
  81. }
  82. return s, nil
  83. }
  84. func (s *Snapshot) assignStream() error {
  85. md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object)
  86. ctx := metadata.NewOutgoingContext(context.Background(), md)
  87. stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
  88. if err != nil {
  89. return err
  90. }
  91. s.stream = stream
  92. return nil
  93. }
  94. func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
  95. start:
  96. r, err := s.stream.Recv()
  97. if err == io.EOF {
  98. return nil, err
  99. }
  100. if err != nil {
  101. log.Println("stream error:", err.Error())
  102. statusCode, ok := status.FromError(err)
  103. if ok {
  104. if statusCode.Code() == codes.Unauthenticated {
  105. return nil, err
  106. } else {
  107. log.Println("Retrying stream")
  108. for {
  109. time.Sleep(time.Second * 3)
  110. err := s.assignStream()
  111. if err == nil {
  112. goto start
  113. }
  114. }
  115. }
  116. }
  117. }
  118. if r.ReplyStatus == pb.DataReply_ERROR {
  119. return nil, errors.New(r.ErrorMsg)
  120. }
  121. data := r.GetData()
  122. rec := make([]map[string]interface{}, 0)
  123. err = json.Unmarshal([]byte(data), &rec)
  124. if err != nil {
  125. return nil, err
  126. }
  127. s.updatedAfter = r.LatestUpdatedTime.AsTime()
  128. return &rec, nil
  129. }
  130. func Heartbeat(c *FhoClient, fn func(isConnected bool)) error {
  131. s, err := c.QuerySnapshot("heartbeat", time.Now())
  132. if err != nil {
  133. log.Println("listen heartbeat error:", err.Error())
  134. return err
  135. }
  136. isInit := false
  137. for {
  138. start:
  139. _, err := s.stream.Recv()
  140. if err != nil {
  141. log.Println("stream error:", err)
  142. if s.isConnected || !isInit {
  143. fn(false)
  144. s.isConnected = false
  145. isInit = true
  146. }
  147. statusCode, ok := status.FromError(err)
  148. if ok {
  149. if statusCode.Code() == codes.Unauthenticated {
  150. return err
  151. }
  152. }
  153. log.Println("Retrying connection")
  154. for {
  155. time.Sleep(time.Second * 3)
  156. err := s.assignStream()
  157. if err == nil {
  158. goto start
  159. }
  160. }
  161. } else {
  162. if !s.isConnected || !isInit {
  163. fn(true)
  164. s.isConnected = true
  165. isInit = true
  166. }
  167. }
  168. }
  169. }
  170. func (f *FhoClient) update(object string, record interface{}) error {
  171. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  172. ctx := metadata.NewOutgoingContext(context.Background(), md)
  173. bytes, err := json.Marshal(record)
  174. if err != nil {
  175. return err
  176. }
  177. r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)})
  178. if err != nil {
  179. return err
  180. }
  181. if r.ReplyStatus == pb.DataReply_ERROR {
  182. return errors.New(r.ErrorMsg)
  183. }
  184. return nil
  185. }