diff --git a/go.mod b/go.mod index 7a8deb3..335ca22 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-kit/kit v0.13.0 github.com/gorilla/rpc v1.2.1 github.com/hashicorp/go-metrics v0.5.4 + github.com/ipfs/go-datastore v0.8.2 github.com/rollkit/rollkit v0.14.2-0.20250422111549-9f2f92ea5c6e github.com/rollkit/rollkit/core v0.0.0-20250422111549-9f2f92ea5c6e github.com/rollkit/rollkit/da v0.0.0-20250422111549-9f2f92ea5c6e @@ -128,7 +129,6 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/boxo v0.27.4 // indirect github.com/ipfs/go-cid v0.5.0 // indirect - github.com/ipfs/go-datastore v0.8.2 // indirect github.com/ipfs/go-ds-badger4 v0.1.8 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 57941ae..c0384c7 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -15,10 +15,11 @@ import ( cmtypes "github.com/cometbft/cometbft/types" servertypes "github.com/cosmos/cosmos-sdk/server/types" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" + ds "github.com/ipfs/go-datastore" "github.com/rollkit/rollkit/core/execution" rollkitp2p "github.com/rollkit/rollkit/pkg/p2p" - "github.com/rollkit/rollkit/pkg/store" + rstore "github.com/rollkit/rollkit/pkg/store" "github.com/rollkit/go-execution-abci/pkg/p2p" ) @@ -37,10 +38,11 @@ func LoadGenesisDoc(cfg *config.Config) (*cmtypes.GenesisDoc, error) { // Adapter is a struct that will contain an ABCI Application, and will implement the go-execution interface type Adapter struct { - App servertypes.ABCI - Store store.Store - Mempool mempool.Mempool - MempoolIDs *mempoolIDs + App servertypes.ABCI + Store *Store + RollkitStore rstore.Store + Mempool mempool.Mempool + MempoolIDs *mempoolIDs P2PClient *rollkitp2p.Client TxGossiper *p2p.Gossiper @@ -58,7 +60,7 @@ type Adapter struct { // The Adapter wraps the provided ABCI application and delegates execution-related operations to it. func NewABCIExecutor( app servertypes.ABCI, - store store.Store, + store ds.Batching, p2pClient *rollkitp2p.Client, p2pMetrics *rollkitp2p.Metrics, logger log.Logger, @@ -69,16 +71,22 @@ func NewABCIExecutor( if metrics == nil { metrics = NopMetrics() } + + // Create a new Store with ABCI prefix + abciStore := NewStore(store) + rollkitStore := rstore.New(abciStore) + a := &Adapter{ - App: app, - Store: store, - Logger: logger, - P2PClient: p2pClient, - p2pMetrics: p2pMetrics, - CometCfg: cfg, - AppGenesis: appGenesis, - MempoolIDs: newMempoolIDs(), - Metrics: metrics, + App: app, + Store: abciStore, + RollkitStore: rollkitStore, + Logger: logger, + P2PClient: p2pClient, + p2pMetrics: p2pMetrics, + CometCfg: cfg, + AppGenesis: appGenesis, + MempoolIDs: newMempoolIDs(), + Metrics: metrics, } return a @@ -240,7 +248,7 @@ func (a *Adapter) InitChain(ctx context.Context, genesisTime time.Time, initialH s.LastHeightConsensusParamsChanged = int64(initialHeight) s.LastHeightValidatorsChanged = int64(initialHeight) - if err := a.SaveState(ctx, s); err != nil { + if err := a.Store.SaveState(ctx, s); err != nil { return nil, 0, fmt.Errorf("failed to save initial state: %w", err) } @@ -257,7 +265,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint a.Logger.Info("Executing block", "height", blockHeight, "num_txs", len(txs), "timestamp", timestamp) a.Metrics.TxsExecutedPerBlock.Observe(float64(len(txs))) - s, err := a.LoadState(ctx) + s, err := a.Store.LoadState(ctx) if err != nil { return nil, 0, fmt.Errorf("failed to load state: %w", err) } @@ -332,7 +340,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint s.NextValidators = nValSet.CopyIncrementProposerPriority(1) s.LastHeightValidatorsChanged = lastHeightValsChanged - if err := a.SaveState(ctx, s); err != nil { + if err := a.Store.SaveState(ctx, s); err != nil { return nil, 0, fmt.Errorf("failed to save state: %w", err) } @@ -375,7 +383,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) { }() a.Logger.Debug("Getting transactions for proposal") - s, err := a.LoadState(ctx) + s, err := a.Store.LoadState(ctx) if err != nil { return nil, fmt.Errorf("failed to load state for GetTxs: %w", err) } @@ -391,7 +399,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) { txsBytes[i] = tx } - currentHeight, err := a.Store.Height(ctx) + currentHeight, err := a.RollkitStore.Height(ctx) if err != nil { return nil, err } @@ -416,19 +424,3 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) { func (a *Adapter) SetFinal(ctx context.Context, blockHeight uint64) error { return nil } - -func NewAdapter(store store.Store) *Adapter { - return &Adapter{ - Store: store, - } -} - -// LoadState loads the state from disk -func (a *Adapter) LoadState(ctx context.Context) (*cmtstate.State, error) { - return loadState(ctx, a.Store) -} - -// SaveState saves the state to disk -func (a *Adapter) SaveState(ctx context.Context, state *cmtstate.State) error { - return saveState(ctx, a.Store, state) -} diff --git a/pkg/adapter/store.go b/pkg/adapter/store.go index 8f96ea3..e02f354 100644 --- a/pkg/adapter/store.go +++ b/pkg/adapter/store.go @@ -7,15 +7,34 @@ import ( cmtstateproto "github.com/cometbft/cometbft/proto/tendermint/state" cmtstate "github.com/cometbft/cometbft/state" proto "github.com/cosmos/gogoproto/proto" + ds "github.com/ipfs/go-datastore" + kt "github.com/ipfs/go-datastore/keytransform" +) - "github.com/rollkit/rollkit/pkg/store" +const ( + // KeyPrefix is the prefix used for all ABCI-related keys in the datastore + KeyPrefix = "abci" + // stateKey is the key used for storing state + stateKey = "s" ) -const stateKey = "abci-s" +// Store wraps a datastore with ABCI-specific functionality +type Store struct { + ds.Batching +} + +// NewStore creates a new Store with the ABCI prefix +func NewStore(store ds.Batching) *Store { + return &Store{ + Batching: kt.Wrap(store, &kt.PrefixTransform{ + Prefix: ds.NewKey(KeyPrefix), + }), + } +} -// loadState loads the state from disk -func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) { - data, err := s.GetMetadata(ctx, stateKey) +// LoadState loads the state from disk +func (s *Store) LoadState(ctx context.Context) (*cmtstate.State, error) { + data, err := s.Get(ctx, ds.NewKey(stateKey)) if err != nil { return nil, fmt.Errorf("failed to get state metadata: %w", err) } @@ -31,8 +50,8 @@ func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) { return cmtstate.FromProto(stateProto) } -// saveState saves the state to disk -func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error { +// SaveState saves the state to disk +func (s *Store) SaveState(ctx context.Context, state *cmtstate.State) error { stateProto, err := state.ToProto() if err != nil { return fmt.Errorf("failed to convert state to proto: %w", err) @@ -43,5 +62,5 @@ func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error return fmt.Errorf("failed to marshal state: %w", err) } - return s.SetMetadata(ctx, stateKey, data) + return s.Put(ctx, ds.NewKey(stateKey), data) } diff --git a/pkg/rpc/provider/blockchain.go b/pkg/rpc/provider/blockchain.go index 7d9e078..3709e22 100644 --- a/pkg/rpc/provider/blockchain.go +++ b/pkg/rpc/provider/blockchain.go @@ -41,7 +41,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu default: heightValue = p.normalizeHeight(height) } - header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue) + header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue) if err != nil { return nil, err } @@ -65,7 +65,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu // BlockByHash implements client.CometRPC. func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) { - header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types + header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types if err != nil { return nil, err } @@ -120,7 +120,7 @@ func (p *RpcProvider) BlockResults(ctx context.Context, height *int64) (*coretyp // Commit implements client.CometRPC. func (p *RpcProvider) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) { heightValue := p.normalizeHeight(height) - header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue) + header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue) if err != nil { return nil, err } @@ -156,7 +156,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes) // decoding logic in the HTTP service will correctly translate from JSON. // See https://github.com/cometbft/cometbft/issues/6802 for context. - header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types + header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types if err != nil { return nil, err } @@ -179,7 +179,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes) func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) { const limit int64 = 20 // Default limit used in the original code - height, err := p.adapter.Store.Height(ctx) + height, err := p.adapter.RollkitStore.Height(ctx) if err != nil { return nil, fmt.Errorf("failed to get current height: %w", err) } @@ -211,7 +211,7 @@ func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHe // Re-fetch height in case new blocks were added during the loop? // The original code did this. - finalHeight, err := p.adapter.Store.Height(ctx) + finalHeight, err := p.adapter.RollkitStore.Height(ctx) if err != nil { return nil, fmt.Errorf("failed to get final height: %w", err) } @@ -274,7 +274,7 @@ func (p *RpcProvider) BlockSearch(ctx context.Context, query string, pagePtr *in apiResults := make([]*coretypes.ResultBlock, 0, pageSize) for i := skipCount; i < skipCount+pageSize; i++ { height := uint64(results[i]) - header, data, err := p.adapter.Store.GetBlockData(ctx, height) + header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, height) if err != nil { // If a block referenced by indexer is missing, should we error out or just skip? // For now, error out. @@ -307,7 +307,7 @@ func (p *RpcProvider) Validators(ctx context.Context, heightPtr *int64, pagePtr // depending on state pruning. The current implementation implicitly loads latest state. height := p.normalizeHeight(heightPtr) - s, err := p.adapter.LoadState(ctx) // Loads the *latest* state + s, err := p.adapter.Store.LoadState(ctx) // Loads the *latest* state if err != nil { return nil, fmt.Errorf("failed to load current state: %w", err) } diff --git a/pkg/rpc/provider/info.go b/pkg/rpc/provider/info.go index c5a956c..752226c 100644 --- a/pkg/rpc/provider/info.go +++ b/pkg/rpc/provider/info.go @@ -17,7 +17,7 @@ func (p *RpcProvider) Status(ctx context.Context) (*coretypes.ResultStatus, erro return nil, err } - s, err := p.adapter.LoadState(ctx) + s, err := p.adapter.Store.LoadState(ctx) if err != nil { return nil, err } @@ -109,7 +109,7 @@ func (p *RpcProvider) Health(context.Context) (*coretypes.ResultHealth, error) { // ConsensusParams implements client.Client. func (p *RpcProvider) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) { - state, err := p.adapter.LoadState(ctx) + state, err := p.adapter.Store.LoadState(ctx) if err != nil { return nil, err } diff --git a/pkg/rpc/provider/provider.go b/pkg/rpc/provider/provider.go index f922b9e..31a7591 100644 --- a/pkg/rpc/provider/provider.go +++ b/pkg/rpc/provider/provider.go @@ -50,7 +50,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 { if height == nil { var err error // TODO: Decide how to handle context here. Using background for now. - heightValue, err = p.adapter.Store.Height(context.Background()) + heightValue, err = p.adapter.RollkitStore.Height(context.Background()) if err != nil { // TODO: Consider logging or returning error p.logger.Error("Failed to get current height in normalizeHeight", "err", err) @@ -61,7 +61,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 { // Currently, just treat them as 0 or latest, adjust as needed. // For now, let's assume negative height means latest valid height. var err error - heightValue, err = p.adapter.Store.Height(context.Background()) + heightValue, err = p.adapter.RollkitStore.Height(context.Background()) if err != nil { p.logger.Error("Failed to get current height for negative height in normalizeHeight", "err", err) return 0 @@ -74,7 +74,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 { } func (p *RpcProvider) getBlockMeta(ctx context.Context, n uint64) *cmtypes.BlockMeta { - header, data, err := p.adapter.Store.GetBlockData(ctx, n) + header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, n) if err != nil { p.logger.Error("Failed to get block data in getBlockMeta", "height", n, "err", err) return nil diff --git a/server/start.go b/server/start.go index dca91f5..6f69bfb 100644 --- a/server/start.go +++ b/server/start.go @@ -358,11 +358,9 @@ func startNode( adapterMetrics = adapter.PrometheusMetrics(config.DefaultInstrumentationConfig().Namespace, "chain_id", cmtGenDoc.ChainID) } - st := store.New(database) - executor = adapter.NewABCIExecutor( app, - st, + database, p2pClient, p2pMetrics, logger, @@ -379,7 +377,7 @@ func startNode( panic(err) } - height, err := st.Height(context.Background()) + height, err := executor.RollkitStore.Height(context.Background()) if err != nil { return nil, nil, nil, cleanupFn, err }