package client import ( "context" "encoding/json" "errors" "io" "log" "strings" "time" pb "git.mokkon.com/sainw/fho_forward_proto.git" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) type FhoClient struct { id string key string dataService pb.DataServiceClient } func NewClient(address string, id string, key string) (*FhoClient, error) { opts := []grpc_retry.CallOption{ grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)), } addresses := strings.Split(address, ":") host := addresses[0] port := "" if len(addresses) > 1 { port = addresses[1] } options := make([]grpc.DialOption, 0) options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...))) if port == "443" { options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host))) } else { options = append(options, grpc.WithInsecure()) } conn, err := grpc.Dial(address, options...) if err != nil { return nil, err } return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil } func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) { md := metadata.Pairs("id", f.id, "key", f.key, "object", object) ctx := metadata.NewOutgoingContext(context.Background(), md) r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)}) if err != nil { return nil, err } if r.ReplyStatus == pb.DataReply_ERROR { return nil, errors.New(r.ErrorMsg) } data := r.GetData() rows := make([]map[string]interface{}, 0) err = json.Unmarshal([]byte(data), &rows) if err != nil { return nil, err } return rows, nil } type Snapshot struct { client *FhoClient object string updatedAfter time.Time stream pb.DataService_StreamDataClient } func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) { s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c} err := s.assignStream() if err != nil { return nil, err } return s, nil } func (s *Snapshot) assignStream() error { md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object) ctx := metadata.NewOutgoingContext(context.Background(), md) stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30)) if err != nil { return err } s.stream = stream return nil } func (s *Snapshot) Next() (*[]map[string]interface{}, error) { start: r, err := s.stream.Recv() if err == io.EOF { return nil, err } if err != nil { log.Println("stream error:", err.Error()) statusCode, ok := status.FromError(err) if ok { if statusCode.Code() == codes.Unauthenticated { return nil, err } else if statusCode.Code() == codes.Unavailable { log.Println("Retrying stream") for { time.Sleep(time.Second * 1) err := s.assignStream() if err == nil { goto start } } } else { return nil, err } } } if r.ReplyStatus == pb.DataReply_ERROR { return nil, errors.New(r.ErrorMsg) } data := r.GetData() rec := make([]map[string]interface{}, 0) err = json.Unmarshal([]byte(data), &rec) if err != nil { return nil, err } s.updatedAfter = r.LatestUpdatedTime.AsTime() return &rec, nil } func (f *FhoClient) update(object string, record interface{}) error { md := metadata.Pairs("id", f.id, "key", f.key, "object", object) ctx := metadata.NewOutgoingContext(context.Background(), md) bytes, err := json.Marshal(record) if err != nil { return err } r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)}) if err != nil { return err } if r.ReplyStatus == pb.DataReply_ERROR { return errors.New(r.ErrorMsg) } return nil }