mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
feat: Allow users to manage multiple database indexes
This commit is contained in:
parent
8576218b51
commit
96dfac5b33
@ -22,6 +22,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
|
||||
slice-group-by = "0.2"
|
||||
unidecode = "0.3"
|
||||
rayon = "1.0"
|
||||
lockfree = "0.5.1"
|
||||
|
||||
[dependencies.toml]
|
||||
git = "https://github.com/Kerollmops/toml-rs.git"
|
||||
|
@ -50,7 +50,9 @@ fn index(
|
||||
stop_words: &HashSet<String>,
|
||||
) -> Result<Database, Box<Error>>
|
||||
{
|
||||
let database = Database::create(database_path, &schema)?;
|
||||
let database = Database::create(database_path)?;
|
||||
|
||||
database.create_index("default", &schema)?;
|
||||
|
||||
let mut rdr = csv::Reader::from_path(csv_data_path)?;
|
||||
let mut raw_record = csv::StringRecord::new();
|
||||
@ -61,7 +63,7 @@ fn index(
|
||||
|
||||
while !end_of_file {
|
||||
let tokenizer_builder = DefaultBuilder::new();
|
||||
let mut update = database.start_update()?;
|
||||
let mut update = database.start_update("default")?;
|
||||
|
||||
loop {
|
||||
end_of_file = !rdr.read_record(&mut raw_record)?;
|
||||
|
@ -116,7 +116,7 @@ fn main() -> Result<(), Box<Error>> {
|
||||
if input.read_line(&mut buffer)? == 0 { break }
|
||||
let query = buffer.trim_end_matches('\n');
|
||||
|
||||
let view = database.view();
|
||||
let view = database.view("default")?;
|
||||
let schema = view.schema();
|
||||
|
||||
let (elapsed, documents) = elapsed::measure_time(|| {
|
||||
|
@ -1,13 +1,17 @@
|
||||
use std::error::Error;
|
||||
use std::path::Path;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::error::Error;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
|
||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||
use rocksdb::{DB, MergeOperands};
|
||||
use crossbeam::atomic::ArcCell;
|
||||
use log::info;
|
||||
use log::{info, error, warn};
|
||||
use rocksdb::rocksdb::{Writable, Snapshot};
|
||||
use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions};
|
||||
use rocksdb::{DB, MergeOperands};
|
||||
use lockfree::map::Map;
|
||||
|
||||
pub use self::document_key::{DocumentKey, DocumentKeyAttr};
|
||||
pub use self::view::{DatabaseView, DocumentIter};
|
||||
@ -77,21 +81,48 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan
|
||||
bytes
|
||||
}
|
||||
|
||||
pub struct Database {
|
||||
db: Arc<DB>,
|
||||
// This view is updated each time the DB ingests an update
|
||||
view: ArcCell<DatabaseView<Arc<DB>>>,
|
||||
pub struct IndexUpdate {
|
||||
index: String,
|
||||
update: Update,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub fn create<P: AsRef<Path>>(path: P, schema: &Schema) -> Result<Database, Box<Error>> {
|
||||
impl Deref for IndexUpdate {
|
||||
type Target = Update;
|
||||
|
||||
fn deref(&self) -> &Update {
|
||||
&self.update
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for IndexUpdate {
|
||||
fn deref_mut(&mut self) -> &mut Update {
|
||||
&mut self.update
|
||||
}
|
||||
}
|
||||
|
||||
struct DatabaseIndex {
|
||||
db: Arc<DB>,
|
||||
|
||||
// This view is updated each time the DB ingests an update
|
||||
view: ArcCell<DatabaseView<Arc<DB>>>,
|
||||
|
||||
// This path is the path to the mdb folder stored on disk
|
||||
path: PathBuf,
|
||||
|
||||
// must_die false by default, must be set as true when the Index is dropped.
|
||||
// It's used to erase the folder saved on disk when the user request to delete an index
|
||||
must_die: AtomicBool,
|
||||
}
|
||||
|
||||
impl DatabaseIndex {
|
||||
fn create<P: AsRef<Path>>(path: P, schema: &Schema) -> Result<DatabaseIndex, Box<Error>> {
|
||||
let path = path.as_ref();
|
||||
if path.exists() {
|
||||
return Err(format!("File already exists at path: {}, cannot create database.",
|
||||
path.display()).into())
|
||||
}
|
||||
|
||||
let path = path.to_string_lossy();
|
||||
let path_lossy = path.to_string_lossy();
|
||||
let mut opts = DBOptions::new();
|
||||
opts.create_if_missing(true);
|
||||
// opts.error_if_exists(true); // FIXME pull request that
|
||||
@ -99,7 +130,7 @@ impl Database {
|
||||
let mut cf_opts = ColumnFamilyOptions::new();
|
||||
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
||||
|
||||
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
||||
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;
|
||||
|
||||
let mut schema_bytes = Vec::new();
|
||||
schema.write_to_bin(&mut schema_bytes)?;
|
||||
@ -109,11 +140,17 @@ impl Database {
|
||||
let snapshot = Snapshot::new(db.clone());
|
||||
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
|
||||
|
||||
Ok(Database { db, view })
|
||||
|
||||
Ok(DatabaseIndex {
|
||||
db: db,
|
||||
view: view,
|
||||
path: path.to_path_buf(),
|
||||
must_die: AtomicBool::new(false)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
|
||||
let path = path.as_ref().to_string_lossy();
|
||||
fn open<P: AsRef<Path>>(path: P) -> Result<DatabaseIndex, Box<Error>> {
|
||||
let path_lossy = path.as_ref().to_string_lossy();
|
||||
|
||||
let mut opts = DBOptions::new();
|
||||
opts.create_if_missing(false);
|
||||
@ -121,7 +158,7 @@ impl Database {
|
||||
let mut cf_opts = ColumnFamilyOptions::new();
|
||||
cf_opts.add_merge_operator("data-index merge operator", merge_indexes);
|
||||
|
||||
let db = DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
|
||||
let db = DB::open_cf(opts, &path_lossy, vec![("default", cf_opts)])?;
|
||||
|
||||
// FIXME create a generic function to do that !
|
||||
let _schema = match db.get(DATA_SCHEMA)? {
|
||||
@ -133,10 +170,19 @@ impl Database {
|
||||
let snapshot = Snapshot::new(db.clone());
|
||||
let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?));
|
||||
|
||||
Ok(Database { db, view })
|
||||
Ok(DatabaseIndex {
|
||||
db: db,
|
||||
view: view,
|
||||
path: path.as_ref().to_path_buf(),
|
||||
must_die: AtomicBool::new(false)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn start_update(&self) -> Result<Update, Box<Error>> {
|
||||
fn must_die(&self) {
|
||||
self.must_die.store(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn start_update(&self) -> Result<Update, Box<Error>> {
|
||||
let schema = match self.db.get(DATA_SCHEMA)? {
|
||||
Some(value) => Schema::read_from_bin(&*value)?,
|
||||
None => panic!("Database does not contain a schema"),
|
||||
@ -145,7 +191,7 @@ impl Database {
|
||||
Ok(Update::new(schema))
|
||||
}
|
||||
|
||||
pub fn commit_update(&self, update: Update) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
|
||||
fn commit_update(&self, update: Update) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
|
||||
let batch = update.build()?;
|
||||
self.db.write(batch)?;
|
||||
|
||||
@ -156,11 +202,115 @@ impl Database {
|
||||
Ok(view)
|
||||
}
|
||||
|
||||
pub fn view(&self) -> Arc<DatabaseView<Arc<DB>>> {
|
||||
fn view(&self) -> Arc<DatabaseView<Arc<DB>>> {
|
||||
self.view.get()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DatabaseIndex {
|
||||
fn drop(&mut self) {
|
||||
if self.must_die.load(Ordering::Relaxed) {
|
||||
if let Err(err) = fs::remove_dir_all(&self.path) {
|
||||
error!("Impossible to remove mdb when Database id dropped; {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Database {
|
||||
indexes: Map<String, Arc<DatabaseIndex>>,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub fn create<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
|
||||
Ok(Database {
|
||||
indexes: Map::new(),
|
||||
path: path.as_ref().to_path_buf(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> {
|
||||
let entries = fs::read_dir(&path)?;
|
||||
|
||||
let indexes = Map::new();
|
||||
for entry in entries {
|
||||
let path = match entry {
|
||||
Ok(p) => p.path(),
|
||||
Err(err) => {
|
||||
warn!("Impossible to retrieve the path from an entry; {}", err);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
let name = match path.file_stem().and_then(OsStr::to_str) {
|
||||
Some(name) => name.to_owned(),
|
||||
None => continue
|
||||
};
|
||||
|
||||
let db = match DatabaseIndex::open(path.clone()) {
|
||||
Ok(db) => db,
|
||||
Err(err) => {
|
||||
warn!("Impossible to open the database; {}", err);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
info!("Load database {}", name);
|
||||
indexes.insert(name, Arc::new(db));
|
||||
}
|
||||
|
||||
Ok(Database {
|
||||
indexes: indexes,
|
||||
path: path.as_ref().to_path_buf(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_index(&self, name: &str, schema: &Schema) -> Result<(), Box<Error>> {
|
||||
let index_path = self.path.join(name);
|
||||
|
||||
if index_path.exists() {
|
||||
return Err("Index already exists".into());
|
||||
}
|
||||
|
||||
let index = DatabaseIndex::create(index_path, schema)?;
|
||||
self.indexes.insert(name.to_owned(), Arc::new(index));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_index(&self, name: &str) -> Result<(), Box<Error>> {
|
||||
let index_guard = self.indexes.remove(name).ok_or("Index not found")?;
|
||||
index_guard.val().must_die();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn list_indexes(&self) -> Vec<String> {
|
||||
self.indexes.iter().map(|g| g.key().clone()).collect()
|
||||
}
|
||||
|
||||
pub fn start_update(&self, index: &str) -> Result<IndexUpdate, Box<Error>> {
|
||||
let index_guard = self.indexes.get(index).ok_or("Index not found")?;
|
||||
let update = index_guard.val().start_update()?;
|
||||
|
||||
Ok(IndexUpdate { index: index.to_owned(), update })
|
||||
}
|
||||
|
||||
pub fn commit_update(&self, update: IndexUpdate)-> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
|
||||
let index_guard = self.indexes.get(&update.index).ok_or("Index not found")?;
|
||||
|
||||
index_guard.val().commit_update(update.update)
|
||||
}
|
||||
|
||||
pub fn view(&self, index: &str) -> Result<Arc<DatabaseView<Arc<DB>>>, Box<Error>> {
|
||||
let index_guard = self.indexes.get(index).ok_or("Index not found")?;
|
||||
|
||||
Ok(index_guard.val().view())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
@ -179,6 +329,7 @@ mod tests {
|
||||
let stop_words = HashSet::new();
|
||||
|
||||
let meilidb_path = dir.path().join("meilidb.mdb");
|
||||
let meilidb_index_name = "default";
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
struct SimpleDoc {
|
||||
@ -197,7 +348,9 @@ mod tests {
|
||||
builder.build()
|
||||
};
|
||||
|
||||
let database = Database::create(&meilidb_path, &schema)?;
|
||||
let database = Database::create(&meilidb_path)?;
|
||||
|
||||
database.create_index(meilidb_index_name, &schema)?;
|
||||
|
||||
let doc0 = SimpleDoc {
|
||||
id: 0,
|
||||
@ -213,7 +366,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let tokenizer_builder = DefaultBuilder::new();
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(meilidb_index_name)?;
|
||||
|
||||
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
|
||||
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
|
||||
@ -235,6 +388,7 @@ mod tests {
|
||||
let stop_words = HashSet::new();
|
||||
|
||||
let meilidb_path = dir.path().join("meilidb.mdb");
|
||||
let meilidb_index_name = "default";
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
struct SimpleDoc {
|
||||
@ -253,7 +407,9 @@ mod tests {
|
||||
builder.build()
|
||||
};
|
||||
|
||||
let database = Database::create(&meilidb_path, &schema)?;
|
||||
let database = Database::create(&meilidb_path)?;
|
||||
|
||||
database.create_index(meilidb_index_name, &schema)?;
|
||||
|
||||
let doc0 = SimpleDoc {
|
||||
id: 0,
|
||||
@ -282,12 +438,12 @@ mod tests {
|
||||
|
||||
let tokenizer_builder = DefaultBuilder::new();
|
||||
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(meilidb_index_name)?;
|
||||
let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?;
|
||||
let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?;
|
||||
database.commit_update(builder)?;
|
||||
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(meilidb_index_name)?;
|
||||
let docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?;
|
||||
let docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?;
|
||||
let view = database.commit_update(builder)?;
|
||||
@ -359,7 +515,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -369,7 +528,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..300 {
|
||||
@ -404,7 +563,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -414,7 +576,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..3000 {
|
||||
@ -450,7 +612,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -460,7 +625,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..30_000 {
|
||||
@ -495,7 +660,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -505,7 +673,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..300 {
|
||||
@ -540,7 +708,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -550,7 +721,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..3000 {
|
||||
@ -586,7 +757,10 @@ mod bench {
|
||||
let schema = builder.build();
|
||||
|
||||
let db_path = dir.path().join("bench.mdb");
|
||||
let database = Database::create(db_path.clone(), &schema)?;
|
||||
let index_name = "default";
|
||||
|
||||
let database = Database::create(&db_path)?;
|
||||
database.create_index(index_name, &schema)?;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Document {
|
||||
@ -596,7 +770,7 @@ mod bench {
|
||||
}
|
||||
|
||||
let tokenizer_builder = DefaultBuilder;
|
||||
let mut builder = database.start_update()?;
|
||||
let mut builder = database.start_update(index_name)?;
|
||||
let mut rng = XorShiftRng::seed_from_u64(42);
|
||||
|
||||
for i in 0..30_000 {
|
||||
|
Loading…
Reference in New Issue
Block a user