Move to zerocopy-lmdb

This commit is contained in:
Clément Renault 2019-10-16 17:05:24 +02:00
parent c332c7bc70
commit 1667e1b32f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
25 changed files with 450 additions and 684 deletions

View File

@ -12,20 +12,23 @@ crossbeam-channel = "0.3.9"
deunicode = "1.0.0" deunicode = "1.0.0"
env_logger = "0.7.0" env_logger = "0.7.0"
hashbrown = { version = "0.6.0", features = ["serde"] } hashbrown = { version = "0.6.0", features = ["serde"] }
lmdb-rkv = "0.12.3"
log = "0.4.8" log = "0.4.8"
meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
once_cell = "1.2.0" once_cell = "1.2.0"
ordered-float = { version = "1.0.2", features = ["serde"] } ordered-float = { version = "1.0.2", features = ["serde"] }
rkv = "0.10.2" sdset = "0.3.3"
sdset = "0.3.2"
serde = { version = "1.0.101", features = ["derive"] } serde = { version = "1.0.101", features = ["derive"] }
serde_json = "1.0.41" serde_json = "1.0.41"
siphasher = "0.3.0" siphasher = "0.3.0"
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
zerocopy = "0.2.8" zerocopy = "0.2.8"
[dependencies.zlmdb]
package = "zerocopy-lmdb"
git = "https://github.com/Kerollmops/zerocopy-lmdb.git"
branch = "master"
[dependencies.levenshtein_automata] [dependencies.levenshtein_automata]
git = "https://github.com/Kerollmops/levenshtein-automata.git" git = "https://github.com/Kerollmops/levenshtein-automata.git"
branch = "arc-byte-slice" branch = "arc-byte-slice"

View File

