123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package client
- import (
- "context"
- "encoding/json"
- "errors"
- "io"
- "log"
- "strings"
- "time"
- pb "git.mokkon.com/sainw/fho_forward_proto.git"
- grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/timestamppb"
- )
- type FhoClient struct {
- id string
- key string
- dataService pb.DataServiceClient
- }
- func NewClient(address string, id string, key string) (*FhoClient, error) {
- opts := []grpc_retry.CallOption{
- grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
- }
- addresses := strings.Split(address, ":")
- host := addresses[0]
- port := ""
- if len(addresses) > 1 {
- port = addresses[1]
- }
- options := make([]grpc.DialOption, 0)
- options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))
- if port == "443" {
- options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
- } else {
- options = append(options, grpc.WithInsecure())
- }
- conn, err := grpc.Dial(address, options...)
- if err != nil {
- return nil, err
- }
- client := &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}
- go func(c *FhoClient) {
- Heartbeat(c, func(isConnected bool) {})
- }(client)
- return client, nil
- }
- func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
- md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
- ctx := metadata.NewOutgoingContext(context.Background(), md)
- r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
- if err != nil {
- return nil, err
- }
- if r.ReplyStatus == pb.DataReply_ERROR {
- return nil, errors.New(r.ErrorMsg)
- }
- data := r.GetData()
- rows := make([]map[string]interface{}, 0)
- err = json.Unmarshal([]byte(data), &rows)
- if err != nil {
- return nil, err
- }
- return rows, nil
- }
- type Snapshot struct {
- client *FhoClient
- object string
- updatedAfter time.Time
- stream pb.DataService_StreamDataClient
- isConnected bool
- }
- func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
- s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
- err := s.assignStream()
- if err != nil {
- return nil, err
- }
- return s, nil
- }
- func (s *Snapshot) assignStream() error {
- md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object)
- ctx := metadata.NewOutgoingContext(context.Background(), md)
- stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
- if err != nil {
- return err
- }
- s.stream = stream
- return nil
- }
- func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
- start:
- r, err := s.stream.Recv()
- if err == io.EOF {
- return nil, err
- }
- if err != nil {
- log.Println("stream error:", err.Error())
- statusCode, ok := status.FromError(err)
- if ok {
- if statusCode.Code() == codes.Unauthenticated {
- return nil, err
- } else {
- log.Println("Retrying stream")
- for {
- time.Sleep(time.Second * 3)
- err := s.assignStream()
- if err == nil {
- goto start
- }
- }
- }
- }
- }
- if r.ReplyStatus == pb.DataReply_ERROR {
- return nil, errors.New(r.ErrorMsg)
- }
- data := r.GetData()
- rec := make([]map[string]interface{}, 0)
- err = json.Unmarshal([]byte(data), &rec)
- if err != nil {
- return nil, err
- }
- s.updatedAfter = r.LatestUpdatedTime.AsTime()
- return &rec, nil
- }
- func Heartbeat(c *FhoClient, fn func(isConnected bool)) error {
- s, err := c.QuerySnapshot("heartbeat", time.Now())
- if err != nil {
- log.Println("listen heartbeat error:", err.Error())
- return err
- }
- isInit := false
- for {
- start:
- _, err := s.stream.Recv()
- if err != nil {
- log.Println("stream error:", err)
- if s.isConnected || !isInit {
- fn(false)
- s.isConnected = false
- isInit = true
- }
- statusCode, ok := status.FromError(err)
- if ok {
- if statusCode.Code() == codes.Unauthenticated {
- return err
- }
- }
- log.Println("Retrying connection")
- for {
- time.Sleep(time.Second * 3)
- err := s.assignStream()
- if err == nil {
- goto start
- }
- }
- } else {
- if !s.isConnected || !isInit {
- fn(true)
- s.isConnected = true
- isInit = true
- }
- }
- }
- }
- func (f *FhoClient) update(object string, record interface{}) error {
- md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
- ctx := metadata.NewOutgoingContext(context.Background(), md)
- bytes, err := json.Marshal(record)
- if err != nil {
- return err
- }
- r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)})
- if err != nil {
- return err
- }
- if r.ReplyStatus == pb.DataReply_ERROR {
- return errors.New(r.ErrorMsg)
- }
- return nil
- }
|