Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/common/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func NewBillingSubscriptionSyncService(logger *slog.Logger, subsServices Subscri
EnableCreditThenInvoice: creditsConfig.EnableCreditThenInvoice,
MaxLinesPerCollectedInvoice: billingFsConfig.MaxLinesPerCollectedInvoice,
},
Logger: logger,
Tracer: tracer,
FeatureGate: featureGate,
ForceAsyncInvoicePendingLines: billingFsConfig.SubscriptionSyncForceAsyncAdvance,
Logger: logger,
Tracer: tracer,
FeatureGate: featureGate,
})
}
4 changes: 3 additions & 1 deletion app/config/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (c BillingConfiguration) Validate() error {
type BillingFeatureSwitchesConfiguration struct {
NamespaceLockdown []string
// MaxLinesPerCollectedInvoice is the maximum number of lines that can be collected for a single invoice, 0 means no limit.
MaxLinesPerCollectedInvoice int
MaxLinesPerCollectedInvoice int
SubscriptionSyncForceAsyncAdvance bool
}

func (c BillingFeatureSwitchesConfiguration) Validate() error {
Expand All @@ -60,4 +61,5 @@ func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet) {
v.SetDefault("billing.advancementStrategy", billing.ForegroundAdvancementStrategy)
v.SetDefault("billing.maxParallelQuantitySnapshots", 4)
v.SetDefault("billing.featureSwitches.maxLinesPerCollectedInvoice", 0)
v.SetDefault("billing.featureSwitches.subscriptionSyncForceAsyncAdvance", true)
}
3 changes: 3 additions & 0 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func TestComplete(t *testing.T) {
Billing: BillingConfiguration{
AdvancementStrategy: billing.ForegroundAdvancementStrategy,
MaxParallelQuantitySnapshots: 4,
FeatureSwitches: BillingFeatureSwitchesConfiguration{
SubscriptionSyncForceAsyncAdvance: true,
},
Worker: BillingWorkerConfiguration{
ConsumerConfiguration: ConsumerConfiguration{
ProcessingTimeout: 30 * time.Second,
Expand Down
9 changes: 5 additions & 4 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ server:
billing:
# for production deployments it's recommended to use queued for server only
# advancementStrategy: foreground
# featureSwitches:
# # 0 collects all eligible pending invoice lines. Positive values cap each
# # collected standard invoice to the earliest N lines by service period start.
# maxLinesPerCollectedInvoice: 0
featureSwitches:
# 0 collects all eligible pending invoice lines. Positive values cap each
# collected standard invoice to the earliest N lines by service period start.
maxLinesPerCollectedInvoice: 0
subscriptionSyncForceAsyncAdvance: true

credits:
enabled: true
Expand Down
1 change: 1 addition & 0 deletions openmeter/billing/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ type InvoicePendingLinesInput struct {

IncludePendingLines mo.Option[[]string]
AsOf *time.Time
ForceAsyncAdvance bool
}

func (i InvoicePendingLinesInput) Validate() error {
Expand Down
9 changes: 5 additions & 4 deletions openmeter/billing/service/gatheringinvoicependinglines.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ func (s *Service) InvoicePendingLines(ctx context.Context, input billing.Invoice

for currency, inScopeLines := range billableLines.LinesByCurrency {
createdInvoice, err := s.CreateStandardInvoiceFromGatheringLines(ctx, billing.CreateStandardInvoiceFromGatheringLinesInput{
Customer: input.Customer,
Currency: currency,
Lines: inScopeLines,
Customer: input.Customer,
Currency: currency,
Lines: inScopeLines,
ForceAsyncAdvance: input.ForceAsyncAdvance,
})
if err != nil {
return nil, fmt.Errorf("creating standard invoice from gathering lines: %w", err)
Expand Down Expand Up @@ -841,7 +842,7 @@ func (s *Service) CreateStandardInvoiceFromGatheringLines(ctx context.Context, i
}

// Otherwise, let's advance the invoice to the next final state
if err := s.advanceUntilStateStable(ctx, sm); err != nil {
if err := s.advanceUntilStateStable(ctx, sm, in.ForceAsyncAdvance); err != nil {
Comment thread
greptile-apps[bot] marked this conversation as resolved.
return fmt.Errorf("activating invoice: %w", err)
}

Expand Down
8 changes: 4 additions & 4 deletions openmeter/billing/service/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ func (s *Service) GetInvoiceById(ctx context.Context, input billing.GetInvoiceBy
}
}

func (s *Service) advanceUntilStateStable(ctx context.Context, sm *InvoiceStateMachine) error {
if s.advancementStrategy == billing.QueuedAdvancementStrategy {
func (s *Service) advanceUntilStateStable(ctx context.Context, sm *InvoiceStateMachine, forceAsync bool) error {
if forceAsync || s.advancementStrategy == billing.QueuedAdvancementStrategy {
return s.publisher.Publish(ctx, billing.AdvanceStandardInvoiceEvent{
Invoice: sm.Invoice.GetInvoiceID(),
CustomerID: sm.Invoice.Customer.CustomerID,
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) AdvanceInvoice(ctx context.Context, input billing.AdvanceInvoi
}
}

if err := s.advanceUntilStateStable(ctx, sm); err != nil {
if err := s.advanceUntilStateStable(ctx, sm, false); err != nil {
return fmt.Errorf("advancing invoice: %w", err)
}

Expand Down Expand Up @@ -700,7 +700,7 @@ func (s *Service) executeTriggerOnInvoice(ctx context.Context, invoiceID billing
return nil
}

if err := s.advanceUntilStateStable(ctx, sm); err != nil {
if err := s.advanceUntilStateStable(ctx, sm, false); err != nil {
return fmt.Errorf("advancing invoice: %w", err)
}

Expand Down
1 change: 1 addition & 0 deletions openmeter/billing/stdinvoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,7 @@ type CreateStandardInvoiceFromGatheringLinesInput struct {

Lines GatheringLines
PostCreationCalculationHook PostCreationCalculationHook
ForceAsyncAdvance bool
}

type (
Expand Down
5 changes: 3 additions & 2 deletions openmeter/billing/worker/collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (a *InvoiceCollector) CollectCustomerInvoice(ctx context.Context, params Co
invoices, err := a.billingService.InvoicePendingLines(
ctx,
billing.InvoicePendingLinesInput{
Customer: params.CustomerID,
AsOf: lo.ToPtr(params.AsOf),
Customer: params.CustomerID,
AsOf: lo.ToPtr(params.AsOf),
ForceAsyncAdvance: true,
Comment thread
greptile-apps[bot] marked this conversation as resolved.
},
// We want to make sure that system collection does not use progressive billing.
billing.WithPartialInvoiceLinesDisabled(),
Expand Down
49 changes: 26 additions & 23 deletions openmeter/billing/worker/subscriptionsync/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type FeatureFlags struct {
type Config struct {
BillingService billing.Service
// ChargesService is required for credit-only sync and charge-based provisioning.
ChargesService charges.Service
SubscriptionService subscription.Service
SubscriptionSyncAdapter subscriptionsync.Adapter
FeatureFlags FeatureFlags
Logger *slog.Logger
Tracer trace.Tracer
FeatureGate *featuregate.FeatureGateChecker
ChargesService charges.Service
SubscriptionService subscription.Service
SubscriptionSyncAdapter subscriptionsync.Adapter
FeatureFlags FeatureFlags
ForceAsyncInvoicePendingLines bool
Logger *slog.Logger
Tracer trace.Tracer
FeatureGate *featuregate.FeatureGateChecker
}

func (c Config) Validate() error {
Expand Down Expand Up @@ -68,14 +69,15 @@ func (c Config) Validate() error {
var _ subscriptionsync.Service = (*Service)(nil)

type Service struct {
billingService billing.Service
chargesService charges.Service
reconciler reconciler.Reconciler
subscriptionService subscription.Service
subscriptionSyncAdapter subscriptionsync.Adapter
featureFlags FeatureFlags
logger *slog.Logger
tracer trace.Tracer
billingService billing.Service
chargesService charges.Service
reconciler reconciler.Reconciler
subscriptionService subscription.Service
subscriptionSyncAdapter subscriptionsync.Adapter
featureFlags FeatureFlags
forceAsyncInvoicePendingLines bool
logger *slog.Logger
tracer trace.Tracer
}

func New(config Config) (*Service, error) {
Expand All @@ -93,14 +95,15 @@ func New(config Config) (*Service, error) {
return nil, err
}
return &Service{
billingService: config.BillingService,
chargesService: config.ChargesService,
reconciler: reconcilerSvc,
subscriptionSyncAdapter: config.SubscriptionSyncAdapter,
featureFlags: config.FeatureFlags,
subscriptionService: config.SubscriptionService,
logger: config.Logger,
tracer: config.Tracer,
billingService: config.BillingService,
chargesService: config.ChargesService,
reconciler: reconcilerSvc,
subscriptionSyncAdapter: config.SubscriptionSyncAdapter,
featureFlags: config.FeatureFlags,
forceAsyncInvoicePendingLines: config.ForceAsyncInvoicePendingLines,
subscriptionService: config.SubscriptionService,
logger: config.Logger,
tracer: config.Tracer,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion openmeter/billing/worker/subscriptionsync/service/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (s *Service) invoicePendingLines(ctx context.Context, customer customer.Cus
_, err := s.billingService.InvoicePendingLines(
ctx,
billing.InvoicePendingLinesInput{
Customer: customer,
Customer: customer,
ForceAsyncAdvance: s.forceAsyncInvoicePendingLines,
},
billing.WithPartialInvoiceLinesDisabled(),
billing.WithMaxLinesPerInvoice(s.featureFlags.MaxLinesPerCollectedInvoice),
Expand Down
28 changes: 28 additions & 0 deletions test/billing/invoice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3680,6 +3680,34 @@ func (s *InvoicingTestSuite) TestNamespaceLockedInvoiceProgression() {
s.Equal(billing.ValidationIssueSeverityCritical, validationError.Severity)
}

func (s *InvoicingTestSuite) TestInvoicePendingLinesForceAsyncAdvance() {
namespace := "ns-force-async-advance"
ctx := context.Background()

sandboxApp := s.InstallSandboxApp(s.T(), namespace)

s.ProvisionBillingProfile(ctx, namespace, sandboxApp.GetID())

customer := s.CreateTestCustomer(namespace, "test-customer")

s.CreateGatheringInvoice(s.T(), ctx, DraftInvoiceInput{
Namespace: namespace,
Customer: customer,
})

invoices, err := s.BillingService.InvoicePendingLines(ctx, billing.InvoicePendingLinesInput{
Customer: customer.GetID(),
ForceAsyncAdvance: true,
})
s.NoError(err)
s.Len(invoices, 1)
s.Equal(billing.StandardInvoiceStatusDraftCreated, invoices[0].Status)

invoice, err := s.BillingService.AdvanceInvoice(ctx, invoices[0].GetInvoiceID())
s.NoError(err)
s.NotEqual(billing.StandardInvoiceStatusDraftCreated, invoice.Status)
}

func (s *InvoicingTestSuite) TestProgressiveBillLate() {
namespace := "ns-progressive-bill-late"
ctx := context.Background()
Expand Down
Loading