diff --git a/evmrpc/block.go b/evmrpc/block.go index d6d4c9b9d1..f138994e25 100644 --- a/evmrpc/block.go +++ b/evmrpc/block.go @@ -24,6 +24,7 @@ import ( "github.com/sei-protocol/sei-chain/x/evm/keeper" "github.com/sei-protocol/sei-chain/x/evm/state" "github.com/sei-protocol/sei-chain/x/evm/types" + "golang.org/x/sync/errgroup" ) const ( @@ -32,6 +33,11 @@ const ( Sei2Namespace = "sei2" ) +// maxBlockReceiptsConcurrency is a hard cap on the number of goroutines +// eth_getBlockReceipts (and its sei_/sei2_ variants) will fan out to when +// fetching per-tx receipts. +const maxBlockReceiptsConcurrency = 100 + // genesisBlockHashHex is the block hash returned by GetBlockByNumber("0x0"). Hash-based lookups // must recognize this so that count/block-by-hash stay consistent with block-by-number. const genesisBlockHashHex = "0xF9D3845DF25B43B1C6926F3CEDA6845C17F5624E12212FD8847D0BA01DA1AB9E" @@ -333,34 +339,37 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block txHashes := getTxHashesFromBlock(a.ctxProvider, a.txConfigProvider, a.keeper, block, shouldIncludeSynthetic(a.namespace), a.cacheCreationMutex, a.globalBlockCache) - // Get tx receipts for all hashes in parallel - wg := sync.WaitGroup{} - mtx := sync.Mutex{} + // Get tx receipts for all hashes in parallel, with a hard cap on the + // goroutine fan-out, so a block with a very large number of txs + // cannot spawn an unbounded number of goroutines. errgroup.SetLimit blocks + // Go() until a slot frees, bounding the number of live goroutines rather + // than just the number doing concurrent work. allReceipts := make([]map[string]interface{}, len(txHashes)) + g := new(errgroup.Group) + g.SetLimit(maxBlockReceiptsConcurrency) for i, hash := range txHashes { - wg.Add(1) - go func(i int, hash typedTxHash) { - defer wg.Done() + g.Go(func() error { defer recoverAndLog() receipt, err := getOrSetCachedReceiptErr(a.cacheCreationMutex, a.globalBlockCache, a.ctxProvider(height), a.keeper, block, hash.hash) if err != nil { - if !strings.Contains(err.Error(), "not found") { - mtx.Lock() - returnErr = err - mtx.Unlock() + // A missing receipt is expected for some hashes and is not an + // error; skip it and leave allReceipts[i] empty. + if strings.Contains(err.Error(), "not found") { + return nil } - return + return err } encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, receipt, a.keeper, block, a.includeShellReceipts, a.globalBlockCache, a.cacheCreationMutex) if err != nil { - mtx.Lock() - returnErr = err - mtx.Unlock() + return err } allReceipts[i] = encodedReceipt - }(i, hash) + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err } - wg.Wait() compactReceipts := make([]map[string]interface{}, 0) for _, r := range allReceipts { if len(r) > 0 { @@ -370,9 +379,6 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block for i, cr := range compactReceipts { cr["transactionIndex"] = hexutil.Uint64(i) //nolint:gosec } - if returnErr != nil { - return nil, returnErr - } return compactReceipts, nil } diff --git a/evmrpc/subscribe.go b/evmrpc/subscribe.go index 7ca79be35f..66a9ace14c 100644 --- a/evmrpc/subscribe.go +++ b/evmrpc/subscribe.go @@ -278,7 +278,16 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter } begin = lastToHeight filter.FromBlock = big.NewInt(lastToHeight + 1) - time.Sleep(SleepInterval) + // Wait before the next poll, but stop promptly if the client + // disconnects or unsubscribes (rpcSub.Err()). Note: ctx here is the + // per-call context, which the RPC framework cancels as soon as the + // eth_subscribe call returns, so it must NOT be used to tear down the + // long-lived subscription loop. + select { + case <-rpcSub.Err(): + return + case <-time.After(SleepInterval): + } } }()