From 46b91d63c98c09ca4efab38809946334ccbb46f5 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 15:51:52 +0200 Subject: [PATCH 1/6] feat(billing): add collected invoice line limit --- app/common/billing.go | 8 ++-- app/config/billing.go | 11 ++++- app/config/billing_test.go | 29 ++++++++++++ cmd/billing-worker/wire_gen.go | 2 +- cmd/jobs/internal/wire_gen.go | 2 +- config.example.yaml | 4 ++ openmeter/billing/invoice.go | 7 +++ .../service/gatheringinvoicependinglines.go | 20 +++++++++ .../gatheringinvoicependinglines_test.go | 45 +++++++++++++++++++ openmeter/billing/worker/collect/collect.go | 18 +++++--- .../subscriptionsync/service/service.go | 1 + .../worker/subscriptionsync/service/sync.go | 1 + 12 files changed, 135 insertions(+), 13 deletions(-) create mode 100644 app/config/billing_test.go diff --git a/app/common/billing.go b/app/common/billing.go index 678140f8cd..7d280df5a8 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -212,7 +212,7 @@ func NewBillingRegistry( if err != nil { return BillingRegistry{}, err } - subscriptionSyncService, err := NewBillingSubscriptionSyncService(logger, subscriptionServices, billingRegistry, subscriptionSyncAdapter, tracer, creditsConfig, featureGate) + subscriptionSyncService, err := NewBillingSubscriptionSyncService(logger, subscriptionServices, billingRegistry, subscriptionSyncAdapter, tracer, creditsConfig, fsConfig, featureGate) if err != nil { return BillingRegistry{}, err } @@ -271,6 +271,7 @@ func NewBillingCollector(logger *slog.Logger, billingRegistry BillingRegistry, f BillingService: billingRegistry.Billing, Logger: logger, LockedNamespaces: fs.NamespaceLockdown, + MaxLinesPerInvoice: fs.MaxLinesPerCollectedInvoice, }) } @@ -289,14 +290,15 @@ func NewBillingSubscriptionSyncAdapter(db *entdb.Client) (subscriptionsync.Adapt }) } -func NewBillingSubscriptionSyncService(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, billingRegistry BillingRegistry, subscriptionSyncAdapter subscriptionsync.Adapter, tracer trace.Tracer, creditsConfig config.CreditsConfiguration, featureGate *featuregate.FeatureGateChecker) (subscriptionsync.Service, error) { +func NewBillingSubscriptionSyncService(logger *slog.Logger, subsServices SubscriptionServiceWithWorkflow, billingRegistry BillingRegistry, subscriptionSyncAdapter subscriptionsync.Adapter, tracer trace.Tracer, creditsConfig config.CreditsConfiguration, billingFsConfig config.BillingFeatureSwitchesConfiguration, featureGate *featuregate.FeatureGateChecker) (subscriptionsync.Service, error) { return subscriptionsyncservice.New(subscriptionsyncservice.Config{ SubscriptionService: subsServices.Service, BillingService: billingRegistry.Billing, ChargesService: billingRegistry.ChargesServiceOrNil(), SubscriptionSyncAdapter: subscriptionSyncAdapter, FeatureFlags: subscriptionsyncservice.FeatureFlags{ - EnableCreditThenInvoice: creditsConfig.EnableCreditThenInvoice, + EnableCreditThenInvoice: creditsConfig.EnableCreditThenInvoice, + MaxLinesPerCollectedInvoice: billingFsConfig.MaxLinesPerCollectedInvoice, }, Logger: logger, Tracer: tracer, diff --git a/app/config/billing.go b/app/config/billing.go index 008e5feaf8..2cd4bac923 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -35,10 +35,18 @@ func (c BillingConfiguration) Validate() error { type BillingFeatureSwitchesConfiguration struct { NamespaceLockdown []string + // MaxLinesPerCollectedInvoice is the maximum number of lines that can be collected for a single invoice, 0 means no limit. + MaxLinesPerCollectedInvoice int } func (c BillingFeatureSwitchesConfiguration) Validate() error { - return nil + var errs []error + + if c.MaxLinesPerCollectedInvoice < 0 { + errs = append(errs, errors.New("maxLinesPerCollectedInvoice must not be negative")) + } + + return errors.Join(errs...) } func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) { @@ -50,4 +58,5 @@ func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) { _ = v.BindPFlag("billing.advancementStrategy", flags.Lookup("billing-advancement-strategy")) v.SetDefault("billing.advancementStrategy", billing.ForegroundAdvancementStrategy) v.SetDefault("billing.maxParallelQuantitySnapshots", 4) + v.SetDefault("billing.featureSwitches.maxLinesPerCollectedInvoice", 0) } diff --git a/app/config/billing_test.go b/app/config/billing_test.go new file mode 100644 index 0000000000..3dc76f084c --- /dev/null +++ b/app/config/billing_test.go @@ -0,0 +1,29 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBillingFeatureSwitchesConfigurationValidate(t *testing.T) { + t.Run("zero max collected invoice lines is valid", func(t *testing.T) { + require.NoError(t, BillingFeatureSwitchesConfiguration{ + MaxLinesPerCollectedInvoice: 0, + }.Validate()) + }) + + t.Run("positive max collected invoice lines is valid", func(t *testing.T) { + require.NoError(t, BillingFeatureSwitchesConfiguration{ + MaxLinesPerCollectedInvoice: 10, + }.Validate()) + }) + + t.Run("negative max collected invoice lines is invalid", func(t *testing.T) { + err := BillingFeatureSwitchesConfiguration{ + MaxLinesPerCollectedInvoice: -1, + }.Validate() + + require.ErrorContains(t, err, "maxLinesPerCollectedInvoice must not be negative") + }) +} diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go index 801af8541c..dcd11d4390 100644 --- a/cmd/billing-worker/wire_gen.go +++ b/cmd/billing-worker/wire_gen.go @@ -374,7 +374,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - subscriptionsyncService, err := common.NewBillingSubscriptionSyncService(logger, subscriptionServiceWithWorkflow, billingRegistry, subscriptionsyncAdapter, tracer, creditsConfiguration, featureGateChecker) + subscriptionsyncService, err := common.NewBillingSubscriptionSyncService(logger, subscriptionServiceWithWorkflow, billingRegistry, subscriptionsyncAdapter, tracer, creditsConfiguration, billingFeatureSwitchesConfiguration, featureGateChecker) if err != nil { cleanup7() cleanup6() diff --git a/cmd/jobs/internal/wire_gen.go b/cmd/jobs/internal/wire_gen.go index 96e178bc04..c5b19bf0af 100644 --- a/cmd/jobs/internal/wire_gen.go +++ b/cmd/jobs/internal/wire_gen.go @@ -449,7 +449,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - subscriptionsyncService, err := common.NewBillingSubscriptionSyncService(logger, subscriptionServiceWithWorkflow, billingRegistry, subscriptionsyncAdapter, tracer, creditsConfiguration, featureGateChecker) + subscriptionsyncService, err := common.NewBillingSubscriptionSyncService(logger, subscriptionServiceWithWorkflow, billingRegistry, subscriptionsyncAdapter, tracer, creditsConfiguration, billingFeatureSwitchesConfiguration, featureGateChecker) if err != nil { cleanup7() cleanup6() diff --git a/config.example.yaml b/config.example.yaml index a3dfc1349a..fe31f5469d 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -96,6 +96,10 @@ server: billing: # for production deployments it's recommended to use queued for server only # advancementStrategy: foreground + # featureSwitches: + # # 0 collects all eligible pending invoice lines. Positive values cap each + # # collected standard invoice to the earliest N lines by service period start. + # maxLinesPerCollectedInvoice: 0 credits: enabled: true diff --git a/openmeter/billing/invoice.go b/openmeter/billing/invoice.go index 6b86df3cfe..537869ae4a 100644 --- a/openmeter/billing/invoice.go +++ b/openmeter/billing/invoice.go @@ -481,6 +481,7 @@ func (i InvoicePendingLinesInput) Validate() error { type InvoicePendingLinesOptions struct { BypassCollectionAlignment bool + MaxLinesPerInvoice int // PartialInvoiceLinesEnabled overrides the billing profile's progressive billing setting // for this invocation: @@ -508,6 +509,12 @@ func WithBypassCollectionAlignment() InvoicePendingLinesOption { } } +func WithMaxLinesPerInvoice(maxLines int) InvoicePendingLinesOption { + return func(o *InvoicePendingLinesOptions) { + o.MaxLinesPerInvoice = maxLines + } +} + func WithPartialInvoiceLinesDisabled() InvoicePendingLinesOption { return func(o *InvoicePendingLinesOptions) { o.PartialInvoiceLinesEnabled = lo.ToPtr(false) diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 1b4894228e..31ce99a872 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -17,6 +17,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/pkg/clock" + "github.com/openmeterio/openmeter/pkg/cmpx" "github.com/openmeterio/openmeter/pkg/currencyx" "github.com/openmeterio/openmeter/pkg/slicesx" "github.com/openmeterio/openmeter/pkg/timeutil" @@ -137,6 +138,10 @@ func (s *Service) prepareBillableLines(ctx context.Context, input billing.Prepar return nil, fmt.Errorf("resolving collection asOf: %w", err) } + if options.MaxLinesPerInvoice < 0 { + return nil, errors.New("max lines per invoice must not be negative") + } + // let's fetch the existing gathering invoices for the customer existingGatheringInvoices, err := s.adapter.ListGatheringInvoices(ctx, billing.ListGatheringInvoicesInput{ Namespaces: []string{input.Customer.Namespace}, @@ -204,6 +209,8 @@ func (s *Service) prepareBillableLines(ctx context.Context, input billing.Prepar continue } + inScopeLines = limitGatheringLinesForInvoice(inScopeLines, options.MaxLinesPerInvoice) + // Step 1: Let's make sure we have lines properly split on the gathering invoice. // Invariant: the gathering invoice is updated to contain the new lines if any were split. prepareResults, err := s.prepareLinesToBill(ctx, prepareLinesToBillInput{ @@ -425,6 +432,19 @@ func (s *Service) gatherInScopeLines(ctx context.Context, in gatherInScopeLineIn return res, nil } +func limitGatheringLinesForInvoice(lines []gatheringLineWithBillablePeriod, maxLines int) []gatheringLineWithBillablePeriod { + if maxLines <= 0 || len(lines) <= maxLines { + return lines + } + + out := slices.Clone(lines) + slices.SortFunc(out, func(a, b gatheringLineWithBillablePeriod) int { + return cmpx.Compare(a.Line.ServicePeriod.From, b.Line.ServicePeriod.From) + }) + + return out[:maxLines] +} + type hasInvoicableLinesInput struct { Invoice billing.GatheringInvoice AsOf time.Time diff --git a/openmeter/billing/service/gatheringinvoicependinglines_test.go b/openmeter/billing/service/gatheringinvoicependinglines_test.go index 898f774c7c..55337071de 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines_test.go +++ b/openmeter/billing/service/gatheringinvoicependinglines_test.go @@ -8,8 +8,44 @@ import ( "github.com/openmeterio/openmeter/openmeter/billing" "github.com/openmeterio/openmeter/pkg/datetime" + "github.com/openmeterio/openmeter/pkg/timeutil" ) +func TestLimitGatheringLinesForInvoice(t *testing.T) { + line := func(id, from, to string) gatheringLineWithBillablePeriod { + gatheringLine := billing.GatheringLine{} + gatheringLine.ID = id + gatheringLine.ServicePeriod = timeutil.ClosedPeriod{ + From: mustTime(t, from), + To: mustTime(t, to), + } + + return gatheringLineWithBillablePeriod{ + Line: gatheringLine, + BillablePeriod: gatheringLine.ServicePeriod, + } + } + + lines := []gatheringLineWithBillablePeriod{ + line("later", "2025-03-01T00:00:00Z", "2025-04-01T00:00:00Z"), + line("tie-b", "2025-01-01T00:00:00Z", "2025-02-01T00:00:00Z"), + line("earliest", "2024-12-01T00:00:00Z", "2025-01-01T00:00:00Z"), + line("tie-a", "2025-01-01T00:00:00Z", "2025-02-01T00:00:00Z"), + } + + t.Run("zero keeps all lines without reordering", func(t *testing.T) { + got := limitGatheringLinesForInvoice(lines, 0) + + require.Equal(t, []gatheringLineWithBillablePeriod(lines), got) + }) + + t.Run("positive limit keeps earliest service periods", func(t *testing.T) { + got := limitGatheringLinesForInvoice(lines, 3) + + require.Equal(t, []string{"earliest", "tie-a", "tie-b"}, gatheringLineIDsForLimitTest(got)) + }) +} + func TestResolvePendingLineCollectionCutoff(t *testing.T) { asOf := mustTime(t, "2025-06-15T12:00:00Z") anchor := mustTime(t, "2025-06-01T00:00:00Z") @@ -157,3 +193,12 @@ func mustTime(t *testing.T, value string) time.Time { return parsed } + +func gatheringLineIDsForLimitTest(lines []gatheringLineWithBillablePeriod) []string { + ids := make([]string, 0, len(lines)) + for _, line := range lines { + ids = append(ids, line.Line.ID) + } + + return ids +} diff --git a/openmeter/billing/worker/collect/collect.go b/openmeter/billing/worker/collect/collect.go index a237b84eff..6bb5e564fb 100644 --- a/openmeter/billing/worker/collect/collect.go +++ b/openmeter/billing/worker/collect/collect.go @@ -17,9 +17,10 @@ import ( ) type InvoiceCollector struct { - gatheringInvoices billing.GatheringInvoiceService - billingService billing.Service - lockedNamespaces []string + gatheringInvoices billing.GatheringInvoiceService + billingService billing.Service + lockedNamespaces []string + maxLinesPerInvoice int logger *slog.Logger } @@ -111,6 +112,7 @@ func (a *InvoiceCollector) CollectCustomerInvoice(ctx context.Context, params Co }, // We want to make sure that system collection does not use progressive billing. billing.WithPartialInvoiceLinesDisabled(), + billing.WithMaxLinesPerInvoice(a.maxLinesPerInvoice), ) if err != nil { if errors.Is(err, billing.ErrNamespaceLocked) { @@ -218,6 +220,7 @@ type Config struct { BillingService billing.Service Logger *slog.Logger LockedNamespaces []string + MaxLinesPerInvoice int } func NewInvoiceCollector(config Config) (*InvoiceCollector, error) { @@ -234,9 +237,10 @@ func NewInvoiceCollector(config Config) (*InvoiceCollector, error) { } return &InvoiceCollector{ - gatheringInvoices: config.GatheringInvoiceService, - billingService: config.BillingService, - logger: config.Logger, - lockedNamespaces: config.LockedNamespaces, + gatheringInvoices: config.GatheringInvoiceService, + billingService: config.BillingService, + logger: config.Logger, + lockedNamespaces: config.LockedNamespaces, + maxLinesPerInvoice: config.MaxLinesPerInvoice, }, nil } diff --git a/openmeter/billing/worker/subscriptionsync/service/service.go b/openmeter/billing/worker/subscriptionsync/service/service.go index 7ecd3e45a0..a5ebf4b32a 100644 --- a/openmeter/billing/worker/subscriptionsync/service/service.go +++ b/openmeter/billing/worker/subscriptionsync/service/service.go @@ -22,6 +22,7 @@ type FeatureFlags struct { EnableFlatFeeInAdvanceProrating bool EnableFlatFeeInArrearsProrating bool EnableCreditThenInvoice bool + MaxLinesPerCollectedInvoice int } type Config struct { diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index 7261f9b054..26ca7bf457 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -39,6 +39,7 @@ func (s *Service) invoicePendingLines(ctx context.Context, customer customer.Cus Customer: customer, }, billing.WithPartialInvoiceLinesDisabled(), + billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), ) if err != nil { if errors.Is(err, billing.ErrInvoiceCreateNoLines) { From 5f404e559dc8472bb97d4c105ca9815818c27dc4 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 15:58:59 +0200 Subject: [PATCH 2/6] fix(billing): address collected invoice limit review --- .../service/gatheringinvoicependinglines.go | 10 +- .../worker/subscriptionsync/service/sync.go | 32 +++--- .../service/sync_unit_test.go | 101 ++++++++++++++++++ 3 files changed, 128 insertions(+), 15 deletions(-) create mode 100644 openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 31ce99a872..8213993293 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -439,7 +439,15 @@ func limitGatheringLinesForInvoice(lines []gatheringLineWithBillablePeriod, maxL out := slices.Clone(lines) slices.SortFunc(out, func(a, b gatheringLineWithBillablePeriod) int { - return cmpx.Compare(a.Line.ServicePeriod.From, b.Line.ServicePeriod.From) + if result := cmpx.Compare(a.Line.ServicePeriod.From, b.Line.ServicePeriod.From); result != 0 { + return result + } + + if result := cmpx.Compare(a.Line.ServicePeriod.To, b.Line.ServicePeriod.To); result != 0 { + return result + } + + return strings.Compare(a.Line.ID, b.Line.ID) }) return out[:maxLines] diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index 26ca7bf457..fc5526664a 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -33,23 +33,27 @@ func (s *Service) invoicePendingLines(ctx context.Context, customer customer.Cus )) return span.Wrap(func(ctx context.Context) error { - _, err := s.billingService.InvoicePendingLines( - ctx, - billing.InvoicePendingLinesInput{ - Customer: customer, - }, - billing.WithPartialInvoiceLinesDisabled(), - billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), - ) - if err != nil { - if errors.Is(err, billing.ErrInvoiceCreateNoLines) { - return nil + for { + invoices, err := s.billingService.InvoicePendingLines( + ctx, + billing.InvoicePendingLinesInput{ + Customer: customer, + }, + billing.WithPartialInvoiceLinesDisabled(), + billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), + ) + if err != nil { + if errors.Is(err, billing.ErrInvoiceCreateNoLines) { + return nil + } + + return err } - return err + if s.featureFlags.MaxLinesPerCollectedInvoice <= 0 || len(invoices) == 0 { + return nil + } } - - return nil }) } diff --git a/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go b/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go new file mode 100644 index 0000000000..2bda02be7c --- /dev/null +++ b/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go @@ -0,0 +1,101 @@ +package service + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/customer" +) + +func TestInvoicePendingLinesCollectsCappedBatchesUntilEmpty(t *testing.T) { + billingService := &invoicePendingLinesBillingService{ + results: []invoicePendingLinesResult{ + {invoices: []billing.StandardInvoice{{}}}, + {invoices: []billing.StandardInvoice{{}}}, + {err: billing.ErrInvoiceCreateNoLines}, + }, + } + service := &Service{ + billingService: billingService, + featureFlags: FeatureFlags{ + MaxLinesPerCollectedInvoice: 1, + }, + tracer: noop.NewTracerProvider().Tracer("test"), + } + + err := service.invoicePendingLines(t.Context(), customer.CustomerID{ + Namespace: "ns", + ID: "customer-id", + }) + + require.NoError(t, err) + require.Len(t, billingService.calls, 3) + + for _, call := range billingService.calls { + require.Equal(t, customer.CustomerID{Namespace: "ns", ID: "customer-id"}, call.input.Customer) + require.Equal(t, 1, call.options.MaxLinesPerInvoice) + require.NotNil(t, call.options.PartialInvoiceLinesEnabled) + require.False(t, *call.options.PartialInvoiceLinesEnabled) + } +} + +func TestInvoicePendingLinesUnlimitedCollectsOnce(t *testing.T) { + billingService := &invoicePendingLinesBillingService{ + results: []invoicePendingLinesResult{ + {invoices: []billing.StandardInvoice{{}}}, + {err: billing.ErrInvoiceCreateNoLines}, + }, + } + service := &Service{ + billingService: billingService, + tracer: noop.NewTracerProvider().Tracer("test"), + } + + err := service.invoicePendingLines(t.Context(), customer.CustomerID{ + Namespace: "ns", + ID: "customer-id", + }) + + require.NoError(t, err) + require.Len(t, billingService.calls, 1) + require.Equal(t, 0, billingService.calls[0].options.MaxLinesPerInvoice) +} + +type invoicePendingLinesBillingService struct { + billing.Service + + results []invoicePendingLinesResult + calls []invoicePendingLinesCall +} + +type invoicePendingLinesResult struct { + invoices []billing.StandardInvoice + err error +} + +type invoicePendingLinesCall struct { + input billing.InvoicePendingLinesInput + options billing.InvoicePendingLinesOptions +} + +func (s *invoicePendingLinesBillingService) InvoicePendingLines( + _ context.Context, + input billing.InvoicePendingLinesInput, + opts ...billing.InvoicePendingLinesOption, +) ([]billing.StandardInvoice, error) { + s.calls = append(s.calls, invoicePendingLinesCall{ + input: input, + options: billing.NewInvoicePendingLinesOptions(opts...), + }) + + if len(s.calls) > len(s.results) { + return nil, billing.ErrInvoiceCreateNoLines + } + + result := s.results[len(s.calls)-1] + return result.invoices, result.err +} From d42f59ad3aa28cc97a69c05c26adeaafabc8cbc3 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 16:16:21 +0200 Subject: [PATCH 3/6] fix(billing): tighten collected invoice limit handling --- app/config/billing.go | 3 ++- openmeter/billing/invoice.go | 4 ++- .../service/gatheringinvoicependinglines.go | 14 +++++++++-- .../gatheringinvoicependinglines_test.go | 2 +- .../worker/subscriptionsync/service/sync.go | 8 +++++- .../service/sync_unit_test.go | 25 +++++++++++++++++++ 6 files changed, 50 insertions(+), 6 deletions(-) diff --git a/app/config/billing.go b/app/config/billing.go index 2cd4bac923..3f36f6dd83 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/viper" "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/pkg/models" ) type BillingConfiguration struct { @@ -46,7 +47,7 @@ func (c BillingFeatureSwitchesConfiguration) Validate() error { errs = append(errs, errors.New("maxLinesPerCollectedInvoice must not be negative")) } - return errors.Join(errs...) + return models.NewNillableGenericValidationError(errors.Join(errs...)) } func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) { diff --git a/openmeter/billing/invoice.go b/openmeter/billing/invoice.go index 537869ae4a..b0dc572210 100644 --- a/openmeter/billing/invoice.go +++ b/openmeter/billing/invoice.go @@ -481,7 +481,9 @@ func (i InvoicePendingLinesInput) Validate() error { type InvoicePendingLinesOptions struct { BypassCollectionAlignment bool - MaxLinesPerInvoice int + // MaxLinesPerInvoice caps the number of pending lines collected into a single invoice. + // 0 means no limit. + MaxLinesPerInvoice int // PartialInvoiceLinesEnabled overrides the billing profile's progressive billing setting // for this invocation: diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 8213993293..6dcd120f9e 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -139,7 +139,9 @@ func (s *Service) prepareBillableLines(ctx context.Context, input billing.Prepar } if options.MaxLinesPerInvoice < 0 { - return nil, errors.New("max lines per invoice must not be negative") + return nil, billing.ValidationError{ + Err: errors.New("max lines per invoice must not be negative"), + } } // let's fetch the existing gathering invoices for the customer @@ -209,7 +211,15 @@ func (s *Service) prepareBillableLines(ctx context.Context, input billing.Prepar continue } - inScopeLines = limitGatheringLinesForInvoice(inScopeLines, options.MaxLinesPerInvoice) + if input.IncludePendingLines.IsPresent() && options.MaxLinesPerInvoice > 0 && len(inScopeLines) > options.MaxLinesPerInvoice { + return nil, billing.ValidationError{ + Err: fmt.Errorf("include pending lines exceeds max lines per invoice: requested %d, limit %d", len(inScopeLines), options.MaxLinesPerInvoice), + } + } + + if !input.IncludePendingLines.IsPresent() { + inScopeLines = limitGatheringLinesForInvoice(inScopeLines, options.MaxLinesPerInvoice) + } // Step 1: Let's make sure we have lines properly split on the gathering invoice. // Invariant: the gathering invoice is updated to contain the new lines if any were split. diff --git a/openmeter/billing/service/gatheringinvoicependinglines_test.go b/openmeter/billing/service/gatheringinvoicependinglines_test.go index 55337071de..7c0f13bacb 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines_test.go +++ b/openmeter/billing/service/gatheringinvoicependinglines_test.go @@ -36,7 +36,7 @@ func TestLimitGatheringLinesForInvoice(t *testing.T) { t.Run("zero keeps all lines without reordering", func(t *testing.T) { got := limitGatheringLinesForInvoice(lines, 0) - require.Equal(t, []gatheringLineWithBillablePeriod(lines), got) + require.Equal(t, lines, got) }) t.Run("positive limit keeps earliest service periods", func(t *testing.T) { diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index fc5526664a..8797d5b510 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -27,13 +27,17 @@ const ( SubscriptionSyncComponentName billing.ComponentName = "subscription-sync" ) +const ( + maxInvoicePendingCollectionPasses = 10_000 +) + func (s *Service) invoicePendingLines(ctx context.Context, customer customer.CustomerID) error { span := tracex.StartWithNoValue(ctx, s.tracer, "billing.worker.subscription.sync.invoicePendingLines", trace.WithAttributes( attribute.String("customer_id", customer.ID), )) return span.Wrap(func(ctx context.Context) error { - for { + for pass := 0; pass < maxInvoicePendingCollectionPasses; pass++ { invoices, err := s.billingService.InvoicePendingLines( ctx, billing.InvoicePendingLinesInput{ @@ -54,6 +58,8 @@ func (s *Service) invoicePendingLines(ctx context.Context, customer customer.Cus return nil } } + + return fmt.Errorf("exceeded max invoice pending collection passes for customer %s", customer.ID) }) } diff --git a/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go b/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go index 2bda02be7c..5aab05df32 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go @@ -65,6 +65,31 @@ func TestInvoicePendingLinesUnlimitedCollectsOnce(t *testing.T) { require.Equal(t, 0, billingService.calls[0].options.MaxLinesPerInvoice) } +func TestInvoicePendingLinesExitsOnEmptySuccess(t *testing.T) { + billingService := &invoicePendingLinesBillingService{ + results: []invoicePendingLinesResult{ + {invoices: []billing.StandardInvoice{}}, + {err: billing.ErrInvoiceCreateNoLines}, + }, + } + service := &Service{ + billingService: billingService, + featureFlags: FeatureFlags{ + MaxLinesPerCollectedInvoice: 1, + }, + tracer: noop.NewTracerProvider().Tracer("test"), + } + + err := service.invoicePendingLines(t.Context(), customer.CustomerID{ + Namespace: "ns", + ID: "customer-id", + }) + + require.NoError(t, err) + require.Len(t, billingService.calls, 1) + require.Equal(t, 1, billingService.calls[0].options.MaxLinesPerInvoice) +} + type invoicePendingLinesBillingService struct { billing.Service From b3619618e360ab48503f5b834c08940cd555c809 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 16:59:01 +0200 Subject: [PATCH 4/6] fix(billing): bound subscription sync invoice collection --- openmeter/billing/worker/subscriptionsync/service/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index 8797d5b510..90eb4a3401 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -28,7 +28,7 @@ const ( ) const ( - maxInvoicePendingCollectionPasses = 10_000 + maxInvoicePendingCollectionPasses = 5 ) func (s *Service) invoicePendingLines(ctx context.Context, customer customer.CustomerID) error { From 4560541074ed0ec138ba881fc79753eb0ef6b2a6 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 17:13:21 +0200 Subject: [PATCH 5/6] fix(billing): avoid repeated subscription sync collection --- .../worker/subscriptionsync/service/sync.go | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index 90eb4a3401..26ca7bf457 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -27,39 +27,29 @@ const ( SubscriptionSyncComponentName billing.ComponentName = "subscription-sync" ) -const ( - maxInvoicePendingCollectionPasses = 5 -) - func (s *Service) invoicePendingLines(ctx context.Context, customer customer.CustomerID) error { span := tracex.StartWithNoValue(ctx, s.tracer, "billing.worker.subscription.sync.invoicePendingLines", trace.WithAttributes( attribute.String("customer_id", customer.ID), )) return span.Wrap(func(ctx context.Context) error { - for pass := 0; pass < maxInvoicePendingCollectionPasses; pass++ { - invoices, err := s.billingService.InvoicePendingLines( - ctx, - billing.InvoicePendingLinesInput{ - Customer: customer, - }, - billing.WithPartialInvoiceLinesDisabled(), - billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), - ) - if err != nil { - if errors.Is(err, billing.ErrInvoiceCreateNoLines) { - return nil - } - - return err - } - - if s.featureFlags.MaxLinesPerCollectedInvoice <= 0 || len(invoices) == 0 { + _, err := s.billingService.InvoicePendingLines( + ctx, + billing.InvoicePendingLinesInput{ + Customer: customer, + }, + billing.WithPartialInvoiceLinesDisabled(), + billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), + ) + if err != nil { + if errors.Is(err, billing.ErrInvoiceCreateNoLines) { return nil } + + return err } - return fmt.Errorf("exceeded max invoice pending collection passes for customer %s", customer.ID) + return nil }) } From 220c197120ac3d58ceb980fef111a856c829f760 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 17:27:19 +0200 Subject: [PATCH 6/6] test(billing): remove repeated sync collection tests --- .../service/sync_unit_test.go | 126 ------------------ 1 file changed, 126 deletions(-) delete mode 100644 openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go diff --git a/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go b/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go deleted file mode 100644 index 5aab05df32..0000000000 --- a/openmeter/billing/worker/subscriptionsync/service/sync_unit_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package service - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/trace/noop" - - "github.com/openmeterio/openmeter/openmeter/billing" - "github.com/openmeterio/openmeter/openmeter/customer" -) - -func TestInvoicePendingLinesCollectsCappedBatchesUntilEmpty(t *testing.T) { - billingService := &invoicePendingLinesBillingService{ - results: []invoicePendingLinesResult{ - {invoices: []billing.StandardInvoice{{}}}, - {invoices: []billing.StandardInvoice{{}}}, - {err: billing.ErrInvoiceCreateNoLines}, - }, - } - service := &Service{ - billingService: billingService, - featureFlags: FeatureFlags{ - MaxLinesPerCollectedInvoice: 1, - }, - tracer: noop.NewTracerProvider().Tracer("test"), - } - - err := service.invoicePendingLines(t.Context(), customer.CustomerID{ - Namespace: "ns", - ID: "customer-id", - }) - - require.NoError(t, err) - require.Len(t, billingService.calls, 3) - - for _, call := range billingService.calls { - require.Equal(t, customer.CustomerID{Namespace: "ns", ID: "customer-id"}, call.input.Customer) - require.Equal(t, 1, call.options.MaxLinesPerInvoice) - require.NotNil(t, call.options.PartialInvoiceLinesEnabled) - require.False(t, *call.options.PartialInvoiceLinesEnabled) - } -} - -func TestInvoicePendingLinesUnlimitedCollectsOnce(t *testing.T) { - billingService := &invoicePendingLinesBillingService{ - results: []invoicePendingLinesResult{ - {invoices: []billing.StandardInvoice{{}}}, - {err: billing.ErrInvoiceCreateNoLines}, - }, - } - service := &Service{ - billingService: billingService, - tracer: noop.NewTracerProvider().Tracer("test"), - } - - err := service.invoicePendingLines(t.Context(), customer.CustomerID{ - Namespace: "ns", - ID: "customer-id", - }) - - require.NoError(t, err) - require.Len(t, billingService.calls, 1) - require.Equal(t, 0, billingService.calls[0].options.MaxLinesPerInvoice) -} - -func TestInvoicePendingLinesExitsOnEmptySuccess(t *testing.T) { - billingService := &invoicePendingLinesBillingService{ - results: []invoicePendingLinesResult{ - {invoices: []billing.StandardInvoice{}}, - {err: billing.ErrInvoiceCreateNoLines}, - }, - } - service := &Service{ - billingService: billingService, - featureFlags: FeatureFlags{ - MaxLinesPerCollectedInvoice: 1, - }, - tracer: noop.NewTracerProvider().Tracer("test"), - } - - err := service.invoicePendingLines(t.Context(), customer.CustomerID{ - Namespace: "ns", - ID: "customer-id", - }) - - require.NoError(t, err) - require.Len(t, billingService.calls, 1) - require.Equal(t, 1, billingService.calls[0].options.MaxLinesPerInvoice) -} - -type invoicePendingLinesBillingService struct { - billing.Service - - results []invoicePendingLinesResult - calls []invoicePendingLinesCall -} - -type invoicePendingLinesResult struct { - invoices []billing.StandardInvoice - err error -} - -type invoicePendingLinesCall struct { - input billing.InvoicePendingLinesInput - options billing.InvoicePendingLinesOptions -} - -func (s *invoicePendingLinesBillingService) InvoicePendingLines( - _ context.Context, - input billing.InvoicePendingLinesInput, - opts ...billing.InvoicePendingLinesOption, -) ([]billing.StandardInvoice, error) { - s.calls = append(s.calls, invoicePendingLinesCall{ - input: input, - options: billing.NewInvoicePendingLinesOptions(opts...), - }) - - if len(s.calls) > len(s.results) { - return nil, billing.ErrInvoiceCreateNoLines - } - - result := s.results[len(s.calls)-1] - return result.invoices, result.err -}