Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions evmrpc/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/sei-protocol/sei-chain/x/evm/keeper"
"github.com/sei-protocol/sei-chain/x/evm/state"
"github.com/sei-protocol/sei-chain/x/evm/types"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -32,6 +33,11 @@ const (
Sei2Namespace = "sei2"
)

// maxBlockReceiptsConcurrency is a hard cap on the number of goroutines
// eth_getBlockReceipts (and its sei_/sei2_ variants) will fan out to when
// fetching per-tx receipts.
const maxBlockReceiptsConcurrency = 100

// genesisBlockHashHex is the block hash returned by GetBlockByNumber("0x0"). Hash-based lookups
// must recognize this so that count/block-by-hash stay consistent with block-by-number.
const genesisBlockHashHex = "0xF9D3845DF25B43B1C6926F3CEDA6845C17F5624E12212FD8847D0BA01DA1AB9E"
Expand Down Expand Up @@ -333,34 +339,37 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block

txHashes := getTxHashesFromBlock(a.ctxProvider, a.txConfigProvider, a.keeper, block, shouldIncludeSynthetic(a.namespace), a.cacheCreationMutex, a.globalBlockCache)

// Get tx receipts for all hashes in parallel
wg := sync.WaitGroup{}
mtx := sync.Mutex{}
// Get tx receipts for all hashes in parallel, with a hard cap on the
// goroutine fan-out, so a block with a very large number of txs
// cannot spawn an unbounded number of goroutines. errgroup.SetLimit blocks
// Go() until a slot frees, bounding the number of live goroutines rather
// than just the number doing concurrent work.
allReceipts := make([]map[string]interface{}, len(txHashes))
g := new(errgroup.Group)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use errgroup with context since we have one?

There is no point waiting in the group if context is cancelled IIUC.

g.SetLimit(maxBlockReceiptsConcurrency)
for i, hash := range txHashes {
wg.Add(1)
go func(i int, hash typedTxHash) {
defer wg.Done()
g.Go(func() error {
defer recoverAndLog()
receipt, err := getOrSetCachedReceiptErr(a.cacheCreationMutex, a.globalBlockCache, a.ctxProvider(height), a.keeper, block, hash.hash)
if err != nil {
if !strings.Contains(err.Error(), "not found") {
mtx.Lock()
returnErr = err
mtx.Unlock()
// A missing receipt is expected for some hashes and is not an
// error; skip it and leave allReceipts[i] empty.
if strings.Contains(err.Error(), "not found") {
return nil
}
return
return err
}
encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, receipt, a.keeper, block, a.includeShellReceipts, a.globalBlockCache, a.cacheCreationMutex)
if err != nil {
mtx.Lock()
returnErr = err
mtx.Unlock()
return err
}
allReceipts[i] = encodedReceipt
}(i, hash)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
wg.Wait()
compactReceipts := make([]map[string]interface{}, 0)
for _, r := range allReceipts {
if len(r) > 0 {
Expand All @@ -370,9 +379,6 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block
for i, cr := range compactReceipts {
cr["transactionIndex"] = hexutil.Uint64(i) //nolint:gosec
}
if returnErr != nil {
return nil, returnErr
}
return compactReceipts, nil
}

Expand Down
11 changes: 10 additions & 1 deletion evmrpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,16 @@ func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriter
}
begin = lastToHeight
filter.FromBlock = big.NewInt(lastToHeight + 1)
time.Sleep(SleepInterval)
// Wait before the next poll, but stop promptly if the client
// disconnects or unsubscribes (rpcSub.Err()). Note: ctx here is the
// per-call context, which the RPC framework cancels as soon as the
// eth_subscribe call returns, so it must NOT be used to tear down the
// long-lived subscription loop.
select {
case <-rpcSub.Err():
return
case <-time.After(SleepInterval):
}
}
}()

Expand Down
Loading