mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-12-02 18:15:38 +08:00
aa6c5df0bc
document reader transform remove update format support document sequences fix document transform clean transform improve error handling add documents! macro fix transform bug fix tests remove csv dependency Add comments on the transform process replace search cli fmt review edits fix http ui fix clippy warnings Revert "fix clippy warnings" This reverts commit a1ce3cd96e603633dbf43e9e0b12b2453c9c5620. fix review comments remove smallvec in transform loop review edits
336 lines
9.8 KiB
Rust
336 lines
9.8 KiB
Rust
use std::fs::File;
|
|
use std::io::{stdin, Cursor, Read};
|
|
use std::path::PathBuf;
|
|
use std::str::FromStr;
|
|
|
|
use byte_unit::Byte;
|
|
use eyre::Result;
|
|
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
|
use milli::update::UpdateIndexingStep::{
|
|
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
|
|
};
|
|
use serde_json::{Map, Value};
|
|
use structopt::StructOpt;
|
|
|
|
#[cfg(target_os = "linux")]
|
|
#[global_allocator]
|
|
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
|
|
|
#[derive(Debug, StructOpt)]
|
|
#[structopt(name = "Milli CLI", about = "A simple CLI to manipulate a milli index.")]
|
|
struct Cli {
|
|
#[structopt(short, long)]
|
|
index_path: PathBuf,
|
|
#[structopt(short = "s", long, default_value = "100GiB")]
|
|
index_size: Byte,
|
|
/// Verbose mode (-v, -vv, -vvv, etc.)
|
|
#[structopt(short, long, parse(from_occurrences))]
|
|
verbose: usize,
|
|
#[structopt(subcommand)]
|
|
subcommand: Command,
|
|
}
|
|
|
|
#[derive(Debug, StructOpt)]
|
|
enum Command {
|
|
DocumentAddition(DocumentAddition),
|
|
Search(Search),
|
|
SettingsUpdate(SettingsUpdate),
|
|
}
|
|
|
|
fn setup(opt: &Cli) -> Result<()> {
|
|
color_eyre::install()?;
|
|
stderrlog::new()
|
|
.verbosity(opt.verbose)
|
|
.show_level(false)
|
|
.timestamp(stderrlog::Timestamp::Off)
|
|
.init()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn main() -> Result<()> {
|
|
let command = Cli::from_args();
|
|
|
|
setup(&command)?;
|
|
|
|
let mut options = heed::EnvOpenOptions::new();
|
|
options.map_size(command.index_size.get_bytes() as usize);
|
|
let index = milli::Index::new(options, command.index_path)?;
|
|
|
|
match command.subcommand {
|
|
Command::DocumentAddition(addition) => addition.perform(index)?,
|
|
Command::Search(search) => search.perform(index)?,
|
|
Command::SettingsUpdate(update) => update.perform(index)?,
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum DocumentAdditionFormat {
|
|
Csv,
|
|
Json,
|
|
Jsonl,
|
|
}
|
|
|
|
impl FromStr for DocumentAdditionFormat {
|
|
type Err = eyre::Error;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
match s {
|
|
"csv" => Ok(Self::Csv),
|
|
"jsonl" => Ok(Self::Jsonl),
|
|
"json" => Ok(Self::Json),
|
|
other => eyre::bail!("invalid format: {}", other),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, StructOpt)]
|
|
struct DocumentAddition {
|
|
#[structopt(short, long, default_value = "json", possible_values = &["csv", "jsonl", "json"])]
|
|
format: DocumentAdditionFormat,
|
|
/// Path to the update file, if not present, will read from stdin.
|
|
#[structopt(short, long)]
|
|
path: Option<PathBuf>,
|
|
/// Whether to generate missing document ids.
|
|
#[structopt(short, long)]
|
|
autogen_docids: bool,
|
|
/// Whether to update or replace the documents if they already exist.
|
|
#[structopt(short, long)]
|
|
update_documents: bool,
|
|
}
|
|
|
|
impl DocumentAddition {
|
|
fn perform(&self, index: milli::Index) -> Result<()> {
|
|
let reader: Box<dyn Read> = match self.path {
|
|
Some(ref path) => {
|
|
let file = File::open(path)?;
|
|
Box::new(file)
|
|
}
|
|
None => Box::new(stdin()),
|
|
};
|
|
|
|
println!("parsing documents...");
|
|
|
|
let documents = match self.format {
|
|
DocumentAdditionFormat::Csv => documents_from_csv(reader)?,
|
|
DocumentAdditionFormat::Json => documents_from_json(reader)?,
|
|
DocumentAdditionFormat::Jsonl => documents_from_jsonl(reader)?,
|
|
};
|
|
|
|
let reader = milli::documents::DocumentBatchReader::from_reader(Cursor::new(documents))?;
|
|
|
|
println!("Adding {} documents to the index.", reader.len());
|
|
|
|
let mut txn = index.env.write_txn()?;
|
|
let mut addition = milli::update::IndexDocuments::new(&mut txn, &index, 0);
|
|
|
|
if self.update_documents {
|
|
addition.index_documents_method(milli::update::IndexDocumentsMethod::UpdateDocuments);
|
|
}
|
|
|
|
addition.log_every_n(100);
|
|
|
|
if self.autogen_docids {
|
|
addition.enable_autogenerate_docids()
|
|
}
|
|
|
|
let mut bars = Vec::new();
|
|
let progesses = MultiProgress::new();
|
|
for _ in 0..4 {
|
|
let bar = ProgressBar::hidden();
|
|
let bar = progesses.add(bar);
|
|
bars.push(bar);
|
|
}
|
|
|
|
std::thread::spawn(move || {
|
|
progesses.join().unwrap();
|
|
});
|
|
|
|
let result = addition.execute(reader, |step, _| indexing_callback(step, &bars))?;
|
|
|
|
txn.commit()?;
|
|
|
|
println!("{:?}", result);
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn indexing_callback(step: milli::update::UpdateIndexingStep, bars: &[ProgressBar]) {
|
|
let step_index = step.step();
|
|
let bar = &bars[step_index];
|
|
if step_index > 0 {
|
|
let prev = &bars[step_index - 1];
|
|
if !prev.is_finished() {
|
|
prev.disable_steady_tick();
|
|
prev.finish_at_current_pos();
|
|
}
|
|
}
|
|
|
|
let style = ProgressStyle::default_bar()
|
|
.template("[eta: {eta_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}")
|
|
.progress_chars("##-");
|
|
|
|
match step {
|
|
RemapDocumentAddition { documents_seen } => {
|
|
bar.set_style(ProgressStyle::default_spinner());
|
|
bar.set_message(format!("remaped {} documents so far.", documents_seen));
|
|
}
|
|
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
|
|
bar.set_style(style);
|
|
bar.set_length(total_documents as u64);
|
|
bar.set_message("Merging documents...");
|
|
bar.set_position(documents_seen as u64);
|
|
}
|
|
IndexDocuments { documents_seen, total_documents } => {
|
|
bar.set_style(style);
|
|
bar.set_length(total_documents as u64);
|
|
bar.set_message("Indexing documents...");
|
|
bar.set_position(documents_seen as u64);
|
|
}
|
|
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
|
|
bar.set_style(style);
|
|
bar.set_length(total_databases as u64);
|
|
bar.set_message("Merging databases...");
|
|
bar.set_position(databases_seen as u64);
|
|
}
|
|
}
|
|
bar.enable_steady_tick(200);
|
|
}
|
|
|
|
fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
|
|
let mut writer = Cursor::new(Vec::new());
|
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
|
|
|
let values = serde_json::Deserializer::from_reader(reader)
|
|
.into_iter::<serde_json::Map<String, serde_json::Value>>();
|
|
for document in values {
|
|
let document = document?;
|
|
documents.add_documents(document)?;
|
|
}
|
|
documents.finish()?;
|
|
|
|
Ok(writer.into_inner())
|
|
}
|
|
|
|
fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
|
|
let mut writer = Cursor::new(Vec::new());
|
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
|
|
|
let json: serde_json::Value = serde_json::from_reader(reader)?;
|
|
documents.add_documents(json)?;
|
|
documents.finish()?;
|
|
|
|
Ok(writer.into_inner())
|
|
}
|
|
|
|
fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
|
|
let mut writer = Cursor::new(Vec::new());
|
|
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
|
|
|
|
let mut records = csv::Reader::from_reader(reader);
|
|
let iter = records.deserialize::<Map<String, Value>>();
|
|
|
|
for doc in iter {
|
|
let doc = doc?;
|
|
documents.add_documents(doc)?;
|
|
}
|
|
|
|
documents.finish()?;
|
|
|
|
Ok(writer.into_inner())
|
|
}
|
|
|
|
#[derive(Debug, StructOpt)]
|
|
struct Search {
|
|
query: Option<String>,
|
|
#[structopt(short, long)]
|
|
filter: Option<String>,
|
|
#[structopt(short, long)]
|
|
offset: Option<usize>,
|
|
#[structopt(short, long)]
|
|
limit: Option<usize>,
|
|
}
|
|
|
|
impl Search {
|
|
fn perform(&self, index: milli::Index) -> Result<()> {
|
|
let txn = index.env.read_txn()?;
|
|
let mut search = index.search(&txn);
|
|
|
|
if let Some(ref query) = self.query {
|
|
search.query(query);
|
|
}
|
|
|
|
if let Some(ref filter) = self.filter {
|
|
let condition = milli::FilterCondition::from_str(&txn, &index, filter)?;
|
|
search.filter(condition);
|
|
}
|
|
|
|
if let Some(offset) = self.offset {
|
|
search.offset(offset);
|
|
}
|
|
|
|
if let Some(limit) = self.limit {
|
|
search.limit(limit);
|
|
}
|
|
|
|
let result = search.execute()?;
|
|
|
|
let fields_ids_map = index.fields_ids_map(&txn)?;
|
|
let displayed_fields =
|
|
index.displayed_fields_ids(&txn)?.unwrap_or_else(|| fields_ids_map.ids().collect());
|
|
let documents = index.documents(&txn, result.documents_ids)?;
|
|
let mut jsons = Vec::new();
|
|
for (_, obkv) in documents {
|
|
let json = milli::obkv_to_json(&displayed_fields, &fields_ids_map, obkv)?;
|
|
jsons.push(json);
|
|
}
|
|
|
|
let hits = serde_json::to_string_pretty(&jsons)?;
|
|
|
|
println!("{}", hits);
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, StructOpt)]
|
|
struct SettingsUpdate {
|
|
#[structopt(short, long)]
|
|
filterable_attributes: Option<Vec<String>>,
|
|
}
|
|
|
|
impl SettingsUpdate {
|
|
fn perform(&self, index: milli::Index) -> Result<()> {
|
|
let mut txn = index.env.write_txn()?;
|
|
|
|
let mut update = milli::update::Settings::new(&mut txn, &index, 0);
|
|
update.log_every_n(100);
|
|
|
|
if let Some(ref filterable_attributes) = self.filterable_attributes {
|
|
if !filterable_attributes.is_empty() {
|
|
update.set_filterable_fields(filterable_attributes.iter().cloned().collect());
|
|
} else {
|
|
update.reset_filterable_fields();
|
|
}
|
|
}
|
|
|
|
let mut bars = Vec::new();
|
|
let progesses = MultiProgress::new();
|
|
for _ in 0..4 {
|
|
let bar = ProgressBar::hidden();
|
|
let bar = progesses.add(bar);
|
|
bars.push(bar);
|
|
}
|
|
|
|
std::thread::spawn(move || {
|
|
progesses.join().unwrap();
|
|
});
|
|
|
|
update.execute(|step, _| indexing_callback(step, &bars))?;
|
|
|
|
txn.commit()?;
|
|
Ok(())
|
|
}
|
|
}
|