Skip to content

feat: use datastore directly for state store #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-kit/kit v0.13.0
github.com/gorilla/rpc v1.2.1
github.com/hashicorp/go-metrics v0.5.4
github.com/ipfs/go-datastore v0.8.2
github.com/rollkit/rollkit v0.14.2-0.20250422111549-9f2f92ea5c6e
github.com/rollkit/rollkit/core v0.0.0-20250422111549-9f2f92ea5c6e
github.com/rollkit/rollkit/da v0.0.0-20250422111549-9f2f92ea5c6e
Expand Down Expand Up @@ -128,7 +129,6 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/boxo v0.27.4 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/ipfs/go-datastore v0.8.2 // indirect
github.com/ipfs/go-ds-badger4 v0.1.8 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
Expand Down
64 changes: 28 additions & 36 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
cmtypes "github.com/cometbft/cometbft/types"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types"
ds "github.com/ipfs/go-datastore"

"github.com/rollkit/rollkit/core/execution"
rollkitp2p "github.com/rollkit/rollkit/pkg/p2p"
"github.com/rollkit/rollkit/pkg/store"
rstore "github.com/rollkit/rollkit/pkg/store"

"github.com/rollkit/go-execution-abci/pkg/p2p"
)
Expand All @@ -37,10 +38,11 @@ func LoadGenesisDoc(cfg *config.Config) (*cmtypes.GenesisDoc, error) {

// Adapter is a struct that will contain an ABCI Application, and will implement the go-execution interface
type Adapter struct {
App servertypes.ABCI
Store store.Store
Mempool mempool.Mempool
MempoolIDs *mempoolIDs
App servertypes.ABCI
Store *Store
RollkitStore rstore.Store
Mempool mempool.Mempool
MempoolIDs *mempoolIDs

P2PClient *rollkitp2p.Client
TxGossiper *p2p.Gossiper
Expand All @@ -58,7 +60,7 @@ type Adapter struct {
// The Adapter wraps the provided ABCI application and delegates execution-related operations to it.
func NewABCIExecutor(
app servertypes.ABCI,
store store.Store,
store ds.Batching,
p2pClient *rollkitp2p.Client,
p2pMetrics *rollkitp2p.Metrics,
logger log.Logger,
Expand All @@ -69,16 +71,22 @@ func NewABCIExecutor(
if metrics == nil {
metrics = NopMetrics()
}

// Create a new Store with ABCI prefix
abciStore := NewStore(store)
rollkitStore := rstore.New(abciStore)

a := &Adapter{
App: app,
Store: store,
Logger: logger,
P2PClient: p2pClient,
p2pMetrics: p2pMetrics,
CometCfg: cfg,
AppGenesis: appGenesis,
MempoolIDs: newMempoolIDs(),
Metrics: metrics,
App: app,
Store: abciStore,
RollkitStore: rollkitStore,
Logger: logger,
P2PClient: p2pClient,
p2pMetrics: p2pMetrics,
CometCfg: cfg,
AppGenesis: appGenesis,
MempoolIDs: newMempoolIDs(),
Metrics: metrics,
}

return a
Expand Down Expand Up @@ -240,7 +248,7 @@ func (a *Adapter) InitChain(ctx context.Context, genesisTime time.Time, initialH
s.LastHeightConsensusParamsChanged = int64(initialHeight)
s.LastHeightValidatorsChanged = int64(initialHeight)

if err := a.SaveState(ctx, s); err != nil {
if err := a.Store.SaveState(ctx, s); err != nil {
return nil, 0, fmt.Errorf("failed to save initial state: %w", err)
}

Expand All @@ -257,7 +265,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint
a.Logger.Info("Executing block", "height", blockHeight, "num_txs", len(txs), "timestamp", timestamp)
a.Metrics.TxsExecutedPerBlock.Observe(float64(len(txs)))

s, err := a.LoadState(ctx)
s, err := a.Store.LoadState(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to load state: %w", err)
}
Expand Down Expand Up @@ -332,7 +340,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint
s.NextValidators = nValSet.CopyIncrementProposerPriority(1)
s.LastHeightValidatorsChanged = lastHeightValsChanged

if err := a.SaveState(ctx, s); err != nil {
if err := a.Store.SaveState(ctx, s); err != nil {
return nil, 0, fmt.Errorf("failed to save state: %w", err)
}

Expand Down Expand Up @@ -375,7 +383,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
}()
a.Logger.Debug("Getting transactions for proposal")

s, err := a.LoadState(ctx)
s, err := a.Store.LoadState(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load state for GetTxs: %w", err)
}
Expand All @@ -391,7 +399,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
txsBytes[i] = tx
}

currentHeight, err := a.Store.Height(ctx)
currentHeight, err := a.RollkitStore.Height(ctx)
if err != nil {
return nil, err
}
Comment on lines +402 to 405
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against nil RollkitStore before calling Height

Even with the constructor fix, defensive code is advisable:

- currentHeight, err := a.RollkitStore.Height(ctx)
+ if a.RollkitStore == nil {
+     return nil, fmt.Errorf("RollkitStore not initialised")
+ }
+ currentHeight, err := a.RollkitStore.Height(ctx)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
currentHeight, err := a.RollkitStore.Height(ctx)
if err != nil {
return nil, err
}
if a.RollkitStore == nil {
return nil, fmt.Errorf("RollkitStore not initialised")
}
currentHeight, err := a.RollkitStore.Height(ctx)
if err != nil {
return nil, err
}

Expand All @@ -416,19 +424,3 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
func (a *Adapter) SetFinal(ctx context.Context, blockHeight uint64) error {
return nil
}

func NewAdapter(store store.Store) *Adapter {
return &Adapter{
Store: store,
}
}

// LoadState loads the state from disk
func (a *Adapter) LoadState(ctx context.Context) (*cmtstate.State, error) {
return loadState(ctx, a.Store)
}

// SaveState saves the state to disk
func (a *Adapter) SaveState(ctx context.Context, state *cmtstate.State) error {
return saveState(ctx, a.Store, state)
}
35 changes: 27 additions & 8 deletions pkg/adapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,34 @@ import (
cmtstateproto "github.com/cometbft/cometbft/proto/tendermint/state"
cmtstate "github.com/cometbft/cometbft/state"
proto "github.com/cosmos/gogoproto/proto"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
)

"github.com/rollkit/rollkit/pkg/store"
const (
// KeyPrefix is the prefix used for all ABCI-related keys in the datastore
KeyPrefix = "abci"
// stateKey is the key used for storing state
stateKey = "s"
)

const stateKey = "abci-s"
// Store wraps a datastore with ABCI-specific functionality
type Store struct {
ds.Batching
}

// NewStore creates a new Store with the ABCI prefix
func NewStore(store ds.Batching) *Store {
return &Store{
Batching: kt.Wrap(store, &kt.PrefixTransform{
Prefix: ds.NewKey(KeyPrefix),
}),
}
}

// loadState loads the state from disk
func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) {
data, err := s.GetMetadata(ctx, stateKey)
// LoadState loads the state from disk
func (s *Store) LoadState(ctx context.Context) (*cmtstate.State, error) {
data, err := s.Get(ctx, ds.NewKey(stateKey))
if err != nil {
return nil, fmt.Errorf("failed to get state metadata: %w", err)
}
Expand All @@ -31,8 +50,8 @@ func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) {
return cmtstate.FromProto(stateProto)
}

// saveState saves the state to disk
func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error {
// SaveState saves the state to disk
func (s *Store) SaveState(ctx context.Context, state *cmtstate.State) error {
stateProto, err := state.ToProto()
if err != nil {
return fmt.Errorf("failed to convert state to proto: %w", err)
Expand All @@ -43,5 +62,5 @@ func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error
return fmt.Errorf("failed to marshal state: %w", err)
}

return s.SetMetadata(ctx, stateKey, data)
return s.Put(ctx, ds.NewKey(stateKey), data)
}
16 changes: 8 additions & 8 deletions pkg/rpc/provider/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu
default:
heightValue = p.normalizeHeight(height)
}
header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue)
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue)
if err != nil {
return nil, err
}
Expand All @@ -65,7 +65,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu

// BlockByHash implements client.CometRPC.
func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) {
header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (p *RpcProvider) BlockResults(ctx context.Context, height *int64) (*coretyp
// Commit implements client.CometRPC.
func (p *RpcProvider) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) {
heightValue := p.normalizeHeight(height)
header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue)
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you open an issue, this block structure is not the one we want to use. the header is not compatible with ibc here

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes)
// decoding logic in the HTTP service will correctly translate from JSON.
// See https://github.com/cometbft/cometbft/issues/6802 for context.

header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
if err != nil {
return nil, err
}
Expand All @@ -179,7 +179,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes)
func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) {
const limit int64 = 20 // Default limit used in the original code

height, err := p.adapter.Store.Height(ctx)
height, err := p.adapter.RollkitStore.Height(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current height: %w", err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHe

// Re-fetch height in case new blocks were added during the loop?
// The original code did this.
finalHeight, err := p.adapter.Store.Height(ctx)
finalHeight, err := p.adapter.RollkitStore.Height(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get final height: %w", err)
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func (p *RpcProvider) BlockSearch(ctx context.Context, query string, pagePtr *in
apiResults := make([]*coretypes.ResultBlock, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
height := uint64(results[i])
header, data, err := p.adapter.Store.GetBlockData(ctx, height)
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, height)
if err != nil {
// If a block referenced by indexer is missing, should we error out or just skip?
// For now, error out.
Expand Down Expand Up @@ -307,7 +307,7 @@ func (p *RpcProvider) Validators(ctx context.Context, heightPtr *int64, pagePtr
// depending on state pruning. The current implementation implicitly loads latest state.
height := p.normalizeHeight(heightPtr)

s, err := p.adapter.LoadState(ctx) // Loads the *latest* state
s, err := p.adapter.Store.LoadState(ctx) // Loads the *latest* state
if err != nil {
return nil, fmt.Errorf("failed to load current state: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpc/provider/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (p *RpcProvider) Status(ctx context.Context) (*coretypes.ResultStatus, erro
return nil, err
}

s, err := p.adapter.LoadState(ctx)
s, err := p.adapter.Store.LoadState(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (p *RpcProvider) Health(context.Context) (*coretypes.ResultHealth, error) {

// ConsensusParams implements client.Client.
func (p *RpcProvider) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) {
state, err := p.adapter.LoadState(ctx)
state, err := p.adapter.Store.LoadState(ctx)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
if height == nil {
var err error
// TODO: Decide how to handle context here. Using background for now.
heightValue, err = p.adapter.Store.Height(context.Background())
heightValue, err = p.adapter.RollkitStore.Height(context.Background())
if err != nil {
// TODO: Consider logging or returning error
p.logger.Error("Failed to get current height in normalizeHeight", "err", err)
Expand All @@ -61,7 +61,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
// Currently, just treat them as 0 or latest, adjust as needed.
// For now, let's assume negative height means latest valid height.
var err error
heightValue, err = p.adapter.Store.Height(context.Background())
heightValue, err = p.adapter.RollkitStore.Height(context.Background())
if err != nil {
p.logger.Error("Failed to get current height for negative height in normalizeHeight", "err", err)
return 0
Expand All @@ -74,7 +74,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
}

func (p *RpcProvider) getBlockMeta(ctx context.Context, n uint64) *cmtypes.BlockMeta {
header, data, err := p.adapter.Store.GetBlockData(ctx, n)
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, n)
if err != nil {
p.logger.Error("Failed to get block data in getBlockMeta", "height", n, "err", err)
return nil
Expand Down
6 changes: 2 additions & 4 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,9 @@ func startNode(
adapterMetrics = adapter.PrometheusMetrics(config.DefaultInstrumentationConfig().Namespace, "chain_id", cmtGenDoc.ChainID)
}

st := store.New(database)

executor = adapter.NewABCIExecutor(
app,
st,
database,
p2pClient,
p2pMetrics,
logger,
Expand All @@ -379,7 +377,7 @@ func startNode(
panic(err)
}

height, err := st.Height(context.Background())
height, err := executor.RollkitStore.Height(context.Background())
if err != nil {
return nil, nil, nil, cleanupFn, err
}
Expand Down