package client

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"time"

	pb "git.mokkon.com/sainw/fho_forward/proto"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/timestamppb"
)

type FhoClient struct {
	id          string
	key         string
	dataService pb.DataServiceClient
}

func NewClient(address string, id string, key string) (*FhoClient, error) {
	opts := []grpc_retry.CallOption{
		grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)),
	}

	conn, err := grpc.Dial(address, grpc.WithInsecure(),
		grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)))
	if err != nil {
		return nil, err
	}
	return &FhoClient{id: id, key: key, dataService: pb.NewDataServiceClient(conn)}, nil
}

func (f *FhoClient) Query(object string, updatedAfter time.Time) ([]map[string]interface{}, error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	md := metadata.Pairs("id", f.id, "key", f.key)
	ctx = metadata.NewOutgoingContext(ctx, md)

	defer cancel()

	r, err := f.dataService.GetData(ctx, &pb.DataRequest{Object: object, UpdatedAfter: timestamppb.New(updatedAfter)})
	if err != nil {
		return nil, err
	}
	if r.ReplyStatus == pb.DataReply_ERROR {
		return nil, errors.New(r.ErrorMsg)
	}
	data := r.GetData()
	rows := make([]map[string]interface{}, 0)
	err = json.Unmarshal([]byte(data), &rows)
	if err != nil {
		return nil, err
	}
	return rows, nil
}

type Snapshot struct {
	client       *FhoClient
	object       string
	updatedAfter time.Time
	stream       pb.DataService_StreamDataClient
}

func (c *FhoClient) QuerySnapshot(object string, updatedAfter time.Time) (*Snapshot, error) {
	s := &Snapshot{object: object, updatedAfter: updatedAfter, client: c}
	err := s.assignStream()
	if err != nil {
		return nil, err
	}
	return s, nil
}

func (s *Snapshot) assignStream() error {
	md := metadata.Pairs("id", s.client.id, "key", s.client.key)
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	stream, err := s.client.dataService.StreamData(ctx, &pb.DataRequest{Object: s.object, UpdatedAfter: timestamppb.New(s.updatedAfter)}, grpc_retry.WithMax(30))
	if err != nil {
		return err
	}
	s.stream = stream
	return nil
}

func (s *Snapshot) Next() ([]map[string]interface{}, error) {
start:
	r, err := s.stream.Recv()
	if err == io.EOF {
		return nil, err
	}
	if err != nil {
		statusCode, ok := status.FromError(err)
		if ok {
			if statusCode.Code() == codes.Unauthenticated {
				return nil, err
			} else if statusCode.Code() == codes.Unavailable {
				for {
					time.Sleep(time.Second * 1)
					err := s.assignStream()
					if err == nil {
						goto start
					}
				}
			} else {
				return nil, err
			}
		}
	}
	if r.ReplyStatus == pb.DataReply_ERROR {
		return nil, errors.New(r.ErrorMsg)
	}
	data := r.GetData()
	rec := make([]map[string]interface{}, 0)
	err = json.Unmarshal([]byte(data), &rec)
	if err != nil {
		return nil, err
	}
	s.updatedAfter = r.LatestUpdatedTime.AsTime()
	return rec, nil
}