Most of the operators accept a list of functional options to impact the Observable behaviour.
As an example:
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 10, nil
}, rxgo.WithContext(),
rxgo.WithCPUPool(),
rxgo.WithBufferedChannel(1))Configure the capacity of the output channel.
rxgo.WithBufferedChannel(1) // Create a buffered channel with a 1 capacityAllows passing a context. The Observable will listen to its done signal to close itself.
rxgo.WithContext(ctx)- Lazy (default): consume when an Observer starts to subscribe.
rxgo.WithObservationStrategy(rxgo.Lazy)- Eager: consumer when the Observable is created:
rxgo.WithObservationStrategy(rxgo.Eager)- StopOnError (default): stop processing if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.StopOnError)- ContinueOnError: continue processing items if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.ContinueOnError)This strategy is propagated to the parent(s) Observable(s).
Convert the operator in a parallel operator and specify the number of concurrent goroutines.
rxgo.WithPool(8) // Creates a pool of 8 goroutinesConvert the operator in a parallel operator and specify the number of concurrent goroutines as runtime.NumCPU().
rxgo.WithCPUPool()Force an Observable to produce items sequentially.
rxgo.Serialize()This option should be used in coordination with rxgo.WithPool(n) or rxgo.WithCPUPool().
Create a Connectable Observable.
rxgo.WithPublishStrategy()This option is propagated to the parent(s) Observable(s).