From 3668627e031c13c5946d888ec9f33371555fa4ee Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 26 May 2020 17:41:44 +0200 Subject: [PATCH] Use zerocopy without bitpacking as a first step --- src/main.rs | 59 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index e4ce5aeca..2d3cda946 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,9 @@ use rayon::prelude::*; use sdset::{SetOperation, SetBuf}; use slice_group_by::StrGroupBy; use structopt::StructOpt; +use zerocopy::{LayoutVerified, AsBytes}; -use self::codec::CodecBitPacker4xSorted; +// use self::codec::CodecBitPacker4xSorted; use self::bp_vec::BpVec; pub type FastMap4 = HashMap>; @@ -44,7 +45,35 @@ struct Opt { files_to_index: Vec, } -fn union_bitpacked_postings_ids( +fn bytes_to_u32s(bytes: &[u8]) -> Vec { + fn aligned_to(bytes: &[u8], align: usize) -> bool { + (bytes as *const _ as *const () as usize) % align == 0 + } + + match LayoutVerified::new_slice(bytes) { + Some(slice) => slice.to_vec(), + None => { + let len = bytes.len(); + + // ensure that it is the alignment that is wrong and the length is valid + assert!(len % 4 == 0, "length is {} and is not modulo 4", len); + assert!(!aligned_to(bytes, std::mem::align_of::()), "bytes are already aligned"); + + let elems = len / 4; + let mut vec = Vec::::with_capacity(elems); + + unsafe { + let dst = vec.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len); + vec.set_len(elems); + } + + vec + }, + } +} + +fn union_postings_ids( _key: &[u8], old_value: Option<&[u8]>, operands: &mut rocksdb::MergeOperands, @@ -53,20 +82,21 @@ fn union_bitpacked_postings_ids( let mut sets_bufs = Vec::new(); if let Some(old_value) = old_value { - let old_value = CodecBitPacker4xSorted::bytes_decode(old_value).unwrap(); - sets_bufs.push(SetBuf::new(old_value).unwrap()); + let old_value = bytes_to_u32s(old_value); + sets_bufs.push(SetBuf::new_unchecked(old_value.to_vec())); } for operand in operands { - let new_value = CodecBitPacker4xSorted::bytes_decode(operand).unwrap(); - sets_bufs.push(SetBuf::new(new_value).unwrap()); + let new_value = bytes_to_u32s(operand); + sets_bufs.push(SetBuf::new_unchecked(new_value.to_vec())); } let sets = sets_bufs.iter().map(|s| s.as_set()).collect(); - let result = sdset::multi::Union::new(sets).into_set_buf(); - let compressed = CodecBitPacker4xSorted::bytes_encode(&result).unwrap(); + let result: SetBuf = sdset::multi::Union::new(sets).into_set_buf(); - Some(compressed) + assert!(result.as_bytes().len() % 4 == 0); + + Some(result.as_bytes().to_vec()) } fn union_words_fst( @@ -167,11 +197,7 @@ fn index_csv( // We compute and store the postings list into the DB. for (word, new_ids) in new_postings_ids { let new_ids = SetBuf::from_dirty(new_ids.to_vec()); - let compressed = CodecBitPacker4xSorted::bytes_encode(&new_ids) - .context("error while compressing using CodecBitPacker4xSorted")?; - - db.merge_cf(&postings_ids, word.as_bytes(), compressed)?; - + db.merge_cf(&postings_ids, word.as_bytes(), new_ids.as_bytes())?; new_words.insert(word); } @@ -184,6 +210,7 @@ fn index_csv( db.merge_cf(&main, "words-fst", new_words_fst.as_fst().as_bytes())?; eprintln!("Finished merging the words-fst"); + eprintln!("Total number of documents seen is {}", ID_GENERATOR.load(Ordering::Relaxed)); Ok(number_of_documents) } @@ -195,8 +222,8 @@ fn main() -> anyhow::Result<()> { opts.create_if_missing(true); opts.create_missing_column_families(true); // Setup the merge operators - opts.set_merge_operator("main", union_words_fst, Some(union_words_fst)); - opts.set_merge_operator("postings-ids", union_bitpacked_postings_ids, Some(union_bitpacked_postings_ids)); + opts.set_merge_operator("main", union_words_fst, None); // Some(union_words_fst)); + opts.set_merge_operator("postings-ids", union_postings_ids, None); // Some(union_postings_ids)); let mut db = rocksdb::DB::open(&opts, &opt.database)?;