Skip to content

Commit 8dada08

Browse files
committed
Require both types of KVStore
As an intermediary step, we require any store to implement both `KVStore` and `KVStoreSync`, allowing us to switch over step-by-step. We already switch to the fully-async background processor variant here.
1 parent 42f7a9e commit 8dada08

File tree

10 files changed

+236
-183
lines changed

10 files changed

+236
-183
lines changed

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ default = []
3232
#lightning-types = { version = "0.2.0" }
3333
#lightning-invoice = { version = "0.33.0", features = ["std"] }
3434
#lightning-net-tokio = { version = "0.1.0" }
35-
#lightning-persister = { version = "0.1.0" }
35+
#lightning-persister = { version = "0.1.0", features = ["tokio"] }
3636
#lightning-background-processor = { version = "0.1.0" }
3737
#lightning-rapid-gossip-sync = { version = "0.1.0" }
3838
#lightning-block-sync = { version = "0.1.0", features = ["rest-client", "rpc-client", "tokio"] }
@@ -44,7 +44,7 @@ default = []
4444
#lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
4545
#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] }
4646
#lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
47-
#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
47+
#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["tokio"] }
4848
#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
4949
#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
5050
#lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["rest-client", "rpc-client", "tokio"] }
@@ -56,7 +56,7 @@ lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "
5656
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" }
5757
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["std"] }
5858
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" }
59-
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" }
59+
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["tokio"] }
6060
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" }
6161
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" }
6262
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["rest-client", "rpc-client", "tokio"] }
@@ -68,7 +68,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning",
6868
#lightning-types = { path = "../rust-lightning/lightning-types" }
6969
#lightning-invoice = { path = "../rust-lightning/lightning-invoice", features = ["std"] }
7070
#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" }
71-
#lightning-persister = { path = "../rust-lightning/lightning-persister" }
71+
#lightning-persister = { path = "../rust-lightning/lightning-persister", features = ["tokio"] }
7272
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" }
7373
#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" }
7474
#lightning-block-sync = { path = "../rust-lightning/lightning-block-sync", features = ["rest-client", "rpc-client", "tokio"] }

