Content-Length: 15455 | pFad | http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800.diff

77 diff --git a/Cargo.lock b/Cargo.lock index b18ea43..d519087 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4139,7 +4139,7 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "sipper" version = "0.0.2" -source = "git+https://github.com/hecrj/sipper.git?rev=ca161524a82e2a181bccc384c07d2fc446655f6b#ca161524a82e2a181bccc384c07d2fc446655f6b" +source = "git+https://github.com/hecrj/sipper.git?rev=c53184fc5517fa027f193ce1ba4c5b40a3b2e18a#c53184fc5517fa027f193ce1ba4c5b40a3b2e18a" dependencies = [ "futures", ] diff --git a/Cargo.toml b/Cargo.toml index 9578e28..418626a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ iced.git = "https://github.com/iced-rs/iced.git" iced.rev = "4bbb5cbc1f8b2a0ee8e09be18071368df3ba5bbd" sipper.git = "https://github.com/hecrj/sipper.git" -sipper.rev = "ca161524a82e2a181bccc384c07d2fc446655f6b" +sipper.rev = "c53184fc5517fa027f193ce1ba4c5b40a3b2e18a" chrono = "0.4" dirs = "6.0" @@ -48,4 +48,3 @@ tokio-stream = "0.1" tracing-subscriber = "0.3" url = "2.5" uuid = "1.10" - diff --git a/core/src/assistant.rs b/core/src/assistant.rs index 03e619f..480fcd7 100644 --- a/core/src/assistant.rs +++ b/core/src/assistant.rs @@ -1,13 +1,10 @@ use crate::model; +use crate::request; use crate::Error; -use futures::Stream; -use futures::{FutureExt, StreamExt}; use serde::Deserialize; use serde_json::json; -use sipper::{sipper, Sipper, Straw}; -use tokio::fs; -use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt}; +use sipper::{sipper, FutureExt, Sipper, Straw, Stream, StreamExt}; use tokio::process; use std::sync::Arc; @@ -31,6 +28,12 @@ impl Assistant { file: model::File, backend: Backend, ) -> impl Stream> { + use tokio::fs; + use tokio::io::{self, AsyncBufReadExt}; + use tokio::process; + use tokio::task; + use tokio::time; + #[derive(Clone)] struct Sender(sipper::Sender>); @@ -39,7 +42,7 @@ impl Assistant { let _ = self.0.send(Ok(BootEvent::Logged(log))).await; } - async fn progress(&mut self, stage: &'static str, percent: u64) { + async fn progress(&mut self, stage: &'static str, percent: u32) { let _ = self .0 .send(Ok(BootEvent::Progressed { stage, percent })) @@ -110,61 +113,42 @@ impl Assistant { )) .await; - let mut model = io::BufWriter::new(fs::File::create(&model_path).await?); - - let mut download = { - let url = format!( - "https://huggingface.co\ - /{id}/resolve/main/\ - {filename}?download=true", - id = file.model.0, - filename = file.name - ); - - reqwest::get(url) - } - .await?; - - let model_size = download.content_length(); - let mut downloaded = 0; - let mut progress = 0; - let start = Instant::now(); - sender .log(format!("Downloading {file}...", file = file.name)) .await; - while let Some(chunk) = download.chunk().await? { - downloaded += chunk.len() as u64; - - let speed = downloaded as f32 / start.elapsed().as_secs_f32(); + let url = format!( + "https://huggingface.co\ + /{id}/resolve/main/\ + {filename}?download=true", + id = file.model.0, + filename = file.name + ); - if let Some(model_size) = model_size { - let new_progress = - (100.0 * downloaded as f32 / model_size as f32).round() as u64; + let mut download = request::download_file(url, &model_path).sip(); + let mut last_percent = None; - if new_progress > progress { - progress = new_progress; + while let Some(progress) = download.next().await { + if let Some((total, percent)) = progress.percent() { + sender.progress("Downloading model...", percent).await; - sender.progress("Downloading model...", progress).await; + if Some(percent) != last_percent { + last_percent = Some(percent); - if progress % 5 == 0 { - sender + sender .log(format!( - "=> {progress}% {downloaded:.2}GB of {model_size:.2}GB @ {speed:.2} MB/s", - downloaded = downloaded as f32 / 10f32.powi(9), - model_size = model_size as f32 / 10f32.powi(9), - speed = speed / 10f32.powi(6), + "=> {percent}% {downloaded:.2}GB of {total:.2}GB \ + @ {speed:.2} MB/s", + downloaded = progress.downloaded as f32 / 10f32.powi(9), + total = total as f32 / 10f32.powi(9), + speed = progress.speed as f32 / 10f32.powi(6), )) .await; - } } } - - model.write_all(&chunk).await?; } - model.flush().await?; + download.finish().await?; } sender.progress("Detecting executor...", 0).await; @@ -194,7 +178,7 @@ impl Assistant { )) .await; - let mut server = Self::launch_with_executable("llama-server", &file, backend)?; + let mut server = Server::launch_with_executable("llama-server", &file, backend)?; let stdout = server.stdout.take(); let stderr = server.stderr.take(); @@ -252,7 +236,7 @@ impl Assistant { }; let mut docker = process::Command::new("docker") - .args(Self::parse_args(&command)) + .args(Server::parse_args(&command)) .kill_on_drop(true) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) @@ -272,7 +256,7 @@ impl Assistant { } }; - let _handle = tokio::task::spawn(notify_progress); + let _handle = task::spawn(notify_progress); let container = { let output = io::BufReader::new(docker.stdout.take().expect("piped stdout")); @@ -309,21 +293,24 @@ impl Assistant { return Err(Error::NoExecutorAvailable); }; - let mut lines = { - use futures::stream; - use tokio_stream::wrappers::LinesStream; + let log_output = { + let mut sender = sender.clone(); - let stdout = io::BufReader::new(stdout.expect("piped stdout")); - let stderr = io::BufReader::new(stderr.expect("piped stderr")); + let mut lines = { + use futures::stream; + use tokio_stream::wrappers::LinesStream; - stream::select( - LinesStream::new(stdout.lines()), - LinesStream::new(stderr.lines()), - ) - }; + let stdout = stdout.expect("piped stdout"); + let stderr = stderr.expect("piped stderr"); - let log_output = { - let mut sender = sender.clone(); + let stdout = io::BufReader::new(stdout); + let stderr = io::BufReader::new(stderr); + + stream::select( + LinesStream::new(stdout.lines()), + LinesStream::new(stderr.lines()), + ) + }; async move { while let Some(line) = lines.next().await { @@ -339,7 +326,7 @@ impl Assistant { let check_health = async move { loop { - tokio::time::sleep(Duration::from_secs(1)).await; + time::sleep(Duration::from_secs(1)).await; if let Ok(response) = reqwest::get(format!( "http://localhost:{port}/health", @@ -355,11 +342,11 @@ impl Assistant { } .boxed(); - if futures::future::select(log_output, check_health) - .await - .factor_first() - .0 - { + let log_handle = task::spawn(log_output); + + if check_health.await { + log_handle.abort(); + return Ok(BootEvent::Finished(Assistant { file, _server: Arc::new(server), @@ -544,37 +531,6 @@ impl Assistant { pub fn name(&self) -> &str { self.file.model.name() } - - fn launch_with_executable( - executable: &'static str, - file: &model::File, - backend: Backend, - ) -> Result { - let gpu_flags = match backend { - Backend::Cpu => "", - Backend::Cuda | Backend::Rocm => "--gpu-layers 80", - }; - - let server = process::Command::new(executable) - .args(Self::parse_args(&format!( - "--model models/{filename} \ - --port 8080 --host 0.0.0.0 {gpu_flags}", - filename = file.name, - ))) - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn()?; - - Ok(server) - } - - fn parse_args(command: &str) -> impl Iterator { - command - .split(' ') - .map(str::trim) - .filter(|arg| !arg.is_empty()) - } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -644,6 +600,39 @@ enum Server { Process(process::Child), } +impl Server { + fn launch_with_executable( + executable: &'static str, + file: &model::File, + backend: Backend, + ) -> Result { + let gpu_flags = match backend { + Backend::Cpu => "", + Backend::Cuda | Backend::Rocm => "--gpu-layers 80", + }; + + let server = process::Command::new(executable) + .args(Self::parse_args(&format!( + "--model models/{filename} \ + --port 8080 --host 0.0.0.0 {gpu_flags}", + filename = file.name, + ))) + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?; + + Ok(server) + } + + fn parse_args(command: &str) -> impl Iterator { + command + .split(' ') + .map(str::trim) + .filter(|arg| !arg.is_empty()) + } +} + impl Drop for Server { fn drop(&mut self) { use std::process; @@ -664,7 +653,7 @@ impl Drop for Server { #[derive(Debug, Clone)] pub enum BootEvent { - Progressed { stage: &'static str, percent: u64 }, + Progressed { stage: &'static str, percent: u32 }, Logged(String), Finished(Assistant), } diff --git a/core/src/chat.rs b/core/src/chat.rs index 16d15f0..16fdf12 100644 --- a/core/src/chat.rs +++ b/core/src/chat.rs @@ -6,9 +6,8 @@ use crate::model; use crate::plan::{self, Plan}; use crate::Error; -use futures::Stream; use serde::{Deserialize, Serialize}; -use sipper::{sipper, Sipper, Straw}; +use sipper::{sipper, Sipper, Straw, Stream}; use tokio::fs; use tokio::task; use uuid::Uuid; diff --git a/core/src/lib.rs b/core/src/lib.rs index 40e8ac7..962f04f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,6 +8,8 @@ pub use model::Model; pub use plan::Plan; pub use url::Url; +mod request; + use std::io; use std::sync::Arc; use tokio::task; diff --git a/core/src/plan.rs b/core/src/plan.rs index 6741063..6bd83df 100644 --- a/core/src/plan.rs +++ b/core/src/plan.rs @@ -92,7 +92,7 @@ impl Plan { }; let plan = design(assistant, &history).run(&progress).await?; - let _ = progress.send(Event::Designed(plan.clone())).await; + progress.send(Event::Designed(plan.clone())).await; execute(assistant, &history, query, &plan) .run(progress) diff --git a/core/src/request.rs b/core/src/request.rs new file mode 100644 index 0000000..ae3ba43 --- /dev/null +++ b/core/src/request.rs @@ -0,0 +1,69 @@ +use crate::Error; + +use reqwest::IntoUrl; +use sipper::{sipper, Straw}; +use tokio::fs; +use tokio::io::{self, AsyncWriteExt}; + +use std::path::Path; +use std::time::Instant; + +#[derive(Debug, Clone, Copy)] +pub struct Progress { + pub total: Option, + pub downloaded: u64, + pub speed: u64, +} + +impl Progress { + pub fn percent(self) -> Option<(u64, u32)> { + let total = self.total?; + + Some(( + total, + (self.downloaded as f32 / total as f32 * 100.0).round() as u32, + )) + } +} + +pub fn download_file<'a>( + url: impl IntoUrl + Send + 'a, + destination: impl AsRef + Send + 'a, +) -> impl Straw<(), Progress, Error> + 'a { + sipper(move |mut progress| async move { + let destination = destination.as_ref(); + let mut file = io::BufWriter::new(fs::File::create(destination).await?); + + let mut download = reqwest::get(url).await?; + let start = Instant::now(); + let total = download.content_length(); + let mut downloaded = 0; + + progress + .send(Progress { + total, + downloaded, + speed: 0, + }) + .await; + + while let Some(chunk) = download.chunk().await? { + downloaded += chunk.len() as u64; + let speed = (downloaded as f32 / start.elapsed().as_secs_f32()) as u64; + + progress + .send(Progress { + total, + downloaded, + speed, + }) + .await; + + file.write_all(&chunk).await?; + } + + file.flush().await?; + + Ok(()) + }) +} diff --git a/src/screen/conversation.rs b/src/screen/conversation.rs index ffa3836..231270d 100644 --- a/src/screen/conversation.rs +++ b/src/screen/conversation.rs @@ -39,7 +39,7 @@ enum State { file: File, logs: Vec, stage: String, - progress: u64, + progress: u32, tick: usize, _task: task::Handle, },








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800.diff

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy