listener.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package client
  2. import (
  3. "io"
  4. "log"
  5. "time"
  6. )
  7. func listen(c *FhoClient, object string, lastUpdatedTime time.Time, fn func(*[]map[string]interface{}) error) error {
  8. snap, err := c.QuerySnapshot(object, lastUpdatedTime)
  9. if err != nil {
  10. log.Println("listen error:", err.Error())
  11. return err
  12. }
  13. for {
  14. data, err := snap.Next()
  15. if err == io.EOF {
  16. log.Println("Error EOF")
  17. break
  18. }
  19. if err != nil {
  20. log.Println("Error:", err.Error())
  21. break
  22. }
  23. return fn(data)
  24. }
  25. return nil
  26. }
  27. func ListenOnProducts(c *FhoClient, lastUpdatedTime time.Time, fn func([]*Product) error) error {
  28. listen(c, "products", lastUpdatedTime, func(data *[]map[string]interface{}) error {
  29. var products []*Product
  30. for _, rec := range *data {
  31. _product := ProductFromMap(rec)
  32. products = append(products, _product)
  33. }
  34. return fn(products)
  35. })
  36. return nil
  37. }
  38. func ListenOnPrice(c *FhoClient, lastUpdatedTime time.Time, fn func([]*PriceSchedule) error) error {
  39. err := listen(c, "station_price_updates", lastUpdatedTime, func(data *[]map[string]interface{}) error {
  40. var prices []*PriceSchedule
  41. pricesMap := make(map[string]*PriceSchedule)
  42. for _, rec := range *data {
  43. _price, err := PriceScheduleFromMap(rec)
  44. if err != nil {
  45. return err
  46. }
  47. if _, ok := pricesMap[_price.ID]; ok {
  48. _price = pricesMap[_price.ID]
  49. } else {
  50. pricesMap[_price.ID] = _price
  51. prices = append(prices, _price)
  52. }
  53. _product := PriceScheduleProductFromMap(rec)
  54. _price.PriceScheduleProducts = append(_price.PriceScheduleProducts, _product)
  55. }
  56. return fn(prices)
  57. })
  58. return err
  59. }