src/builder.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use lightning::routing::scoring::{
3131
};
3232
use lightning::sign::{EntropySource, NodeSigner};
3333
use lightning::util::persist::{
34-
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
34+
read_channel_monitors, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
3535
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
3636
};
3737
use lightning::util::ser::ReadableArgs;
@@ -1419,7 +1419,8 @@ fn build_with_store_internal(
14191419

14201420
// Initialize the ChannelManager
14211421
let channel_manager = {
1422-
if let Ok(res) = kv_store.read(
1422+
if let Ok(res) = KVStoreSync::read(
1423+
&*kv_store,
14231424
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
14241425
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
14251426
CHANNEL_MANAGER_PERSISTENCE_KEY,
@@ -1657,7 +1658,7 @@ fn build_with_store_internal(
16571658
Ok(output_sweeper) => Arc::new(output_sweeper),
16581659
Err(e) => {
16591660
if e.kind() == std::io::ErrorKind::NotFound {
1660-
Arc::new(OutputSweeper::new_with_kv_store_sync(
1661+
Arc::new(OutputSweeper::new(
16611662
channel_manager.current_best_block(),
16621663
Arc::clone(&tx_broadcaster),
16631664
Arc::clone(&fee_estimator),

src/data_store.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::collections::{hash_map, HashMap};
99
use std::ops::Deref;
1010
use std::sync::{Arc, Mutex};
1111

12+
use lightning::util::persist::KVStoreSync;
1213
use lightning::util::ser::{Readable, Writeable};
1314

1415
use crate::logger::{log_error, LdkLogger};
@@ -97,19 +98,24 @@ where
9798
let removed = self.objects.lock().unwrap().remove(id).is_some();
9899
if removed {
99100
let store_key = id.encode_to_hex_str();
100-
self.kv_store
101-
.remove(&self.primary_namespace, &self.secondary_namespace, &store_key, false)
102-
.map_err(|e| {
103-
log_error!(
104-
self.logger,
105-
"Removing object data for key {}/{}/{} failed due to: {}",
106-
&self.primary_namespace,
107-
&self.secondary_namespace,
108-
store_key,
109-
e
110-
);
111-
Error::PersistenceFailed
112-
})?;
101+
KVStoreSync::remove(
102+
&*self.kv_store,
103+
&self.primary_namespace,
104+
&self.secondary_namespace,
105+
&store_key,
106+
false,
107+
)
108+
.map_err(|e| {
109+
log_error!(
110+
self.logger,
111+
"Removing object data for key {}/{}/{} failed due to: {}",
112+
&self.primary_namespace,
113+
&self.secondary_namespace,
114+
store_key,
115+
e
116+
);
117+
Error::PersistenceFailed
118+
})?;
113119
}
114120
Ok(())
115121
}
@@ -141,9 +147,14 @@ where
141147
fn persist(&self, object: &SO) -> Result<(), Error> {
142148
let store_key = object.id().encode_to_hex_str();
143149
let data = object.encode();
144-
self.kv_store
145-
.write(&self.primary_namespace, &self.secondary_namespace, &store_key, data)
146-
.map_err(|e| {
150+
KVStoreSync::write(
151+
&*self.kv_store,
152+
&self.primary_namespace,
153+
&self.secondary_namespace,
154+
&store_key,
155+
data,
156+
)
157+
.map_err(|e| {
147158
log_error!(
148159
self.logger,
149160
"Write for key {}/{}/{} failed due to: {}",
@@ -241,13 +252,15 @@ mod tests {
241252
let store_key = id.encode_to_hex_str();
242253

243254
// Check we start empty.
244-
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_err());
255+
assert!(KVStoreSync::read(&*store, &primary_namespace, &secondary_namespace, &store_key)
256+
.is_err());
245257

246258
// Check we successfully store an object and return `false`
247259
let object = TestObject { id, data: [23u8; 3] };
248260
assert_eq!(Ok(false), data_store.insert(object.clone()));
249261
assert_eq!(Some(object), data_store.get(&id));
250-
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_ok());
262+
assert!(KVStoreSync::read(&*store, &primary_namespace, &secondary_namespace, &store_key)
263+
.is_ok());
251264

252265
// Test re-insertion returns `true`
253266
let mut override_object = object.clone();

src/event.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use lightning::util::config::{
2626
ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
2727
};
2828
use lightning::util::errors::APIError;
29+
use lightning::util::persist::KVStoreSync;
2930
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
3031
use lightning_liquidity::lsps2::utils::compute_opening_fee;
3132
use lightning_types::payment::{PaymentHash, PaymentPreimage};
@@ -348,24 +349,24 @@ where
348349

349350
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
350351
let data = EventQueueSerWrapper(locked_queue).encode();
351-
self.kv_store
352-
.write(
352+
KVStoreSync::write(
353+
&*self.kv_store,
354+
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
355+
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
356+
EVENT_QUEUE_PERSISTENCE_KEY,
357+
data,
358+
)
359+
.map_err(|e| {
360+
log_error!(
361+
self.logger,
362+
"Write for key {}/{}/{} failed due to: {}",
353363
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
354364
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
355365
EVENT_QUEUE_PERSISTENCE_KEY,
356-
data,
357-
)
358-
.map_err(|e| {
359-
log_error!(
360-
self.logger,
361-
"Write for key {}/{}/{} failed due to: {}",
362-
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
363-
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
364-
EVENT_QUEUE_PERSISTENCE_KEY,
365-
e
366-
);
367-
Error::PersistenceFailed
368-
})?;
366+
e
367+
);
368+
Error::PersistenceFailed
369+
})?;
369370
Ok(())
370371
}
371372
}
@@ -1620,13 +1621,13 @@ mod tests {
16201621
}
16211622

16221623
// Check we can read back what we persisted.
1623-
let persisted_bytes = store
1624-
.read(
1625-
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1626-
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1627-
EVENT_QUEUE_PERSISTENCE_KEY,
1628-
)
1629-
.unwrap();
1624+
let persisted_bytes = KVStoreSync::read(
1625+
&*store,
1626+
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1627+
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1628+
EVENT_QUEUE_PERSISTENCE_KEY,
1629+
)
1630+
.unwrap();
16301631
let deser_event_queue =
16311632
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
16321633
assert_eq!(deser_event_queue.wait_next_event(), expected_event);

0 commit comments

Comments
 (0)