From ec5d17e8c2661613d23ffa07fd58dc1d78c05aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Nov 2018 16:28:10 +0100 Subject: [PATCH] feat: Implement the PositiveUpdate --- src/index/mod.rs | 11 ++++-- src/index/update/negative_update.rs | 57 ++++++++++++++++++----------- src/index/update/positive_update.rs | 1 - src/lib.rs | 2 - 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/src/index/mod.rs b/src/index/mod.rs index 3a96d83e0..ea4d13294 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -14,8 +14,11 @@ use crate::index::update::Update; use crate::rank::QueryBuilder; use crate::blob::{self, Blob}; +const DATA_INDEX: &[u8] = b"data-index"; +const DATA_SCHEMA: &[u8] = b"data-schema"; + fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { - if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") } + if key != DATA_INDEX { panic!("The merge operator only supports \"data-index\" merging") } let capacity = { let remaining = operands.size_hint().0; @@ -65,7 +68,7 @@ impl Index { let mut schema_bytes = Vec::new(); schema.write_to(&mut schema_bytes)?; - database.put(b"data-schema", &schema_bytes)?; + database.put(DATA_SCHEMA, &schema_bytes)?; Ok(Self { database }) } @@ -81,7 +84,7 @@ impl Index { let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; - let _schema = match database.get(b"data-schema")? { + let _schema = match database.get(DATA_SCHEMA)? { Some(value) => Schema::read_from(&*value)?, None => return Err(String::from("Database does not contain a schema").into()), }; @@ -103,7 +106,7 @@ impl Index { } pub fn schema(&self) -> Result> { - let bytes = self.database.get(b"data-schema")?.expect("data-schema entry not found"); + let bytes = self.database.get(DATA_SCHEMA)?.expect("data-schema entry not found"); Ok(Schema::read_from(&*bytes).expect("Invalid schema")) } diff --git a/src/index/update/negative_update.rs b/src/index/update/negative_update.rs index dc2ea5d7e..6c82919b5 100644 --- a/src/index/update/negative_update.rs +++ b/src/index/update/negative_update.rs @@ -1,12 +1,27 @@ use std::path::PathBuf; use std::error::Error; +use std::io::{Cursor, Write}; +use byteorder::{NetworkEndian, WriteBytesExt}; use ::rocksdb::rocksdb_options; +use crate::data::{DocIds, DocIdsBuilder}; +use crate::blob::{Blob, NegativeBlob}; use crate::index::update::Update; -use crate::data::DocIdsBuilder; +use crate::index::DATA_INDEX; use crate::DocumentId; +const DOC_KEY_LEN: usize = 4 + std::mem::size_of::(); + +// "doc-ID_8_BYTES" +fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] { + let mut key = [0; DOC_KEY_LEN]; + let mut rdr = Cursor::new(&mut key[..]); + rdr.write_all(b"doc-").unwrap(); + rdr.write_u64::(id).unwrap(); + key +} + pub struct NegativeUpdateBuilder { path: PathBuf, doc_ids: DocIdsBuilder>, @@ -30,29 +45,27 @@ impl NegativeUpdateBuilder { let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); file_writer.open(&self.path.to_string_lossy())?; - // // write the doc ids - // let blob_key = Identifier::blob(blob_info.name).document_ids().build(); - // let blob_doc_ids = self.doc_ids.into_inner()?; - // file_writer.put(&blob_key, &blob_doc_ids)?; + // write the data-index aka negative blob + let bytes = self.doc_ids.into_inner()?; + let doc_ids = DocIds::from_bytes(bytes)?; + let blob = Blob::Negative(NegativeBlob::from_raw(doc_ids)); + let bytes = bincode::serialize(&blob)?; + file_writer.merge(DATA_INDEX, &bytes); - // { - // // write the blob name to be merged - // let mut buffer = Vec::new(); - // blob_info.write_into(&mut buffer); - // let data_key = Identifier::data().blobs_order().build(); - // file_writer.merge(&data_key, &buffer)?; - // } + // FIXME remove this ugly thing ! + // let Blob::Negative(negative_blob) = blob; + let negative_blob = match blob { + Blob::Negative(blob) => blob, + Blob::Positive(_) => unreachable!(), + }; - // let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?; - // for id in blob_doc_ids.doc_ids().iter().cloned() { - // let start = Identifier::document(id).build(); - // let end = Identifier::document(id + 1).build(); - // file_writer.delete_range(&start, &end)?; - // } + for &document_id in negative_blob.as_ref() { + let start = raw_document_key(document_id); + let end = raw_document_key(document_id + 1); + file_writer.delete_range(&start, &end)?; + } - // file_writer.finish()?; - // Update::open(self.path) - - unimplemented!() + file_writer.finish()?; + Update::open(self.path) } } diff --git a/src/index/update/positive_update.rs b/src/index/update/positive_update.rs index b2b219e55..1e6d38316 100644 --- a/src/index/update/positive_update.rs +++ b/src/index/update/positive_update.rs @@ -52,7 +52,6 @@ where B: TokenizerBuilder let env_options = rocksdb_options::EnvOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&self.path.to_string_lossy())?; // let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); diff --git a/src/lib.rs b/src/lib.rs index ca416204a..96a3f5d2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(range_contains)] - #[macro_use] extern crate lazy_static; #[macro_use] extern crate serde_derive;