chore: Rework the data module structures

being able to be constructed from SharedData
This commit is contained in:
Clément Renault 2018-12-31 18:33:59 +01:00
parent c022fa3fca
commit 64d53ee1bd
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
15 changed files with 292 additions and 262 deletions

View File

@ -59,10 +59,10 @@ fn index(schema: Schema, database_path: &Path, csv_data_path: &Path) -> Result<D
} }
}; };
update.update_document(&document).unwrap(); update.update_document(&document, &tokenizer_builder)?;
} }
let mut update = update.build()?; let update = update.build()?;
database.ingest_update_file(update)?; database.ingest_update_file(update)?;
Ok(database) Ok(database)

View File

@ -1,62 +1,53 @@
use std::io::{self, Cursor, BufRead};
use std::slice::from_raw_parts; use std::slice::from_raw_parts;
use std::sync::Arc; use std::mem::size_of;
use std::{io, mem};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use sdset::Set; use sdset::Set;
use crate::DocumentId; use crate::DocumentId;
use crate::data::SharedData; use crate::data::SharedData;
use super::into_u8_slice;
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct DocIds { pub struct DocIds(SharedData);
data: SharedData,
}
impl DocIds { impl DocIds {
pub fn empty() -> Self { pub fn new(ids: &Set<DocumentId>) -> DocIds {
DocIds { data: SharedData::empty() } let bytes = unsafe { into_u8_slice(ids.as_slice()) };
let data = SharedData::from_bytes(bytes.to_vec());
DocIds(data)
} }
pub fn from_bytes(vec: Vec<u8>) -> io::Result<Self> { pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIds> {
let len = vec.len(); let len = cursor.read_u64::<LittleEndian>()? as usize;
DocIds::from_shared_bytes(Arc::new(vec), 0, len) let offset = cursor.position() as usize;
} let doc_ids = cursor.get_ref().range(offset, len);
cursor.consume(len);
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> io::Result<Self> { Ok(DocIds(doc_ids))
let data = SharedData { bytes, offset, len };
DocIds::from_data(data)
}
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
fn from_data(data: SharedData) -> io::Result<Self> {
let len = data.as_ref().read_u64::<LittleEndian>()?;
let data = data.range(mem::size_of::<u64>(), len as usize);
Ok(DocIds { data })
}
pub fn from_raw(vec: Vec<DocumentId>) -> Self {
DocIds::from_bytes(unsafe { mem::transmute(vec) }).unwrap()
} }
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) { pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let len = self.data.len() as u64; let len = self.0.len() as u64;
bytes.write_u64::<LittleEndian>(len).unwrap(); bytes.write_u64::<LittleEndian>(len).unwrap();
bytes.extend_from_slice(&self.data); bytes.extend_from_slice(&self.0);
} }
pub fn contains(&self, doc: DocumentId) -> bool { pub fn is_empty(&self) -> bool {
// FIXME prefer using the sdset::exponential_search function self.0.is_empty()
self.doc_ids().binary_search(&doc).is_ok()
} }
pub fn doc_ids(&self) -> &Set<DocumentId> { pub fn as_bytes(&self) -> &[u8] {
let slice = &self.data; &self.0
}
}
impl AsRef<Set<DocumentId>> for DocIds {
fn as_ref(&self) -> &Set<DocumentId> {
let slice = &self.0;
let ptr = slice.as_ptr() as *const DocumentId; let ptr = slice.as_ptr() as *const DocumentId;
let len = slice.len() / mem::size_of::<DocumentId>(); let len = slice.len() / size_of::<DocumentId>();
let slice = unsafe { from_raw_parts(ptr, len) }; let slice = unsafe { from_raw_parts(ptr, len) };
Set::new_unchecked(slice) Set::new_unchecked(slice)
} }

View File

