Skip to content

Commit d62bf75

Browse files
feat: use datastore directly for state store (#66)
* feat: use datastore directly for state store * missing commit * added prefix * lint * lint
1 parent d632794 commit d62bf75

File tree

7 files changed

+71
-62
lines changed

7 files changed

+71
-62
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/go-kit/kit v0.13.0
1111
github.com/gorilla/rpc v1.2.1
1212
github.com/hashicorp/go-metrics v0.5.4
13+
github.com/ipfs/go-datastore v0.8.2
1314
github.com/rollkit/rollkit v0.14.2-0.20250422111549-9f2f92ea5c6e
1415
github.com/rollkit/rollkit/core v0.0.0-20250422111549-9f2f92ea5c6e
1516
github.com/rollkit/rollkit/da v0.0.0-20250422111549-9f2f92ea5c6e
@@ -128,7 +129,6 @@ require (
128129
github.com/inconshreveable/mousetrap v1.1.0 // indirect
129130
github.com/ipfs/boxo v0.27.4 // indirect
130131
github.com/ipfs/go-cid v0.5.0 // indirect
131-
github.com/ipfs/go-datastore v0.8.2 // indirect
132132
github.com/ipfs/go-ds-badger4 v0.1.8 // indirect
133133
github.com/ipfs/go-log v1.0.5 // indirect
134134
github.com/ipfs/go-log/v2 v2.5.1 // indirect

pkg/adapter/adapter.go

+28-36
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515
cmtypes "github.com/cometbft/cometbft/types"
1616
servertypes "github.com/cosmos/cosmos-sdk/server/types"
1717
genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types"
18+
ds "github.com/ipfs/go-datastore"
1819

1920
"github.com/rollkit/rollkit/core/execution"
2021
rollkitp2p "github.com/rollkit/rollkit/pkg/p2p"
21-
"github.com/rollkit/rollkit/pkg/store"
22+
rstore "github.com/rollkit/rollkit/pkg/store"
2223

2324
"github.com/rollkit/go-execution-abci/pkg/p2p"
2425
)
@@ -37,10 +38,11 @@ func LoadGenesisDoc(cfg *config.Config) (*cmtypes.GenesisDoc, error) {
3738

3839
// Adapter is a struct that will contain an ABCI Application, and will implement the go-execution interface
3940
type Adapter struct {
40-
App servertypes.ABCI
41-
Store store.Store
42-
Mempool mempool.Mempool
43-
MempoolIDs *mempoolIDs
41+
App servertypes.ABCI
42+
Store *Store
43+
RollkitStore rstore.Store
44+
Mempool mempool.Mempool
45+
MempoolIDs *mempoolIDs
4446

4547
P2PClient *rollkitp2p.Client
4648
TxGossiper *p2p.Gossiper
@@ -58,7 +60,7 @@ type Adapter struct {
5860
// The Adapter wraps the provided ABCI application and delegates execution-related operations to it.
5961
func NewABCIExecutor(
6062
app servertypes.ABCI,
61-
store store.Store,
63+
store ds.Batching,
6264
p2pClient *rollkitp2p.Client,
6365
p2pMetrics *rollkitp2p.Metrics,
6466
logger log.Logger,
@@ -69,16 +71,22 @@ func NewABCIExecutor(
6971
if metrics == nil {
7072
metrics = NopMetrics()
7173
}
74+
75+
// Create a new Store with ABCI prefix
76+
abciStore := NewStore(store)
77+
rollkitStore := rstore.New(abciStore)
78+
7279
a := &Adapter{
73-
App: app,
74-
Store: store,
75-
Logger: logger,
76-
P2PClient: p2pClient,
77-
p2pMetrics: p2pMetrics,
78-
CometCfg: cfg,
79-
AppGenesis: appGenesis,
80-
MempoolIDs: newMempoolIDs(),
81-
Metrics: metrics,
80+
App: app,
81+
Store: abciStore,
82+
RollkitStore: rollkitStore,
83+
Logger: logger,
84+
P2PClient: p2pClient,
85+
p2pMetrics: p2pMetrics,
86+
CometCfg: cfg,
87+
AppGenesis: appGenesis,
88+
MempoolIDs: newMempoolIDs(),
89+
Metrics: metrics,
8290
}
8391

8492
return a
@@ -240,7 +248,7 @@ func (a *Adapter) InitChain(ctx context.Context, genesisTime time.Time, initialH
240248
s.LastHeightConsensusParamsChanged = int64(initialHeight)
241249
s.LastHeightValidatorsChanged = int64(initialHeight)
242250

243-
if err := a.SaveState(ctx, s); err != nil {
251+
if err := a.Store.SaveState(ctx, s); err != nil {
244252
return nil, 0, fmt.Errorf("failed to save initial state: %w", err)
245253
}
246254

@@ -257,7 +265,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint
257265
a.Logger.Info("Executing block", "height", blockHeight, "num_txs", len(txs), "timestamp", timestamp)
258266
a.Metrics.TxsExecutedPerBlock.Observe(float64(len(txs)))
259267

260-
s, err := a.LoadState(ctx)
268+
s, err := a.Store.LoadState(ctx)
261269
if err != nil {
262270
return nil, 0, fmt.Errorf("failed to load state: %w", err)
263271
}
@@ -332,7 +340,7 @@ func (a *Adapter) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint
332340
s.NextValidators = nValSet.CopyIncrementProposerPriority(1)
333341
s.LastHeightValidatorsChanged = lastHeightValsChanged
334342

335-
if err := a.SaveState(ctx, s); err != nil {
343+
if err := a.Store.SaveState(ctx, s); err != nil {
336344
return nil, 0, fmt.Errorf("failed to save state: %w", err)
337345
}
338346

@@ -375,7 +383,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
375383
}()
376384
a.Logger.Debug("Getting transactions for proposal")
377385

378-
s, err := a.LoadState(ctx)
386+
s, err := a.Store.LoadState(ctx)
379387
if err != nil {
380388
return nil, fmt.Errorf("failed to load state for GetTxs: %w", err)
381389
}
@@ -391,7 +399,7 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
391399
txsBytes[i] = tx
392400
}
393401

394-
currentHeight, err := a.Store.Height(ctx)
402+
currentHeight, err := a.RollkitStore.Height(ctx)
395403
if err != nil {
396404
return nil, err
397405
}
@@ -416,19 +424,3 @@ func (a *Adapter) GetTxs(ctx context.Context) ([][]byte, error) {
416424
func (a *Adapter) SetFinal(ctx context.Context, blockHeight uint64) error {
417425
return nil
418426
}
419-
420-
func NewAdapter(store store.Store) *Adapter {
421-
return &Adapter{
422-
Store: store,
423-
}
424-
}
425-
426-
// LoadState loads the state from disk
427-
func (a *Adapter) LoadState(ctx context.Context) (*cmtstate.State, error) {
428-
return loadState(ctx, a.Store)
429-
}
430-
431-
// SaveState saves the state to disk
432-
func (a *Adapter) SaveState(ctx context.Context, state *cmtstate.State) error {
433-
return saveState(ctx, a.Store, state)
434-
}

pkg/adapter/store.go

+27-8
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,34 @@ import (
77
cmtstateproto "github.com/cometbft/cometbft/proto/tendermint/state"
88
cmtstate "github.com/cometbft/cometbft/state"
99
proto "github.com/cosmos/gogoproto/proto"
10+
ds "github.com/ipfs/go-datastore"
11+
kt "github.com/ipfs/go-datastore/keytransform"
12+
)
1013

11-
"github.com/rollkit/rollkit/pkg/store"
14+
const (
15+
// KeyPrefix is the prefix used for all ABCI-related keys in the datastore
16+
KeyPrefix = "abci"
17+
// stateKey is the key used for storing state
18+
stateKey = "s"
1219
)
1320

14-
const stateKey = "abci-s"
21+
// Store wraps a datastore with ABCI-specific functionality
22+
type Store struct {
23+
ds.Batching
24+
}
25+
26+
// NewStore creates a new Store with the ABCI prefix
27+
func NewStore(store ds.Batching) *Store {
28+
return &Store{
29+
Batching: kt.Wrap(store, &kt.PrefixTransform{
30+
Prefix: ds.NewKey(KeyPrefix),
31+
}),
32+
}
33+
}
1534

16-
// loadState loads the state from disk
17-
func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) {
18-
data, err := s.GetMetadata(ctx, stateKey)
35+
// LoadState loads the state from disk
36+
func (s *Store) LoadState(ctx context.Context) (*cmtstate.State, error) {
37+
data, err := s.Get(ctx, ds.NewKey(stateKey))
1938
if err != nil {
2039
return nil, fmt.Errorf("failed to get state metadata: %w", err)
2140
}
@@ -31,8 +50,8 @@ func loadState(ctx context.Context, s store.Store) (*cmtstate.State, error) {
3150
return cmtstate.FromProto(stateProto)
3251
}
3352

34-
// saveState saves the state to disk
35-
func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error {
53+
// SaveState saves the state to disk
54+
func (s *Store) SaveState(ctx context.Context, state *cmtstate.State) error {
3655
stateProto, err := state.ToProto()
3756
if err != nil {
3857
return fmt.Errorf("failed to convert state to proto: %w", err)
@@ -43,5 +62,5 @@ func saveState(ctx context.Context, s store.Store, state *cmtstate.State) error
4362
return fmt.Errorf("failed to marshal state: %w", err)
4463
}
4564

46-
return s.SetMetadata(ctx, stateKey, data)
65+
return s.Put(ctx, ds.NewKey(stateKey), data)
4766
}

pkg/rpc/provider/blockchain.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu
4141
default:
4242
heightValue = p.normalizeHeight(height)
4343
}
44-
header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue)
44+
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue)
4545
if err != nil {
4646
return nil, err
4747
}
@@ -65,7 +65,7 @@ func (p *RpcProvider) Block(ctx context.Context, height *int64) (*coretypes.Resu
6565

6666
// BlockByHash implements client.CometRPC.
6767
func (p *RpcProvider) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) {
68-
header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
68+
header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
6969
if err != nil {
7070
return nil, err
7171
}
@@ -120,7 +120,7 @@ func (p *RpcProvider) BlockResults(ctx context.Context, height *int64) (*coretyp
120120
// Commit implements client.CometRPC.
121121
func (p *RpcProvider) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) {
122122
heightValue := p.normalizeHeight(height)
123-
header, data, err := p.adapter.Store.GetBlockData(ctx, heightValue)
123+
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, heightValue)
124124
if err != nil {
125125
return nil, err
126126
}
@@ -156,7 +156,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes)
156156
// decoding logic in the HTTP service will correctly translate from JSON.
157157
// See https://github.com/cometbft/cometbft/issues/6802 for context.
158158

159-
header, data, err := p.adapter.Store.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
159+
header, data, err := p.adapter.RollkitStore.GetBlockByHash(ctx, rlktypes.Hash(hash)) // Used types.Hash from rollkit/types
160160
if err != nil {
161161
return nil, err
162162
}
@@ -179,7 +179,7 @@ func (p *RpcProvider) HeaderByHash(ctx context.Context, hash cmtbytes.HexBytes)
179179
func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) {
180180
const limit int64 = 20 // Default limit used in the original code
181181

182-
height, err := p.adapter.Store.Height(ctx)
182+
height, err := p.adapter.RollkitStore.Height(ctx)
183183
if err != nil {
184184
return nil, fmt.Errorf("failed to get current height: %w", err)
185185
}
@@ -211,7 +211,7 @@ func (p *RpcProvider) BlockchainInfo(ctx context.Context, minHeight int64, maxHe
211211

212212
// Re-fetch height in case new blocks were added during the loop?
213213
// The original code did this.
214-
finalHeight, err := p.adapter.Store.Height(ctx)
214+
finalHeight, err := p.adapter.RollkitStore.Height(ctx)
215215
if err != nil {
216216
return nil, fmt.Errorf("failed to get final height: %w", err)
217217
}
@@ -274,7 +274,7 @@ func (p *RpcProvider) BlockSearch(ctx context.Context, query string, pagePtr *in
274274
apiResults := make([]*coretypes.ResultBlock, 0, pageSize)
275275
for i := skipCount; i < skipCount+pageSize; i++ {
276276
height := uint64(results[i])
277-
header, data, err := p.adapter.Store.GetBlockData(ctx, height)
277+
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, height)
278278
if err != nil {
279279
// If a block referenced by indexer is missing, should we error out or just skip?
280280
// For now, error out.
@@ -307,7 +307,7 @@ func (p *RpcProvider) Validators(ctx context.Context, heightPtr *int64, pagePtr
307307
// depending on state pruning. The current implementation implicitly loads latest state.
308308
height := p.normalizeHeight(heightPtr)
309309

310-
s, err := p.adapter.LoadState(ctx) // Loads the *latest* state
310+
s, err := p.adapter.Store.LoadState(ctx) // Loads the *latest* state
311311
if err != nil {
312312
return nil, fmt.Errorf("failed to load current state: %w", err)
313313
}

pkg/rpc/provider/info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func (p *RpcProvider) Status(ctx context.Context) (*coretypes.ResultStatus, erro
1717
return nil, err
1818
}
1919

20-
s, err := p.adapter.LoadState(ctx)
20+
s, err := p.adapter.Store.LoadState(ctx)
2121
if err != nil {
2222
return nil, err
2323
}
@@ -109,7 +109,7 @@ func (p *RpcProvider) Health(context.Context) (*coretypes.ResultHealth, error) {
109109

110110
// ConsensusParams implements client.Client.
111111
func (p *RpcProvider) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) {
112-
state, err := p.adapter.LoadState(ctx)
112+
state, err := p.adapter.Store.LoadState(ctx)
113113
if err != nil {
114114
return nil, err
115115
}

pkg/rpc/provider/provider.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
5050
if height == nil {
5151
var err error
5252
// TODO: Decide how to handle context here. Using background for now.
53-
heightValue, err = p.adapter.Store.Height(context.Background())
53+
heightValue, err = p.adapter.RollkitStore.Height(context.Background())
5454
if err != nil {
5555
// TODO: Consider logging or returning error
5656
p.logger.Error("Failed to get current height in normalizeHeight", "err", err)
@@ -61,7 +61,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
6161
// Currently, just treat them as 0 or latest, adjust as needed.
6262
// For now, let's assume negative height means latest valid height.
6363
var err error
64-
heightValue, err = p.adapter.Store.Height(context.Background())
64+
heightValue, err = p.adapter.RollkitStore.Height(context.Background())
6565
if err != nil {
6666
p.logger.Error("Failed to get current height for negative height in normalizeHeight", "err", err)
6767
return 0
@@ -74,7 +74,7 @@ func (p *RpcProvider) normalizeHeight(height *int64) uint64 {
7474
}
7575

7676
func (p *RpcProvider) getBlockMeta(ctx context.Context, n uint64) *cmtypes.BlockMeta {
77-
header, data, err := p.adapter.Store.GetBlockData(ctx, n)
77+
header, data, err := p.adapter.RollkitStore.GetBlockData(ctx, n)
7878
if err != nil {
7979
p.logger.Error("Failed to get block data in getBlockMeta", "height", n, "err", err)
8080
return nil

server/start.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -358,11 +358,9 @@ func startNode(
358358
adapterMetrics = adapter.PrometheusMetrics(config.DefaultInstrumentationConfig().Namespace, "chain_id", cmtGenDoc.ChainID)
359359
}
360360

361-
st := store.New(database)
362-
363361
executor = adapter.NewABCIExecutor(
364362
app,
365-
st,
363+
database,
366364
p2pClient,
367365
p2pMetrics,
368366
logger,
@@ -379,7 +377,7 @@ func startNode(
379377
panic(err)
380378
}
381379

382-
height, err := st.Height(context.Background())
380+
height, err := executor.RollkitStore.Height(context.Background())
383381
if err != nil {
384382
return nil, nil, nil, cleanupFn, err
385383
}

0 commit comments

Comments
 (0)