From d092e1177302b21d25b3a3a776059a7874eab15d Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 1 Jul 2026 13:32:12 +0200 Subject: [PATCH] Force async advancement for invoice collection --- app/common/billing.go | 7 +-- app/config/billing.go | 4 +- app/config/config_test.go | 3 ++ config.example.yaml | 9 ++-- openmeter/billing/invoice.go | 1 + .../service/gatheringinvoicependinglines.go | 9 ++-- openmeter/billing/service/invoice.go | 8 +-- openmeter/billing/stdinvoice.go | 1 + openmeter/billing/worker/collect/collect.go | 5 +- .../subscriptionsync/service/service.go | 49 ++++++++++--------- .../worker/subscriptionsync/service/sync.go | 3 +- test/billing/invoice_test.go | 28 +++++++++++ 12 files changed, 85 insertions(+), 42 deletions(-) diff --git a/app/common/billing.go b/app/common/billing.go index 7d280df5a8..e80a84c425 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -300,8 +300,9 @@ func NewBillingSubscriptionSyncService(logger *slog.Logger, subsServices Subscri EnableCreditThenInvoice: creditsConfig.EnableCreditThenInvoice, MaxLinesPerCollectedInvoice: billingFsConfig.MaxLinesPerCollectedInvoice, }, - Logger: logger, - Tracer: tracer, - FeatureGate: featureGate, + ForceAsyncInvoicePendingLines: billingFsConfig.SubscriptionSyncForceAsyncAdvance, + Logger: logger, + Tracer: tracer, + FeatureGate: featureGate, }) } diff --git a/app/config/billing.go b/app/config/billing.go index 3f36f6dd83..3444a7bee0 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -37,7 +37,8 @@ func (c BillingConfiguration) Validate() error { type BillingFeatureSwitchesConfiguration struct { NamespaceLockdown []string // MaxLinesPerCollectedInvoice is the maximum number of lines that can be collected for a single invoice, 0 means no limit. - MaxLinesPerCollectedInvoice int + MaxLinesPerCollectedInvoice int + SubscriptionSyncForceAsyncAdvance bool } func (c BillingFeatureSwitchesConfiguration) Validate() error { @@ -60,4 +61,5 @@ func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) { v.SetDefault("billing.advancementStrategy", billing.ForegroundAdvancementStrategy) v.SetDefault("billing.maxParallelQuantitySnapshots", 4) v.SetDefault("billing.featureSwitches.maxLinesPerCollectedInvoice", 0) + v.SetDefault("billing.featureSwitches.subscriptionSyncForceAsyncAdvance", true) } diff --git a/app/config/config_test.go b/app/config/config_test.go index 24511dfdb7..df32b9e657 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -167,6 +167,9 @@ func TestComplete(t *testing.T) { Billing: BillingConfiguration{ AdvancementStrategy: billing.ForegroundAdvancementStrategy, MaxParallelQuantitySnapshots: 4, + FeatureSwitches: BillingFeatureSwitchesConfiguration{ + SubscriptionSyncForceAsyncAdvance: true, + }, Worker: BillingWorkerConfiguration{ ConsumerConfiguration: ConsumerConfiguration{ ProcessingTimeout: 30 * time.Second, diff --git a/config.example.yaml b/config.example.yaml index fe31f5469d..c3d00302c9 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -96,10 +96,11 @@ server: billing: # for production deployments it's recommended to use queued for server only # advancementStrategy: foreground - # featureSwitches: - # # 0 collects all eligible pending invoice lines. Positive values cap each - # # collected standard invoice to the earliest N lines by service period start. - # maxLinesPerCollectedInvoice: 0 + featureSwitches: + # 0 collects all eligible pending invoice lines. Positive values cap each + # collected standard invoice to the earliest N lines by service period start. + maxLinesPerCollectedInvoice: 0 + subscriptionSyncForceAsyncAdvance: true credits: enabled: true diff --git a/openmeter/billing/invoice.go b/openmeter/billing/invoice.go index b0dc572210..ac2d138f51 100644 --- a/openmeter/billing/invoice.go +++ b/openmeter/billing/invoice.go @@ -459,6 +459,7 @@ type InvoicePendingLinesInput struct { IncludePendingLines mo.Option[[]string] AsOf *time.Time + ForceAsyncAdvance bool } func (i InvoicePendingLinesInput) Validate() error { diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 6dcd120f9e..71ab3efdbf 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -81,9 +81,10 @@ func (s *Service) InvoicePendingLines(ctx context.Context, input billing.Invoice for currency, inScopeLines := range billableLines.LinesByCurrency { createdInvoice, err := s.CreateStandardInvoiceFromGatheringLines(ctx, billing.CreateStandardInvoiceFromGatheringLinesInput{ - Customer: input.Customer, - Currency: currency, - Lines: inScopeLines, + Customer: input.Customer, + Currency: currency, + Lines: inScopeLines, + ForceAsyncAdvance: input.ForceAsyncAdvance, }) if err != nil { return nil, fmt.Errorf("creating standard invoice from gathering lines: %w", err) @@ -841,7 +842,7 @@ func (s *Service) CreateStandardInvoiceFromGatheringLines(ctx context.Context, i } // Otherwise, let's advance the invoice to the next final state - if err := s.advanceUntilStateStable(ctx, sm); err != nil { + if err := s.advanceUntilStateStable(ctx, sm, in.ForceAsyncAdvance); err != nil { return fmt.Errorf("activating invoice: %w", err) } diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 55b0c284b0..e9829dcd7f 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -437,8 +437,8 @@ func (s *Service) GetInvoiceById(ctx context.Context, input billing.GetInvoiceBy } } -func (s *Service) advanceUntilStateStable(ctx context.Context, sm *InvoiceStateMachine) error { - if s.advancementStrategy == billing.QueuedAdvancementStrategy { +func (s *Service) advanceUntilStateStable(ctx context.Context, sm *InvoiceStateMachine, forceAsync bool) error { + if forceAsync || s.advancementStrategy == billing.QueuedAdvancementStrategy { return s.publisher.Publish(ctx, billing.AdvanceStandardInvoiceEvent{ Invoice: sm.Invoice.GetInvoiceID(), CustomerID: sm.Invoice.Customer.CustomerID, @@ -521,7 +521,7 @@ func (s *Service) AdvanceInvoice(ctx context.Context, input billing.AdvanceInvoi } } - if err := s.advanceUntilStateStable(ctx, sm); err != nil { + if err := s.advanceUntilStateStable(ctx, sm, false); err != nil { return fmt.Errorf("advancing invoice: %w", err) } @@ -700,7 +700,7 @@ func (s *Service) executeTriggerOnInvoice(ctx context.Context, invoiceID billing return nil } - if err := s.advanceUntilStateStable(ctx, sm); err != nil { + if err := s.advanceUntilStateStable(ctx, sm, false); err != nil { return fmt.Errorf("advancing invoice: %w", err) } diff --git a/openmeter/billing/stdinvoice.go b/openmeter/billing/stdinvoice.go index fcf5e0a847..2bf7cf36fc 100644 --- a/openmeter/billing/stdinvoice.go +++ b/openmeter/billing/stdinvoice.go @@ -1156,6 +1156,7 @@ type CreateStandardInvoiceFromGatheringLinesInput struct { Lines GatheringLines PostCreationCalculationHook PostCreationCalculationHook + ForceAsyncAdvance bool } type ( diff --git a/openmeter/billing/worker/collect/collect.go b/openmeter/billing/worker/collect/collect.go index 6bb5e564fb..7aaaf27ac4 100644 --- a/openmeter/billing/worker/collect/collect.go +++ b/openmeter/billing/worker/collect/collect.go @@ -107,8 +107,9 @@ func (a *InvoiceCollector) CollectCustomerInvoice(ctx context.Context, params Co invoices, err := a.billingService.InvoicePendingLines( ctx, billing.InvoicePendingLinesInput{ - Customer: params.CustomerID, - AsOf: lo.ToPtr(params.AsOf), + Customer: params.CustomerID, + AsOf: lo.ToPtr(params.AsOf), + ForceAsyncAdvance: true, }, // We want to make sure that system collection does not use progressive billing. billing.WithPartialInvoiceLinesDisabled(), diff --git a/openmeter/billing/worker/subscriptionsync/service/service.go b/openmeter/billing/worker/subscriptionsync/service/service.go index a5ebf4b32a..19240b4371 100644 --- a/openmeter/billing/worker/subscriptionsync/service/service.go +++ b/openmeter/billing/worker/subscriptionsync/service/service.go @@ -28,13 +28,14 @@ type FeatureFlags struct { type Config struct { BillingService billing.Service // ChargesService is required for credit-only sync and charge-based provisioning. - ChargesService charges.Service - SubscriptionService subscription.Service - SubscriptionSyncAdapter subscriptionsync.Adapter - FeatureFlags FeatureFlags - Logger *slog.Logger - Tracer trace.Tracer - FeatureGate *featuregate.FeatureGateChecker + ChargesService charges.Service + SubscriptionService subscription.Service + SubscriptionSyncAdapter subscriptionsync.Adapter + FeatureFlags FeatureFlags + ForceAsyncInvoicePendingLines bool + Logger *slog.Logger + Tracer trace.Tracer + FeatureGate *featuregate.FeatureGateChecker } func (c Config) Validate() error { @@ -68,14 +69,15 @@ func (c Config) Validate() error { var _ subscriptionsync.Service = (*Service)(nil) type Service struct { - billingService billing.Service - chargesService charges.Service - reconciler reconciler.Reconciler - subscriptionService subscription.Service - subscriptionSyncAdapter subscriptionsync.Adapter - featureFlags FeatureFlags - logger *slog.Logger - tracer trace.Tracer + billingService billing.Service + chargesService charges.Service + reconciler reconciler.Reconciler + subscriptionService subscription.Service + subscriptionSyncAdapter subscriptionsync.Adapter + featureFlags FeatureFlags + forceAsyncInvoicePendingLines bool + logger *slog.Logger + tracer trace.Tracer } func New(config Config) (*Service, error) { @@ -93,14 +95,15 @@ func New(config Config) (*Service, error) { return nil, err } return &Service{ - billingService: config.BillingService, - chargesService: config.ChargesService, - reconciler: reconcilerSvc, - subscriptionSyncAdapter: config.SubscriptionSyncAdapter, - featureFlags: config.FeatureFlags, - subscriptionService: config.SubscriptionService, - logger: config.Logger, - tracer: config.Tracer, + billingService: config.BillingService, + chargesService: config.ChargesService, + reconciler: reconcilerSvc, + subscriptionSyncAdapter: config.SubscriptionSyncAdapter, + featureFlags: config.FeatureFlags, + forceAsyncInvoicePendingLines: config.ForceAsyncInvoicePendingLines, + subscriptionService: config.SubscriptionService, + logger: config.Logger, + tracer: config.Tracer, }, nil } diff --git a/openmeter/billing/worker/subscriptionsync/service/sync.go b/openmeter/billing/worker/subscriptionsync/service/sync.go index 26ca7bf457..93f160762c 100644 --- a/openmeter/billing/worker/subscriptionsync/service/sync.go +++ b/openmeter/billing/worker/subscriptionsync/service/sync.go @@ -36,7 +36,8 @@ func (s *Service) invoicePendingLines(ctx context.Context, customer customer.Cus _, err := s.billingService.InvoicePendingLines( ctx, billing.InvoicePendingLinesInput{ - Customer: customer, + Customer: customer, + ForceAsyncAdvance: s.forceAsyncInvoicePendingLines, }, billing.WithPartialInvoiceLinesDisabled(), billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice), diff --git a/test/billing/invoice_test.go b/test/billing/invoice_test.go index 80c4e67ddd..bfc1833ed9 100644 --- a/test/billing/invoice_test.go +++ b/test/billing/invoice_test.go @@ -3680,6 +3680,34 @@ func (s *InvoicingTestSuite) TestNamespaceLockedInvoiceProgression() { s.Equal(billing.ValidationIssueSeverityCritical, validationError.Severity) } +func (s *InvoicingTestSuite) TestInvoicePendingLinesForceAsyncAdvance() { + namespace := "ns-force-async-advance" + ctx := context.Background() + + sandboxApp := s.InstallSandboxApp(s.T(), namespace) + + s.ProvisionBillingProfile(ctx, namespace, sandboxApp.GetID()) + + customer := s.CreateTestCustomer(namespace, "test-customer") + + s.CreateGatheringInvoice(s.T(), ctx, DraftInvoiceInput{ + Namespace: namespace, + Customer: customer, + }) + + invoices, err := s.BillingService.InvoicePendingLines(ctx, billing.InvoicePendingLinesInput{ + Customer: customer.GetID(), + ForceAsyncAdvance: true, + }) + s.NoError(err) + s.Len(invoices, 1) + s.Equal(billing.StandardInvoiceStatusDraftCreated, invoices[0].Status) + + invoice, err := s.BillingService.AdvanceInvoice(ctx, invoices[0].GetInvoiceID()) + s.NoError(err) + s.NotEqual(billing.StandardInvoiceStatusDraftCreated, invoice.Status) +} + func (s *InvoicingTestSuite) TestProgressiveBillLate() { namespace := "ns-progressive-bill-late" ctx := context.Background()