diff --git a/sei-db/config/receipt_config.go b/sei-db/config/receipt_config.go index e894765acd..9742d69f63 100644 --- a/sei-db/config/receipt_config.go +++ b/sei-db/config/receipt_config.go @@ -82,10 +82,10 @@ func ReadReceiptConfig(opts AppOptions) (ReceiptStoreConfig, error) { } backend = strings.ToLower(strings.TrimSpace(backend)) switch backend { - case "pebbledb", "pebble": + case "pebbledb", "pebble", "littidx": cfg.Backend = backend default: - return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb", backend) + return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb, littidx", backend) } } if v := opts.Get(flagRSAsyncWriteBuffer); v != nil { diff --git a/sei-db/db_engine/pebbledb/db.go b/sei-db/db_engine/pebbledb/db.go index c22dece944..2e0cfa67ff 100644 --- a/sei-db/db_engine/pebbledb/db.go +++ b/sei-db/db_engine/pebbledb/db.go @@ -165,6 +165,15 @@ func (p *pebbleDB) Delete(key []byte, opts types.WriteOptions) error { return nil } +// DeleteRange deletes every key in [start, end) with a single range tombstone +// (O(1) write) rather than iterating and point-deleting each key. +func (p *pebbleDB) DeleteRange(start, end []byte, opts types.WriteOptions) error { + if err := p.db.DeleteRange(start, end, toPebbleWriteOpts(opts)); err != nil { + return fmt.Errorf("failed to delete range in database: %w", err) + } + return nil +} + func (p *pebbleDB) NewIter(opts *types.IterOptions) (dbm.Iterator, error) { var iopts *pebble.IterOptions if opts != nil { diff --git a/sei-db/ledger_db/receipt/export_test.go b/sei-db/ledger_db/receipt/export_test.go index e2d2aaf09b..04650ae381 100644 --- a/sei-db/ledger_db/receipt/export_test.go +++ b/sei-db/ledger_db/receipt/export_test.go @@ -15,3 +15,10 @@ func RecoverReceiptStore(changelogPath string, db types2.StateStore) error { func GetLogsForTx(receipt *types.Receipt, logStartIndex uint) []*ethtypes.Log { return getLogsForTx(receipt, logStartIndex) } + +// PruneLittIdx runs a synchronous prune on a littidx store, removing every +// block below cutoff. Test-only hook so prune behavior can be asserted without +// waiting on the background interval. +func PruneLittIdx(store ReceiptStore, cutoff uint64) error { + return store.(*littReceiptStore).pruneBlocksBelow(cutoff) +} diff --git a/sei-db/ledger_db/receipt/litt_receipt_store.go b/sei-db/ledger_db/receipt/litt_receipt_store.go new file mode 100644 index 0000000000..fd4ba2c15e --- /dev/null +++ b/sei-db/ledger_db/receipt/litt_receipt_store.go @@ -0,0 +1,491 @@ +package receipt + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + "github.com/sei-protocol/sei-chain/sei-db/common/unit" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/disktable/keymap" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/littbuilder" + litttypes "github.com/sei-protocol/sei-chain/sei-db/db_engine/litt/types" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb" + dbtypes "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/x/evm/types" +) + +// littReceiptStore stores receipt bodies in LittDB and supports eth_getLogs +// via a small pebble tag index (see litt_tag_index.go). +// +// Receipt bytes live in litt's immutable append-only segments — large values +// never enter LSM compaction, and expired data is reclaimed by dropping whole +// segments. Each tx hash is a litt secondary key aliasing its receipt's byte +// range, so eth_getTransactionReceipt is one litt lookup with no separate +// index. A block is written as one or more litt "parts" (block + part index -> +// the part's receipts concatenated); a block normally has one part, but legacy +// receipt migration can flush a block across several SetReceipts calls, each +// appending a new immutable part. +// +// The pebble index holds the tag keys (litt_tag_index.go) plus version +// metadata (m:latest / m:earliest). +// +// Durability: a background flusher bounds litt durability lag to +// littFlushInterval without putting fsync on the commit path; Close flushes the +// remainder. Accepted tradeoff: a hard crash can lose up to littFlushInterval +// of the most recent receipt bodies (litt buffers in memory, no WAL) while the +// index still lists them — tolerable for auxiliary, non-consensus RPC data, +// since reads return not-found for a missing body. Retention: receipt values +// expire via litt's per-table TTL (time based), tag keys are pruned by block +// range, and reads enforce the KeepRecent floor, so visible retention never +// exceeds KeepRecent regardless of GC timing. +type littReceiptStore struct { + values litt.DB + receipts litt.Table + index dbtypes.KeyValueDB + storeKey sdk.StoreKey + + latestVersion atomic.Int64 + earliestVersion atomic.Int64 + + keepRecent int64 + pruneInterval int64 + stopBackground chan struct{} + backgroundWg sync.WaitGroup + closeOnce sync.Once +} + +var _ ReceiptStore = (*littReceiptStore)(nil) + +var ( + receiptLatestVersionKey = []byte("m:latest") + receiptEarliestVersionKey = []byte("m:earliest") +) + +const ( + receiptBackendLittIdx = "littidx" + + littReceiptTableName = "receipts" + littFlushInterval = 100 * time.Millisecond + // littTTLPerBlock converts the KeepRecent block count into litt's wall-clock + // TTL (KeepRecent * littTTLPerBlock). Set above Giga block times so the TTL + // over-retains; only a sustained block time above this would expire a body + // still inside the height-based KeepRecent window, which reads mask as + // not-found (the earliest-version floor is authoritative). + littTTLPerBlock = 2 * time.Second + + littPartCountLen = 4 +) + +// littPartKey identifies one part of a block's receipts in litt. Its 12-byte +// length is disjoint from the 32-byte tx-hash secondary keys. +func littPartKey(blockNumber uint64, part uint32) []byte { + key := make([]byte, blockNumLen+littPartCountLen) + binary.BigEndian.PutUint64(key, blockNumber) + binary.BigEndian.PutUint32(key[blockNumLen:], part) + return key +} + +func newLittReceiptStore(cfg dbconfig.ReceiptStoreConfig, storeKey sdk.StoreKey) (ReceiptStore, error) { + if err := os.MkdirAll(cfg.DBDirectory, 0o750); err != nil { + return nil, fmt.Errorf("failed to create receipt store directory: %w", err) + } + littConfig, err := litt.DefaultConfig(filepath.Join(cfg.DBDirectory, "littdb")) + if err != nil { + return nil, fmt.Errorf("failed to build littdb config: %w", err) + } + // Receipt-workload tuning: this is a small-value, many-keys workload (every + // tx hash is a key), the opposite of litt's few-large-values default. The + // stock seal triggers fire every few thousand tiny keys / 2MB of key file + // and stall throughput, so raise the key-count and key-file caps past a + // retention window and let only the value-file size bind segment seals. + littConfig.MaxSegmentKeyCount = 100_000_000 + littConfig.TargetSegmentFileSize = 512 * unit.MB + littConfig.TargetSegmentKeyFileSize = 5 * unit.GB + littConfig.KeymapType = keymap.PebbleDBKeymapType + + values, err := littbuilder.NewDB(littConfig) + if err != nil { + return nil, fmt.Errorf("failed to open littdb: %w", err) + } + tableConfig := litt.DefaultTableConfig(littReceiptTableName) + tableConfig.ShardingFactor = 16 // spread writes across shards (cores spare) + receipts, err := values.BuildTable(tableConfig) + if err != nil { + _ = values.Close() + return nil, fmt.Errorf("failed to open littdb receipts table: %w", err) + } + if cfg.KeepRecent > 0 { + if err := receipts.SetTTL(time.Duration(cfg.KeepRecent) * littTTLPerBlock); err != nil { + _ = values.Close() + return nil, fmt.Errorf("failed to set littdb ttl: %w", err) + } + } + + indexCfg := pebbledb.DefaultConfig() + indexCfg.DataDir = filepath.Join(cfg.DBDirectory, "log-index") + index, err := pebbledb.Open(context.Background(), &indexCfg) + if err != nil { + _ = values.Close() + return nil, fmt.Errorf("failed to open receipt log index: %w", err) + } + + s := &littReceiptStore{ + values: values, + receipts: receipts, + index: index, + storeKey: storeKey, + keepRecent: int64(cfg.KeepRecent), + pruneInterval: int64(cfg.PruneIntervalSeconds), + stopBackground: make(chan struct{}), + } + s.latestVersion.Store(s.readMeta(receiptLatestVersionKey)) + s.earliestVersion.Store(s.readMeta(receiptEarliestVersionKey)) + s.startPruning() + s.startFlusher() + return s, nil +} + +func (s *littReceiptStore) readMeta(key []byte) int64 { + val, err := s.index.Get(key) + if err != nil || len(val) != blockNumLen { + return 0 + } + return int64(binary.BigEndian.Uint64(val)) //nolint:gosec // block heights fit within int64 +} + +func (s *littReceiptStore) LatestVersion() int64 { return s.latestVersion.Load() } + +func (s *littReceiptStore) EarliestVersion() int64 { return s.earliestVersion.Load() } + +func (s *littReceiptStore) SetLatestVersion(version int64) error { + if version <= s.latestVersion.Load() { + return nil + } + if err := s.index.Set(receiptLatestVersionKey, encodeBlockNumber(uint64(version)), dbtypes.WriteOptions{}); err != nil { //nolint:gosec // block heights fit within uint64 + return err + } + s.latestVersion.Store(version) + return nil +} + +func (s *littReceiptStore) SetEarliestVersion(version int64) error { + // The retention floor only moves forward; never re-expose pruned blocks. + if version <= s.earliestVersion.Load() { + return nil + } + if err := s.index.Set(receiptEarliestVersionKey, encodeBlockNumber(uint64(version)), dbtypes.WriteOptions{}); err != nil { //nolint:gosec // block heights fit within uint64 + return err + } + s.earliestVersion.Store(version) + return nil +} + +func (s *littReceiptStore) GetReceipt(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) { + receipt, err := s.GetReceiptFromStore(ctx, txHash) + if err == nil { + return receipt, nil + } + if !errors.Is(err, ErrNotFound) { + return nil, err + } + // Misses are cheap negatives (one keymap lookup); fall back to the legacy + // KV store for receipts that predate this store. + return legacyReceiptFromKVStore(ctx, s.storeKey, txHash) +} + +func (s *littReceiptStore) GetReceiptFromStore(_ sdk.Context, txHash common.Hash) (*types.Receipt, error) { + val, exists, err := s.receipts.Get(txHash[:]) + if err != nil { + return nil, err + } + if !exists { + return nil, ErrNotFound + } + var r types.Receipt + // gogoproto Unmarshal copies all byte/string fields, so it is safe over + // litt's cache-shared buffer. + if err := r.Unmarshal(val); err != nil { + return nil, err + } + // Enforce the KeepRecent floor: litt expires values lazily via TTL, so a + // pruned block may still be physically present. + if s.belowRetentionFloor(r.BlockNumber) { + return nil, ErrNotFound + } + return &r, nil +} + +func (s *littReceiptStore) belowRetentionFloor(blockNumber uint64) bool { + earliest := s.earliestVersion.Load() + return earliest > 0 && blockNumber < uint64(earliest) //nolint:gosec // earliest is non-negative +} + +func (s *littReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptRecord) error { + blockNumbers, receiptsByBlock := groupReceiptRecordsByBlock(receipts) + if len(blockNumbers) == 0 { + return s.SetLatestVersion(ctx.BlockHeight()) + } + + // Receipt values go to litt first; the index batch (tag keys + version + // meta) commits after, so an indexed block always has its values written. + batch := s.index.NewBatch() + defer func() { _ = batch.Close() }() + + for _, blockNumber := range blockNumbers { + if err := s.writeBlock(batch, blockNumber, receiptsByBlock[blockNumber]); err != nil { + return err + } + } + + maxBlock := blockNumbers[len(blockNumbers)-1] + newLatest := s.latestVersion.Load() + if int64(maxBlock) > newLatest { //nolint:gosec // block heights fit within int64 + newLatest = int64(maxBlock) //nolint:gosec // block heights fit within int64 + if err := batch.Set(receiptLatestVersionKey, encodeBlockNumber(maxBlock)); err != nil { + return err + } + } + if err := batch.Commit(dbtypes.WriteOptions{}); err != nil { + return err + } + s.latestVersion.Store(newLatest) + return nil +} + +// writeBlock appends one part for the block into litt (the receipts +// concatenated, each tx hash a secondary key aliasing its receipt's range) and +// stages the tag keys onto the index batch. nextPartIndex picks the next free +// part slot, so a normal block writes part 0 and legacy migration appends. +func (s *littReceiptStore) writeBlock(batch dbtypes.Batch, blockNumber uint64, records []ReceiptRecord) error { + sortRecordsByTxIndex(records) + + partIndex, err := s.nextPartIndex(blockNumber) + if err != nil { + return err + } + + // The part value is the receipts concatenated; each tx hash is a secondary + // key aliasing its receipt's sub-range, which is how every read fetches a + // receipt — the value is never read whole, so no framing is needed. + value := make([]byte, 0) + secondaryKeys := make([]*litttypes.SecondaryKey, 0, len(records)) + for _, record := range records { + bz, err := marshaledReceipt(record) + if err != nil { + return err + } + offset := uint32(len(value)) //nolint:gosec // block regions fit within uint32 + value = append(value, bz...) + + txHash := make([]byte, common.HashLength) + copy(txHash, record.TxHash[:]) + secondaryKeys = append(secondaryKeys, &litttypes.SecondaryKey{ + Key: txHash, + Offset: offset, + Length: uint32(len(bz)), //nolint:gosec // receipt sizes fit within uint32 + }) + } + + if err := s.receipts.Put(littPartKey(blockNumber, partIndex), value, secondaryKeys...); err != nil { + return err + } + return s.stageTagKeys(batch, blockNumber, records) +} + +// nextPartIndex returns the number of parts already written for the block, +// probing litt from part 0 until a gap. A brand-new block returns 0 after one +// Exists; blocks normally have exactly one part. +func (s *littReceiptStore) nextPartIndex(blockNumber uint64) (uint32, error) { + for part := uint32(0); ; part++ { + exists, err := s.receipts.Exists(littPartKey(blockNumber, part)) + if err != nil { + return 0, err + } + if !exists { + return part, nil + } + } +} + +// FilterLogs answers eth_getLogs via the tag index. Both bounds inclusive; for +// a single block set fromBlock == toBlock. +func (s *littReceiptStore) FilterLogs(_ sdk.Context, fromBlock, toBlock uint64, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + if fromBlock > toBlock { + return nil, fmt.Errorf("fromBlock (%d) > toBlock (%d)", fromBlock, toBlock) + } + return s.filterLogsByTags(fromBlock, toBlock, crit) +} + +// startFlusher bounds litt durability lag to littFlushInterval from a +// background goroutine so block commit never waits on an fsync. +func (s *littReceiptStore) startFlusher() { + s.backgroundWg.Add(1) + go func() { + defer s.backgroundWg.Done() + ticker := time.NewTicker(littFlushInterval) + defer ticker.Stop() + for { + select { + case <-s.stopBackground: + return + case <-ticker.C: + if err := s.receipts.Flush(); err != nil { + logger.Error("failed to flush littdb receipts", "err", err) + } + } + } + }() +} + +func (s *littReceiptStore) Close() error { + var err error + s.closeOnce.Do(func() { + close(s.stopBackground) + s.backgroundWg.Wait() + // litt's Close flushes, so the last sub-interval of writes is durable. + err = s.values.Close() + if indexErr := s.index.Close(); err == nil { + err = indexErr + } + }) + return err +} + +func (s *littReceiptStore) startPruning() { + if s.keepRecent <= 0 || s.pruneInterval <= 0 { + return + } + s.backgroundWg.Add(1) + go func() { + defer s.backgroundWg.Done() + for { + // Keep exactly keepRecent blocks, [latest-keepRecent+1, latest]; the + // +1 matches the pebble backend (which retains keepRecent, not +1). + pruneBefore := s.latestVersion.Load() - s.keepRecent + 1 + if pruneBefore > 0 { + if err := s.pruneBlocksBelow(uint64(pruneBefore)); err != nil { + logger.Error("failed to prune littdb receipt store", "before-block", pruneBefore, "err", err) + } + } + // Jittered cadence, matching the other receipt pruners. + sleep := time.Duration(float64(s.pruneInterval)*(1+rand.Float64())) * time.Second + select { + case <-s.stopBackground: + return + case <-time.After(sleep): + } + } + }() +} + +// pruneBlocksBelow deletes the tag entries in [earliest, cutoff) and advances +// the retention floor. Receipt values are reclaimed independently by litt's TTL +// GC; the read-time floor keeps them invisible in the meantime. +func (s *littReceiptStore) pruneBlocksBelow(cutoff uint64) error { + floor := uint64(0) + if earliest := s.earliestVersion.Load(); earliest > 0 { + floor = uint64(earliest) //nolint:gosec // earliest is non-negative + } + if floor >= cutoff { + return nil + } + + if err := s.deleteIndexRange(littTagBlockKey(floor), littTagBlockKey(cutoff)); err != nil { + return err + } + if err := s.index.Set(receiptEarliestVersionKey, encodeBlockNumber(cutoff), dbtypes.WriteOptions{}); err != nil { + return err + } + s.earliestVersion.Store(int64(cutoff)) //nolint:gosec // block heights fit within int64 + return nil +} + +// rangeDeleter is implemented by index DBs that can drop a whole key range with +// one range tombstone instead of per-key deletes (pebble implements it). +type rangeDeleter interface { + DeleteRange(start, end []byte, opts dbtypes.WriteOptions) error +} + +// deleteIndexRange removes every index key in [lower, upper) with one O(1) range +// tombstone — essential for the tag index, which writes thousands of keys per +// block, so per-key deletes would scan and delete millions of keys per prune +// pass. The index is always pebble (which supports range delete); the assertion +// guards against a future backend that does not. +func (s *littReceiptStore) deleteIndexRange(lower, upper []byte) error { + rd, ok := s.index.(rangeDeleter) + if !ok { + return fmt.Errorf("receipt index %T does not support range delete", s.index) + } + return rd.DeleteRange(lower, upper, dbtypes.WriteOptions{}) +} + +// groupReceiptRecordsByBlock splits records by block number (dropping entries +// without a receipt) and returns the block numbers in ascending order. +func groupReceiptRecordsByBlock(receipts []ReceiptRecord) ([]uint64, map[uint64][]ReceiptRecord) { + byBlock := make(map[uint64][]ReceiptRecord) + var blockNumbers []uint64 + for _, record := range receipts { + if record.Receipt == nil { + continue + } + block := record.Receipt.BlockNumber + if _, ok := byBlock[block]; !ok { + blockNumbers = append(blockNumbers, block) + } + byBlock[block] = append(byBlock[block], record) + } + sort.Slice(blockNumbers, func(i, j int) bool { return blockNumbers[i] < blockNumbers[j] }) + return blockNumbers, byBlock +} + +// sortRecordsByTxIndex orders a block's records by transaction index (tx hash +// as tiebreaker), the storage order the tag index relies on. +func sortRecordsByTxIndex(records []ReceiptRecord) { + sort.Slice(records, func(i, j int) bool { + l, r := records[i].Receipt, records[j].Receipt + if l.TransactionIndex != r.TransactionIndex { + return l.TransactionIndex < r.TransactionIndex + } + return records[i].TxHash.Cmp(records[j].TxHash) < 0 + }) +} + +// marshaledReceipt returns the record's pre-marshaled bytes, marshaling the +// receipt if they were not supplied. +func marshaledReceipt(record ReceiptRecord) ([]byte, error) { + if len(record.ReceiptBytes) > 0 { + return record.ReceiptBytes, nil + } + return record.Receipt.Marshal() +} + +// legacyReceiptFromKVStore looks up a receipt in the legacy KV store for +// receipts that predate this store. Returns ErrNotFound when unavailable. +func legacyReceiptFromKVStore(ctx sdk.Context, storeKey sdk.StoreKey, txHash common.Hash) (*types.Receipt, error) { + if storeKey == nil { + return nil, ErrNotFound + } + bz := ctx.KVStore(storeKey).Get(types.ReceiptKey(txHash)) + if bz == nil { + return nil, ErrNotFound + } + var r types.Receipt + if err := r.Unmarshal(bz); err != nil { + return nil, err + } + return &r, nil +} diff --git a/sei-db/ledger_db/receipt/litt_tag_index.go b/sei-db/ledger_db/receipt/litt_tag_index.go new file mode 100644 index 0000000000..a65fbe6353 --- /dev/null +++ b/sei-db/ledger_db/receipt/litt_tag_index.go @@ -0,0 +1,319 @@ +package receipt + +import ( + "encoding/binary" + "fmt" + "sort" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + dbtypes "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/x/evm/types" +) + +// The tag index is the "littidx" filtering layer. litt holds the receipt +// bodies (point lookup by tx hash); this pebble index holds one empty-valued +// key per log tag so FilterLogs can locate matching receipts without scanning +// whole blocks: +// +// 't' + block (u64 BE) + kind (1B) + tag (20B addr / 32B topic) +// + txIndex (u32 BE) + firstLogIndex (u32 BE) + txHash (32B) -> nil +// +// A query seeks the (block, tag) entries, intersects candidates across the +// criteria dimensions, then point-reads only the matching receipts from litt +// by the tx hash carried in the key. The kind byte keeps the address and each +// topic position in disjoint keyspaces (criteria are positional). +// firstLogIndex is the receipt's block-wide first log index, stored so reads +// can number logs without decoding the receipts before it. Pruning a block +// range is a single range tombstone (see deleteIndexRange). +const ( + littTagKeyPrefix = 't' + + tagKindAddress byte = 0x00 + tagKindTopic0 byte = 0x01 // +p for topic position p + maxIndexedTopics = 4 // EVM LOG4 limit + + littTxIndexLen = 4 + littLogIndexLen = 4 + littAddrTagLen = common.AddressLength // 20 + littTopicTagLen = common.HashLength // 32 + littTagSuffixLen = littTxIndexLen + littLogIndexLen + common.HashLength + littTagKeyMaxLen = 1 + blockNumLen + 1 + littTopicTagLen + littTagSuffixLen +) + +// littTagRef is what a tag scan recovers per candidate transaction: the +// receipt's block-wide first log index and the tx hash to look it up in litt. +type littTagRef struct { + firstLogIndex uint32 + txHash common.Hash +} + +// littTagBlockKey is the lower bound for every tag key of a block (also the +// exclusive upper bound for block-1's keys). +func littTagBlockKey(blockNumber uint64) []byte { + key := make([]byte, 1+blockNumLen) + key[0] = littTagKeyPrefix + binary.BigEndian.PutUint64(key[1:], blockNumber) + return key +} + +// littTagKindKey bounds a single (block, kind) keyspace. +func littTagKindKey(blockNumber uint64, kind byte) []byte { + return append(littTagBlockKey(blockNumber), kind) +} + +// littTagTagKey is the scan prefix for one (block, kind+tag). +func littTagTagKey(blockNumber uint64, kindTag []byte) []byte { + return append(littTagBlockKey(blockNumber), kindTag...) +} + +// appendLittTagKey writes a full tag key into dst[:0], reusing its capacity so +// the write path allocates no per-key slice (pebble's batch.Set copies the key, +// so the buffer is free to reuse immediately after). +func appendLittTagKey(dst []byte, blockNumber uint64, kind byte, tag []byte, txIndex, firstLogIndex uint32, txHash common.Hash) []byte { + dst = append(dst, littTagKeyPrefix) + dst = binary.BigEndian.AppendUint64(dst, blockNumber) + dst = append(dst, kind) + dst = append(dst, tag...) + dst = binary.BigEndian.AppendUint32(dst, txIndex) + dst = binary.BigEndian.AppendUint32(dst, firstLogIndex) + return append(dst, txHash[:]...) +} + +// addressTag and topicTag build the kind+tag bytes that key membership is +// matched on; criteriaTagGroups turns filter criteria into tag groups (OR +// within a group, AND across groups — matchLog semantics), one group per +// constrained dimension (the address list, each non-empty topic position). +func addressTag(addressHex string) []byte { + addr := common.HexToAddress(addressHex) + return append([]byte{tagKindAddress}, addr[:]...) +} + +func topicTag(position int, topic common.Hash) []byte { + return append([]byte{tagKindTopic0 + byte(position)}, topic[:]...) //nolint:gosec // position < maxIndexedTopics +} + +func criteriaTagGroups(crit filters.FilterCriteria) [][][]byte { + var groups [][][]byte + if len(crit.Addresses) > 0 { + group := make([][]byte, 0, len(crit.Addresses)) + for _, addr := range crit.Addresses { + group = append(group, append([]byte{tagKindAddress}, addr[:]...)) + } + groups = append(groups, group) + } + for p, topicList := range crit.Topics { + if p >= maxIndexedTopics || len(topicList) == 0 { + continue + } + group := make([][]byte, 0, len(topicList)) + for _, topic := range topicList { + group = append(group, topicTag(p, topic)) + } + groups = append(groups, group) + } + return groups +} + +// parseLittTagKey extracts the transaction index and ref (first log index + tx +// hash) from a tag key's trailing suffix. The kind byte fixes the tag width. +func parseLittTagKey(key []byte) (txIndex uint32, ref littTagRef, err error) { + if len(key) < 1+blockNumLen+1 { + return 0, littTagRef{}, fmt.Errorf("corrupt receipt tag key %x", key) + } + tagLen := littTopicTagLen + if key[1+blockNumLen] == tagKindAddress { + tagLen = littAddrTagLen + } + suffixStart := 1 + blockNumLen + 1 + tagLen + if len(key) != suffixStart+littTagSuffixLen { + return 0, littTagRef{}, fmt.Errorf("corrupt receipt tag key %x", key) + } + suffix := key[suffixStart:] + ref.firstLogIndex = binary.BigEndian.Uint32(suffix[littTxIndexLen:]) + copy(ref.txHash[:], suffix[littTxIndexLen+littLogIndexLen:]) + return binary.BigEndian.Uint32(suffix), ref, nil +} + +// prefixSuccessor returns the smallest key strictly greater than every key +// beginning with prefix. Tag prefixes start with the 't' marker, so a non-0xff +// byte always exists and the nil (all-0xff) case can't arise here. +func prefixSuccessor(prefix []byte) []byte { + out := make([]byte, len(prefix)) + copy(out, prefix) + for i := len(out) - 1; i >= 0; i-- { + if out[i] != 0xff { + out[i]++ + return out[:i+1] + } + } + return nil +} + +// stageTagKeys writes the tag keys for every log in the block (records already +// sorted by transaction index) onto the index batch. Values are nil — all +// information is in the key — so re-staging the same data (crash replay) is +// idempotent. firstLogIndex is block-wide within a single call; a block split +// across calls (legacy migration) restarts it per part — harmless, as the RPC +// layer recomputes log indices from canonical block data. +func (s *littReceiptStore) stageTagKeys(batch dbtypes.Batch, blockNumber uint64, records []ReceiptRecord) error { + scratch := make([]byte, 0, littTagKeyMaxLen) + firstLogIndex := uint32(0) + for _, record := range records { + txIndex := record.Receipt.TransactionIndex + txHash := record.TxHash + for _, lg := range record.Receipt.Logs { + addr := common.HexToAddress(lg.Address) + scratch = appendLittTagKey(scratch[:0], blockNumber, tagKindAddress, addr[:], txIndex, firstLogIndex, txHash) + if err := batch.Set(scratch, nil); err != nil { + return err + } + for p, topic := range lg.Topics { + if p >= maxIndexedTopics { + break + } + th := common.HexToHash(topic) + scratch = appendLittTagKey(scratch[:0], blockNumber, tagKindTopic0+byte(p), th[:], txIndex, firstLogIndex, txHash) //nolint:gosec // p < maxIndexedTopics + if err := batch.Set(scratch, nil); err != nil { + return err + } + } + } + firstLogIndex += uint32(len(record.Receipt.Logs)) //nolint:gosec // log counts fit within uint32 + } + return nil +} + +// filterLogsByTags answers a getLogs query from the tag index: per block, +// intersect the candidate transactions across criteria dimensions, then +// point-read only the surviving receipts from litt. Results are exact — +// matchLog re-verifies after decode. +func (s *littReceiptStore) filterLogsByTags(fromBlock, toBlock uint64, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + if latest := s.latestVersion.Load(); latest >= 0 && toBlock > uint64(latest) { //nolint:gosec // latest is non-negative + toBlock = uint64(latest) //nolint:gosec // latest is non-negative + } + if earliest := s.earliestVersion.Load(); earliest > 0 && fromBlock < uint64(earliest) { //nolint:gosec // earliest is non-negative + fromBlock = uint64(earliest) //nolint:gosec // earliest is non-negative + } + if fromBlock > toBlock { + return nil, nil + } + + groups := criteriaTagGroups(crit) + var logs []*ethtypes.Log + for block := fromBlock; block <= toBlock; block++ { + candidates, err := s.blockTagCandidates(block, groups) + if err != nil { + return nil, err + } + if len(candidates) == 0 { + continue + } + blockLogs, err := s.candidateBlockLogs(candidates, crit) + if err != nil { + return nil, err + } + logs = append(logs, blockLogs...) + } + return logs, nil +} + +// blockTagCandidates returns the block's candidate transactions. With no +// criteria groups it scans the address keyspace (every log-bearing tx has an +// address tag). Otherwise it scans exactly one tag's keys per (group, tag) — +// the index iterator has no SeekGE, so a tight bounded scan is the +// seek-equivalent — and intersects: a tx survives only if some tag of every +// group named it. +func (s *littReceiptStore) blockTagCandidates(blockNumber uint64, groups [][][]byte) (map[uint32]littTagRef, error) { + if len(groups) == 0 { + set := make(map[uint32]littTagRef) + err := s.scanTagRange(littTagKindKey(blockNumber, tagKindAddress), littTagKindKey(blockNumber, tagKindAddress+1), set) + return set, err + } + + groupSets := make([]map[uint32]littTagRef, len(groups)) + for gi, group := range groups { + set := make(map[uint32]littTagRef) + for _, tag := range group { + prefix := littTagTagKey(blockNumber, tag) + if err := s.scanTagRange(prefix, prefixSuccessor(prefix), set); err != nil { + return nil, err + } + } + if len(set) == 0 { + return nil, nil // this dimension matched nothing; intersection empty + } + groupSets[gi] = set + } + + // Intersect, seeding from the smallest set to minimize membership checks. + result := groupSets[0] + for _, gs := range groupSets[1:] { + if len(gs) < len(result) { + result, gs = gs, result + } + for txIndex := range result { + if _, ok := gs[txIndex]; !ok { + delete(result, txIndex) + } + } + if len(result) == 0 { + return nil, nil + } + } + return result, nil +} + +// scanTagRange walks [lower, upper) of the tag keyspace, adding every entry's +// txIndex -> ref into dst. +func (s *littReceiptStore) scanTagRange(lower, upper []byte, dst map[uint32]littTagRef) error { + iter, err := s.index.NewIter(&dbtypes.IterOptions{LowerBound: lower, UpperBound: upper}) + if err != nil { + return err + } + defer func() { _ = iter.Close() }() + + for ; iter.Valid(); iter.Next() { + txIndex, ref, err := parseLittTagKey(iter.Key()) + if err != nil { + return err + } + dst[txIndex] = ref + } + return iter.Error() +} + +// candidateBlockLogs point-reads the candidate receipts from litt by tx hash, +// in transaction-index order, and applies the exact matchLog predicate. A +// missing receipt is skipped, not an error: litt TTL GC can reclaim a body +// between the index scan and the read. +func (s *littReceiptStore) candidateBlockLogs(candidates map[uint32]littTagRef, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + txIndexes := make([]uint32, 0, len(candidates)) + for txIndex := range candidates { + txIndexes = append(txIndexes, txIndex) + } + sort.Slice(txIndexes, func(i, j int) bool { return txIndexes[i] < txIndexes[j] }) + + var logs []*ethtypes.Log + for _, txIndex := range txIndexes { + ref := candidates[txIndex] + bz, exists, err := s.receipts.Get(ref.txHash[:]) + if err != nil { + return nil, err + } + if !exists { + continue + } + receipt := &types.Receipt{} + if err := receipt.Unmarshal(bz); err != nil { + return nil, err + } + for _, lg := range getLogsForTx(receipt, uint(ref.firstLogIndex)) { + if matchLog(lg, crit) { + logs = append(logs, lg) + } + } + } + return logs, nil +} diff --git a/sei-db/ledger_db/receipt/littidx_test.go b/sei-db/ledger_db/receipt/littidx_test.go new file mode 100644 index 0000000000..d80bd09ad8 --- /dev/null +++ b/sei-db/ledger_db/receipt/littidx_test.go @@ -0,0 +1,310 @@ +package receipt_test + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/filters" + storetypes "github.com/sei-protocol/sei-chain/sei-cosmos/store/types" + "github.com/sei-protocol/sei-chain/sei-cosmos/testutil" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt" + "github.com/sei-protocol/sei-chain/x/evm/types" + "github.com/stretchr/testify/require" +) + +func setupLittIdx(t *testing.T, dir string) (receipt.ReceiptStore, sdk.Context) { + t.Helper() + storeKey := storetypes.NewKVStoreKey("evm") + tkey := storetypes.NewTransientStoreKey("evm_transient") + ctx := testutil.DefaultContext(storeKey, tkey).WithBlockHeight(1) + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "littidx" + cfg.DBDirectory = dir + cfg.KeepRecent = 0 + store, err := receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + return store, ctx +} + +func litTxHash(block uint64, txIndex uint32) common.Hash { + var h common.Hash + copy(h[:], fmt.Sprintf("tx-%d-%d", block, txIndex)) + return h +} + +// litReceipt builds a receipt with one log carrying the given address and topics. +func litReceipt(block uint64, txIndex uint32, addr common.Address, topics ...common.Hash) receipt.ReceiptRecord { + topicHex := make([]string, 0, len(topics)) + for _, topic := range topics { + topicHex = append(topicHex, topic.Hex()) + } + txHash := litTxHash(block, txIndex) + return receipt.ReceiptRecord{ + TxHash: txHash, + Receipt: &types.Receipt{ + TxHashHex: txHash.Hex(), + BlockNumber: block, + TransactionIndex: txIndex, + GasUsed: 21000, + Logs: []*types.Log{{Address: addr.Hex(), Topics: topicHex, Data: []byte{0xde, 0xad}, Index: 0}}, + }, + } +} + +func writeLitBlock(t *testing.T, store receipt.ReceiptStore, ctx sdk.Context, block uint64, records ...receipt.ReceiptRecord) { + t.Helper() + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(int64(block)), records)) //nolint:gosec // small test heights +} + +func TestLittIdxReadWrite(t *testing.T) { + store, ctx := setupLittIdx(t, t.TempDir()) + defer func() { _ = store.Close() }() + + require.Equal(t, "littidx", receipt.BackendTypeName(store)) + + addr := common.HexToAddress("0xabcd") + topic := common.HexToHash("0x1111") + writeLitBlock(t, store, ctx, 1, litReceipt(1, 0, addr, topic), litReceipt(1, 1, addr, topic)) + writeLitBlock(t, store, ctx, 2, litReceipt(2, 0, addr, topic)) + + for _, e := range []struct { + block uint64 + txIndex uint32 + }{{1, 0}, {1, 1}, {2, 0}} { + rcpt, err := store.GetReceiptFromStore(ctx, litTxHash(e.block, e.txIndex)) + require.NoError(t, err) + require.Equal(t, e.block, rcpt.BlockNumber) + require.Equal(t, e.txIndex, rcpt.TransactionIndex) + } + + _, err := store.GetReceiptFromStore(ctx, litTxHash(9, 9)) + require.ErrorIs(t, err, receipt.ErrNotFound) + require.Equal(t, int64(2), store.LatestVersion()) +} + +func TestLittIdxFilterLogs(t *testing.T) { + store, ctx := setupLittIdx(t, t.TempDir()) + defer func() { _ = store.Close() }() + + addr1 := common.HexToAddress("0x4444444444444444444444444444444444444444") + addr2 := common.HexToAddress("0x5555555555555555555555555555555555555555") + transfer := common.HexToHash("0x1111aa") + approve := common.HexToHash("0x2222bb") + alice := common.HexToHash("0x3333cc") + bob := common.HexToHash("0x4444dd") + + writeLitBlock(t, store, ctx, 1, litReceipt(1, 0, addr1, transfer, alice), litReceipt(1, 1, addr1, transfer, bob)) + writeLitBlock(t, store, ctx, 2, litReceipt(2, 0, addr2, approve, alice)) + writeLitBlock(t, store, ctx, 3, litReceipt(3, 0, addr1, approve, bob)) + + // Address OR: either address matches. + logs, err := store.FilterLogs(ctx, 1, 3, filters.FilterCriteria{Addresses: []common.Address{addr1, addr2}}) + require.NoError(t, err) + require.Len(t, logs, 4) + + // AND across topic positions: transfer at 0 AND alice at 1. + logs, err = store.FilterLogs(ctx, 1, 3, filters.FilterCriteria{Topics: [][]common.Hash{{transfer}, {alice}}}) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, uint64(1), logs[0].BlockNumber) + require.Equal(t, uint(0), logs[0].TxIndex) + + // OR within a topic position: alice or bob at position 1. + logs, err = store.FilterLogs(ctx, 1, 3, filters.FilterCriteria{Topics: [][]common.Hash{{transfer}, {alice, bob}}}) + require.NoError(t, err) + require.Len(t, logs, 2) + + // Wildcard position 0 (empty), bob at position 1. + logs, err = store.FilterLogs(ctx, 1, 3, filters.FilterCriteria{Topics: [][]common.Hash{{}, {bob}}}) + require.NoError(t, err) + require.Len(t, logs, 2) + + // Range bound: only block 2. + logs, err = store.FilterLogs(ctx, 2, 2, filters.FilterCriteria{}) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, uint64(2), logs[0].BlockNumber) + + // No match. + logs, err = store.FilterLogs(ctx, 1, 3, filters.FilterCriteria{Addresses: []common.Address{common.HexToAddress("0x9")}}) + require.NoError(t, err) + require.Empty(t, logs) +} + +// TestLittIdxMultiLogReceipt: a receipt with several logs is numbered with +// contiguous block-wide log indices. +func TestLittIdxMultiLogReceipt(t *testing.T) { + store, ctx := setupLittIdx(t, t.TempDir()) + defer func() { _ = store.Close() }() + + addr := common.HexToAddress("0xbeef") + topic := common.HexToHash("0xfeed") + rec := receipt.ReceiptRecord{ + TxHash: litTxHash(7, 0), + Receipt: &types.Receipt{ + TxHashHex: litTxHash(7, 0).Hex(), BlockNumber: 7, TransactionIndex: 0, + Logs: []*types.Log{ + {Address: addr.Hex(), Topics: []string{topic.Hex()}, Index: 0}, + {Address: addr.Hex(), Topics: []string{topic.Hex()}, Index: 1}, + {Address: addr.Hex(), Topics: []string{topic.Hex()}, Index: 2}, + }, + }, + } + writeLitBlock(t, store, ctx, 7, rec) + + logs, err := store.FilterLogs(ctx, 7, 7, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Len(t, logs, 3) + for i, lg := range logs { + require.Equal(t, uint(i), lg.Index) //nolint:gosec // small test indices + } +} + +// TestLittIdxBlockWideLogIndex verifies firstLogIndex accumulates across +// transactions, so logs are numbered block-wide rather than per-transaction. +func TestLittIdxBlockWideLogIndex(t *testing.T) { + store, ctx := setupLittIdx(t, t.TempDir()) + defer func() { _ = store.Close() }() + + addr := common.HexToAddress("0xabc") + topic := common.HexToHash("0xdef") + mk := func(txIndex uint32, nLogs int) receipt.ReceiptRecord { + logs := make([]*types.Log, nLogs) + for i := range logs { + logs[i] = &types.Log{Address: addr.Hex(), Topics: []string{topic.Hex()}, Index: uint32(i)} //nolint:gosec // small test indices + } + txHash := litTxHash(8, txIndex) + return receipt.ReceiptRecord{ + TxHash: txHash, + Receipt: &types.Receipt{TxHashHex: txHash.Hex(), BlockNumber: 8, TransactionIndex: txIndex, Logs: logs}, + } + } + // tx0: 2 logs, tx1: 1 log, tx2: 3 logs -> contiguous block-wide indices 0..5. + writeLitBlock(t, store, ctx, 8, mk(0, 2), mk(1, 1), mk(2, 3)) + + logs, err := store.FilterLogs(ctx, 8, 8, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Len(t, logs, 6) + for i, lg := range logs { + require.Equal(t, uint(i), lg.Index, "log %d block-wide index", i) //nolint:gosec // small test indices + } + // Index ranges map back to the owning transaction in tx order. + require.Equal(t, []uint{0, 0, 1, 2, 2, 2}, []uint{ + logs[0].TxIndex, logs[1].TxIndex, logs[2].TxIndex, logs[3].TxIndex, logs[4].TxIndex, logs[5].TxIndex, + }) +} + +// TestLittIdxMultiPart writes one block across two SetReceipts calls (the +// legacy-migration shape that produces multiple litt parts) and confirms both +// parts' receipts and logs are visible. +func TestLittIdxMultiPart(t *testing.T) { + store, ctx := setupLittIdx(t, t.TempDir()) + defer func() { _ = store.Close() }() + + addr := common.HexToAddress("0xabcd") + topic := common.HexToHash("0x1111") + // Two separate writes for block 4 -> part 0 then part 1. + writeLitBlock(t, store, ctx, 4, litReceipt(4, 0, addr, topic)) + writeLitBlock(t, store, ctx, 4, litReceipt(4, 1, addr, topic)) + + for _, txIndex := range []uint32{0, 1} { + rcpt, err := store.GetReceiptFromStore(ctx, litTxHash(4, txIndex)) + require.NoError(t, err) + require.Equal(t, uint64(4), rcpt.BlockNumber) + require.Equal(t, txIndex, rcpt.TransactionIndex) + } + + logs, err := store.FilterLogs(ctx, 4, 4, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Len(t, logs, 2) +} + +// TestLittIdxLegacyFallback covers GetReceipt falling back to the legacy KV +// store for a receipt that predates this backend (absent from litt). +func TestLittIdxLegacyFallback(t *testing.T) { + storeKey := storetypes.NewKVStoreKey("evm") + tkey := storetypes.NewTransientStoreKey("evm_transient") + ctx := testutil.DefaultContext(storeKey, tkey).WithBlockHeight(1) + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "littidx" + cfg.DBDirectory = t.TempDir() + store, err := receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + defer func() { _ = store.Close() }() + + txHash := litTxHash(99, 0) + legacy := &types.Receipt{TxHashHex: txHash.Hex(), BlockNumber: 99, TransactionIndex: 0} + bz, err := legacy.Marshal() + require.NoError(t, err) + ctx.KVStore(storeKey).Set(types.ReceiptKey(txHash), bz) + + // Not in litt, so the store-only read misses... + _, err = store.GetReceiptFromStore(ctx, txHash) + require.ErrorIs(t, err, receipt.ErrNotFound) + + // ...but GetReceipt falls back to the legacy KV store. + rcpt, err := store.GetReceipt(ctx, txHash) + require.NoError(t, err) + require.Equal(t, uint64(99), rcpt.BlockNumber) +} + +func TestLittIdxReopen(t *testing.T) { + dir := t.TempDir() + store, ctx := setupLittIdx(t, dir) + + addr := common.HexToAddress("0xc0de") + topic := common.HexToHash("0xdead") + for block := uint64(1); block <= 5; block++ { + writeLitBlock(t, store, ctx, block, litReceipt(block, 0, addr, topic)) + } + require.NoError(t, store.Close()) + + store, ctx = setupLittIdx(t, dir) + defer func() { _ = store.Close() }() + require.Equal(t, int64(5), store.LatestVersion()) + + rcpt, err := store.GetReceiptFromStore(ctx, litTxHash(3, 0)) + require.NoError(t, err) + require.Equal(t, uint64(3), rcpt.BlockNumber) + + logs, err := store.FilterLogs(ctx, 1, 5, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Len(t, logs, 5) +} + +func TestLittIdxPrune(t *testing.T) { + dir := t.TempDir() + store, ctx := setupLittIdx(t, dir) + defer func() { _ = store.Close() }() + + addr := common.HexToAddress("0x6666") + topic := common.HexToHash("0xeeee") + for block := uint64(1); block <= 10; block++ { + writeLitBlock(t, store, ctx, block, litReceipt(block, 0, addr, topic)) + } + + require.NoError(t, receipt.PruneLittIdx(store, 6)) + require.Equal(t, int64(6), store.EarliestVersion()) + + // Pruned blocks are invisible (tag entries deleted, read floor enforced). + for block := uint64(1); block <= 5; block++ { + _, err := store.GetReceiptFromStore(ctx, litTxHash(block, 0)) + require.ErrorIs(t, err, receipt.ErrNotFound, "block %d should be pruned", block) + } + logs, err := store.FilterLogs(ctx, 1, 5, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Empty(t, logs) + + // Retained blocks unaffected. + for block := uint64(6); block <= 10; block++ { + rcpt, err := store.GetReceiptFromStore(ctx, litTxHash(block, 0)) + require.NoError(t, err) + require.Equal(t, block, rcpt.BlockNumber) + } + logs, err = store.FilterLogs(ctx, 1, 10, filters.FilterCriteria{Addresses: []common.Address{addr}}) + require.NoError(t, err) + require.Len(t, logs, 5) +} diff --git a/sei-db/ledger_db/receipt/receipt_store.go b/sei-db/ledger_db/receipt/receipt_store.go index c9c3ed8037..27d7646b4b 100644 --- a/sei-db/ledger_db/receipt/receipt_store.go +++ b/sei-db/ledger_db/receipt/receipt_store.go @@ -107,6 +107,8 @@ func BackendTypeName(store ReceiptStore) string { switch store.(type) { case *receiptStore: return receiptBackendPebble + case *littReceiptStore: + return receiptBackendLittIdx default: return "unknown" } @@ -119,6 +121,8 @@ func newReceiptBackend(config dbconfig.ReceiptStoreConfig, storeKey sdk.StoreKey backend := normalizeReceiptBackend(config.Backend) switch backend { + case receiptBackendLittIdx: + return newLittReceiptStore(config, storeKey) case receiptBackendPebble: ssConfig := dbconfig.DefaultStateStoreConfig() ssConfig.DBDirectory = config.DBDirectory