One pattern I’ve used previously was setting up real time configs for services using NATS KV. A KV watcher would monitor the config value(s) and then in real time update values in a service. I guess you could think of this as service level feature flags. I had thought about writing a small Go package for this but it’s not enough to warrant a package so I figured I’d just write a small post about it.
Setup
First we need to set up something to watch the configs we care about. Let’s create a struct to hold our watcher, some functions to do what we need, and our interval. I’m going to just use an options struct to keep things clean later. We also create a WatchFunc type that will take in a key/value that’s returned from the watcher and return an error. The Name field here is the top level configs subject. Since NATS KV watchers can accept wildcards, this can be a higher level subject if desired, something like services.serviceA.>.
type WatchFunc func(key string, val []byte) error
type ConfigWatcherOpts struct {
Bucket string
Name string
Keys map[string]WatchFunc
Conn *nats.Conn
WatchInterval int64
}
type ConfigWatcher struct {
kv jetstream.KeyValue
watcher jetstream.KeyWatcher
keys map[string]WatchFunc
watchInterval int64
}
Builder
Let’s setup a function to create our watcher and map everything we need to our Flag struct. I’m using the IgnoreDeletes functional option here because I don’t necessarily care about a key being deleted, but there could be use cases where a service should terminate if the key with the config value disappears.
func New(opts ConfigWatcherOpts) (ConfigWatcher, error) {
ctx := context.TODO()
js, err := jetstream.New(opts.Conn)
if err != nil {
return ConfigWatcher{}, err
}
kv, err := js.KeyValue(ctx, opts.Bucket)
if err != nil {
return ConfigWatcher{}, err
}
w, err := kv.Watch(ctx, opts.Name, jetstream.IgnoreDeletes())
if err != nil {
return ConfigWatcher{}, err
}
return ConfigWatcher{
watcher: w,
keys: opts.Keys,
watchInterval: opts.WatchInterval,
}, nil
}
Watcher
Now that we have our builder, let’s add a method to watch our configs. This doesn’t do much other than call Updates() on our watcher and range over them. Since watchers can accept NATS wildcard characters, this is where we filter the subjects we care about, something like services.serviceA.logger.level. If the Keys field contains a subject defined in our map, the key and value are passed to that WatchFunc where it can do something with that key/value.
We also take in a channel for errors that we can send in case one of our WatchFuncs encounter an error.
func (c *ConfigWatcher) Watch(errChan chan<- error) error {
for val := range c.watcher.Updates() {
if val == nil {
continue
}
wf, ok := c.Keys[val.Key()]
if !ok {
continue
}
if err := wf(val.Key(), val.Value()); err != nil {
errChan <- err
}
}
time.Sleep(time.Duration(c.watchInterval) * time.Second)
return nil
}
WatchFuncs
Let’s put together our watcher functions. There are two examples here. Both wrap an anonymous function that matches the WatchFunc type. One uses a logger that I wrote directly and the other uses an interface just to make things a bit easier if we decide to refactor things into their own packages later on.
The LogLevel function will watch for changes to our log level key and apply changes to our logger real time. The FooFunc is an example of setting a value in a struct in real time.
type ConfigSetter interface {
Set(string, []byte) error
}
func LogLevel(logger *logr.Logger) configs.WatchFunc {
return func(key string, value []byte) error {
level := string(value)
switch level {
case "info":
logger.Level = logr.InfoLevel
case "error":
logger.Level = logr.ErrorLevel
case "debug":
logger.Level = logr.DebugLevel
}
logger.Infof("set log level to %s ", string(value))
return nil
}
}
func FooFunc(c ConfigSetter) configs.WatchFunc {
return func(key string, value []byte) error {
return c.Set(key, value)
}
}
Example type
Here is our example type that we can send to FooFunc. For this example, we aren’t doing anything other than setting the value.
type Foo struct {
Flag string
}
func (f *Foo) Set(key string, value []byte) error {
f.Flag = string(value)
return nil
}
Main
Let’s finally build our main function.
We first build our logger and instantiate our Foo object. We then set up our NATS connection so we can pass it to our New function. Then we define the keys that we want to watch and the WatchFunc that goes with that key. These are our lower level filter subjects for this service. Next we call New with our options to build our config watcher. Once the watcher is build, we create a channel for any errors encountered by the watcher functions. Since the watcher is going to run in a goroutine, these errors are passed back to the next goroutine to handle. Finally we run an infinite loop to demonstrate our real time changes.
func main() {
logger := logr.NewLogger()
foo := Foo{
Flag: "initial",
}
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
keys := map[string]configs.WatchFunc{
"services.logger.log_level": LogLevel(logger),
"services.foo": FooFunc(&foo),
}
ff, err := configs.New(configs.ConfigWatcherOpts{
Bucket: "configs",
Name: "services.>",
Keys: keys,
Conn: nc,
})
if err != nil {
log.Fatal(err)
}
errChan := make(chan error)
go ff.Watch(errChan)
fmt.Println("service started")
go func() {
err := <-errChan
if err != nil {
os.Exit(1)
}
}()
for {
logger.Info("info")
logger.Debug("debug")
logger.Infof("%+v\n", foo)
time.Sleep(2 * time.Second)
}
}
Demo
As the service runs, you can see our intial values are set for our log level and our Foo flag. As we update the KVs on the right side, the watcher receives the change and then updates our logger to use the debug logging level and then updates the value set in the Foo struct.

Example Repo
All of the code here is in this example repo