@ -94,14 +94,14 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
let done = database.set_update_callback(INDEX_NAME, Box::new(update_fn)); let done = database.set_update_callback(INDEX_NAME, Box::new(update_fn));
assert!(done, "could not set the index update function"); assert!(done, "could not set the index update function");
let rkv = database.rkv.read().unwrap(); let env = &database.env;
let schema = { let schema = {
let string = fs::read_to_string(&command.schema)?; let string = fs::read_to_string(&command.schema)?;
toml::from_str(&string).unwrap() toml::from_str(&string).unwrap()
}; };
let mut writer = rkv.write().unwrap(); let mut writer = env.write_txn().unwrap();
match index.main.schema(&writer)? { match index.main.schema(&writer)? {
Some(current_schema) => { Some(current_schema) => {
if current_schema != schema { if current_schema != schema {
@ -150,7 +150,7 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
println!(); println!();
let mut writer = rkv.write().unwrap(); let mut writer = env.write_txn().unwrap();
println!("committing update..."); println!("committing update...");
let update_id = additions.finalize(&mut writer)?; let update_id = additions.finalize(&mut writer)?;
writer.commit().unwrap(); writer.commit().unwrap();
@ -266,9 +266,9 @@ fn crop_text(
} }
fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> { fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> {
let rkv = database.rkv.read().unwrap(); let env = &database.env;
let index = database.open_index(INDEX_NAME).expect("Could not find index"); let index = database.open_index(INDEX_NAME).expect("Could not find index");
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let schema = index.main.schema(&reader)?; let schema = index.main.schema(&reader)?;
let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?; let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?;
@ -317,7 +317,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
doc.highlights.sort_unstable_by_key(|m| (m.char_index, m.char_length)); doc.highlights.sort_unstable_by_key(|m| (m.char_index, m.char_length));
let start_retrieve = Instant::now(); let start_retrieve = Instant::now();
let result = index.document::<_, Document>(&reader, Some(&fields), doc.id); let result = index.document::<Document>(&reader, Some(&fields), doc.id);
retrieve_duration += start_retrieve.elapsed(); retrieve_duration += start_retrieve.elapsed();
match result { match result {

View File

@ -23,7 +23,7 @@ pub struct AutomatonProducer {
impl AutomatonProducer { impl AutomatonProducer {
pub fn new( pub fn new(
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
query: &str, query: &str,
main_store: store::Main, main_store: store::Main,
synonyms_store: store::Synonyms, synonyms_store: store::Synonyms,
@ -108,7 +108,7 @@ pub fn normalize_str(string: &str) -> String {
} }
fn generate_automatons( fn generate_automatons(
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
query: &str, query: &str,
main_store: store::Main, main_store: store::Main,
synonym_store: store::Synonyms, synonym_store: store::Synonyms,

View File

@ -3,6 +3,7 @@ use std::path::Path;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::{fs, thread}; use std::{fs, thread};
use zlmdb::types::{Str, Unit};
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
use log::{debug, error}; use log::{debug, error};
@ -12,27 +13,22 @@ pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>; type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct Database { pub struct Database {
pub rkv: Arc<RwLock<rkv::Rkv>>, pub env: zlmdb::Env,
common_store: rkv::SingleStore, common_store: zlmdb::DynDatabase,
indexes_store: rkv::SingleStore, indexes_store: zlmdb::Database<Str, Unit>,
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>, indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>,
} }
fn update_awaiter( fn update_awaiter(
receiver: Receiver<()>, receiver: Receiver<()>,
rkv: Arc<RwLock<rkv::Rkv>>, env: zlmdb::Env,
update_fn: Arc<ArcSwapFn>, update_fn: Arc<ArcSwapFn>,
index: Index, index: Index,
) { ) {
for () in receiver { for () in receiver {
// consume all updates in order (oldest first) // consume all updates in order (oldest first)
loop { loop {
let rkv = match rkv.read() { let mut writer = match env.write_txn() {
Ok(rkv) => rkv,
Err(e) => { error!("rkv RwLock read failed: {}", e); break }
};
let mut writer = match rkv.write() {
Ok(writer) => writer, Ok(writer) => writer,
Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break } Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break }
}; };
@ -55,64 +51,57 @@ fn update_awaiter(
impl Database { impl Database {
pub fn open_or_create(path: impl AsRef<Path>) -> MResult<Database> { pub fn open_or_create(path: impl AsRef<Path>) -> MResult<Database> {
let manager = rkv::Manager::singleton();
let mut rkv_write = manager.write().unwrap();
fs::create_dir_all(path.as_ref())?; fs::create_dir_all(path.as_ref())?;
let rkv = rkv_write let env = zlmdb::EnvOpenOptions::new()
.get_or_create(path.as_ref(), |path| { .map_size(10 * 1024 * 1024 * 1024) // 10GB
let mut builder = rkv::Rkv::environment_builder(); .max_dbs(3000)
builder.set_max_dbs(3000).set_map_size(10 * 1024 * 1024 * 1024); // 10GB .open(path)?;
rkv::Rkv::from_env(path, builder)
})?;
drop(rkv_write); let common_store = env.create_dyn_database(Some("common"))?;
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
let rkv_read = rkv.read().unwrap();
let create_options = rkv::store::Options::create();
let common_store = rkv_read.open_single("common", create_options)?;
let indexes_store = rkv_read.open_single("indexes", create_options)?;
// list all indexes that needs to be opened // list all indexes that needs to be opened
let mut must_open = Vec::new(); let mut must_open = Vec::new();
let reader = rkv_read.read()?; let reader = env.read_txn()?;
for result in indexes_store.iter_start(&reader)? { for result in indexes_store.iter(&reader)? {
let (key, _) = result?; let (index_name, _) = result?;
if let Ok(index_name) = std::str::from_utf8(key) {
must_open.push(index_name.to_owned()); must_open.push(index_name.to_owned());
} }
}
drop(reader); reader.abort();
// open the previously aggregated indexes // open the previously aggregated indexes
let mut indexes = HashMap::new(); let mut indexes = HashMap::new();
for index_name in must_open { for index_name in must_open {
let (sender, receiver) = crossbeam_channel::bounded(100); let (sender, receiver) = crossbeam_channel::bounded(100);
let index = store::open(&rkv_read, &index_name, sender.clone())?; let index = match store::open(&env, &index_name, sender.clone())? {
Some(index) => index,
None => {
log::warn!("the index {} doesn't exist or has not all the databases", index_name);
continue;
},
};
let update_fn = Arc::new(ArcSwapFn::empty()); let update_fn = Arc::new(ArcSwapFn::empty());
let rkv_clone = rkv.clone(); let env_clone = env.clone();
let index_clone = index.clone(); let index_clone = index.clone();
let update_fn_clone = update_fn.clone(); let update_fn_clone = update_fn.clone();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone) update_awaiter(receiver, env_clone, update_fn_clone, index_clone)
}); });
// send an update notification to make sure that // send an update notification to make sure that
// possible previous boot updates are consumed // possible pre-boot updates are consumed
sender.send(()).unwrap(); sender.send(()).unwrap();
let result = indexes.insert(index_name, (index, update_fn, handle)); let result = indexes.insert(index_name, (index, update_fn, handle));
assert!(result.is_none(), "The index should not have been already open"); assert!(result.is_none(), "The index should not have been already open");
} }
drop(rkv_read); Ok(Database { env, common_store, indexes_store, indexes: RwLock::new(indexes) })
Ok(Database { rkv, common_store, indexes_store, indexes: RwLock::new(indexes) })
} }
pub fn open_index(&self, name: impl AsRef<str>) -> Option<Index> { pub fn open_index(&self, name: impl AsRef<str>) -> Option<Index> {
@ -130,22 +119,20 @@ impl Database {
match indexes_lock.entry(name.to_owned()) { match indexes_lock.entry(name.to_owned()) {
Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists), Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists),
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let rkv_lock = self.rkv.read().unwrap();
let (sender, receiver) = crossbeam_channel::bounded(100); let (sender, receiver) = crossbeam_channel::bounded(100);
let index = store::create(&rkv_lock, name, sender)?; let index = store::create(&self.env, name, sender)?;
let mut writer = rkv_lock.write()?; let mut writer = self.env.write_txn()?;
let value = rkv::Value::Blob(&[]); self.indexes_store.put(&mut writer, name, &())?;
self.indexes_store.put(&mut writer, name, &value)?;
let rkv_clone = self.rkv.clone(); let env_clone = self.env.clone();
let index_clone = index.clone(); let index_clone = index.clone();
let no_update_fn = Arc::new(ArcSwapFn::empty()); let no_update_fn = Arc::new(ArcSwapFn::empty());
let no_update_fn_clone = no_update_fn.clone(); let no_update_fn_clone = no_update_fn.clone();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
update_awaiter(receiver, rkv_clone, no_update_fn_clone, index_clone) update_awaiter(receiver, env_clone, no_update_fn_clone, index_clone)
}); });
writer.commit()?; writer.commit()?;
@ -181,7 +168,7 @@ impl Database {
Ok(indexes.keys().cloned().collect()) Ok(indexes.keys().cloned().collect())
} }
pub fn common_store(&self) -> rkv::SingleStore { pub fn common_store(&self) -> zlmdb::DynDatabase {
self.common_store self.common_store
} }
} }

View File

@ -12,7 +12,7 @@ pub enum Error {
SchemaMissing, SchemaMissing,
WordIndexMissing, WordIndexMissing,
MissingDocumentId, MissingDocumentId,
Rkv(rkv::StoreError), Zlmdb(zlmdb::Error),
Fst(fst::Error), Fst(fst::Error),
SerdeJson(SerdeJsonError), SerdeJson(SerdeJsonError),
Bincode(bincode::Error), Bincode(bincode::Error),
@ -27,9 +27,9 @@ impl From<io::Error> for Error {
} }
} }
impl From<rkv::StoreError> for Error { impl From<zlmdb::Error> for Error {
fn from(error: rkv::StoreError) -> Error { fn from(error: zlmdb::Error) -> Error {
Error::Rkv(error) Error::Zlmdb(error)
} }
} }
@ -79,7 +79,7 @@ impl fmt::Display for Error {
SchemaMissing => write!(f, "this index does not have a schema"), SchemaMissing => write!(f, "this index does not have a schema"),
WordIndexMissing => write!(f, "this index does not have a word index"), WordIndexMissing => write!(f, "this index does not have a word index"),
MissingDocumentId => write!(f, "document id is missing"), MissingDocumentId => write!(f, "document id is missing"),
Rkv(e) => write!(f, "rkv error; {}", e), Zlmdb(e) => write!(f, "zlmdb error; {}", e),
Fst(e) => write!(f, "fst error; {}", e), Fst(e) => write!(f, "fst error; {}", e),
SerdeJson(e) => write!(f, "serde json error; {}", e), SerdeJson(e) => write!(f, "serde json error; {}", e),
Bincode(e) => write!(f, "bincode error; {}", e), Bincode(e) => write!(f, "bincode error; {}", e),

View File

@ -125,7 +125,7 @@ fn multiword_rewrite_matches(
} }
fn fetch_raw_documents( fn fetch_raw_documents(
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
automatons: &[Automaton], automatons: &[Automaton],
query_enhancer: &QueryEnhancer, query_enhancer: &QueryEnhancer,
searchables: Option<&ReorderedAttrs>, searchables: Option<&ReorderedAttrs>,
@ -278,7 +278,7 @@ impl<'c, FI> QueryBuilder<'c, FI> {
impl<FI> QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool { impl<FI> QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool {
pub fn query( pub fn query(
self, self,
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
query: &str, query: &str,
range: Range<usize>, range: Range<usize>,
) -> MResult<Vec<Document>> ) -> MResult<Vec<Document>>
@ -414,7 +414,7 @@ where FI: Fn(DocumentId) -> bool,
{ {
pub fn query( pub fn query(
self, self,
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
query: &str, query: &str,
range: Range<usize>, range: Range<usize>,
) -> MResult<Vec<Document>> ) -> MResult<Vec<Document>>
@ -643,8 +643,8 @@ mod tests {
} }
pub fn add_synonym(&mut self, word: &str, new: SetBuf<&str>) { pub fn add_synonym(&mut self, word: &str, new: SetBuf<&str>) {
let rkv = self.database.rkv.read().unwrap(); let env = &self.database.env;
let mut writer = rkv.write().unwrap(); let mut writer = env.write_txn().unwrap();
let word = word.to_lowercase(); let word = word.to_lowercase();
@ -675,8 +675,8 @@ mod tests {
let database = Database::open_or_create(&tempdir).unwrap(); let database = Database::open_or_create(&tempdir).unwrap();
let index = database.create_index("default").unwrap(); let index = database.create_index("default").unwrap();
let rkv = database.rkv.read().unwrap(); let env = &database.env;
let mut writer = rkv.write().unwrap(); let mut writer = env.write_txn().unwrap();
let mut words_fst = BTreeSet::new(); let mut words_fst = BTreeSet::new();
let mut postings_lists = HashMap::new(); let mut postings_lists = HashMap::new();
@ -720,7 +720,6 @@ mod tests {
} }
writer.commit().unwrap(); writer.commit().unwrap();
drop(rkv);
TempDatabase { database, index, _tempdir: tempdir } TempDatabase { database, index, _tempdir: tempdir }
} }
@ -734,8 +733,8 @@ mod tests {
("apple", &[doc_char_index(0, 2, 2)][..]), ("apple", &[doc_char_index(0, 2, 2)][..]),
]); ]);
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "iphone from apple", 0..20).unwrap(); let results = builder.query(&reader, "iphone from apple", 0..20).unwrap();
@ -759,8 +758,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "hello", 0..20).unwrap(); let results = builder.query(&reader, "hello", 0..20).unwrap();
@ -794,8 +793,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"]));
store.add_synonym("salut", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "sal", 0..20).unwrap(); let results = builder.query(&reader, "sal", 0..20).unwrap();
@ -840,8 +839,8 @@ mod tests {
store.add_synonym("salutation", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("salutation", SetBuf::from_dirty(vec!["hello"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "salutution", 0..20).unwrap(); let results = builder.query(&reader, "salutution", 0..20).unwrap();
@ -878,8 +877,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello", "salut"])); store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello", "salut"]));
store.add_synonym("salut", SetBuf::from_dirty(vec!["hello", "bonjour"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello", "bonjour"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "hello", 0..20).unwrap(); let results = builder.query(&reader, "hello", 0..20).unwrap();
@ -961,8 +960,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"])); store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"]));
store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap(); let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1033,8 +1032,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["york new"])); store.add_synonym("NY", SetBuf::from_dirty(vec!["york new"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY", 0..20).unwrap(); let results = builder.query(&reader, "NY", 0..20).unwrap();
@ -1092,8 +1091,8 @@ mod tests {
store.add_synonym("new york", SetBuf::from_dirty(vec!["NY"])); store.add_synonym("new york", SetBuf::from_dirty(vec!["NY"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap(); let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1152,8 +1151,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"])); store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"]));
store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap(); let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1228,8 +1227,8 @@ mod tests {
store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"]));
store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY subway broken", 0..20).unwrap(); let results = builder.query(&reader, "NY subway broken", 0..20).unwrap();
@ -1311,8 +1310,8 @@ mod tests {
store.add_synonym("new york city", SetBuf::from_dirty(vec![ "NYC", "NY", "new york" ])); store.add_synonym("new york city", SetBuf::from_dirty(vec![ "NYC", "NY", "new york" ]));
store.add_synonym("underground train", SetBuf::from_dirty(vec![ "subway" ])); store.add_synonym("underground train", SetBuf::from_dirty(vec![ "subway" ]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "new york underground train broken", 0..20).unwrap(); let results = builder.query(&reader, "new york underground train broken", 0..20).unwrap();
@ -1407,8 +1406,8 @@ mod tests {
store.add_synonym("new york", SetBuf::from_dirty(vec![ "new york city" ])); store.add_synonym("new york", SetBuf::from_dirty(vec![ "new york city" ]));
store.add_synonym("new york city", SetBuf::from_dirty(vec![ "new york" ])); store.add_synonym("new york city", SetBuf::from_dirty(vec![ "new york" ]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "new york big ", 0..20).unwrap(); let results = builder.query(&reader, "new york big ", 0..20).unwrap();
@ -1446,8 +1445,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["new york city story"])); store.add_synonym("NY", SetBuf::from_dirty(vec!["new york city story"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "NY subway ", 0..20).unwrap(); let results = builder.query(&reader, "NY subway ", 0..20).unwrap();
@ -1496,8 +1495,8 @@ mod tests {
store.add_synonym("new york city", SetBuf::from_dirty(vec!["NYC"])); store.add_synonym("new york city", SetBuf::from_dirty(vec!["NYC"]));
store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "new york city long subway cool ", 0..20).unwrap(); let results = builder.query(&reader, "new york city long subway cool ", 0..20).unwrap();
@ -1528,8 +1527,8 @@ mod tests {
store.add_synonym("téléphone", SetBuf::from_dirty(vec!["iphone"])); store.add_synonym("téléphone", SetBuf::from_dirty(vec!["iphone"]));
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "telephone", 0..20).unwrap(); let results = builder.query(&reader, "telephone", 0..20).unwrap();
@ -1590,8 +1589,8 @@ mod tests {
("case", &[doc_index(0, 1)][..]), ("case", &[doc_index(0, 1)][..]),
]); ]);
let rkv = store.database.rkv.read().unwrap(); let env = &store.database.env;
let reader = rkv.read().unwrap(); let reader = env.read_txn().unwrap();
let builder = store.query_builder(); let builder = store.query_builder();
let results = builder.query(&reader, "i phone case", 0..20).unwrap(); let results = builder.query(&reader, "i phone case", 0..20).unwrap();

View File

@ -2,10 +2,13 @@ use std::io::{Read, Write};
use hashbrown::HashMap; use hashbrown::HashMap;
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
use serde::{Serialize, Deserialize};
use crate::{DocumentId, Number}; use crate::{DocumentId, Number};
#[derive(Debug, Default, Clone, PartialEq, Eq)] #[derive(Debug, Default, Clone, PartialEq, Eq)]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct RankedMap(HashMap<(DocumentId, SchemaAttr), Number>); pub struct RankedMap(HashMap<(DocumentId, SchemaAttr), Number>);
impl RankedMap { impl RankedMap {

View File

@ -14,7 +14,7 @@ use crate::DocumentId;
#[derive(Debug)] #[derive(Debug)]
pub enum DeserializerError { pub enum DeserializerError {
SerdeJson(SerdeJsonError), SerdeJson(SerdeJsonError),
Rkv(rkv::StoreError), Zlmdb(zlmdb::Error),
Custom(String), Custom(String),
} }
@ -28,7 +28,7 @@ impl fmt::Display for DeserializerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
DeserializerError::SerdeJson(e) => write!(f, "serde json related error: {}", e), DeserializerError::SerdeJson(e) => write!(f, "serde json related error: {}", e),
DeserializerError::Rkv(e) => write!(f, "rkv related error: {}", e), DeserializerError::Zlmdb(e) => write!(f, "zlmdb related error: {}", e),
DeserializerError::Custom(s) => f.write_str(s), DeserializerError::Custom(s) => f.write_str(s),
} }
} }
@ -42,23 +42,21 @@ impl From<SerdeJsonError> for DeserializerError {
} }
} }
impl From<rkv::StoreError> for DeserializerError { impl From<zlmdb::Error> for DeserializerError {
fn from(error: rkv::StoreError) -> DeserializerError { fn from(error: zlmdb::Error) -> DeserializerError {
DeserializerError::Rkv(error) DeserializerError::Zlmdb(error)
} }
} }
pub struct Deserializer<'a, R> { pub struct Deserializer<'a> {
pub document_id: DocumentId, pub document_id: DocumentId,
pub reader: &'a R, pub reader: &'a zlmdb::RoTxn,
pub documents_fields: DocumentsFields, pub documents_fields: DocumentsFields,
pub schema: &'a Schema, pub schema: &'a Schema,
pub attributes: Option<&'a HashSet<SchemaAttr>>, pub attributes: Option<&'a HashSet<SchemaAttr>>,
} }
impl<'de, 'a, 'b, R: 'a> de::Deserializer<'de> for &'b mut Deserializer<'a, R> impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> {
where R: rkv::Readable,
{
type Error = DeserializerError; type Error = DeserializerError;
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error> fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>

View File

@ -35,7 +35,7 @@ use crate::{DocumentId, ParseNumberError};
pub enum SerializerError { pub enum SerializerError {
DocumentIdNotFound, DocumentIdNotFound,
InvalidDocumentIdType, InvalidDocumentIdType,
RkvError(rkv::StoreError), Zlmdb(zlmdb::Error),
SerdeJson(SerdeJsonError), SerdeJson(SerdeJsonError),
ParseNumber(ParseNumberError), ParseNumber(ParseNumberError),
UnserializableType { type_name: &'static str }, UnserializableType { type_name: &'static str },
@ -59,7 +59,7 @@ impl fmt::Display for SerializerError {
SerializerError::InvalidDocumentIdType => { SerializerError::InvalidDocumentIdType => {
f.write_str("document identifier can only be of type string or number") f.write_str("document identifier can only be of type string or number")
}, },
SerializerError::RkvError(e) => write!(f, "rkv related error: {}", e), SerializerError::Zlmdb(e) => write!(f, "zlmdb related error: {}", e),
SerializerError::SerdeJson(e) => write!(f, "serde json error: {}", e), SerializerError::SerdeJson(e) => write!(f, "serde json error: {}", e),
SerializerError::ParseNumber(e) => { SerializerError::ParseNumber(e) => {
write!(f, "error while trying to parse a number: {}", e) write!(f, "error while trying to parse a number: {}", e)
@ -92,9 +92,9 @@ impl From<SerdeJsonError> for SerializerError {
} }
} }
impl From<rkv::StoreError> for SerializerError { impl From<zlmdb::Error> for SerializerError {
fn from(error: rkv::StoreError) -> SerializerError { fn from(error: zlmdb::Error) -> SerializerError {
SerializerError::RkvError(error) SerializerError::Zlmdb(error)
} }
} }

View File

@ -1,54 +1,51 @@
use std::sync::Arc; use std::sync::Arc;
use rkv::{Value, StoreError}; use zlmdb::types::{OwnedType, ByteSlice};
use crate::{DocumentId, MResult}; use zlmdb::Result as ZResult;
use crate::DocumentId;
use super::BEU64;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct DocsWords { pub struct DocsWords {
pub(crate) docs_words: rkv::SingleStore, pub(crate) docs_words: zlmdb::Database<OwnedType<BEU64>, ByteSlice>,
} }
impl DocsWords { impl DocsWords {
pub fn put_doc_words( pub fn put_doc_words(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
words: &fst::Set, words: &fst::Set,
) -> Result<(), rkv::StoreError> ) -> ZResult<()>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let document_id = BEU64::new(document_id.0);
let bytes = words.as_fst().as_bytes(); let bytes = words.as_fst().as_bytes();
self.docs_words.put(writer, document_id_bytes, &Value::Blob(bytes)) self.docs_words.put(writer, &document_id, bytes)
} }
pub fn del_doc_words( pub fn del_doc_words(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
) -> Result<bool, rkv::StoreError> ) -> ZResult<bool>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let document_id = BEU64::new(document_id.0);
match self.docs_words.delete(writer, document_id_bytes) { self.docs_words.delete(writer, &document_id)
Ok(()) => Ok(true),
Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false),
Err(e) => Err(e),
}
} }
pub fn doc_words<T: rkv::Readable>( pub fn doc_words(
&self, &self,
reader: &T, reader: &zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
) -> MResult<Option<fst::Set>> ) -> ZResult<Option<fst::Set>>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let document_id = BEU64::new(document_id.0);
match self.docs_words.get(reader, document_id_bytes)? { match self.docs_words.get(reader, &document_id)? {
Some(Value::Blob(bytes)) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let bytes = Arc::from(bytes); let bytes = Arc::from(bytes);
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None), None => Ok(None),
} }
} }

View File

@ -1,102 +1,77 @@
use std::convert::TryFrom;
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
use zlmdb::types::{OwnedType, ByteSlice};
use zlmdb::Result as ZResult;
use crate::DocumentId; use crate::DocumentId;
use super::{document_attribute_into_key, document_attribute_from_key}; use super::DocumentAttrKey;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct DocumentsFields { pub struct DocumentsFields {
pub(crate) documents_fields: rkv::SingleStore, pub(crate) documents_fields: zlmdb::Database<OwnedType<DocumentAttrKey>, ByteSlice>,
} }
impl DocumentsFields { impl DocumentsFields {
pub fn put_document_field( pub fn put_document_field(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
attribute: SchemaAttr, attribute: SchemaAttr,
value: &[u8], value: &[u8],
) -> Result<(), rkv::StoreError> ) -> ZResult<()>
{ {
let key = document_attribute_into_key(document_id, attribute); let key = DocumentAttrKey::new(document_id, attribute);
self.documents_fields.put(writer, key, &rkv::Value::Blob(value)) self.documents_fields.put(writer, &key, value)
} }
pub fn del_all_document_fields( pub fn del_all_document_fields(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
) -> Result<usize, rkv::StoreError> ) -> ZResult<usize>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
let mut keys_to_delete = Vec::new(); let end = DocumentAttrKey::new(document_id, SchemaAttr::max());
self.documents_fields.delete_range(writer, start..=end)
// WARN we can not delete the keys using the iterator
// so we store them and delete them just after
let iter = self.documents_fields.iter_from(writer, document_id_bytes)?;
for result in iter {
let (key, _) = result?;
let array = TryFrom::try_from(key).unwrap();
let (current_document_id, _) = document_attribute_from_key(array);
if current_document_id != document_id { break }
keys_to_delete.push(key.to_owned());
} }
let count = keys_to_delete.len(); pub fn document_attribute<'txn>(
for key in keys_to_delete {
self.documents_fields.delete(writer, key)?;
}
Ok(count)
}
pub fn document_attribute<'a>(
&self, &self,
reader: &'a impl rkv::Readable, reader: &'txn zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
attribute: SchemaAttr, attribute: SchemaAttr,
) -> Result<Option<&'a [u8]>, rkv::StoreError> ) -> ZResult<Option<&'txn [u8]>>
{ {
let key = document_attribute_into_key(document_id, attribute); let key = DocumentAttrKey::new(document_id, attribute);
self.documents_fields.get(reader, &key)
match self.documents_fields.get(reader, key)? {
Some(rkv::Value::Blob(bytes)) => Ok(Some(bytes)),
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
pub fn document_fields<'r, T: rkv::Readable>( pub fn document_fields<'txn>(
&self, &self,
reader: &'r T, reader: &'txn zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
) -> Result<DocumentFieldsIter<'r>, rkv::StoreError> ) -> ZResult<DocumentFieldsIter<'txn>>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
let iter = self.documents_fields.iter_from(reader, document_id_bytes)?; let end = DocumentAttrKey::new(document_id, SchemaAttr::max());
Ok(DocumentFieldsIter { document_id, iter }) let iter = self.documents_fields.range(reader, start..=end)?;
Ok(DocumentFieldsIter { iter })
} }
} }
pub struct DocumentFieldsIter<'r> { pub struct DocumentFieldsIter<'txn> {
document_id: DocumentId, iter: zlmdb::RoRange<'txn, OwnedType<DocumentAttrKey>, ByteSlice>,
iter: rkv::store::single::Iter<'r>,
} }
impl<'r> Iterator for DocumentFieldsIter<'r> { impl<'txn> Iterator for DocumentFieldsIter<'txn> {
type Item = Result<(SchemaAttr, &'r [u8]), rkv::StoreError>; type Item = ZResult<(SchemaAttr, &'txn [u8])>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() { match self.iter.next() {
Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => { Some(Ok((key, bytes))) => {
let array = TryFrom::try_from(key).unwrap(); let attr = SchemaAttr(key.attr.get());
let (current_document_id, attr) = document_attribute_from_key(array);
if current_document_id != self.document_id { return None; }
Some(Ok((attr, bytes))) Some(Ok((attr, bytes)))
}, },
Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), Some(Err(e)) => Some(Err(e.into())),
Some(Err(e)) => Some(Err(e)),
None => None, None => None,
} }
} }

View File

@ -1,163 +1,142 @@
use std::convert::TryFrom;
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
use zlmdb::types::OwnedType;
use zlmdb::Result as ZResult;
use crate::DocumentId; use crate::DocumentId;
use super::{document_attribute_into_key, document_attribute_from_key}; use super::DocumentAttrKey;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct DocumentsFieldsCounts { pub struct DocumentsFieldsCounts {
pub(crate) documents_fields_counts: rkv::SingleStore, pub(crate) documents_fields_counts: zlmdb::Database<OwnedType<DocumentAttrKey>, OwnedType<u64>>,
} }
impl DocumentsFieldsCounts { impl DocumentsFieldsCounts {
pub fn put_document_field_count( pub fn put_document_field_count(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
attribute: SchemaAttr, attribute: SchemaAttr,
value: u64, value: u64,
) -> Result<(), rkv::StoreError> ) -> ZResult<()>
{ {
let key = document_attribute_into_key(document_id, attribute); let key = DocumentAttrKey::new(document_id, attribute);
self.documents_fields_counts.put(writer, key, &rkv::Value::U64(value)) self.documents_fields_counts.put(writer, &key, &value)
} }
pub fn del_all_document_fields_counts( pub fn del_all_document_fields_counts(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
document_id: DocumentId, document_id: DocumentId,
) -> Result<usize, rkv::StoreError> ) -> ZResult<usize>
{ {
let mut keys_to_delete = Vec::new(); let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
let end = DocumentAttrKey::new(document_id, SchemaAttr::max());
// WARN we can not delete the keys using the iterator self.documents_fields_counts.delete_range(writer, start..=end)
// so we store them and delete them just after
for result in self.document_fields_counts(writer, document_id)? {
let (attribute, _) = result?;
let key = document_attribute_into_key(document_id, attribute);
keys_to_delete.push(key);
}
let count = keys_to_delete.len();
for key in keys_to_delete {
self.documents_fields_counts.delete(writer, key)?;
}
Ok(count)
} }
pub fn document_field_count( pub fn document_field_count(
&self, &self,
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
attribute: SchemaAttr, attribute: SchemaAttr,
) -> Result<Option<u64>, rkv::StoreError> ) -> ZResult<Option<u64>>
{ {
let key = document_attribute_into_key(document_id, attribute); let key = DocumentAttrKey::new(document_id, attribute);
match self.documents_fields_counts.get(reader, &key)? {
match self.documents_fields_counts.get(reader, key)? { Some(count) => Ok(Some(count)),
Some(rkv::Value::U64(count)) => Ok(Some(count)),
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None), None => Ok(None),
} }
} }
pub fn document_fields_counts<'r, T: rkv::Readable>( pub fn document_fields_counts<'txn>(
&self, &self,
reader: &'r T, reader: &'txn zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
) -> Result<DocumentFieldsCountsIter<'r>, rkv::StoreError> ) -> ZResult<DocumentFieldsCountsIter<'txn>>
{ {
let document_id_bytes = document_id.0.to_be_bytes(); let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
let iter = self.documents_fields_counts.iter_from(reader, document_id_bytes)?; let end = DocumentAttrKey::new(document_id, SchemaAttr::max());
Ok(DocumentFieldsCountsIter { document_id, iter }) let iter = self.documents_fields_counts.range(reader, start..=end)?;
Ok(DocumentFieldsCountsIter { iter })
} }
pub fn documents_ids<'r, T: rkv::Readable>( pub fn documents_ids<'txn>(
&self, &self,
reader: &'r T, reader: &'txn zlmdb::RoTxn,
) -> Result<DocumentsIdsIter<'r>, rkv::StoreError> ) -> ZResult<DocumentsIdsIter<'txn>>
{ {
let iter = self.documents_fields_counts.iter_start(reader)?; let iter = self.documents_fields_counts.iter(reader)?;
Ok(DocumentsIdsIter { last_seen_id: None, iter }) Ok(DocumentsIdsIter { last_seen_id: None, iter })
} }
pub fn all_documents_fields_counts<'r, T: rkv::Readable>( pub fn all_documents_fields_counts<'txn>(
&self, &self,
reader: &'r T, reader: &'txn zlmdb::RoTxn,
) -> Result<AllDocumentsFieldsCountsIter<'r>, rkv::StoreError> ) -> ZResult<AllDocumentsFieldsCountsIter<'txn>>
{ {
let iter = self.documents_fields_counts.iter_start(reader)?; let iter = self.documents_fields_counts.iter(reader)?;
Ok(AllDocumentsFieldsCountsIter { iter }) Ok(AllDocumentsFieldsCountsIter { iter })
} }
} }
pub struct DocumentFieldsCountsIter<'r> { pub struct DocumentFieldsCountsIter<'txn> {
document_id: DocumentId, iter: zlmdb::RoRange<'txn, OwnedType<DocumentAttrKey>, OwnedType<u64>>,
iter: rkv::store::single::Iter<'r>,
} }
impl Iterator for DocumentFieldsCountsIter<'_> { impl Iterator for DocumentFieldsCountsIter<'_> {
type Item = Result<(SchemaAttr, u64), rkv::StoreError>; type Item = ZResult<(SchemaAttr, u64)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() { match self.iter.next() {
Some(Ok((key, Some(rkv::Value::U64(count))))) => { Some(Ok((key, count))) => {
let array = TryFrom::try_from(key).unwrap(); let attr = SchemaAttr(key.attr.get());
let (current_document_id, attr) = document_attribute_from_key(array);
if current_document_id != self.document_id { return None; }
Some(Ok((attr, count))) Some(Ok((attr, count)))
}, },
Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), Some(Err(e)) => Some(Err(e.into())),
Some(Err(e)) => Some(Err(e)),
None => None, None => None,
} }
} }
} }
pub struct DocumentsIdsIter<'r> { pub struct DocumentsIdsIter<'txn> {
last_seen_id: Option<DocumentId>, last_seen_id: Option<DocumentId>,
iter: rkv::store::single::Iter<'r>, iter: zlmdb::RoIter<'txn, OwnedType<DocumentAttrKey>, OwnedType<u64>>,
} }
impl Iterator for DocumentsIdsIter<'_> { impl Iterator for DocumentsIdsIter<'_> {
type Item = Result<DocumentId, rkv::StoreError>; type Item = ZResult<DocumentId>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
for result in &mut self.iter { for result in &mut self.iter {
match result { match result {
Ok((key, _)) => { Ok((key, _)) => {
let array = TryFrom::try_from(key).unwrap(); let document_id = DocumentId(key.docid.get());
let (document_id, _) = document_attribute_from_key(array);
if Some(document_id) != self.last_seen_id { if Some(document_id) != self.last_seen_id {
self.last_seen_id = Some(document_id); self.last_seen_id = Some(document_id);
return Some(Ok(document_id)) return Some(Ok(document_id))
} }
}, },
Err(e) => return Some(Err(e)), Err(e) => return Some(Err(e.into())),
} }
} }
None None
} }
} }
pub struct AllDocumentsFieldsCountsIter<'r> { pub struct AllDocumentsFieldsCountsIter<'txn> {
iter: rkv::store::single::Iter<'r>, iter: zlmdb::RoIter<'txn, OwnedType<DocumentAttrKey>, OwnedType<u64>>,
} }
impl<'r> Iterator for AllDocumentsFieldsCountsIter<'r> { impl<'r> Iterator for AllDocumentsFieldsCountsIter<'r> {
type Item = Result<(DocumentId, SchemaAttr, u64), rkv::StoreError>; type Item = ZResult<(DocumentId, SchemaAttr, u64)>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() { match self.iter.next() {
Some(Ok((key, Some(rkv::Value::U64(count))))) => { Some(Ok((key, count))) => {
let array = TryFrom::try_from(key).unwrap(); let docid = DocumentId(key.docid.get());
let (document_id, attr) = document_attribute_from_key(array); let attr = SchemaAttr(key.attr.get());
Some(Ok((document_id, attr, count))) Some(Ok((docid, attr, count)))
}, },
Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), Some(Err(e)) => Some(Err(e.into())),
Some(Err(e)) => Some(Err(e)),
None => None, None => None,
} }
} }

