diff --git a/app/config/sink.go b/app/config/sink.go index e14f74484f..4b305af4ba 100644 --- a/app/config/sink.go +++ b/app/config/sink.go @@ -176,5 +176,5 @@ func ConfigureSink(v *viper.Viper) { ConfigureKafkaConfiguration(v, "sink") // Override Kafka configuration defaults - v.SetDefault("sink.kafka.consumerGroupId", "openmeter-sink-worker") + v.SetDefault("sink.kafka.consumerGroupID", "openmeter-sink-worker") } diff --git a/app/config/viper.go b/app/config/viper.go index e8c4f9f60e..327c4ab8ca 100644 --- a/app/config/viper.go +++ b/app/config/viper.go @@ -11,6 +11,7 @@ func DecodeHook() mapstructure.DecodeHookFunc { mapstructure.TextUnmarshallerHookFunc(), mapstructure.StringToTimeDurationHookFunc(), mapstructure.StringToSliceHookFunc(","), + mapstructure.StringToBasicTypeHookFunc(), ) } diff --git a/pkg/redis/client.go b/pkg/redis/client.go index a0f3b8ee24..04f21c91c9 100644 --- a/pkg/redis/client.go +++ b/pkg/redis/client.go @@ -3,6 +3,7 @@ package redis import ( "crypto/tls" "fmt" + "strings" "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" @@ -57,9 +58,19 @@ func NewClient(o Options, opts ...Option) (*redis.Client, error) { // Initialize Redis Client var client *redis.Client if o.Sentinel.Enabled { + // Address may be a comma-separated list of sentinel nodes. + var sentinelAddrs []string + for _, a := range strings.Split(o.Address, ",") { + if trimmed := strings.TrimSpace(a); trimmed != "" { + sentinelAddrs = append(sentinelAddrs, trimmed) + } + } + if len(sentinelAddrs) == 0 { + return nil, fmt.Errorf("sentinel mode enabled but no valid sentinel addresses provided") + } client = redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: o.Sentinel.MasterName, - SentinelAddrs: []string{o.Address}, + SentinelAddrs: sentinelAddrs, DB: o.Database, Username: o.Username, Password: o.Password, diff --git a/tools/migrate/fs.go b/tools/migrate/fs.go index b7a7a4c65a..177573b448 100644 --- a/tools/migrate/fs.go +++ b/tools/migrate/fs.go @@ -4,7 +4,7 @@ import ( "bufio" "bytes" "io/fs" - "path/filepath" + "path" "strings" ) @@ -29,15 +29,15 @@ func (s *SourceWrapper) Open(name string) (fs.File, error) { return s.fsys.Open(name) } -func (s *SourceWrapper) ReadDir(path string) ([]fs.DirEntry, error) { - entries, err := fs.ReadDir(s.fsys, path) +func (s *SourceWrapper) ReadDir(dir string) ([]fs.DirEntry, error) { + entries, err := fs.ReadDir(s.fsys, dir) if err != nil { return nil, err } results := make([]fs.DirEntry, 0, len(entries)) for _, entry := range entries { - filePath := filepath.Join(path, entry.Name()) + filePath := path.Join(dir, entry.Name()) if entry.IsDir() { r, err := s.ReadDir(filePath)