@ -1,5 +1,5 @@
use std::io::{self, Write, Cursor, BufRead};
use std::slice::from_raw_parts; use std::slice::from_raw_parts;
use std::io::{self, Write};
use std::mem::size_of; use std::mem::size_of;
use std::ops::Index; use std::ops::Index;
use std::sync::Arc; use std::sync::Arc;
@ -9,6 +9,7 @@ use sdset::Set;
use crate::DocIndex; use crate::DocIndex;
use crate::data::SharedData; use crate::data::SharedData;
use super::into_u8_slice;
#[derive(Debug)] #[derive(Debug)]
#[repr(C)] #[repr(C)]
@ -24,40 +25,36 @@ pub struct DocIndexes {
} }
impl DocIndexes { impl DocIndexes {
pub fn from_bytes(vec: Vec<u8>) -> io::Result<DocIndexes> { pub fn from_bytes(bytes: Vec<u8>) -> io::Result<DocIndexes> {
let len = vec.len(); let bytes = Arc::new(bytes);
DocIndexes::from_shared_bytes(Arc::new(vec), 0, len) let len = bytes.len();
let data = SharedData::new(bytes, 0, len);
let mut cursor = Cursor::new(data);
DocIndexes::from_cursor(&mut cursor)
} }
pub fn from_shared_bytes(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> io::Result<DocIndexes> { pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> io::Result<DocIndexes> {
let data = SharedData { bytes, offset, len }; let len = cursor.read_u64::<LittleEndian>()? as usize;
DocIndexes::from_data(data) let offset = cursor.position() as usize;
} let ranges = cursor.get_ref().range(offset, len);
cursor.consume(len);
fn from_data(data: SharedData) -> io::Result<DocIndexes> { let len = cursor.read_u64::<LittleEndian>()? as usize;
let ranges_len_offset = data.len() - size_of::<u64>(); let offset = cursor.position() as usize;
let ranges_len = (&data[ranges_len_offset..]).read_u64::<LittleEndian>()?; let indexes = cursor.get_ref().range(offset, len);
let ranges_len = ranges_len as usize; cursor.consume(len);
let ranges_offset = ranges_len_offset - ranges_len;
let ranges = data.range(ranges_offset, ranges_len);
let indexes = data.range(0, ranges_offset);
Ok(DocIndexes { ranges, indexes }) Ok(DocIndexes { ranges, indexes })
} }
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) { pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let ranges_len = self.ranges.len() as u64; let ranges_len = self.ranges.len() as u64;
let indexes_len = self.indexes.len() as u64;
let u64_size = size_of::<u64>() as u64;
let len = indexes_len + ranges_len + u64_size;
let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(&self.indexes);
bytes.extend_from_slice(&self.ranges);
let _ = bytes.write_u64::<LittleEndian>(ranges_len); let _ = bytes.write_u64::<LittleEndian>(ranges_len);
bytes.extend_from_slice(&self.ranges);
let indexes_len = self.indexes.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(indexes_len);
bytes.extend_from_slice(&self.indexes);
} }
pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> { pub fn get(&self, index: usize) -> Option<&Set<DocIndex>> {
@ -97,12 +94,17 @@ impl Index<usize> for DocIndexes {
pub struct DocIndexesBuilder<W> { pub struct DocIndexesBuilder<W> {
ranges: Vec<Range>, ranges: Vec<Range>,
indexes: Vec<DocIndex>,
wtr: W, wtr: W,
} }
impl DocIndexesBuilder<Vec<u8>> { impl DocIndexesBuilder<Vec<u8>> {
pub fn memory() -> Self { pub fn memory() -> Self {
DocIndexesBuilder::new(Vec::new()) DocIndexesBuilder {
ranges: Vec::new(),
indexes: Vec::new(),
wtr: Vec::new(),
}
} }
} }
@ -110,19 +112,18 @@ impl<W: Write> DocIndexesBuilder<W> {
pub fn new(wtr: W) -> Self { pub fn new(wtr: W) -> Self {
DocIndexesBuilder { DocIndexesBuilder {
ranges: Vec::new(), ranges: Vec::new(),
indexes: Vec::new(),
wtr: wtr, wtr: wtr,
} }
} }
pub fn insert(&mut self, indexes: &Set<DocIndex>) -> io::Result<()> { pub fn insert(&mut self, indexes: &Set<DocIndex>) {
let len = indexes.len() as u64; let len = indexes.len() as u64;
let start = self.ranges.last().map(|r| r.end).unwrap_or(0); let start = self.ranges.last().map(|r| r.end).unwrap_or(0);
let range = Range { start, end: start + len }; let range = Range { start, end: start + len };
self.ranges.push(range); self.ranges.push(range);
// write the values self.indexes.extend_from_slice(indexes);
let indexes = unsafe { into_u8_slice(indexes) };
self.wtr.write_all(indexes)
} }
pub fn finish(self) -> io::Result<()> { pub fn finish(self) -> io::Result<()> {
@ -130,24 +131,20 @@ impl<W: Write> DocIndexesBuilder<W> {
} }
pub fn into_inner(mut self) -> io::Result<W> { pub fn into_inner(mut self) -> io::Result<W> {
// write the ranges let ranges = unsafe { into_u8_slice(&self.ranges) };
let ranges = unsafe { into_u8_slice(self.ranges.as_slice()) };
self.wtr.write_all(ranges)?;
// write the length of the ranges
let len = ranges.len() as u64; let len = ranges.len() as u64;
self.wtr.write_u64::<LittleEndian>(len)?; self.wtr.write_u64::<LittleEndian>(len)?;
self.wtr.write_all(ranges)?;
let indexes = unsafe { into_u8_slice(&self.indexes) };
let len = indexes.len() as u64;
self.wtr.write_u64::<LittleEndian>(len)?;
self.wtr.write_all(indexes)?;
Ok(self.wtr) Ok(self.wtr)
} }
} }
unsafe fn into_u8_slice<T>(slice: &[T]) -> &[u8] {
let ptr = slice.as_ptr() as *const u8;
let len = slice.len() * size_of::<T>();
from_raw_parts(ptr, len)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -177,9 +174,9 @@ mod tests {
let mut builder = DocIndexesBuilder::memory(); let mut builder = DocIndexesBuilder::memory();
builder.insert(Set::new(&[a])?)?; builder.insert(Set::new(&[a])?);
builder.insert(Set::new(&[a, b, c])?)?; builder.insert(Set::new(&[a, b, c])?);
builder.insert(Set::new(&[a, c])?)?; builder.insert(Set::new(&[a, c])?);
let bytes = builder.into_inner()?; let bytes = builder.into_inner()?;
let docs = DocIndexes::from_bytes(bytes)?; let docs = DocIndexes::from_bytes(bytes)?;
@ -212,18 +209,17 @@ mod tests {
let mut builder = DocIndexesBuilder::memory(); let mut builder = DocIndexesBuilder::memory();
builder.insert(Set::new(&[a])?)?; builder.insert(Set::new(&[a])?);
builder.insert(Set::new(&[a, b, c])?)?; builder.insert(Set::new(&[a, b, c])?);
builder.insert(Set::new(&[a, c])?)?; builder.insert(Set::new(&[a, c])?);
let builder_bytes = builder.into_inner()?; let builder_bytes = builder.into_inner()?;
let docs = DocIndexes::from_bytes(builder_bytes.clone())?; let docs = DocIndexes::from_bytes(builder_bytes.clone())?;
let mut bytes = Vec::new(); let mut bytes = Vec::new();
docs.write_to_bytes(&mut bytes); docs.write_to_bytes(&mut bytes);
let len = size_of::<u64>();
assert_eq!(builder_bytes, &bytes[len..]); assert_eq!(builder_bytes, bytes);
Ok(()) Ok(())
} }

View File

@ -1,26 +1,30 @@
mod doc_ids; mod doc_ids;
mod doc_indexes; mod doc_indexes;
use std::slice::from_raw_parts;
use std::mem::size_of;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
pub use self::doc_ids::DocIds; pub use self::doc_ids::DocIds;
pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder}; pub use self::doc_indexes::{DocIndexes, DocIndexesBuilder};
#[derive(Clone)] #[derive(Default, Clone)]
struct SharedData { pub struct SharedData {
bytes: Arc<Vec<u8>>, pub bytes: Arc<Vec<u8>>,
offset: usize, pub offset: usize,
len: usize, pub len: usize,
} }
impl SharedData { impl SharedData {
pub fn empty() -> SharedData { pub fn from_bytes(vec: Vec<u8>) -> SharedData {
SharedData { let len = vec.len();
bytes: Arc::default(), let bytes = Arc::new(vec);
offset: 0, SharedData::new(bytes, 0, len)
len: 0, }
}
pub fn new(bytes: Arc<Vec<u8>>, offset: usize, len: usize) -> SharedData {
SharedData { bytes, offset, len }
} }
pub fn range(&self, offset: usize, len: usize) -> SharedData { pub fn range(&self, offset: usize, len: usize) -> SharedData {
@ -33,12 +37,6 @@ impl SharedData {
} }
} }
impl Default for SharedData {
fn default() -> SharedData {
SharedData::empty()
}
}
impl Deref for SharedData { impl Deref for SharedData {
type Target = [u8]; type Target = [u8];
@ -52,3 +50,9 @@ impl AsRef<[u8]> for SharedData {
&self.bytes[self.offset..self.offset + self.len] &self.bytes[self.offset..self.offset + self.len]
} }
} }
unsafe fn into_u8_slice<T: Sized>(slice: &[T]) -> &[u8] {
let ptr = slice.as_ptr() as *const u8;
let len = slice.len() * size_of::<T>();
from_raw_parts(ptr, len)
}

View File

@ -7,7 +7,7 @@ use rocksdb::rocksdb::{Writable, Snapshot};
use rocksdb::{DB, DBVector, MergeOperands}; use rocksdb::{DB, DBVector, MergeOperands};
use crossbeam::atomic::ArcCell; use crossbeam::atomic::ArcCell;
use crate::database::index::{self, Index, Positive}; use crate::database::index::Index;
use crate::database::{DatabaseView, Update, Schema}; use crate::database::{DatabaseView, Update, Schema};
use crate::database::{DATA_INDEX, DATA_SCHEMA}; use crate::database::{DATA_INDEX, DATA_SCHEMA};
@ -86,7 +86,7 @@ impl Database {
}; };
let path = update.path().to_string_lossy(); let path = update.path().to_string_lossy();
let mut options = IngestExternalFileOptions::new(); let options = IngestExternalFileOptions::new();
// options.move_files(move_update); // options.move_files(move_update);
let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); let cf_handle = db.cf_handle("default").expect("\"default\" column family not found");
@ -182,7 +182,6 @@ mod tests {
}; };
let database = Database::create(&rocksdb_path, schema.clone())?; let database = Database::create(&rocksdb_path, schema.clone())?;
let tokenizer_builder = DefaultBuilder::new();
let update_path = dir.path().join("update.sst"); let update_path = dir.path().join("update.sst");
@ -201,11 +200,12 @@ mod tests {
let docid0; let docid0;
let docid1; let docid1;
let mut update = { let update = {
let tokenizer_builder = DefaultBuilder::new();
let mut builder = UpdateBuilder::new(update_path, schema); let mut builder = UpdateBuilder::new(update_path, schema);
docid0 = builder.update_document(&doc0).unwrap(); docid0 = builder.update_document(&doc0, &tokenizer_builder)?;
docid1 = builder.update_document(&doc1).unwrap(); docid1 = builder.update_document(&doc1, &tokenizer_builder)?;
builder.build()? builder.build()?
}; };

View File

@ -4,18 +4,16 @@ mod positive;
pub(crate) use self::negative::Negative; pub(crate) use self::negative::Negative;
pub(crate) use self::positive::{Positive, PositiveBuilder}; pub(crate) use self::positive::{Positive, PositiveBuilder};
use std::sync::Arc;
use std::error::Error; use std::error::Error;
use std::io::{Cursor, BufRead}; use std::io::Cursor;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
use sdset::duo::DifferenceByKey; use sdset::duo::DifferenceByKey;
use sdset::{Set, SetOperation}; use sdset::{Set, SetOperation};
use fst::raw::Fst;
use fst::Map; use fst::Map;
use crate::data::{DocIds, DocIndexes}; use crate::data::{SharedData, DocIndexes};
#[derive(Default)] #[derive(Default)]
pub struct Index { pub struct Index {
@ -35,8 +33,11 @@ impl Index {
len: usize, len: usize,
) -> Result<Index, Box<Error>> ) -> Result<Index, Box<Error>>
{ {
let (negative, neg_offset) = Negative::from_shared_bytes(bytes.clone(), offset, len)?; let data = SharedData::new(bytes, offset, len);
let (positive, _) = Positive::from_shared_bytes(bytes, offset + neg_offset, len)?; let mut cursor = Cursor::new(data);
let negative = Negative::from_cursor(&mut cursor)?;
let positive = Positive::from_cursor(&mut cursor)?;
Ok(Index { negative, positive }) Ok(Index { negative, positive })
} }
@ -71,7 +72,7 @@ impl Index {
let (map, indexes) = builder.into_inner()?; let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?; let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?; let indexes = DocIndexes::from_bytes(indexes)?;
Positive { map, indexes } Positive::new(map, indexes)
}; };
let negative = Negative::default(); let negative = Negative::default();

View File

@ -1,46 +1,36 @@
use std::io::{Cursor, BufRead};
use std::error::Error; use std::error::Error;
use std::mem::size_of; use std::io::Cursor;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc;
use sdset::Set; use sdset::Set;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, WriteBytesExt};
use crate::data::SharedData;
use crate::data::DocIds; use crate::data::DocIds;
use crate::DocumentId; use crate::DocumentId;
#[derive(Default)] #[derive(Default)]
pub struct Negative { pub struct Negative(DocIds);
pub doc_ids: DocIds,
}
impl Negative { impl Negative {
pub fn from_shared_bytes( pub fn new(doc_ids: DocIds) -> Negative {
bytes: Arc<Vec<u8>>, Negative(doc_ids)
offset: usize, }
len: usize,
) -> Result<(Negative, usize), Box<Error>>
{
let mut cursor = Cursor::new(&bytes[..len]);
cursor.consume(offset);
let len = cursor.read_u64::<LittleEndian>()? as usize; pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Negative, Box<Error>> {
let offset = cursor.position() as usize; let doc_ids = DocIds::from_cursor(cursor)?;
let doc_ids = DocIds::from_shared_bytes(bytes, offset, len)?; Ok(Negative(doc_ids))
Ok((Negative { doc_ids }, offset + len))
} }
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) { pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
let slice = self.doc_ids.as_bytes(); let slice = self.0.as_bytes();
let len = slice.len() as u64; let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len); let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice); bytes.extend_from_slice(slice);
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.doc_ids.doc_ids().is_empty() self.0.is_empty()
} }
} }
@ -48,6 +38,6 @@ impl Deref for Negative {
type Target = Set<DocumentId>; type Target = Set<DocumentId>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
self.doc_ids.doc_ids() self.0.as_ref()
} }
} }

