diff --git a/examples/create-database.rs b/examples/create-database.rs index 5c6e81d1d..07ffeb931 100644 --- a/examples/create-database.rs +++ b/examples/create-database.rs @@ -59,10 +59,10 @@ fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result Self { - DocIds { data: SharedData::empty() } + pub fn new(ids: &Set) -> DocIds { + let bytes = unsafe { into_u8_slice(ids.as_slice()) }; + let data = SharedData::from_bytes(bytes.to_vec()); + DocIds(data) } - pub fn from_bytes(vec: Vec) -> io::Result { - let len = vec.len(); - DocIds::from_shared_bytes(Arc::new(vec), 0, len) - } + pub fn from_cursor(cursor: &mut Cursor) -> io::Result { + let len = cursor.read_u64::()? as usize; + let offset = cursor.position() as usize; + let doc_ids = cursor.get_ref().range(offset, len); + cursor.consume(len); - pub fn from_shared_bytes(bytes: Arc>, offset: usize, len: usize) -> io::Result { - let data = SharedData { bytes, offset, len }; - DocIds::from_data(data) - } - - pub fn as_bytes(&self) -> &[u8] { - &self.data - } - - fn from_data(data: SharedData) -> io::Result { - let len = data.as_ref().read_u64::()?; - let data = data.range(mem::size_of::(), len as usize); - Ok(DocIds { data }) - } - - pub fn from_raw(vec: Vec) -> Self { - DocIds::from_bytes(unsafe { mem::transmute(vec) }).unwrap() + Ok(DocIds(doc_ids)) } pub fn write_to_bytes(&self, bytes: &mut Vec) { - let len = self.data.len() as u64; + let len = self.0.len() as u64; bytes.write_u64::(len).unwrap(); - bytes.extend_from_slice(&self.data); + bytes.extend_from_slice(&self.0); } - pub fn contains(&self, doc: DocumentId) -> bool { - // FIXME prefer using the sdset::exponential_search function - self.doc_ids().binary_search(&doc).is_ok() + pub fn is_empty(&self) -> bool { + self.0.is_empty() } - pub fn doc_ids(&self) -> &Set { - let slice = &self.data; + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } +} + +impl AsRef> for DocIds { + fn as_ref(&self) -> &Set { + let slice = &self.0; let ptr = slice.as_ptr() as *const DocumentId; - let len = slice.len() / mem::size_of::(); + let len = slice.len() / size_of::(); let slice = unsafe { from_raw_parts(ptr, len) }; Set::new_unchecked(slice) } diff --git a/src/data/doc_indexes.rs b/src/data/doc_indexes.rs index 21627cb0d..b760765bf 100644 --- a/src/data/doc_indexes.rs +++ b/src/data/doc_indexes.rs @@ -1,5 +1,5 @@ +use std::io::{self, Write, Cursor, BufRead}; use std::slice::from_raw_parts; -use std::io::{self, Write}; use std::mem::size_of; use std::ops::Index; use std::sync::Arc; @@ -9,6 +9,7 @@ use sdset::Set; use crate::DocIndex; use crate::data::SharedData; +use super::into_u8_slice; #[derive(Debug)] #[repr(C)] @@ -24,40 +25,36 @@ pub struct DocIndexes { } impl DocIndexes { - pub fn from_bytes(vec: Vec) -> io::Result { - let len = vec.len(); - DocIndexes::from_shared_bytes(Arc::new(vec), 0, len) + pub fn from_bytes(bytes: Vec) -> io::Result { + let bytes = Arc::new(bytes); + let len = bytes.len(); + let data = SharedData::new(bytes, 0, len); + let mut cursor = Cursor::new(data); + DocIndexes::from_cursor(&mut cursor) } - pub fn from_shared_bytes(bytes: Arc>, offset: usize, len: usize) -> io::Result { - let data = SharedData { bytes, offset, len }; - DocIndexes::from_data(data) - } + pub fn from_cursor(cursor: &mut Cursor) -> io::Result { + let len = cursor.read_u64::()? as usize; + let offset = cursor.position() as usize; + let ranges = cursor.get_ref().range(offset, len); + cursor.consume(len); - fn from_data(data: SharedData) -> io::Result { - let ranges_len_offset = data.len() - size_of::(); - let ranges_len = (&data[ranges_len_offset..]).read_u64::()?; - let ranges_len = ranges_len as usize; - - let ranges_offset = ranges_len_offset - ranges_len; - let ranges = data.range(ranges_offset, ranges_len); - - let indexes = data.range(0, ranges_offset); + let len = cursor.read_u64::()? as usize; + let offset = cursor.position() as usize; + let indexes = cursor.get_ref().range(offset, len); + cursor.consume(len); Ok(DocIndexes { ranges, indexes }) } pub fn write_to_bytes(&self, bytes: &mut Vec) { let ranges_len = self.ranges.len() as u64; - let indexes_len = self.indexes.len() as u64; - let u64_size = size_of::() as u64; - let len = indexes_len + ranges_len + u64_size; - - let _ = bytes.write_u64::(len); - - bytes.extend_from_slice(&self.indexes); - bytes.extend_from_slice(&self.ranges); let _ = bytes.write_u64::(ranges_len); + bytes.extend_from_slice(&self.ranges); + + let indexes_len = self.indexes.len() as u64; + let _ = bytes.write_u64::(indexes_len); + bytes.extend_from_slice(&self.indexes); } pub fn get(&self, index: usize) -> Option<&Set> { @@ -97,12 +94,17 @@ impl Index for DocIndexes { pub struct DocIndexesBuilder { ranges: Vec, + indexes: Vec, wtr: W, } impl DocIndexesBuilder> { pub fn memory() -> Self { - DocIndexesBuilder::new(Vec::new()) + DocIndexesBuilder { + ranges: Vec::new(), + indexes: Vec::new(), + wtr: Vec::new(), + } } } @@ -110,19 +112,18 @@ impl DocIndexesBuilder { pub fn new(wtr: W) -> Self { DocIndexesBuilder { ranges: Vec::new(), + indexes: Vec::new(), wtr: wtr, } } - pub fn insert(&mut self, indexes: &Set) -> io::Result<()> { + pub fn insert(&mut self, indexes: &Set) { let len = indexes.len() as u64; let start = self.ranges.last().map(|r| r.end).unwrap_or(0); let range = Range { start, end: start + len }; self.ranges.push(range); - // write the values - let indexes = unsafe { into_u8_slice(indexes) }; - self.wtr.write_all(indexes) + self.indexes.extend_from_slice(indexes); } pub fn finish(self) -> io::Result<()> { @@ -130,24 +131,20 @@ impl DocIndexesBuilder { } pub fn into_inner(mut self) -> io::Result { - // write the ranges - let ranges = unsafe { into_u8_slice(self.ranges.as_slice()) }; - self.wtr.write_all(ranges)?; - - // write the length of the ranges + let ranges = unsafe { into_u8_slice(&self.ranges) }; let len = ranges.len() as u64; self.wtr.write_u64::(len)?; + self.wtr.write_all(ranges)?; + + let indexes = unsafe { into_u8_slice(&self.indexes) }; + let len = indexes.len() as u64; + self.wtr.write_u64::(len)?; + self.wtr.write_all(indexes)?; Ok(self.wtr) } } -unsafe fn into_u8_slice(slice: &[T]) -> &[u8] { - let ptr = slice.as_ptr() as *const u8; - let len = slice.len() * size_of::(); - from_raw_parts(ptr, len) -} - #[cfg(test)] mod tests { use super::*; @@ -177,9 +174,9 @@ mod tests { let mut builder = DocIndexesBuilder::memory(); - builder.insert(Set::new(&[a])?)?; - builder.insert(Set::new(&[a, b, c])?)?; - builder.insert(Set::new(&[a, c])?)?; + builder.insert(Set::new(&[a])?); + builder.insert(Set::new(&[a, b, c])?); + builder.insert(Set::new(&[a, c])?); let bytes = builder.into_inner()?; let docs = DocIndexes::from_bytes(bytes)?; @@ -212,18 +209,17 @@ mod tests { let mut builder = DocIndexesBuilder::memory(); - builder.insert(Set::new(&[a])?)?; - builder.insert(Set::new(&[a, b, c])?)?; - builder.insert(Set::new(&[a, c])?)?; + builder.insert(Set::new(&[a])?); + builder.insert(Set::new(&[a, b, c])?); + builder.insert(Set::new(&[a, c])?); let builder_bytes = builder.into_inner()?; let docs = DocIndexes::from_bytes(builder_bytes.clone())?; let mut bytes = Vec::new(); docs.write_to_bytes(&mut bytes); - let len = size_of::(); - assert_eq!(builder_bytes, &bytes[len..]); + assert_eq!(builder_bytes, bytes); Ok(()) } diff --git a/src/data/mod.rs b/src/data/mod.rs index 69888dfcf..0e0b0e2c4 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,26 +1,30 @@ mod doc_ids; mod doc_indexes; +use std::slice::from_raw_parts; +use std::mem::size_of; use std::ops::Deref; use std::sync::Arc; pub use self::doc_ids::DocIds; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder}; -#[derive(Clone)] -struct SharedData { - bytes: Arc>, - offset: usize, - len: usize, +#[derive(Default, Clone)] +pub struct SharedData { + pub bytes: Arc>, + pub offset: usize, + pub len: usize, } impl SharedData { - pub fn empty() -> SharedData { - SharedData { - bytes: Arc::default(), - offset: 0, - len: 0, - } + pub fn from_bytes(vec: Vec) -> SharedData { + let len = vec.len(); + let bytes = Arc::new(vec); + SharedData::new(bytes, 0, len) + } + + pub fn new(bytes: Arc>, offset: usize, len: usize) -> SharedData { + SharedData { bytes, offset, len } } pub fn range(&self, offset: usize, len: usize) -> SharedData { @@ -33,12 +37,6 @@ impl SharedData { } } -impl Default for SharedData { - fn default() -> SharedData { - SharedData::empty() - } -} - impl Deref for SharedData { type Target = [u8]; @@ -52,3 +50,9 @@ impl AsRef<[u8]> for SharedData { &self.bytes[self.offset..self.offset + self.len] } } + +unsafe fn into_u8_slice(slice: &[T]) -> &[u8] { + let ptr = slice.as_ptr() as *const u8; + let len = slice.len() * size_of::(); + from_raw_parts(ptr, len) +} diff --git a/src/database/database.rs b/src/database/database.rs index 9b3d76f15..225059300 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -7,7 +7,7 @@ use rocksdb::rocksdb::{Writable, Snapshot}; use rocksdb::{DB, DBVector, MergeOperands}; use crossbeam::atomic::ArcCell; -use crate::database::index::{self, Index, Positive}; +use crate::database::index::Index; use crate::database::{DatabaseView, Update, Schema}; use crate::database::{DATA_INDEX, DATA_SCHEMA}; @@ -86,7 +86,7 @@ impl Database { }; let path = update.path().to_string_lossy(); - let mut options = IngestExternalFileOptions::new(); + let options = IngestExternalFileOptions::new(); // options.move_files(move_update); let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); @@ -182,7 +182,6 @@ mod tests { }; let database = Database::create(&rocksdb_path, schema.clone())?; - let tokenizer_builder = DefaultBuilder::new(); let update_path = dir.path().join("update.sst"); @@ -201,11 +200,12 @@ mod tests { let docid0; let docid1; - let mut update = { + let update = { + let tokenizer_builder = DefaultBuilder::new(); let mut builder = UpdateBuilder::new(update_path, schema); - docid0 = builder.update_document(&doc0).unwrap(); - docid1 = builder.update_document(&doc1).unwrap(); + docid0 = builder.update_document(&doc0, &tokenizer_builder)?; + docid1 = builder.update_document(&doc1, &tokenizer_builder)?; builder.build()? }; diff --git a/src/database/index/mod.rs b/src/database/index/mod.rs index 0098c5fd2..f9964f1f5 100644 --- a/src/database/index/mod.rs +++ b/src/database/index/mod.rs @@ -4,18 +4,16 @@ mod positive; pub(crate) use self::negative::Negative; pub(crate) use self::positive::{Positive, PositiveBuilder}; -use std::sync::Arc; use std::error::Error; -use std::io::{Cursor, BufRead}; +use std::io::Cursor; +use std::sync::Arc; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use fst::{IntoStreamer, Streamer}; use sdset::duo::DifferenceByKey; use sdset::{Set, SetOperation}; -use fst::raw::Fst; use fst::Map; -use crate::data::{DocIds, DocIndexes}; +use crate::data::{SharedData, DocIndexes}; #[derive(Default)] pub struct Index { @@ -35,8 +33,11 @@ impl Index { len: usize, ) -> Result> { - let (negative, neg_offset) = Negative::from_shared_bytes(bytes.clone(), offset, len)?; - let (positive, _) = Positive::from_shared_bytes(bytes, offset + neg_offset, len)?; + let data = SharedData::new(bytes, offset, len); + let mut cursor = Cursor::new(data); + + let negative = Negative::from_cursor(&mut cursor)?; + let positive = Positive::from_cursor(&mut cursor)?; Ok(Index { negative, positive }) } @@ -71,7 +72,7 @@ impl Index { let (map, indexes) = builder.into_inner()?; let map = Map::from_bytes(map)?; let indexes = DocIndexes::from_bytes(indexes)?; - Positive { map, indexes } + Positive::new(map, indexes) }; let negative = Negative::default(); diff --git a/src/database/index/negative.rs b/src/database/index/negative.rs index e9c30abfc..822c99d20 100644 --- a/src/database/index/negative.rs +++ b/src/database/index/negative.rs @@ -1,46 +1,36 @@ -use std::io::{Cursor, BufRead}; use std::error::Error; -use std::mem::size_of; +use std::io::Cursor; use std::ops::Deref; -use std::sync::Arc; use sdset::Set; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use byteorder::{LittleEndian, WriteBytesExt}; +use crate::data::SharedData; use crate::data::DocIds; use crate::DocumentId; #[derive(Default)] -pub struct Negative { - pub doc_ids: DocIds, -} +pub struct Negative(DocIds); impl Negative { - pub fn from_shared_bytes( - bytes: Arc>, - offset: usize, - len: usize, - ) -> Result<(Negative, usize), Box> - { - let mut cursor = Cursor::new(&bytes[..len]); - cursor.consume(offset); + pub fn new(doc_ids: DocIds) -> Negative { + Negative(doc_ids) + } - let len = cursor.read_u64::()? as usize; - let offset = cursor.position() as usize; - let doc_ids = DocIds::from_shared_bytes(bytes, offset, len)?; - - Ok((Negative { doc_ids }, offset + len)) + pub fn from_cursor(cursor: &mut Cursor) -> Result> { + let doc_ids = DocIds::from_cursor(cursor)?; + Ok(Negative(doc_ids)) } pub fn write_to_bytes(&self, bytes: &mut Vec) { - let slice = self.doc_ids.as_bytes(); + let slice = self.0.as_bytes(); let len = slice.len() as u64; let _ = bytes.write_u64::(len); bytes.extend_from_slice(slice); } pub fn is_empty(&self) -> bool { - self.doc_ids.doc_ids().is_empty() + self.0.is_empty() } } @@ -48,6 +38,6 @@ impl Deref for Negative { type Target = Set; fn deref(&self) -> &Self::Target { - self.doc_ids.doc_ids() + self.0.as_ref() } } diff --git a/src/database/index/positive.rs b/src/database/index/positive.rs index f72cb94de..d6c3bf3d5 100644 --- a/src/database/index/positive.rs +++ b/src/database/index/positive.rs @@ -1,7 +1,5 @@ use std::io::{Write, BufRead, Cursor}; -use std::mem::size_of; use std::error::Error; -use std::sync::Arc; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use fst::{map, Map, Streamer, IntoStreamer}; @@ -10,51 +8,51 @@ use sdset::duo::Union; use fst::raw::Fst; use crate::data::{DocIndexes, DocIndexesBuilder}; +use crate::data::SharedData; use crate::DocIndex; #[derive(Default)] pub struct Positive { - pub map: Map, - pub indexes: DocIndexes, + map: Map, + indexes: DocIndexes, } impl Positive { - pub fn from_shared_bytes( - bytes: Arc>, - offset: usize, - len: usize, - ) -> Result<(Positive, usize), Box> - { - let mut cursor = Cursor::new(&bytes[..len]); - cursor.consume(offset); + pub fn new(map: Map, indexes: DocIndexes) -> Positive { + Positive { map, indexes } + } - let map_len = cursor.read_u64::()? as usize; - let map_offset = cursor.position() as usize; - let fst = Fst::from_shared_bytes(bytes.clone(), map_offset, map_len)?; + pub fn from_cursor(cursor: &mut Cursor) -> Result> { + let len = cursor.read_u64::()? as usize; + let offset = cursor.position() as usize; + let data = cursor.get_ref().range(offset, len); + + let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?; let map = Map::from(fst); + cursor.consume(len); - cursor.consume(map_len); - let indexes_len = cursor.read_u64::()? as usize; - let indexes_offset = cursor.position() as usize; - let indexes = DocIndexes::from_shared_bytes(bytes, indexes_offset, indexes_len)?; + let indexes = DocIndexes::from_cursor(cursor)?; - let positive = Positive { map, indexes }; - let len = indexes_offset + indexes_len; - - Ok((positive, len)) + Ok(Positive { map, indexes}) } pub fn write_to_bytes(&self, bytes: &mut Vec) { - // indexes let slice = self.map.as_fst().as_bytes(); let len = slice.len() as u64; let _ = bytes.write_u64::(len); bytes.extend_from_slice(slice); - // map self.indexes.write_to_bytes(bytes); } + pub fn map(&self) -> &Map { + &self.map + } + + pub fn indexes(&self) -> &DocIndexes { + &self.indexes + } + pub fn union(&self, other: &Positive) -> Result> { let mut builder = PositiveBuilder::memory(); let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union(); @@ -155,15 +153,11 @@ impl PositiveBuilder { where K: AsRef<[u8]>, { self.map.insert(key, self.value)?; - self.indexes.insert(indexes)?; + self.indexes.insert(indexes); self.value += 1; Ok(()) } - pub fn finish(self) -> Result<(), Box> { - self.into_inner().map(drop) - } - pub fn into_inner(self) -> Result<(W, X), Box> { let map = self.map.into_inner()?; let indexes = self.indexes.into_inner()?; diff --git a/src/database/schema.rs b/src/database/schema.rs index 5f622e003..60a258824 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -141,10 +141,12 @@ impl Schema { attributes } - pub fn document_id(&self, document: &T) -> Result> + pub fn document_id(&self, document: T) -> Result where T: Serialize, { - unimplemented!() + let id_attribute_name = &self.inner.identifier; + let serializer = FindDocumentIdSerializer { id_attribute_name }; + document.serialize(serializer) } pub fn props(&self, attr: SchemaAttr) -> SchemaProps { diff --git a/src/database/serde/indexer_serializer.rs b/src/database/serde/indexer_serializer.rs index bfb0118ed..0bfb6e44a 100644 --- a/src/database/serde/indexer_serializer.rs +++ b/src/database/serde/indexer_serializer.rs @@ -1,4 +1,4 @@ -use crate::database::update::UnorderedPositiveBlobBuilder; +use crate::database::update::RawUpdateBuilder; use crate::database::schema::SchemaAttr; use crate::database::serde::SerializerError; use crate::tokenizer::TokenizerBuilder; @@ -10,7 +10,7 @@ use serde::ser; pub struct IndexerSerializer<'a, B> { pub tokenizer_builder: &'a B, - pub builder: &'a mut UnorderedPositiveBlobBuilder, Vec>, + pub builder: &'a mut RawUpdateBuilder, pub document_id: DocumentId, pub attribute: SchemaAttr, } @@ -72,10 +72,10 @@ where B: TokenizerBuilder // and the unidecoded lowercased version let word_unidecoded = unidecode::unidecode(word).to_lowercase(); if word_lower != word_unidecoded { - self.builder.insert(word_unidecoded, doc_index); + self.builder.insert_doc_index(word_unidecoded.into_bytes(), doc_index); } - self.builder.insert(word_lower, doc_index); + self.builder.insert_doc_index(word_lower.into_bytes(), doc_index); } Ok(()) } diff --git a/src/database/serde/serializer.rs b/src/database/serde/serializer.rs index d2faed2db..48c58fd0d 100644 --- a/src/database/serde/serializer.rs +++ b/src/database/serde/serializer.rs @@ -1,24 +1,20 @@ -use std::collections::BTreeMap; - use serde::Serialize; use serde::ser; use crate::database::serde::indexer_serializer::IndexerSerializer; use crate::database::serde::key_to_string::KeyToStringSerializer; -use crate::database::update::UnorderedPositiveBlobBuilder; use crate::database::document_key::DocumentKeyAttr; -use crate::database::update::NewState; -use crate::database::Schema; +use crate::database::update::RawUpdateBuilder; use crate::database::serde::SerializerError; use crate::tokenizer::TokenizerBuilder; +use crate::database::schema::Schema; use crate::DocumentId; pub struct Serializer<'a, B> { pub schema: &'a Schema, - pub tokenizer_builder: &'a B, pub document_id: DocumentId, - pub builder: &'a mut UnorderedPositiveBlobBuilder, Vec>, - pub new_states: &'a mut BTreeMap, + pub tokenizer_builder: &'a B, + pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::Serializer for Serializer<'a, B> @@ -145,7 +141,6 @@ where B: TokenizerBuilder document_id: self.document_id, current_key_name: None, builder: self.builder, - new_states: self.new_states, }) } @@ -160,7 +155,6 @@ where B: TokenizerBuilder tokenizer_builder: self.tokenizer_builder, document_id: self.document_id, builder: self.builder, - new_states: self.new_states, }) } @@ -181,8 +175,7 @@ pub struct MapSerializer<'a, B> { pub tokenizer_builder: &'a B, pub document_id: DocumentId, pub current_key_name: Option, - pub builder: &'a mut UnorderedPositiveBlobBuilder, Vec>, - pub new_states: &'a mut BTreeMap, + pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::SerializeMap for MapSerializer<'a, B> @@ -220,7 +213,7 @@ where B: TokenizerBuilder if props.is_stored() { let value = bincode::serialize(value).unwrap(); let key = DocumentKeyAttr::new(self.document_id, attr); - self.new_states.insert(key, NewState::Updated { value }); + self.builder.insert_attribute_value(key, value); } if props.is_indexed() { let serializer = IndexerSerializer { @@ -243,10 +236,9 @@ where B: TokenizerBuilder pub struct StructSerializer<'a, B> { pub schema: &'a Schema, - pub tokenizer_builder: &'a B, pub document_id: DocumentId, - pub builder: &'a mut UnorderedPositiveBlobBuilder, Vec>, - pub new_states: &'a mut BTreeMap, + pub tokenizer_builder: &'a B, + pub builder: &'a mut RawUpdateBuilder, } impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B> @@ -267,7 +259,7 @@ where B: TokenizerBuilder if props.is_stored() { let value = bincode::serialize(value).unwrap(); let key = DocumentKeyAttr::new(self.document_id, attr); - self.new_states.insert(key, NewState::Updated { value }); + self.builder.insert_attribute_value(key, value); } if props.is_indexed() { let serializer = IndexerSerializer { diff --git a/src/database/update/builder.rs b/src/database/update/builder.rs index c3bdf59fc..344eb84e4 100644 --- a/src/database/update/builder.rs +++ b/src/database/update/builder.rs @@ -1,95 +1,60 @@ -use std::collections::{BTreeMap, BTreeSet}; use std::path::PathBuf; use std::error::Error; -use fst::map::{Map, MapBuilder}; -use rocksdb::rocksdb_options; use serde::Serialize; -use sdset::Set; -use crate::database::index::{Index, Positive, PositiveBuilder, Negative}; -use crate::database::{DATA_INDEX, Schema, DocumentKeyAttr}; -use crate::data::{DocIds, DocIndexes}; -use crate::{DocumentId, DocIndex}; -use super::Update; +use crate::database::serde::serializer::Serializer; +use crate::database::serde::SerializerError; +use crate::tokenizer::TokenizerBuilder; +use crate::database::Schema; -type Token = Vec; // TODO could be replaced by a SmallVec -type Value = Vec; +use crate::DocumentId; +use super::{Update, RawUpdateBuilder}; pub struct UpdateBuilder { - sst_file: PathBuf, schema: Schema, - removed_documents: BTreeSet, - words_indexes: BTreeMap>, - keys_values: BTreeMap, + raw_builder: RawUpdateBuilder, } impl UpdateBuilder { pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder { UpdateBuilder { - sst_file: path, schema: schema, - removed_documents: BTreeSet::new(), - words_indexes: BTreeMap::new(), - keys_values: BTreeMap::new(), + raw_builder: RawUpdateBuilder::new(path), } } - pub fn update_document(&mut self, document: T) -> Result> + pub fn update_document( + &mut self, + document: T, + tokenizer_builder: &B, + ) -> Result where T: Serialize, + B: TokenizerBuilder, { - unimplemented!() + let document_id = self.schema.document_id(&document)?; + + let serializer = Serializer { + schema: &self.schema, + document_id: document_id, + tokenizer_builder: tokenizer_builder, + builder: &mut self.raw_builder, + }; + + document.serialize(serializer)?; + + Ok(document_id) } - pub fn remove_document(&mut self, document: T) -> Result> + pub fn remove_document(&mut self, document: T) -> Result where T: Serialize, { - unimplemented!() + let document_id = self.schema.document_id(&document)?; + self.raw_builder.remove_document(document_id); + Ok(document_id) } pub fn build(self) -> Result> { - let tree = { - let negative = { - let documents_ids = self.removed_documents.into_iter().collect(); - let doc_ids = DocIds::from_raw(documents_ids); - Negative { doc_ids } - }; - - let positive = { - let mut builder = PositiveBuilder::memory(); - - for (key, mut indexes) in self.words_indexes { - indexes.sort_unstable(); - let indexes = Set::new_unchecked(&indexes); - builder.insert(key, indexes); - } - - let (map, indexes) = builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - Positive { map, indexes } - }; - - Index { negative, positive } - }; - - let env_options = rocksdb_options::EnvOptions::new(); - let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); - let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&self.sst_file.to_string_lossy())?; - - // write the data-index - let mut bytes = Vec::new(); - tree.write_to_bytes(&mut bytes); - file_writer.merge(DATA_INDEX, &bytes)?; - - // write all the documents attributes updates - for (key, value) in self.keys_values { - file_writer.put(key.as_ref(), &value)?; - } - - file_writer.finish()?; - - Ok(Update { sst_file: self.sst_file }) + self.raw_builder.build() } } diff --git a/src/database/update/mod.rs b/src/database/update/mod.rs index 7bdda9949..3e3eb8cca 100644 --- a/src/database/update/mod.rs +++ b/src/database/update/mod.rs @@ -1,8 +1,10 @@ use std::path::{Path, PathBuf}; mod builder; +mod raw_builder; pub use self::builder::UpdateBuilder; +pub use self::raw_builder::RawUpdateBuilder; pub struct Update { sst_file: PathBuf, diff --git a/src/database/update/raw_builder.rs b/src/database/update/raw_builder.rs new file mode 100644 index 000000000..e7e65a5fc --- /dev/null +++ b/src/database/update/raw_builder.rs @@ -0,0 +1,93 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::path::PathBuf; +use std::error::Error; + +use rocksdb::rocksdb_options; +use fst::map::Map; +use sdset::Set; + +use crate::database::index::{Index, Positive, PositiveBuilder, Negative}; +use crate::database::{DATA_INDEX, DocumentKeyAttr}; +use crate::data::{DocIds, DocIndexes}; +use crate::{DocumentId, DocIndex}; +use super::Update; + +type Token = Vec; // TODO could be replaced by a SmallVec +type Value = Vec; + +pub struct RawUpdateBuilder { + sst_file: PathBuf, + removed_documents: BTreeSet, + words_indexes: BTreeMap>, + keys_values: BTreeMap, +} + +impl RawUpdateBuilder { + pub fn new(path: PathBuf) -> RawUpdateBuilder { + RawUpdateBuilder { + sst_file: path, + removed_documents: BTreeSet::new(), + words_indexes: BTreeMap::new(), + keys_values: BTreeMap::new(), + } + } + + pub fn insert_doc_index(&mut self, token: Vec, doc_index: DocIndex) { + self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index) + } + + pub fn insert_attribute_value(&mut self, key_attr: DocumentKeyAttr, value: Vec) -> Option> { + self.keys_values.insert(key_attr, value) + } + + pub fn remove_document(&mut self, id: DocumentId) { + self.removed_documents.insert(id); + } + + pub fn build(self) -> Result> { + let tree = { + let negative = { + let documents_ids: Vec<_> = self.removed_documents.into_iter().collect(); + let documents_ids = Set::new_unchecked(&documents_ids); + let doc_ids = DocIds::new(documents_ids); + Negative::new(doc_ids) + }; + + let positive = { + let mut builder = PositiveBuilder::memory(); + + for (key, mut indexes) in self.words_indexes { + indexes.sort_unstable(); + let indexes = Set::new_unchecked(&indexes); + builder.insert(key, indexes)?; + } + + let (map, indexes) = builder.into_inner()?; + let map = Map::from_bytes(map)?; + let indexes = DocIndexes::from_bytes(indexes)?; + Positive::new(map, indexes) + }; + + Index { negative, positive } + }; + + let env_options = rocksdb_options::EnvOptions::new(); + let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); + let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); + file_writer.open(&self.sst_file.to_string_lossy())?; + + // write the data-index + let mut bytes = Vec::new(); + tree.write_to_bytes(&mut bytes); + file_writer.merge(DATA_INDEX, &bytes)?; + + // write all the documents attributes updates + for (key, value) in self.keys_values { + file_writer.put(key.as_ref(), &value)?; + } + + file_writer.finish()?; + + Ok(Update { sst_file: self.sst_file }) + } +} diff --git a/src/rank/query_builder.rs b/src/rank/query_builder.rs index a5bbdf885..5e4ee0d11 100644 --- a/src/rank/query_builder.rs +++ b/src/rank/query_builder.rs @@ -86,7 +86,7 @@ where D: Deref, let mut stream = { let mut op_builder = fst::map::OpBuilder::new(); for automaton in &automatons { - let stream = self.view.index().positive.map.search(automaton); + let stream = self.view.index().positive.map().search(automaton); op_builder.push(stream); } op_builder.union() @@ -100,7 +100,7 @@ where D: Deref, let distance = automaton.eval(input).to_u8(); let is_exact = distance == 0 && input.len() == automaton.query_len(); - let doc_indexes = &self.view.index().positive.indexes; + let doc_indexes = &self.view.index().positive.indexes(); let doc_indexes = &doc_indexes[iv.value as usize]; for doc_index in doc_indexes {