mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
implement update index
implement update index
This commit is contained in:
parent
f44f8a823a
commit
90b930ed7f
@ -6,7 +6,7 @@ use futures_util::stream::StreamExt;
|
|||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
use super::Data;
|
use super::Data;
|
||||||
use crate::index_controller::{IndexController, Settings};
|
use crate::index_controller::{IndexController, Settings, IndexSettings, IndexMetadata};
|
||||||
use crate::index_controller::UpdateStatus;
|
use crate::index_controller::UpdateStatus;
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
@ -70,4 +70,18 @@ impl Data {
|
|||||||
pub fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> {
|
pub fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> {
|
||||||
self.index_controller.all_update_status(index)
|
self.index_controller.all_update_status(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update_index(
|
||||||
|
&self,
|
||||||
|
name: impl AsRef<str>,
|
||||||
|
primary_key: Option<impl AsRef<str>>,
|
||||||
|
new_name: Option<impl AsRef<str>>
|
||||||
|
) -> anyhow::Result<IndexMetadata> {
|
||||||
|
let settings = IndexSettings {
|
||||||
|
name: new_name.map(|s| s.as_ref().to_string()),
|
||||||
|
primary_key: primary_key.map(|s| s.as_ref().to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.index_controller.update_index(name, settings)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ pub struct IndexMeta {
|
|||||||
index_store_size: u64,
|
index_store_size: u64,
|
||||||
pub uuid: Uuid,
|
pub uuid: Uuid,
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexMeta {
|
impl IndexMeta {
|
||||||
@ -131,6 +132,52 @@ impl IndexStore {
|
|||||||
self.get_index_txn(&txn, name)
|
self.get_index_txn(&txn, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Use this function to perform an update on an index.
|
||||||
|
/// This function also puts a lock on what index in allowed to perform an update.
|
||||||
|
pub fn update_index<F, T>(&self, name: impl AsRef<str>, f: F) -> anyhow::Result<(T, IndexMeta)>
|
||||||
|
where
|
||||||
|
F: FnOnce(&Index) -> anyhow::Result<T>,
|
||||||
|
{
|
||||||
|
let mut txn = self.env.write_txn()?;
|
||||||
|
let (index, _) = self.get_index_txn(&txn, &name)?
|
||||||
|
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
||||||
|
let result = f(index.as_ref());
|
||||||
|
match result {
|
||||||
|
Ok(ret) => {
|
||||||
|
let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?;
|
||||||
|
txn.commit()?;
|
||||||
|
Ok((ret, meta))
|
||||||
|
}
|
||||||
|
Err(e) => Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_with_meta(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, IndexMeta)>> {
|
||||||
|
let txn = self.env.read_txn()?;
|
||||||
|
let uuid = self.index_uuid(&txn, &name)?;
|
||||||
|
match uuid {
|
||||||
|
Some(uuid) => {
|
||||||
|
let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())?
|
||||||
|
.with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?;
|
||||||
|
let (index, _) = self.retrieve_index(&txn, uuid)?
|
||||||
|
.with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?;
|
||||||
|
Ok(Some((index, meta)))
|
||||||
|
}
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef<str>, f: impl FnOnce(&mut IndexMeta)) -> anyhow::Result<IndexMeta> {
|
||||||
|
let uuid = self.index_uuid(txn, &name)?
|
||||||
|
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
||||||
|
let mut meta = self.uuid_to_index_meta
|
||||||
|
.get(txn, uuid.as_bytes())?
|
||||||
|
.with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?;
|
||||||
|
f(&mut meta);
|
||||||
|
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
||||||
|
Ok(meta)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_or_create_index(
|
pub fn get_or_create_index(
|
||||||
&self,
|
&self,
|
||||||
name: impl AsRef<str>,
|
name: impl AsRef<str>,
|
||||||
@ -173,7 +220,14 @@ impl IndexStore {
|
|||||||
index_store_size: u64,
|
index_store_size: u64,
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
||||||
let created_at = Utc::now();
|
let created_at = Utc::now();
|
||||||
let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at };
|
let updated_at = created_at;
|
||||||
|
let meta = IndexMeta {
|
||||||
|
update_store_size,
|
||||||
|
index_store_size,
|
||||||
|
uuid: uuid.clone(),
|
||||||
|
created_at,
|
||||||
|
updated_at,
|
||||||
|
};
|
||||||
|
|
||||||
self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?;
|
self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?;
|
||||||
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
||||||
@ -318,11 +372,15 @@ mod test {
|
|||||||
let txn = store.env.read_txn().unwrap();
|
let txn = store.env.read_txn().unwrap();
|
||||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
||||||
|
|
||||||
|
let created_at = Utc::now();
|
||||||
|
let updated_at = created_at;
|
||||||
|
|
||||||
let meta = IndexMeta {
|
let meta = IndexMeta {
|
||||||
update_store_size: 4096 * 100,
|
update_store_size: 4096 * 100,
|
||||||
index_store_size: 4096 * 100,
|
index_store_size: 4096 * 100,
|
||||||
uuid: uuid.clone(),
|
uuid: uuid.clone(),
|
||||||
created_at: Utc::now(),
|
created_at,
|
||||||
|
updated_at,
|
||||||
};
|
};
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
||||||
@ -344,12 +402,16 @@ mod test {
|
|||||||
|
|
||||||
assert!(store.index(&name).unwrap().is_none());
|
assert!(store.index(&name).unwrap().is_none());
|
||||||
|
|
||||||
|
let created_at = Utc::now();
|
||||||
|
let updated_at = created_at;
|
||||||
|
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let meta = IndexMeta {
|
let meta = IndexMeta {
|
||||||
update_store_size: 4096 * 100,
|
update_store_size: 4096 * 100,
|
||||||
index_store_size: 4096 * 100,
|
index_store_size: 4096 * 100,
|
||||||
uuid: uuid.clone(),
|
uuid: uuid.clone(),
|
||||||
created_at: Utc::now(),
|
created_at,
|
||||||
|
updated_at,
|
||||||
};
|
};
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
||||||
|
@ -144,6 +144,44 @@ impl IndexController for LocalIndexController {
|
|||||||
}
|
}
|
||||||
Ok(output_meta)
|
Ok(output_meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_index(&self, name: impl AsRef<str>, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||||
|
if index_settings.name.is_some() {
|
||||||
|
bail!("can't udpate an index name.")
|
||||||
|
}
|
||||||
|
|
||||||
|
let (primary_key, meta) = match index_settings.primary_key {
|
||||||
|
Some(ref primary_key) => {
|
||||||
|
self.indexes
|
||||||
|
.update_index(&name, |index| {
|
||||||
|
let mut txn = index.write_txn()?;
|
||||||
|
if index.primary_key(&txn)?.is_some() {
|
||||||
|
bail!("primary key already exists.")
|
||||||
|
}
|
||||||
|
index.put_primary_key(&mut txn, primary_key)?;
|
||||||
|
txn.commit()?;
|
||||||
|
Ok(Some(primary_key.clone()))
|
||||||
|
})?
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
let (index, meta) = self.indexes
|
||||||
|
.index_with_meta(&name)?
|
||||||
|
.with_context(|| format!("index {:?} doesn't exist.", name.as_ref()))?;
|
||||||
|
let primary_key = index
|
||||||
|
.primary_key(&index.read_txn()?)?
|
||||||
|
.map(String::from);
|
||||||
|
(primary_key, meta)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(IndexMetadata {
|
||||||
|
name: name.as_ref().to_string(),
|
||||||
|
uuid: meta.uuid.clone(),
|
||||||
|
created_at: meta.created_at,
|
||||||
|
updated_at: meta.updated_at,
|
||||||
|
primary_key,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
||||||
|
@ -10,9 +10,7 @@ use std::sync::Arc;
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use milli::Index;
|
use milli::Index;
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid;
|
||||||
use serde::{Serialize, Deserialize, de::Deserializer};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
pub use updates::{Processed, Processing, Failed};
|
pub use updates::{Processed, Processing, Failed};
|
||||||
|
|
||||||
@ -155,10 +153,16 @@ pub trait IndexController {
|
|||||||
/// Returns, if it exists, the `Index` with the povided name.
|
/// Returns, if it exists, the `Index` with the povided name.
|
||||||
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>;
|
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>;
|
||||||
|
|
||||||
|
/// Returns the udpate status an update
|
||||||
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
|
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
|
||||||
|
|
||||||
|
/// Returns all the udpate status for an index
|
||||||
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
|
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
|
||||||
|
|
||||||
|
/// List all the indexes
|
||||||
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>>;
|
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>>;
|
||||||
|
|
||||||
|
fn update_index(&self, name: impl AsRef<str>, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -94,11 +94,17 @@ struct UpdateIndexResponse {
|
|||||||
|
|
||||||
#[put("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
#[put("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
||||||
async fn update_index(
|
async fn update_index(
|
||||||
_data: web::Data<Data>,
|
data: web::Data<Data>,
|
||||||
_path: web::Path<IndexParam>,
|
path: web::Path<IndexParam>,
|
||||||
_body: web::Json<IndexCreateRequest>,
|
body: web::Json<UpdateIndexRequest>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
match data.update_index(&path.index_uid, body.primary_key.as_ref(), body.name.as_ref()) {
|
||||||
|
Ok(meta) => {
|
||||||
|
let json = serde_json::to_string(&meta).unwrap();
|
||||||
|
Ok(HttpResponse::Ok().body(json))
|
||||||
|
}
|
||||||
|
Err(_) => { todo!() }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[delete("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
#[delete("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
||||||
|
Loading…
Reference in New Issue
Block a user