View File

@ -1,7 +1,5 @@
use std::io::{Write, BufRead, Cursor}; use std::io::{Write, BufRead, Cursor};
use std::mem::size_of;
use std::error::Error; use std::error::Error;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fst::{map, Map, Streamer, IntoStreamer}; use fst::{map, Map, Streamer, IntoStreamer};
@ -10,51 +8,51 @@ use sdset::duo::Union;
use fst::raw::Fst; use fst::raw::Fst;
use crate::data::{DocIndexes, DocIndexesBuilder}; use crate::data::{DocIndexes, DocIndexesBuilder};
use crate::data::SharedData;
use crate::DocIndex; use crate::DocIndex;
#[derive(Default)] #[derive(Default)]
pub struct Positive { pub struct Positive {
pub map: Map, map: Map,
pub indexes: DocIndexes, indexes: DocIndexes,
} }
impl Positive { impl Positive {
pub fn from_shared_bytes( pub fn new(map: Map, indexes: DocIndexes) -> Positive {
bytes: Arc<Vec<u8>>, Positive { map, indexes }
offset: usize, }
len: usize,
) -> Result<(Positive, usize), Box<Error>>
{
let mut cursor = Cursor::new(&bytes[..len]);
cursor.consume(offset);
let map_len = cursor.read_u64::<LittleEndian>()? as usize; pub fn from_cursor(cursor: &mut Cursor<SharedData>) -> Result<Positive, Box<Error>> {
let map_offset = cursor.position() as usize; let len = cursor.read_u64::<LittleEndian>()? as usize;
let fst = Fst::from_shared_bytes(bytes.clone(), map_offset, map_len)?; let offset = cursor.position() as usize;
let data = cursor.get_ref().range(offset, len);
let fst = Fst::from_shared_bytes(data.bytes, data.offset, data.len)?;
let map = Map::from(fst); let map = Map::from(fst);
cursor.consume(len);
cursor.consume(map_len); let indexes = DocIndexes::from_cursor(cursor)?;
let indexes_len = cursor.read_u64::<LittleEndian>()? as usize;
let indexes_offset = cursor.position() as usize;
let indexes = DocIndexes::from_shared_bytes(bytes, indexes_offset, indexes_len)?;
let positive = Positive { map, indexes }; Ok(Positive { map, indexes})
let len = indexes_offset + indexes_len;
Ok((positive, len))
} }
pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) { pub fn write_to_bytes(&self, bytes: &mut Vec<u8>) {
// indexes
let slice = self.map.as_fst().as_bytes(); let slice = self.map.as_fst().as_bytes();
let len = slice.len() as u64; let len = slice.len() as u64;
let _ = bytes.write_u64::<LittleEndian>(len); let _ = bytes.write_u64::<LittleEndian>(len);
bytes.extend_from_slice(slice); bytes.extend_from_slice(slice);
// map
self.indexes.write_to_bytes(bytes); self.indexes.write_to_bytes(bytes);
} }
pub fn map(&self) -> &Map {
&self.map
}
pub fn indexes(&self) -> &DocIndexes {
&self.indexes
}
pub fn union(&self, other: &Positive) -> Result<Positive, Box<Error>> { pub fn union(&self, other: &Positive) -> Result<Positive, Box<Error>> {
let mut builder = PositiveBuilder::memory(); let mut builder = PositiveBuilder::memory();
let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union(); let mut stream = map::OpBuilder::new().add(&self.map).add(&other.map).union();
@ -155,15 +153,11 @@ impl<W: Write, X: Write> PositiveBuilder<W, X> {
where K: AsRef<[u8]>, where K: AsRef<[u8]>,
{ {
self.map.insert(key, self.value)?; self.map.insert(key, self.value)?;
self.indexes.insert(indexes)?; self.indexes.insert(indexes);
self.value += 1; self.value += 1;
Ok(()) Ok(())
} }
pub fn finish(self) -> Result<(), Box<Error>> {
self.into_inner().map(drop)
}
pub fn into_inner(self) -> Result<(W, X), Box<Error>> { pub fn into_inner(self) -> Result<(W, X), Box<Error>> {
let map = self.map.into_inner()?; let map = self.map.into_inner()?;
let indexes = self.indexes.into_inner()?; let indexes = self.indexes.into_inner()?;

View File

@ -141,10 +141,12 @@ impl Schema {
attributes attributes
} }
pub fn document_id<T>(&self, document: &T) -> Result<DocumentId, Box<Error>> pub fn document_id<T>(&self, document: T) -> Result<DocumentId, SerializerError>
where T: Serialize, where T: Serialize,
{ {
unimplemented!() let id_attribute_name = &self.inner.identifier;
let serializer = FindDocumentIdSerializer { id_attribute_name };
document.serialize(serializer)
} }
pub fn props(&self, attr: SchemaAttr) -> SchemaProps { pub fn props(&self, attr: SchemaAttr) -> SchemaProps {

View File

@ -1,4 +1,4 @@
use crate::database::update::UnorderedPositiveBlobBuilder; use crate::database::update::RawUpdateBuilder;
use crate::database::schema::SchemaAttr; use crate::database::schema::SchemaAttr;
use crate::database::serde::SerializerError; use crate::database::serde::SerializerError;
use crate::tokenizer::TokenizerBuilder; use crate::tokenizer::TokenizerBuilder;
@ -10,7 +10,7 @@ use serde::ser;
pub struct IndexerSerializer<'a, B> { pub struct IndexerSerializer<'a, B> {
pub tokenizer_builder: &'a B, pub tokenizer_builder: &'a B,
pub builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>, pub builder: &'a mut RawUpdateBuilder,
pub document_id: DocumentId, pub document_id: DocumentId,
pub attribute: SchemaAttr, pub attribute: SchemaAttr,
} }
@ -72,10 +72,10 @@ where B: TokenizerBuilder
// and the unidecoded lowercased version // and the unidecoded lowercased version
let word_unidecoded = unidecode::unidecode(word).to_lowercase(); let word_unidecoded = unidecode::unidecode(word).to_lowercase();
if word_lower != word_unidecoded { if word_lower != word_unidecoded {
self.builder.insert(word_unidecoded, doc_index); self.builder.insert_doc_index(word_unidecoded.into_bytes(), doc_index);
} }
self.builder.insert(word_lower, doc_index); self.builder.insert_doc_index(word_lower.into_bytes(), doc_index);
} }
Ok(()) Ok(())
} }

View File

@ -1,24 +1,20 @@
use std::collections::BTreeMap;
use serde::Serialize; use serde::Serialize;
use serde::ser; use serde::ser;
use crate::database::serde::indexer_serializer::IndexerSerializer; use crate::database::serde::indexer_serializer::IndexerSerializer;
use crate::database::serde::key_to_string::KeyToStringSerializer; use crate::database::serde::key_to_string::KeyToStringSerializer;
use crate::database::update::UnorderedPositiveBlobBuilder;
use crate::database::document_key::DocumentKeyAttr; use crate::database::document_key::DocumentKeyAttr;
use crate::database::update::NewState; use crate::database::update::RawUpdateBuilder;
use crate::database::Schema;
use crate::database::serde::SerializerError; use crate::database::serde::SerializerError;
use crate::tokenizer::TokenizerBuilder; use crate::tokenizer::TokenizerBuilder;
use crate::database::schema::Schema;
use crate::DocumentId; use crate::DocumentId;
pub struct Serializer<'a, B> { pub struct Serializer<'a, B> {
pub schema: &'a Schema, pub schema: &'a Schema,
pub tokenizer_builder: &'a B,
pub document_id: DocumentId, pub document_id: DocumentId,
pub builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>, pub tokenizer_builder: &'a B,
pub new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>, pub builder: &'a mut RawUpdateBuilder,
} }
impl<'a, B> ser::Serializer for Serializer<'a, B> impl<'a, B> ser::Serializer for Serializer<'a, B>
@ -145,7 +141,6 @@ where B: TokenizerBuilder
document_id: self.document_id, document_id: self.document_id,
current_key_name: None, current_key_name: None,
builder: self.builder, builder: self.builder,
new_states: self.new_states,
}) })
} }
@ -160,7 +155,6 @@ where B: TokenizerBuilder
tokenizer_builder: self.tokenizer_builder, tokenizer_builder: self.tokenizer_builder,
document_id: self.document_id, document_id: self.document_id,
builder: self.builder, builder: self.builder,
new_states: self.new_states,
}) })
} }
@ -181,8 +175,7 @@ pub struct MapSerializer<'a, B> {
pub tokenizer_builder: &'a B, pub tokenizer_builder: &'a B,
pub document_id: DocumentId, pub document_id: DocumentId,
pub current_key_name: Option<String>, pub current_key_name: Option<String>,
pub builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>, pub builder: &'a mut RawUpdateBuilder,
pub new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>,
} }
impl<'a, B> ser::SerializeMap for MapSerializer<'a, B> impl<'a, B> ser::SerializeMap for MapSerializer<'a, B>
@ -220,7 +213,7 @@ where B: TokenizerBuilder
if props.is_stored() { if props.is_stored() {
let value = bincode::serialize(value).unwrap(); let value = bincode::serialize(value).unwrap();
let key = DocumentKeyAttr::new(self.document_id, attr); let key = DocumentKeyAttr::new(self.document_id, attr);
self.new_states.insert(key, NewState::Updated { value }); self.builder.insert_attribute_value(key, value);
} }
if props.is_indexed() { if props.is_indexed() {
let serializer = IndexerSerializer { let serializer = IndexerSerializer {
@ -243,10 +236,9 @@ where B: TokenizerBuilder
pub struct StructSerializer<'a, B> { pub struct StructSerializer<'a, B> {
pub schema: &'a Schema, pub schema: &'a Schema,
pub tokenizer_builder: &'a B,
pub document_id: DocumentId, pub document_id: DocumentId,
pub builder: &'a mut UnorderedPositiveBlobBuilder<Vec<u8>, Vec<u8>>, pub tokenizer_builder: &'a B,
pub new_states: &'a mut BTreeMap<DocumentKeyAttr, NewState>, pub builder: &'a mut RawUpdateBuilder,
} }
impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B> impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B>
@ -267,7 +259,7 @@ where B: TokenizerBuilder
if props.is_stored() { if props.is_stored() {
let value = bincode::serialize(value).unwrap(); let value = bincode::serialize(value).unwrap();
let key = DocumentKeyAttr::new(self.document_id, attr); let key = DocumentKeyAttr::new(self.document_id, attr);
self.new_states.insert(key, NewState::Updated { value }); self.builder.insert_attribute_value(key, value);
} }
if props.is_indexed() { if props.is_indexed() {
let serializer = IndexerSerializer { let serializer = IndexerSerializer {

View File

@ -1,95 +1,60 @@
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf; use std::path::PathBuf;
use std::error::Error; use std::error::Error;
use fst::map::{Map, MapBuilder};
use rocksdb::rocksdb_options;
use serde::Serialize; use serde::Serialize;
use sdset::Set;
use crate::database::index::{Index, Positive, PositiveBuilder, Negative}; use crate::database::serde::serializer::Serializer;
use crate::database::{DATA_INDEX, Schema, DocumentKeyAttr}; use crate::database::serde::SerializerError;
use crate::data::{DocIds, DocIndexes}; use crate::tokenizer::TokenizerBuilder;
use crate::{DocumentId, DocIndex}; use crate::database::Schema;
use super::Update;
type Token = Vec<u8>; // TODO could be replaced by a SmallVec use crate::DocumentId;
type Value = Vec<u8>; use super::{Update, RawUpdateBuilder};
pub struct UpdateBuilder { pub struct UpdateBuilder {
sst_file: PathBuf,
schema: Schema, schema: Schema,
removed_documents: BTreeSet<DocumentId>, raw_builder: RawUpdateBuilder,
words_indexes: BTreeMap<Token, Vec<DocIndex>>,
keys_values: BTreeMap<DocumentKeyAttr, Value>,
} }
impl UpdateBuilder { impl UpdateBuilder {
pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder { pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder {
UpdateBuilder { UpdateBuilder {
sst_file: path,
schema: schema, schema: schema,
removed_documents: BTreeSet::new(), raw_builder: RawUpdateBuilder::new(path),
words_indexes: BTreeMap::new(),
keys_values: BTreeMap::new(),
} }
} }
pub fn update_document<T>(&mut self, document: T) -> Result<DocumentId, Box<Error>> pub fn update_document<T, B>(
&mut self,
document: T,
tokenizer_builder: &B,
) -> Result<DocumentId, SerializerError>
where T: Serialize, where T: Serialize,
B: TokenizerBuilder,
{ {
unimplemented!() let document_id = self.schema.document_id(&document)?;
let serializer = Serializer {
schema: &self.schema,
document_id: document_id,
tokenizer_builder: tokenizer_builder,
builder: &mut self.raw_builder,
};
document.serialize(serializer)?;
Ok(document_id)
} }
pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, Box<Error>> pub fn remove_document<T>(&mut self, document: T) -> Result<DocumentId, SerializerError>
where T: Serialize, where T: Serialize,
{ {
unimplemented!() let document_id = self.schema.document_id(&document)?;
self.raw_builder.remove_document(document_id);
Ok(document_id)
} }
pub fn build(self) -> Result<Update, Box<Error>> { pub fn build(self) -> Result<Update, Box<Error>> {
let tree = { self.raw_builder.build()
let negative = {
let documents_ids = self.removed_documents.into_iter().collect();
let doc_ids = DocIds::from_raw(documents_ids);
Negative { doc_ids }
};
let positive = {
let mut builder = PositiveBuilder::memory();
for (key, mut indexes) in self.words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
builder.insert(key, indexes);
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive { map, indexes }
};
Index { negative, positive }
};
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.sst_file.to_string_lossy())?;
// write the data-index
let mut bytes = Vec::new();
tree.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents attributes updates
for (key, value) in self.keys_values {
file_writer.put(key.as_ref(), &value)?;
}
file_writer.finish()?;
Ok(Update { sst_file: self.sst_file })
} }
} }

View File

@ -1,8 +1,10 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
mod builder; mod builder;
mod raw_builder;
pub use self::builder::UpdateBuilder; pub use self::builder::UpdateBuilder;
pub use self::raw_builder::RawUpdateBuilder;
pub struct Update { pub struct Update {
sst_file: PathBuf, sst_file: PathBuf,

View File

@ -0,0 +1,93 @@
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::error::Error;
use rocksdb::rocksdb_options;
use fst::map::Map;
use sdset::Set;
use crate::database::index::{Index, Positive, PositiveBuilder, Negative};
use crate::database::{DATA_INDEX, DocumentKeyAttr};
use crate::data::{DocIds, DocIndexes};
use crate::{DocumentId, DocIndex};
use super::Update;
type Token = Vec<u8>; // TODO could be replaced by a SmallVec
type Value = Vec<u8>;
pub struct RawUpdateBuilder {
sst_file: PathBuf,
removed_documents: BTreeSet<DocumentId>,
words_indexes: BTreeMap<Token, Vec<DocIndex>>,
keys_values: BTreeMap<DocumentKeyAttr, Value>,
}
impl RawUpdateBuilder {
pub fn new(path: PathBuf) -> RawUpdateBuilder {
RawUpdateBuilder {
sst_file: path,
removed_documents: BTreeSet::new(),
words_indexes: BTreeMap::new(),
keys_values: BTreeMap::new(),
}
}
pub fn insert_doc_index(&mut self, token: Vec<u8>, doc_index: DocIndex) {
self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index)
}
pub fn insert_attribute_value(&mut self, key_attr: DocumentKeyAttr, value: Vec<u8>) -> Option<Vec<u8>> {
self.keys_values.insert(key_attr, value)
}
pub fn remove_document(&mut self, id: DocumentId) {
self.removed_documents.insert(id);
}
pub fn build(self) -> Result<Update, Box<Error>> {
let tree = {
let negative = {
let documents_ids: Vec<_> = self.removed_documents.into_iter().collect();
let documents_ids = Set::new_unchecked(&documents_ids);
let doc_ids = DocIds::new(documents_ids);
Negative::new(doc_ids)
};
let positive = {
let mut builder = PositiveBuilder::memory();
for (key, mut indexes) in self.words_indexes {
indexes.sort_unstable();
let indexes = Set::new_unchecked(&indexes);
builder.insert(key, indexes)?;
}
let (map, indexes) = builder.into_inner()?;
let map = Map::from_bytes(map)?;
let indexes = DocIndexes::from_bytes(indexes)?;
Positive::new(map, indexes)
};
Index { negative, positive }
};
let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.sst_file.to_string_lossy())?;
// write the data-index
let mut bytes = Vec::new();
tree.write_to_bytes(&mut bytes);
file_writer.merge(DATA_INDEX, &bytes)?;
// write all the documents attributes updates
for (key, value) in self.keys_values {
file_writer.put(key.as_ref(), &value)?;
}
file_writer.finish()?;
Ok(Update { sst_file: self.sst_file })
}
}

View File

@ -86,7 +86,7 @@ where D: Deref<Target=DB>,
let mut stream = { let mut stream = {
let mut op_builder = fst::map::OpBuilder::new(); let mut op_builder = fst::map::OpBuilder::new();
for automaton in &automatons { for automaton in &automatons {
let stream = self.view.index().positive.map.search(automaton); let stream = self.view.index().positive.map().search(automaton);
op_builder.push(stream); op_builder.push(stream);
} }
op_builder.union() op_builder.union()
@ -100,7 +100,7 @@ where D: Deref<Target=DB>,
let distance = automaton.eval(input).to_u8(); let distance = automaton.eval(input).to_u8();
let is_exact = distance == 0 && input.len() == automaton.query_len(); let is_exact = distance == 0 && input.len() == automaton.query_len();
let doc_indexes = &self.view.index().positive.indexes; let doc_indexes = &self.view.index().positive.indexes();
let doc_indexes = &doc_indexes[iv.value as usize]; let doc_indexes = &doc_indexes[iv.value as usize];
for doc_index in doc_indexes { for doc_index in doc_indexes {