package client

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"log"
	"strings"
	"time"

	pb "git.mokkon.com/sainw/fho_forward_proto.git"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/timestamppb"
)

type FhoClient struct {
	id          string
	key         string
	dataService pb.DataServiceClient
}

func NewClient(address string, id string, key string) (*FhoClient, error) {
	opts := []grpc_retry.CallOption{
		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
	}
	addresses := strings.Split(address, ":")
	host := addresses[0]
	port := ""
	if len(addresses) > 1 {
		port = addresses[1]
	}
	options := make([]grpc.DialOption, 0)
	options = append(options, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))

	if port == "443" {
		options = append(options, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, host)))
	} else {
		options = append(options, grpc.WithInsecure())
	}

	conn, err := grpc.Dial(address, options...)
	if err != nil {
		return nil, err
	}
	client := &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}
	go func(c *FhoClient) {
		Heartbeat(c, func(isConnected bool) {})
	}(client)

	return client, nil
}

func (f *FhoClient) query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
	md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
	if err != nil {
		return nil, err
	}
	if r.ReplyStatus == pb.DataReply_ERROR {
		return nil, errors.New(r.ErrorMsg)
	}
	data := r.GetData()
	rows := make([]map[string]interface{}, 0)
	err = json.Unmarshal([]byte(data), &rows)
	if err != nil {
		return nil, err
	}
	return rows, nil
}

type Snapshot struct {
	client       *FhoClient
	object       string
	updatedAfter time.Time
	stream       pb.DataService_StreamDataClient
	isConnected  bool
}

func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
	s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
	err := s.assignStream()
	if err != nil {
		return nil, err
	}
	return s, nil
}

func (s *Snapshot) assignStream() error {
	md := metadata.Pairs("id", s.client.id, "key", s.client.key, "object", s.object)
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
	if err != nil {
		return err
	}
	s.stream = stream
	return nil
}

func (s *Snapshot) Next() (*[]map[string]interface{}, error) {
start:
	r, err := s.stream.Recv()
	if err == io.EOF {
		return nil, err
	}
	if err != nil {
		log.Println("stream error:", err.Error())
		statusCode, ok := status.FromError(err)
		if ok {
			if statusCode.Code() == codes.Unauthenticated {
				return nil, err
			} else {
				log.Println("Retrying stream")
				for {
					time.Sleep(time.Second * 3)
					err := s.assignStream()
					if err == nil {
						goto start
					}
				}
			}
		}
	}
	if r.ReplyStatus == pb.DataReply_ERROR {
		return nil, errors.New(r.ErrorMsg)
	}
	data := r.GetData()
	rec := make([]map[string]interface{}, 0)
	err = json.Unmarshal([]byte(data), &rec)
	if err != nil {
		return nil, err
	}
	s.updatedAfter = r.LatestUpdatedTime.AsTime()
	return &rec, nil
}

func Heartbeat(c *FhoClient, fn func(isConnected bool)) error {
	s, err := c.QuerySnapshot("heartbeat", time.Now())
	if err != nil {
		log.Println("listen heartbeat error:", err.Error())
		return err
	}
	isInit := false
	for {
	start:
		_, err := s.stream.Recv()
		if err != nil {
			log.Println("stream error:", err)
			if s.isConnected || !isInit {
				fn(false)
				s.isConnected = false
				isInit = true
			}
			statusCode, ok := status.FromError(err)
			if ok {
				if statusCode.Code() == codes.Unauthenticated {
					return err
				}
			}
			log.Println("Retrying connection")
			for {
				time.Sleep(time.Second * 3)
				err := s.assignStream()
				if err == nil {
					goto start
				}
			}
		} else {
			if !s.isConnected || !isInit {
				fn(true)
				s.isConnected = true
				isInit = true
			}
		}
	}
}

func (f *FhoClient) update(object string, record interface{}) error {
	md := metadata.Pairs("id", f.id, "key", f.key, "object", object)
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	bytes, err := json.Marshal(record)
	if err != nil {
		return err
	}

	r, err := f.dataService.UpdateData(ctx, &pb.DataRequest{Object: object, Data: string(bytes)})
	if err != nil {
		return err
	}
	if r.ReplyStatus == pb.DataReply_ERROR {
		return errors.New(r.ErrorMsg)
	}
	return nil
}