Browse Source

add client

sainw 3 years ago
parent
commit
50cc870a1a
8 changed files with 163 additions and 81 deletions
  1. 1 0
      .gitignore
  2. 7 7
      client/client.go
  3. 40 0
      client/listener.go
  4. 45 0
      client/model.go
  5. 20 0
      client/query.go
  6. 24 1
      client/util.go
  7. 25 72
      example/main.go
  8. 1 1
      go.mod

+ 1 - 0
.gitignore

@@ -1,3 +1,4 @@
 fho_proto/*.go
 fho_forward_sample/main
 example/example
+example/__debug_bin

+ 7 - 7
client/client.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"io"
+	"log"
 	"time"
 
 	pb "git.mokkon.com/sainw/fho_forward/proto"
@@ -35,12 +36,9 @@ func NewClient(address string, id string, key string) (*FhoClient, error) {
 	return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil
 }
 
-func (f *FhoClient) Query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
 	md := metadata.Pairs("id", f.id, "key", f.key)
-	ctx = metadata.NewOutgoingContext(ctx, md)
-
-	defer cancel()
+	ctx := metadata.NewOutgoingContext(context.Background(), md)
 
 	r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
 	if err != nil {
@@ -86,18 +84,20 @@ func (s *Snapshot) assignStream() error {
 	return nil
 }
 
-func (s *Snapshot) Next() ([]map[string]interface{}, error) {
+func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
 start:
 	r, err := s.stream.Recv()
 	if err == io.EOF {
 		return nil, err
 	}
 	if err != nil {
+		log.Println("stream error:", err.Error())
 		statusCode, ok := status.FromError(err)
 		if ok {
 			if statusCode.Code() == codes.Unauthenticated {
 				return nil, err
 			} else if statusCode.Code() == codes.Unavailable {
+				log.Println("Retrying stream")
 				for {
 					time.Sleep(time.Second * 1)
 					err := s.assignStream()
@@ -120,5 +120,5 @@ start:
 		return nil, err
 	}
 	s.updatedAfter = r.LatestUpdatedTime.AsTime()
-	return rec, nil
+	return &rec, nil
 }

+ 40 - 0
client/listener.go

@@ -0,0 +1,40 @@
+package client
+
+import (
+	"io"
+	"log"
+	"time"
+)
+
+func listen(c *FhoClient, object string, lastUpdatedTime time.Time, fn func(*[]map[string]interface{})) error {
+	snap, err := c.QuerySnapshot(object, lastUpdatedTime)
+	if err != nil {
+		log.Println("listen error:", err.Error())
+		return err
+	}
+	for {
+		data, err := snap.Next()
+		if err == io.EOF {
+			log.Println("Error EOF")
+			break
+		}
+		if err != nil {
+			log.Println("Error:", err.Error())
+			break
+		}
+		fn(data)
+	}
+	return nil
+}
+
+func ListenOnProducts(c *FhoClient, lastUpdatedTime time.Time, fn func([]*Product)) error {
+	listen(c, "products", lastUpdatedTime, func(data *[]map[string]interface{}) {
+		var products []*Product
+		for _, rec := range *data {
+			_product := ProductFromMap(rec)
+			products = append(products, _product)
+		}
+		fn(products)
+	})
+	return nil
+}

+ 45 - 0
client/model.go

@@ -0,0 +1,45 @@
+package client
+
+import (
+	"time"
+)
+
+type Model struct {
+	ID                string
+	IsCreated         bool
+	IsDeleted         bool
+	IsUpdated         bool
+	LatestUpdatedTime *time.Time
+}
+
+func modelFromMap(m map[string]interface{}) *Model {
+	createdAtStr := ToString(m["created_at"])
+	updatedAtStr := ToString(m["updated_at"])
+	deletedAtStr := ToString(m["deleted_at"])
+	updatedAt, _ := ToTime(updatedAtStr)
+
+	isCreated := false
+	isUpdated := false
+	isDeleted := false
+	if createdAtStr == updatedAtStr {
+		isCreated = true
+	} else if deletedAtStr != "" {
+		isDeleted = true
+	} else {
+		isUpdated = true
+	}
+	return &Model{ID: ToString(m["id"]), LatestUpdatedTime: updatedAt,
+		IsCreated: isCreated, IsUpdated: isUpdated, IsDeleted: isDeleted}
+}
+
+type Product struct {
+	Model
+	Name  string
+	Color string
+}
+
+func ProductFromMap(m map[string]interface{}) *Product {
+	return &Product{Model: *modelFromMap(m),
+		Name:  ToString(m["name"]),
+		Color: ToString(m["color"])}
+}

+ 20 - 0
client/query.go

@@ -0,0 +1,20 @@
+package client
+
+import (
+	"fmt"
+	"time"
+)
+
+func QueryProducts(client *FhoClient, lastUpdatedTime time.Time) ([]*Product, error) {
+	data, err := client.query("products", lastUpdatedTime)
+	if err != nil {
+		fmt.Println("query error:", err.Error())
+		return nil, err
+	}
+	var products []*Product
+	for _, rec := range data {
+		_product := ProductFromMap(rec)
+		products = append(products, _product)
+	}
+	return products, nil
+}

+ 24 - 1
client/util.go

@@ -1,6 +1,9 @@
 package client
 
-import "fmt"
+import (
+	"fmt"
+	"time"
+)
 
 func ToStrings(data interface{}) []string {
 	var i []string
@@ -44,3 +47,23 @@ func ToFloat32(data interface{}) float32 {
 	}
 	return 0
 }
+
+const (
+	GRPC_DATETIME_FROM_SQL_FORMAT  = "2006-01-02T15:04:05.999999999-07:00"
+	GRPC_DATETIME_FROM_SQL_FORMAT2 = "2006-01-02T15:04:05.999999999Z"
+)
+
+func ToTime(data interface{}) (*time.Time, error) {
+	if v, ok := data.(time.Time); ok {
+		return &v, nil
+	}
+	str := ToString(data)
+	dateTime, err := time.Parse(GRPC_DATETIME_FROM_SQL_FORMAT, str)
+	if err != nil {
+		dateTime, err = time.Parse(GRPC_DATETIME_FROM_SQL_FORMAT2, str)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &dateTime, nil
+}

+ 25 - 72
example/main.go

@@ -2,23 +2,17 @@ package main
 
 import (
 	"fmt"
-	"io"
+	"log"
 	"time"
 
 	fho "git.mokkon.com/sainw/fho_forward/client"
 )
 
 const (
-	// address = "fhogrpc-test.seinmaungengineering.com:7091"
-	address = "localhost:7090"
+	address = "fhogrpc-test.seinmaungengineering.com:7091"
+	// address = "localhost:7090"
 )
 
-type Product struct {
-	ID    string
-	Name  string
-	Color string
-}
-
 type RegionProduct struct {
 	ProductID string
 	Name      string
@@ -26,12 +20,6 @@ type RegionProduct struct {
 	Variant   float32
 }
 
-func ProductFromMap(m map[string]interface{}) *Product {
-	return &Product{ID: fho.ToString(m["id"]),
-		Name:  fho.ToString(m["name"]),
-		Color: fho.ToString(m["color"])}
-}
-
 func RegionProductFromMap(m map[string]interface{}) *RegionProduct {
 	return &RegionProduct{ProductID: fho.ToString(m["product_id"]),
 		Name:    fho.ToString(m["name"]),
@@ -39,75 +27,40 @@ func RegionProductFromMap(m map[string]interface{}) *RegionProduct {
 		Variant: fho.ToFloat32(m["variant"])}
 }
 func main() {
-	// c, err := fho.NewClient(address, "da910506-4bdc-4dc2-9481-58a4e3e3c8bf", "1adf4633-7e9e-4bd0-b512-7b087d9c9719")
-	client, err := fho.NewClient(address, "bd38e9ac-210a-4e85-865c-401270c07220", "8c95f7fa-3071-427d-a27a-6f96e1d9590e")
+	client, err := fho.NewClient(address, "da910506-4bdc-4dc2-9481-58a4e3e3c8bf", "1adf4633-7e9e-4bd0-b512-7b087d9c9719")
+	// client, err := fho.NewClient(address, "bd38e9ac-210a-4e85-865c-401270c07220", "8c95f7fa-3071-427d-a27a-6f96e1d9590e")
 	if err != nil {
 		panic(err)
 	}
-	go listen(client)
-	// query(client)
-	// for {
-	// 	time.Sleep(time.Second * 5)
-	// 	// query(client)
-	// }
-	select {}
-}
-
-func listen(client *fho.FhoClient) error {
-	snap, err := client.QuerySnapshot("products", time.Now().Add(time.Hour*-114))
+	go listenProducts(client)
+	products, err := fho.QueryProducts(client, time.Now().Add(time.Hour*-114))
 	if err != nil {
-		fmt.Println("listen error:", err.Error())
-		return err
+		log.Println("Error query:", err.Error())
 	}
-	for {
-		data, err := snap.Next()
-		if err == io.EOF {
-			print("Error EOF")
-			break
-		}
-		if err != nil {
-			print("Error:", err.Error())
-			break
-		}
-		for _, rec := range data {
-			_product := ProductFromMap(rec)
-			fmt.Printf("Listener========================\n")
-			printProduct(_product)
-		}
+	for _, product := range products {
+		fmt.Println("Query=====>")
+		printProduct(product)
 	}
-	return nil
+	select {}
 }
 
-func query(client *fho.FhoClient) error {
-	data, err := client.Query("region_product_variants", time.Now().Add(time.Hour*-114))
-	if err != nil {
-		fmt.Println("query error:", err.Error())
-		return err
-	}
-	for _, rec := range data {
-		_product := RegionProductFromMap(rec)
-		printRegionProduct(_product)
-	}
-	return nil
+func listenProducts(client *fho.FhoClient) error {
+	err := fho.ListenOnProducts(client, time.Now().Add(time.Hour*-114), func(products []*fho.Product) {
+		for _, p := range products {
+			fmt.Println("Listening=====>")
+			printProduct(p)
+		}
+	})
+	return err
 }
 
-func printProduct(p *Product) {
+func printProduct(p *fho.Product) {
 	fmt.Printf("ID : %v\n", p.ID)
 	fmt.Printf("Name : %v\n", p.Name)
 	fmt.Printf("Color : %v\n", p.Color)
-}
 
-func printRegionProduct(p *RegionProduct) {
-	fmt.Printf("Product ID : %v\n", p.ProductID)
-	fmt.Printf("Name : %v\n", p.Name)
-	fmt.Printf("Color : %v\n", p.Color)
-	fmt.Printf("Variant : %v\n", p.Variant)
-}
-func dumpMap(m []map[string]interface{}) {
-	for _, r := range m {
-		for k, v := range r {
-			fmt.Printf("%v : %v\n", k, v)
-		}
-		fmt.Printf("-----------------\n")
-	}
+	fmt.Printf("IsCreated : %v\n", p.IsCreated)
+	fmt.Printf("IsUpdated : %v\n", p.IsUpdated)
+	fmt.Printf("IsDeleted : %v\n", p.IsDeleted)
+	fmt.Printf("LatestUpdatedTime : %v\n", p.LatestUpdatedTime)
 }

+ 1 - 1
go.mod

@@ -1,4 +1,4 @@
-module git.mokkon.com/sainw/fho_forward
+module git.mokkon.com/sainw/fho_forward.git
 
 go 1.17