listener.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package client
  2. import (
  3. "io"
  4. "log"
  5. "time"
  6. )
  7. func listen(c *FhoClient, object string, lastUpdatedTime time.Time, fn func(*[]map[string]interface{}) error) error {
  8. snap, err := c.QuerySnapshot(object, lastUpdatedTime)
  9. if err != nil {
  10. log.Println("listen error:", err.Error())
  11. return err
  12. }
  13. for {
  14. data, err := snap.Next()
  15. if err == io.EOF {
  16. log.Println("Error EOF")
  17. break
  18. }
  19. if err != nil {
  20. log.Println("Error:", err.Error())
  21. break
  22. }
  23. err = fn(data)
  24. if err != nil {
  25. return err
  26. }
  27. }
  28. return nil
  29. }
  30. func ListenOnProducts(c *FhoClient, lastUpdatedTime time.Time, fn func([]*Product) error) error {
  31. listen(c, "products", lastUpdatedTime, func(data *[]map[string]interface{}) error {
  32. var products []*Product
  33. for _, rec := range *data {
  34. _product := ProductFromMap(rec)
  35. products = append(products, _product)
  36. }
  37. return fn(products)
  38. })
  39. return nil
  40. }
  41. func ListenOnPrice(c *FhoClient, lastUpdatedTime time.Time, fn func([]*PriceSchedule) error) error {
  42. err := listen(c, "station_price_updates", lastUpdatedTime, func(data *[]map[string]interface{}) error {
  43. var prices []*PriceSchedule
  44. pricesMap := make(map[string]*PriceSchedule)
  45. for _, rec := range *data {
  46. _price, err := PriceScheduleFromMap(rec)
  47. if err != nil {
  48. return err
  49. }
  50. if _, ok := pricesMap[_price.ID]; ok {
  51. _price = pricesMap[_price.ID]
  52. } else {
  53. pricesMap[_price.ID] = _price
  54. prices = append(prices, _price)
  55. }
  56. _product := PriceScheduleProductFromMap(rec)
  57. _price.PriceScheduleProducts = append(_price.PriceScheduleProducts, _product)
  58. }
  59. return fn(prices)
  60. })
  61. return err
  62. }