client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "log"
  8. "strings"
  9. "time"
  10. pb "git.mokkon.com/sainw/fho_forward_proto.git"
  11. grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/credentials"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/grpc/status"
  17. "google.golang.org/protobuf/types/known/timestamppb"
  18. )
  19. type FhoClient struct {
  20. id string
  21. key string
  22. dataService pb.DataServiceClient
  23. }
  24. func NewClient(address string, id string, key string) (*FhoClient, error) {
  25. opts := []grpc_retry.CallOption{
  26. grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
  27. }
  28. addresses := strings.Split(address, ":")
  29. host := addresses[0]
  30. port := ""
  31. if len(addresses) > 1 {
  32. port = addresses[1]
  33. }
  34. options := make([]grpc.DialOption, 0)
  35. options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))
  36. if port == "443" {
  37. options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
  38. } else {
  39. options = append(options, grpc.WithInsecure())
  40. }
  41. conn, err := grpc.Dial(address, options...)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil
  46. }
  47. func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
  48. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  49. ctx := metadata.NewOutgoingContext(context.Background(), md)
  50. r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
  51. if err != nil {
  52. return nil, err
  53. }
  54. if r.ReplyStatus == pb.DataReply_ERROR {
  55. return nil, errors.New(r.ErrorMsg)
  56. }
  57. data := r.GetData()
  58. rows := make([]map[string]interface{}, 0)
  59. err = json.Unmarshal([]byte(data), &rows)
  60. if err != nil {
  61. return nil, err
  62. }
  63. return rows, nil
  64. }
  65. type Snapshot struct {
  66. client *FhoClient
  67. object string
  68. updatedAfter time.Time
  69. stream pb.DataService_StreamDataClient
  70. }
  71. func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
  72. s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
  73. err := s.assignStream()
  74. if err != nil {
  75. return nil, err
  76. }
  77. return s, nil
  78. }
  79. func (s *Snapshot) assignStream() error {
  80. md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object)
  81. ctx := metadata.NewOutgoingContext(context.Background(), md)
  82. stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
  83. if err != nil {
  84. return err
  85. }
  86. s.stream = stream
  87. return nil
  88. }
  89. func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
  90. start:
  91. r, err := s.stream.Recv()
  92. if err == io.EOF {
  93. return nil, err
  94. }
  95. if err != nil {
  96. log.Println("stream error:", err.Error())
  97. statusCode, ok := status.FromError(err)
  98. if ok {
  99. if statusCode.Code() == codes.Unauthenticated {
  100. return nil, err
  101. } else if statusCode.Code() == codes.Unavailable {
  102. log.Println("Retrying stream")
  103. for {
  104. time.Sleep(time.Second * 1)
  105. err := s.assignStream()
  106. if err == nil {
  107. goto start
  108. }
  109. }
  110. } else {
  111. return nil, err
  112. }
  113. }
  114. }
  115. if r.ReplyStatus == pb.DataReply_ERROR {
  116. return nil, errors.New(r.ErrorMsg)
  117. }
  118. data := r.GetData()
  119. rec := make([]map[string]interface{}, 0)
  120. err = json.Unmarshal([]byte(data), &rec)
  121. if err != nil {
  122. return nil, err
  123. }
  124. s.updatedAfter = r.LatestUpdatedTime.AsTime()
  125. return &rec, nil
  126. }
  127. func (f *FhoClient) update(object string, record interface{}) error {
  128. md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
  129. ctx := metadata.NewOutgoingContext(context.Background(), md)
  130. bytes, err := json.Marshal(record)
  131. if err != nil {
  132. return err
  133. }
  134. r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)})
  135. if err != nil {
  136. return err
  137. }
  138. if r.ReplyStatus == pb.DataReply_ERROR {
  139. return errors.New(r.ErrorMsg)
  140. }
  141. return nil
  142. }