View File

@ -1,9 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use std::convert::TryInto;
use meilidb_schema::Schema; use meilidb_schema::Schema;
use rkv::Value; use zlmdb::types::{Str, OwnedType, ByteSlice, Serde};
use crate::{RankedMap, MResult}; use zlmdb::Result as ZResult;
use crate::RankedMap;
const CUSTOMS_KEY: &str = "customs-key"; const CUSTOMS_KEY: &str = "customs-key";
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
@ -14,155 +13,80 @@ const WORDS_KEY: &str = "words";
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct Main { pub struct Main {
pub(crate) main: rkv::SingleStore, pub(crate) main: zlmdb::DynDatabase,
} }
impl Main { impl Main {
pub fn put_words_fst( pub fn put_words_fst(&self, writer: &mut zlmdb::RwTxn, fst: &fst::Set) -> ZResult<()> {
&self, let bytes = fst.as_fst().as_bytes();
writer: &mut rkv::Writer, self.main.put::<Str, ByteSlice>(writer, WORDS_KEY, bytes)
fst: &fst::Set,
) -> Result<(), rkv::StoreError>
{
let blob = rkv::Value::Blob(fst.as_fst().as_bytes());
self.main.put(writer, WORDS_KEY, &blob)
} }
pub fn words_fst( pub fn words_fst(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<fst::Set>> {
&self, match self.main.get::<Str, ByteSlice>(reader, WORDS_KEY)? {
reader: &impl rkv::Readable, Some(bytes) => {
) -> MResult<Option<fst::Set>>
{
match self.main.get(reader, WORDS_KEY)? {
Some(Value::Blob(bytes)) => {
let len = bytes.len(); let len = bytes.len();
let bytes = Arc::from(bytes); let bytes = Arc::from(bytes);
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None), None => Ok(None),
} }
} }
pub fn put_schema( pub fn put_schema(&self, writer: &mut zlmdb::RwTxn, schema: &Schema) -> ZResult<()> {
&self, self.main.put::<Str, Serde<Schema>>(writer, SCHEMA_KEY, schema)
writer: &mut rkv::Writer,
schema: &Schema,
) -> MResult<()>
{
let bytes = bincode::serialize(schema)?;
let blob = Value::Blob(&bytes[..]);
self.main.put(writer, SCHEMA_KEY, &blob)?;
Ok(())
} }
pub fn schema( pub fn schema(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<Schema>> {
&self, self.main.get::<Str, Serde<Schema>>(reader, SCHEMA_KEY)
reader: &impl rkv::Readable,
) -> MResult<Option<Schema>>
{
match self.main.get(reader, SCHEMA_KEY)? {
Some(Value::Blob(bytes)) => {
let schema = bincode::deserialize_from(bytes)?;
Ok(Some(schema))
},
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
pub fn put_ranked_map( pub fn put_ranked_map(&self, writer: &mut zlmdb::RwTxn, ranked_map: &RankedMap) -> ZResult<()> {
&self, self.main.put::<Str, Serde<RankedMap>>(writer, RANKED_MAP_KEY, &ranked_map)
writer: &mut rkv::Writer,
ranked_map: &RankedMap,
) -> MResult<()>
{
let mut bytes = Vec::new();
ranked_map.write_to_bin(&mut bytes)?;
let blob = Value::Blob(&bytes[..]);
self.main.put(writer, RANKED_MAP_KEY, &blob)?;
Ok(())
} }
pub fn ranked_map( pub fn ranked_map(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<RankedMap>> {
&self, self.main.get::<Str, Serde<RankedMap>>(reader, RANKED_MAP_KEY)
reader: &impl rkv::Readable,
) -> MResult<Option<RankedMap>>
{
match self.main.get(reader, RANKED_MAP_KEY)? {
Some(Value::Blob(bytes)) => {
let ranked_map = RankedMap::read_from_bin(bytes)?;
Ok(Some(ranked_map))
},
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
pub fn put_synonyms_fst( pub fn put_synonyms_fst(&self, writer: &mut zlmdb::RwTxn, fst: &fst::Set) -> ZResult<()> {
&self, let bytes = fst.as_fst().as_bytes();
writer: &mut rkv::Writer, self.main.put::<Str, ByteSlice>(writer, SYNONYMS_KEY, bytes)
fst: &fst::Set,
) -> MResult<()>
{
let blob = rkv::Value::Blob(fst.as_fst().as_bytes());
Ok(self.main.put(writer, SYNONYMS_KEY, &blob)?)
} }
pub fn synonyms_fst( pub fn synonyms_fst(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<fst::Set>> {
&self, match self.main.get::<Str, ByteSlice>(reader, SYNONYMS_KEY)? {
reader: &impl rkv::Readable, Some(bytes) => {
) -> MResult<Option<fst::Set>>
{
match self.main.get(reader, SYNONYMS_KEY)? {
Some(Value::Blob(bytes)) => {
let len = bytes.len(); let len = bytes.len();
let bytes = Arc::from(bytes); let bytes = Arc::from(bytes);
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None), None => Ok(None),
} }
} }
pub fn put_number_of_documents<F: Fn(u64) -> u64>( pub fn put_number_of_documents<F>(&self, writer: &mut zlmdb::RwTxn, f: F) -> ZResult<u64>
&self, where F: Fn(u64) -> u64,
writer: &mut rkv::Writer,
f: F,
) -> Result<u64, rkv::StoreError>
{ {
let new = self.number_of_documents(writer).map(f)?; let new = self.number_of_documents(writer).map(f)?;
self.main.put(writer, NUMBER_OF_DOCUMENTS_KEY, &Value::Blob(&new.to_be_bytes()))?; self.main.put::<Str, OwnedType<u64>>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?;
Ok(new) Ok(new)
} }
pub fn number_of_documents( pub fn number_of_documents(&self, reader: &zlmdb::RwTxn) -> ZResult<u64> {
&self, match self.main.get::<Str, OwnedType<u64>>(reader, NUMBER_OF_DOCUMENTS_KEY)? {
reader: &impl rkv::Readable, Some(value) => Ok(value),
) -> Result<u64, rkv::StoreError>
{
match self.main.get(reader, NUMBER_OF_DOCUMENTS_KEY)? {
Some(Value::Blob(bytes)) => {
let array = bytes.try_into().unwrap();
Ok(u64::from_be_bytes(array))
},
Some(value) => panic!("invalid type {:?}", value),
None => Ok(0), None => Ok(0),
} }
} }
pub fn put_customs(&self, writer: &mut rkv::Writer, customs: &[u8]) -> MResult<()> { pub fn put_customs(&self, writer: &mut zlmdb::RwTxn, customs: &[u8]) -> ZResult<()> {
self.main.put(writer, CUSTOMS_KEY, &Value::Blob(customs))?; self.main.put::<Str, ByteSlice>(writer, CUSTOMS_KEY, customs)
Ok(())
} }
pub fn customs<'t>(&self, reader: &'t impl rkv::Readable) -> MResult<Option<&'t [u8]>> { pub fn customs<'txn>(&self, reader: &'txn zlmdb::RoTxn) -> ZResult<Option<&'txn [u8]>> {
match self.main.get(reader, CUSTOMS_KEY)? { self.main.get::<Str, ByteSlice>(reader, CUSTOMS_KEY)
Some(Value::Blob(bytes)) => Ok(Some(bytes)),
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
} }

View File

@ -17,42 +17,28 @@ pub use self::updates::Updates;
pub use self::updates_results::UpdatesResults; pub use self::updates_results::UpdatesResults;
use std::collections::HashSet; use std::collections::HashSet;
use std::convert::TryFrom;
use meilidb_schema::{Schema, SchemaAttr}; use meilidb_schema::{Schema, SchemaAttr};
use serde::de; use serde::de;
use zerocopy::{AsBytes, FromBytes};
use zlmdb::Result as ZResult;
use crate::criterion::Criteria; use crate::criterion::Criteria;
use crate::serde::Deserializer; use crate::serde::Deserializer;
use crate::{update, query_builder::QueryBuilder, DocumentId, MResult, Error}; use crate::{update, query_builder::QueryBuilder, DocumentId, MResult, Error};
fn aligned_to(bytes: &[u8], align: usize) -> bool { type BEU64 = zerocopy::U64<byteorder::BigEndian>;
(bytes as *const _ as *const () as usize) % align == 0 type BEU16 = zerocopy::U16<byteorder::BigEndian>;
#[derive(Debug, Copy, Clone)]
#[derive(AsBytes, FromBytes)]
#[repr(C)]
pub struct DocumentAttrKey { docid: BEU64, attr: BEU16 }
impl DocumentAttrKey {
fn new(docid: DocumentId, attr: SchemaAttr) -> DocumentAttrKey {
DocumentAttrKey { docid: BEU64::new(docid.0), attr: BEU16::new(attr.0) }
} }
fn document_attribute_into_key(document_id: DocumentId, attribute: SchemaAttr) -> [u8; 10] {
let document_id_bytes = document_id.0.to_be_bytes();
let attr_bytes = attribute.0.to_be_bytes();
let mut key = [0u8; 10];
key[0..8].copy_from_slice(&document_id_bytes);
key[8..10].copy_from_slice(&attr_bytes);
key
}
fn document_attribute_from_key(key: [u8; 10]) -> (DocumentId, SchemaAttr) {
let document_id = {
let array = TryFrom::try_from(&key[0..8]).unwrap();
DocumentId(u64::from_be_bytes(array))
};
let schema_attr = {
let array = TryFrom::try_from(&key[8..8+2]).unwrap();
SchemaAttr(u16::from_be_bytes(array))
};
(document_id, schema_attr)
} }
fn main_name(name: &str) -> String { fn main_name(name: &str) -> String {
@ -102,9 +88,9 @@ pub struct Index {
} }
impl Index { impl Index {
pub fn document<R: rkv::Readable, T: de::DeserializeOwned>( pub fn document<T: de::DeserializeOwned>(
&self, &self,
reader: &R, reader: &zlmdb::RoTxn,
attributes: Option<&HashSet<&str>>, attributes: Option<&HashSet<&str>>,
document_id: DocumentId, document_id: DocumentId,
) -> MResult<Option<T>> ) -> MResult<Option<T>>
@ -130,9 +116,9 @@ impl Index {
Ok(T::deserialize(&mut deserializer).map(Some)?) Ok(T::deserialize(&mut deserializer).map(Some)?)
} }
pub fn document_attribute<T: de::DeserializeOwned, R: rkv::Readable>( pub fn document_attribute<T: de::DeserializeOwned>(
&self, &self,
reader: &R, reader: &zlmdb::RoTxn,
document_id: DocumentId, document_id: DocumentId,
attribute: SchemaAttr, attribute: SchemaAttr,
) -> MResult<Option<T>> ) -> MResult<Option<T>>
@ -144,12 +130,12 @@ impl Index {
} }
} }
pub fn schema_update(&self, writer: &mut rkv::Writer, schema: Schema) -> MResult<u64> { pub fn schema_update(&self, writer: &mut zlmdb::RwTxn, schema: Schema) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
update::push_schema_update(writer, self.updates, self.updates_results, schema) update::push_schema_update(writer, self.updates, self.updates_results, schema)
} }
pub fn customs_update(&self, writer: &mut rkv::Writer, customs: Vec<u8>) -> MResult<u64> { pub fn customs_update(&self, writer: &mut zlmdb::RwTxn, customs: Vec<u8>) -> ZResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
update::push_customs_update(writer, self.updates, self.updates_results, customs) update::push_customs_update(writer, self.updates, self.updates_results, customs)
} }
@ -186,16 +172,16 @@ impl Index {
) )
} }
pub fn current_update_id<T: rkv::Readable>(&self, reader: &T) -> MResult<Option<u64>> { pub fn current_update_id(&self, reader: &zlmdb::RoTxn) -> MResult<Option<u64>> {
match self.updates.last_update_id(reader)? { match self.updates.last_update_id(reader)? {
Some((id, _)) => Ok(Some(id)), Some((id, _)) => Ok(Some(id)),
None => Ok(None), None => Ok(None),
} }
} }
pub fn update_status<T: rkv::Readable>( pub fn update_status(
&self, &self,
reader: &T, reader: &zlmdb::RoTxn,
update_id: u64, update_id: u64,
) -> MResult<update::UpdateStatus> ) -> MResult<update::UpdateStatus>
{ {
@ -228,31 +214,10 @@ impl Index {
} }
pub fn create( pub fn create(
env: &rkv::Rkv, env: &zlmdb::Env,
name: &str, name: &str,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: crossbeam_channel::Sender<()>,
) -> Result<Index, rkv::StoreError> ) -> MResult<Index>
{
open_options(env, name, rkv::StoreOptions::create(), updates_notifier)
}
pub fn open(
env: &rkv::Rkv,
name: &str,
updates_notifier: crossbeam_channel::Sender<()>,
) -> Result<Index, rkv::StoreError>
{
let mut options = rkv::StoreOptions::default();
options.create = false;
open_options(env, name, options, updates_notifier)
}
fn open_options(
env: &rkv::Rkv,
name: &str,
options: rkv::StoreOptions,
updates_notifier: crossbeam_channel::Sender<()>,
) -> Result<Index, rkv::StoreError>
{ {
// create all the store names // create all the store names
let main_name = main_name(name); let main_name = main_name(name);
@ -265,14 +230,14 @@ fn open_options(
let updates_results_name = updates_results_name(name); let updates_results_name = updates_results_name(name);
// open all the stores // open all the stores
let main = env.open_single(main_name.as_str(), options)?; let main = env.create_dyn_database(Some(&main_name))?;
let postings_lists = env.open_single(postings_lists_name.as_str(), options)?; let postings_lists = env.create_database(Some(&postings_lists_name))?;
let documents_fields = env.open_single(documents_fields_name.as_str(), options)?; let documents_fields = env.create_database(Some(&documents_fields_name))?;
let documents_fields_counts = env.open_single(documents_fields_counts_name.as_str(), options)?; let documents_fields_counts = env.create_database(Some(&documents_fields_counts_name))?;
let synonyms = env.open_single(synonyms_name.as_str(), options)?; let synonyms = env.create_database(Some(&synonyms_name))?;
let docs_words = env.open_single(docs_words_name.as_str(), options)?; let docs_words = env.create_database(Some(&docs_words_name))?;
let updates = env.open_single(updates_name.as_str(), options)?; let updates = env.create_database(Some(&updates_name))?;
let updates_results = env.open_single(updates_results_name.as_str(), options)?; let updates_results = env.create_database(Some(&updates_results_name))?;
Ok(Index { Ok(Index {
main: Main { main }, main: Main { main },
@ -286,3 +251,66 @@ fn open_options(
updates_notifier, updates_notifier,
}) })
} }
pub fn open(
env: &zlmdb::Env,
name: &str,
updates_notifier: crossbeam_channel::Sender<()>,
) -> MResult<Option<Index>>
{
// create all the store names
let main_name = main_name(name);
let postings_lists_name = postings_lists_name(name);
let documents_fields_name = documents_fields_name(name);
let documents_fields_counts_name = documents_fields_counts_name(name);
let synonyms_name = synonyms_name(name);
let docs_words_name = docs_words_name(name);
let updates_name = updates_name(name);
let updates_results_name = updates_results_name(name);
// open all the stores
let main = match env.open_dyn_database(Some(&main_name))? {
Some(main) => main,
None => return Ok(None),
};
let postings_lists = match env.open_database(Some(&postings_lists_name))? {
Some(postings_lists) => postings_lists,
None => return Ok(None),
};
let documents_fields = match env.open_database(Some(&documents_fields_name))? {
Some(documents_fields) => documents_fields,
None => return Ok(None),
};
let documents_fields_counts = match env.open_database(Some(&documents_fields_counts_name))? {
Some(documents_fields_counts) => documents_fields_counts,
None => return Ok(None),
};
let synonyms = match env.open_database(Some(&synonyms_name))? {
Some(synonyms) => synonyms,
None => return Ok(None),
};
let docs_words = match env.open_database(Some(&docs_words_name))? {
Some(docs_words) => docs_words,
None => return Ok(None),
};
let updates = match env.open_database(Some(&updates_name))? {
Some(updates) => updates,
None => return Ok(None),
};
let updates_results = match env.open_database(Some(&updates_results_name))? {
Some(updates_results) => updates_results,
None => return Ok(None),
};
Ok(Some(Index {
main: Main { main },
postings_lists: PostingsLists { postings_lists },
documents_fields: DocumentsFields { documents_fields },
documents_fields_counts: DocumentsFieldsCounts { documents_fields_counts },
synonyms: Synonyms { synonyms },
docs_words: DocsWords { docs_words },
updates: Updates { updates },
updates_results: UpdatesResults { updates_results },
updates_notifier,
}))
}

View File

@ -1,81 +1,39 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::{mem, ptr}; use sdset::{Set, SetBuf};
use zlmdb::types::{ByteSlice, CowSlice};
use zerocopy::{AsBytes, LayoutVerified}; use zlmdb::Result as ZResult;
use rkv::StoreError;
use crate::DocIndex; use crate::DocIndex;
use crate::store::aligned_to;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct PostingsLists { pub struct PostingsLists {
pub(crate) postings_lists: rkv::SingleStore, pub(crate) postings_lists: zlmdb::Database<ByteSlice, CowSlice<DocIndex>>,
} }
impl PostingsLists { impl PostingsLists {
pub fn put_postings_list( pub fn put_postings_list(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
word: &[u8], word: &[u8],
words_indexes: &[DocIndex], words_indexes: &Set<DocIndex>,
) -> Result<(), rkv::StoreError> ) -> ZResult<()>
{ {
let blob = rkv::Value::Blob(words_indexes.as_bytes()); self.postings_lists.put(writer, word, words_indexes)
self.postings_lists.put(writer, word, &blob)
} }
pub fn del_postings_list( pub fn del_postings_list(&self, writer: &mut zlmdb::RwTxn, word: &[u8]) -> ZResult<bool> {
self.postings_lists.delete(writer, word)
}
pub fn postings_list<'txn>(
&self, &self,
writer: &mut rkv::Writer, reader: &'txn zlmdb::RoTxn,
word: &[u8], word: &[u8],
) -> Result<bool, rkv::StoreError> ) -> ZResult<Option<Cow<'txn, Set<DocIndex>>>>
{ {
match self.postings_lists.delete(writer, word) { match self.postings_lists.get(reader, word)? {
Ok(()) => Ok(true), Some(Cow::Borrowed(slice)) => Ok(Some(Cow::Borrowed(Set::new_unchecked(slice)))),
Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false), Some(Cow::Owned(vec)) => Ok(Some(Cow::Owned(SetBuf::new_unchecked(vec)))),
Err(e) => Err(e), None => Ok(None),
}
}
pub fn postings_list<'a>(
&self,
reader: &'a impl rkv::Readable,
word: &[u8],
) -> Result<Option<Cow<'a, sdset::Set<DocIndex>>>, rkv::StoreError>
{
let bytes = match self.postings_lists.get(reader, word)? {
Some(rkv::Value::Blob(bytes)) => bytes,
Some(value) => panic!("invalid type {:?}", value),
None => return Ok(None),
};
match LayoutVerified::new_slice(bytes) {
Some(layout) => {
let set = sdset::Set::new(layout.into_slice()).unwrap();
Ok(Some(Cow::Borrowed(set)))
},
None => {
let len = bytes.len();
let elem_size = mem::size_of::<DocIndex>();
// ensure that it is the alignment that is wrong
// and the length is valid
if len % elem_size == 0 && !aligned_to(bytes, mem::align_of::<DocIndex>()) {
let elems = len / elem_size;
let mut vec = Vec::<DocIndex>::with_capacity(elems);
unsafe {
let dst = vec.as_mut_ptr() as *mut u8;
ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len);
vec.set_len(elems);
}
let setbuf = sdset::SetBuf::new(vec).unwrap();
return Ok(Some(Cow::Owned(setbuf)))
}
Ok(None)
},
} }
} }
} }

View File

@ -1,51 +1,36 @@
use std::sync::Arc; use std::sync::Arc;
use rkv::StoreError; use zlmdb::types::ByteSlice;
use crate::error::MResult; use zlmdb::Result as ZResult;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct Synonyms { pub struct Synonyms {
pub(crate) synonyms: rkv::SingleStore, pub(crate) synonyms: zlmdb::Database<ByteSlice, ByteSlice>,
} }
impl Synonyms { impl Synonyms {
pub fn put_synonyms( pub fn put_synonyms(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
word: &[u8], word: &[u8],
synonyms: &fst::Set, synonyms: &fst::Set,
) -> Result<(), rkv::StoreError> ) -> ZResult<()>
{ {
let blob = rkv::Value::Blob(synonyms.as_fst().as_bytes()); let bytes = synonyms.as_fst().as_bytes();
self.synonyms.put(writer, word, &blob) self.synonyms.put(writer, word, bytes)
} }
pub fn del_synonyms( pub fn del_synonyms(&self, writer: &mut zlmdb::RwTxn, word: &[u8]) -> ZResult<bool> {
&self, self.synonyms.delete(writer, word)
writer: &mut rkv::Writer,
word: &[u8],
) -> Result<bool, rkv::StoreError>
{
match self.synonyms.delete(writer, word) {
Ok(()) => Ok(true),
Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false),
Err(e) => Err(e),
}
} }
pub fn synonyms( pub fn synonyms(&self, reader: &zlmdb::RoTxn, word: &[u8]) -> ZResult<Option<fst::Set>> {
&self,
reader: &impl rkv::Readable,
word: &[u8],
) -> MResult<Option<fst::Set>>
{
match self.synonyms.get(reader, word)? { match self.synonyms.get(reader, word)? {
Some(rkv::Value::Blob(bytes)) => { Some(bytes) => {
let len = bytes.len(); let len = bytes.len();
let bytes = Arc::from(bytes); let bytes = Arc::from(bytes);
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(Some(fst::Set::from(fst))) Ok(Some(fst::Set::from(fst)))
}, },
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None), None => Ok(None),
} }
} }

View File

@ -1,100 +1,56 @@
use std::convert::TryInto; use zlmdb::types::{OwnedType, Serde};
use rkv::Value; use zlmdb::Result as ZResult;
use crate::{update::Update, MResult}; use crate::update::Update;
use super::BEU64;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct Updates { pub struct Updates {
pub(crate) updates: rkv::SingleStore, pub(crate) updates: zlmdb::Database<OwnedType<BEU64>, Serde<Update>>,
} }
impl Updates { impl Updates {
// TODO we should use the MDB_LAST op but // TODO do not trigger deserialize if possible
// it is not exposed by the rkv library pub fn last_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<(u64, Update)>> {
pub fn last_update_id<'a>( match self.updates.last(reader)? {
&self, Some((key, data)) => Ok(Some((key.get(), data))),
reader: &'a impl rkv::Readable, None => Ok(None),
) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError> }
{
let mut last = None;
let iter = self.updates.iter_start(reader)?;
for result in iter {
let (key, data) = result?;
last = Some((key, data));
} }
let (last_key, last_data) = match last { // TODO do not trigger deserialize if possible
Some(entry) => entry, fn first_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<(u64, Update)>> {
None => return Ok(None), match self.updates.first(reader)? {
}; Some((key, data)) => Ok(Some((key.get(), data))),
None => Ok(None),
let array = last_key.try_into().unwrap(); }
let number = u64::from_be_bytes(array);
Ok(Some((number, last_data)))
} }
fn first_update_id<'a>( // TODO do not trigger deserialize if possible
&self, pub fn contains(&self, reader: &zlmdb::RoTxn, update_id: u64) -> ZResult<bool> {
reader: &'a impl rkv::Readable, let update_id = BEU64::new(update_id);
) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError> self.updates.get(reader, &update_id).map(|v| v.is_some())
{
let mut iter = self.updates.iter_start(reader)?;
let (first_key, first_data) = match iter.next() {
Some(result) => result?,
None => return Ok(None),
};
let array = first_key.try_into().unwrap();
let number = u64::from_be_bytes(array);
Ok(Some((number, first_data)))
}
pub fn contains(
&self,
reader: &impl rkv::Readable,
update_id: u64,
) -> Result<bool, rkv::StoreError>
{
let update_id_bytes = update_id.to_be_bytes();
self.updates.get(reader, update_id_bytes).map(|v| v.is_some())
} }
pub fn put_update( pub fn put_update(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
update_id: u64, update_id: u64,
update: &Update, update: &Update,
) -> MResult<()> ) -> ZResult<()>
{ {
let update_id_bytes = update_id.to_be_bytes(); // TODO prefer using serde_json?
let update = serde_json::to_vec(&update)?; let update_id = BEU64::new(update_id);
let blob = Value::Blob(&update); self.updates.put(writer, &update_id, update)
self.updates.put(writer, update_id_bytes, &blob)?;
Ok(())
} }
pub fn pop_front( pub fn pop_front(&self, writer: &mut zlmdb::RwTxn) -> ZResult<Option<(u64, Update)>> {
&self, match self.first_update_id(writer)? {
writer: &mut rkv::Writer, Some((update_id, update)) => {
) -> MResult<Option<(u64, Update)>> let key = BEU64::new(update_id);
{ self.updates.delete(writer, &key)?;
let (first_id, first_data) = match self.first_update_id(writer)? { Ok(Some((update_id, update)))
Some(entry) => entry,
None => return Ok(None),
};
match first_data {
Some(Value::Blob(bytes)) => {
let update = serde_json::from_slice(&bytes)?;
// remove it from the database now
let first_id_bytes = first_id.to_be_bytes();
self.updates.delete(writer, first_id_bytes)?;
Ok(Some((first_id, update)))
}, },
Some(value) => panic!("invalid type {:?}", value), None => Ok(None)
None => Ok(None),
} }
} }
} }

View File

@ -1,67 +1,39 @@
use std::convert::TryInto; use zlmdb::types::{OwnedType, Serde};
use rkv::Value; use zlmdb::Result as ZResult;
use crate::{update::UpdateResult, MResult}; use crate::update::UpdateResult;
use super::BEU64;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct UpdatesResults { pub struct UpdatesResults {
pub(crate) updates_results: rkv::SingleStore, pub(crate) updates_results: zlmdb::Database<OwnedType<BEU64>, Serde<UpdateResult>>,
} }
impl UpdatesResults { impl UpdatesResults {
// TODO we should use the MDB_LAST op but pub fn last_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult<Option<(u64, UpdateResult)>> {
// it is not exposed by the rkv library match self.updates_results.last(reader)? {
pub fn last_update_id<'a>( Some((key, data)) => Ok(Some((key.get(), data))),
&self, None => Ok(None),
reader: &'a impl rkv::Readable,
) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError>
{
let mut last = None;
let iter = self.updates_results.iter_start(reader)?;
for result in iter {
let (key, data) = result?;
last = Some((key, data));
} }
let (last_key, last_data) = match last {
Some(entry) => entry,
None => return Ok(None),
};
let array = last_key.try_into().unwrap();
let number = u64::from_be_bytes(array);
Ok(Some((number, last_data)))
} }
pub fn put_update_result( pub fn put_update_result(
&self, &self,
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
update_id: u64, update_id: u64,
update_result: &UpdateResult, update_result: &UpdateResult,
) -> MResult<()> ) -> ZResult<()>
{ {
let update_id_bytes = update_id.to_be_bytes(); let update_id = BEU64::new(update_id);
let update_result = bincode::serialize(&update_result)?; self.updates_results.put(writer, &update_id, update_result)
let blob = Value::Blob(&update_result);
self.updates_results.put(writer, update_id_bytes, &blob)?;
Ok(())
} }
pub fn update_result( pub fn update_result(
&self, &self,
reader: &impl rkv::Readable, reader: &zlmdb::RoTxn,
update_id: u64, update_id: u64,
) -> MResult<Option<UpdateResult>> ) -> ZResult<Option<UpdateResult>>
{ {
let update_id_bytes = update_id.to_be_bytes(); let update_id = BEU64::new(update_id);
self.updates_results.get(reader, &update_id)
match self.updates_results.get(reader, update_id_bytes)? {
Some(Value::Blob(bytes)) => {
let update_result = bincode::deserialize(&bytes)?;
Ok(Some(update_result))
},
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
} }

View File

@ -1,21 +1,22 @@
use zlmdb::Result as ZResult;
use crate::update::{Update, next_update_id}; use crate::update::{Update, next_update_id};
use crate::{store, MResult}; use crate::store;
pub fn apply_customs_update( pub fn apply_customs_update(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
customs: &[u8], customs: &[u8],
) -> MResult<()> ) -> ZResult<()>
{ {
main_store.put_customs(writer, customs) main_store.put_customs(writer, customs)
} }
pub fn push_customs_update( pub fn push_customs_update(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
customs: Vec<u8>, customs: Vec<u8>,
) -> MResult<u64> ) -> ZResult<u64>
{ {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;

View File

@ -36,7 +36,7 @@ impl<D> DocumentsAddition<D> {
self.documents.push(document); self.documents.push(document);
} }
pub fn finalize(self, writer: &mut rkv::Writer) -> MResult<u64> pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult<u64>
where D: serde::Serialize where D: serde::Serialize
{ {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
@ -57,7 +57,7 @@ impl<D> Extend<D> for DocumentsAddition<D> {
} }
pub fn push_documents_addition<D: serde::Serialize>( pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
addition: Vec<D>, addition: Vec<D>,
@ -79,7 +79,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
} }
pub fn apply_documents_addition( pub fn apply_documents_addition(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
documents_fields_store: store::DocumentsFields, documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts, documents_fields_counts_store: store::DocumentsFieldsCounts,

View File

@ -49,7 +49,7 @@ impl DocumentsDeletion {
Ok(()) Ok(())
} }
pub fn finalize(self, writer: &mut rkv::Writer) -> MResult<u64> { pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
let update_id = push_documents_deletion( let update_id = push_documents_deletion(
writer, writer,
@ -68,7 +68,7 @@ impl Extend<DocumentId> for DocumentsDeletion {
} }
pub fn push_documents_deletion( pub fn push_documents_deletion(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>, deletion: Vec<DocumentId>,
@ -83,7 +83,7 @@ pub fn push_documents_deletion(
} }
pub fn apply_documents_deletion( pub fn apply_documents_deletion(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
documents_fields_store: store::DocumentsFields, documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts, documents_fields_counts_store: store::DocumentsFieldsCounts,

View File

@ -18,11 +18,12 @@ use std::cmp;
use log::debug; use log::debug;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use zlmdb::Result as ZResult;
use crate::{store, MResult, DocumentId, RankedMap}; use crate::{store, MResult, DocumentId, RankedMap};
use meilidb_schema::Schema; use meilidb_schema::Schema;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update { pub enum Update {
Schema(Schema), Schema(Schema),
Customs(Vec<u8>), Customs(Vec<u8>),
@ -62,8 +63,8 @@ pub enum UpdateStatus {
Unknown, Unknown,
} }
pub fn update_status<T: rkv::Readable>( pub fn update_status(
reader: &T, reader: &zlmdb::RoTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
update_id: u64, update_id: u64,
@ -82,10 +83,10 @@ pub fn update_status<T: rkv::Readable>(
} }
pub fn next_update_id( pub fn next_update_id(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
) -> MResult<u64> ) -> ZResult<u64>
{ {
let last_update_id = updates_store.last_update_id(writer)?; let last_update_id = updates_store.last_update_id(writer)?;
let last_update_id = last_update_id.map(|(n, _)| n); let last_update_id = last_update_id.map(|(n, _)| n);
@ -99,7 +100,7 @@ pub fn next_update_id(
Ok(new_update_id) Ok(new_update_id)
} }
pub fn update_task(writer: &mut rkv::Writer, index: store::Index) -> MResult<Option<UpdateResult>> { pub fn update_task(writer: &mut zlmdb::RwTxn, index: store::Index) -> MResult<Option<UpdateResult>> {
let (update_id, update) = match index.updates.pop_front(writer)? { let (update_id, update) = match index.updates.pop_front(writer)? {
Some(value) => value, Some(value) => value,
None => return Ok(None), None => return Ok(None),
@ -120,7 +121,7 @@ pub fn update_task(writer: &mut rkv::Writer, index: store::Index) -> MResult<Opt
let start = Instant::now(); let start = Instant::now();
let update_type = UpdateType::Customs; let update_type = UpdateType::Customs;
let result = apply_customs_update(writer, index.main, &customs); let result = apply_customs_update(writer, index.main, &customs).map_err(Into::into);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }

View File

@ -3,7 +3,7 @@ use crate::{store, error::UnsupportedOperation, MResult};
use crate::update::{Update, next_update_id}; use crate::update::{Update, next_update_id};
pub fn apply_schema_update( pub fn apply_schema_update(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
new_schema: &Schema, new_schema: &Schema,
) -> MResult<()> ) -> MResult<()>
@ -12,11 +12,11 @@ pub fn apply_schema_update(
return Err(UnsupportedOperation::SchemaAlreadyExists.into()) return Err(UnsupportedOperation::SchemaAlreadyExists.into())
} }
main_store.put_schema(writer, new_schema) main_store.put_schema(writer, new_schema).map_err(Into::into)
} }
pub fn push_schema_update( pub fn push_schema_update(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
schema: Schema, schema: Schema,

View File

@ -39,7 +39,7 @@ impl SynonymsAddition {
self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives);
} }
pub fn finalize(self, writer: &mut rkv::Writer) -> MResult<u64> { pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
let update_id = push_synonyms_addition( let update_id = push_synonyms_addition(
writer, writer,
@ -52,7 +52,7 @@ impl SynonymsAddition {
} }
pub fn push_synonyms_addition( pub fn push_synonyms_addition(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>, addition: BTreeMap<String, Vec<String>>,
@ -67,7 +67,7 @@ pub fn push_synonyms_addition(
} }
pub fn apply_synonyms_addition( pub fn apply_synonyms_addition(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
synonyms_store: store::Synonyms, synonyms_store: store::Synonyms,
addition: BTreeMap<String, Vec<String>>, addition: BTreeMap<String, Vec<String>>,

View File

@ -49,7 +49,7 @@ impl SynonymsDeletion {
} }
} }
pub fn finalize(self, writer: &mut rkv::Writer) -> MResult<u64> { pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(());
let update_id = push_synonyms_deletion( let update_id = push_synonyms_deletion(
writer, writer,
@ -62,7 +62,7 @@ impl SynonymsDeletion {
} }
pub fn push_synonyms_deletion( pub fn push_synonyms_deletion(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
deletion: BTreeMap<String, Option<Vec<String>>>, deletion: BTreeMap<String, Option<Vec<String>>>,
@ -77,7 +77,7 @@ pub fn push_synonyms_deletion(
} }
pub fn apply_synonyms_deletion( pub fn apply_synonyms_deletion(
writer: &mut rkv::Writer, writer: &mut zlmdb::RwTxn,
main_store: store::Main, main_store: store::Main,
synonyms_store: store::Synonyms, synonyms_store: store::Synonyms,
deletion: BTreeMap<String, Option<Vec<String>>>, deletion: BTreeMap<String, Option<Vec<String>>>,