package fho_client import ( "context" "encoding/json" "errors" "io" "time" pb "git.mokkon.com/sainw/fho_proto" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) type FhoClient struct { id string key string dataService pb.DataServiceClient } func NewClient(address string, id string, key string) (*FhoClient, error) { opts := []grpc_retry.CallOption{ grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)), } conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...))) if err != nil { return nil, err } return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil } func (f *FhoClient) Query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) md := metadata.Pairs("id", f.id, "key", f.key) ctx = metadata.NewOutgoingContext(ctx, md) defer cancel() r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)}) if err != nil { return nil, err } if r.ReplyStatus == pb.DataReply_ERROR { return nil, errors.New(r.ErrorMsg) } data := r.GetData() rows := make([]map[string]interface{}, 0) err = json.Unmarshal([]byte(data), &rows) if err != nil { return nil, err } return rows, nil } type Snapshot struct { client *FhoClient object string updatedAfter time.Time stream pb.DataService_StreamDataClient } func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) { s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c} err := s.assignStream() if err != nil { return nil, err } return s, nil } func (s *Snapshot) assignStream() error { md := metadata.Pairs("id", s.client.id, "key", s.client.key) ctx := metadata.NewOutgoingContext(context.Background(), md) stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30)) if err != nil { return err } s.stream = stream return nil } func (s *Snapshot) Next() ([]map[string]interface{}, error) { start: r, err := s.stream.Recv() if err == io.EOF { return nil, err } if err != nil { statusCode, ok := status.FromError(err) if ok { if statusCode.Code() == codes.Unauthenticated { return nil, err } else if statusCode.Code() == codes.Unavailable { for { time.Sleep(time.Second * 1) err := s.assignStream() if err == nil { goto start } } } else { return nil, err } } } if r.ReplyStatus == pb.DataReply_ERROR { return nil, errors.New(r.ErrorMsg) } data := r.GetData() rec := make([]map[string]interface{}, 0) err = json.Unmarshal([]byte(data), &rec) if err != nil { return nil, err } s.updatedAfter = r.LatestUpdatedTime.AsTime() return rec, nil }