Content-Length: 15455 | pFad | http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800.diff
77
diff --git a/Cargo.lock b/Cargo.lock
index b18ea43..d519087 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4139,7 +4139,7 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "sipper"
version = "0.0.2"
-source = "git+https://github.com/hecrj/sipper.git?rev=ca161524a82e2a181bccc384c07d2fc446655f6b#ca161524a82e2a181bccc384c07d2fc446655f6b"
+source = "git+https://github.com/hecrj/sipper.git?rev=c53184fc5517fa027f193ce1ba4c5b40a3b2e18a#c53184fc5517fa027f193ce1ba4c5b40a3b2e18a"
dependencies = [
"futures",
]
diff --git a/Cargo.toml b/Cargo.toml
index 9578e28..418626a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,7 +30,7 @@ iced.git = "https://github.com/iced-rs/iced.git"
iced.rev = "4bbb5cbc1f8b2a0ee8e09be18071368df3ba5bbd"
sipper.git = "https://github.com/hecrj/sipper.git"
-sipper.rev = "ca161524a82e2a181bccc384c07d2fc446655f6b"
+sipper.rev = "c53184fc5517fa027f193ce1ba4c5b40a3b2e18a"
chrono = "0.4"
dirs = "6.0"
@@ -48,4 +48,3 @@ tokio-stream = "0.1"
tracing-subscriber = "0.3"
url = "2.5"
uuid = "1.10"
-
diff --git a/core/src/assistant.rs b/core/src/assistant.rs
index 03e619f..480fcd7 100644
--- a/core/src/assistant.rs
+++ b/core/src/assistant.rs
@@ -1,13 +1,10 @@
use crate::model;
+use crate::request;
use crate::Error;
-use futures::Stream;
-use futures::{FutureExt, StreamExt};
use serde::Deserialize;
use serde_json::json;
-use sipper::{sipper, Sipper, Straw};
-use tokio::fs;
-use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
+use sipper::{sipper, FutureExt, Sipper, Straw, Stream, StreamExt};
use tokio::process;
use std::sync::Arc;
@@ -31,6 +28,12 @@ impl Assistant {
file: model::File,
backend: Backend,
) -> impl Stream- > {
+ use tokio::fs;
+ use tokio::io::{self, AsyncBufReadExt};
+ use tokio::process;
+ use tokio::task;
+ use tokio::time;
+
#[derive(Clone)]
struct Sender(sipper::Sender>);
@@ -39,7 +42,7 @@ impl Assistant {
let _ = self.0.send(Ok(BootEvent::Logged(log))).await;
}
- async fn progress(&mut self, stage: &'static str, percent: u64) {
+ async fn progress(&mut self, stage: &'static str, percent: u32) {
let _ = self
.0
.send(Ok(BootEvent::Progressed { stage, percent }))
@@ -110,61 +113,42 @@ impl Assistant {
))
.await;
- let mut model = io::BufWriter::new(fs::File::create(&model_path).await?);
-
- let mut download = {
- let url = format!(
- "https://huggingface.co\
- /{id}/resolve/main/\
- {filename}?download=true",
- id = file.model.0,
- filename = file.name
- );
-
- reqwest::get(url)
- }
- .await?;
-
- let model_size = download.content_length();
- let mut downloaded = 0;
- let mut progress = 0;
- let start = Instant::now();
-
sender
.log(format!("Downloading {file}...", file = file.name))
.await;
- while let Some(chunk) = download.chunk().await? {
- downloaded += chunk.len() as u64;
-
- let speed = downloaded as f32 / start.elapsed().as_secs_f32();
+ let url = format!(
+ "https://huggingface.co\
+ /{id}/resolve/main/\
+ {filename}?download=true",
+ id = file.model.0,
+ filename = file.name
+ );
- if let Some(model_size) = model_size {
- let new_progress =
- (100.0 * downloaded as f32 / model_size as f32).round() as u64;
+ let mut download = request::download_file(url, &model_path).sip();
+ let mut last_percent = None;
- if new_progress > progress {
- progress = new_progress;
+ while let Some(progress) = download.next().await {
+ if let Some((total, percent)) = progress.percent() {
+ sender.progress("Downloading model...", percent).await;
- sender.progress("Downloading model...", progress).await;
+ if Some(percent) != last_percent {
+ last_percent = Some(percent);
- if progress % 5 == 0 {
- sender
+ sender
.log(format!(
- "=> {progress}% {downloaded:.2}GB of {model_size:.2}GB @ {speed:.2} MB/s",
- downloaded = downloaded as f32 / 10f32.powi(9),
- model_size = model_size as f32 / 10f32.powi(9),
- speed = speed / 10f32.powi(6),
+ "=> {percent}% {downloaded:.2}GB of {total:.2}GB \
+ @ {speed:.2} MB/s",
+ downloaded = progress.downloaded as f32 / 10f32.powi(9),
+ total = total as f32 / 10f32.powi(9),
+ speed = progress.speed as f32 / 10f32.powi(6),
))
.await;
- }
}
}
-
- model.write_all(&chunk).await?;
}
- model.flush().await?;
+ download.finish().await?;
}
sender.progress("Detecting executor...", 0).await;
@@ -194,7 +178,7 @@ impl Assistant {
))
.await;
- let mut server = Self::launch_with_executable("llama-server", &file, backend)?;
+ let mut server = Server::launch_with_executable("llama-server", &file, backend)?;
let stdout = server.stdout.take();
let stderr = server.stderr.take();
@@ -252,7 +236,7 @@ impl Assistant {
};
let mut docker = process::Command::new("docker")
- .args(Self::parse_args(&command))
+ .args(Server::parse_args(&command))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -272,7 +256,7 @@ impl Assistant {
}
};
- let _handle = tokio::task::spawn(notify_progress);
+ let _handle = task::spawn(notify_progress);
let container = {
let output = io::BufReader::new(docker.stdout.take().expect("piped stdout"));
@@ -309,21 +293,24 @@ impl Assistant {
return Err(Error::NoExecutorAvailable);
};
- let mut lines = {
- use futures::stream;
- use tokio_stream::wrappers::LinesStream;
+ let log_output = {
+ let mut sender = sender.clone();
- let stdout = io::BufReader::new(stdout.expect("piped stdout"));
- let stderr = io::BufReader::new(stderr.expect("piped stderr"));
+ let mut lines = {
+ use futures::stream;
+ use tokio_stream::wrappers::LinesStream;
- stream::select(
- LinesStream::new(stdout.lines()),
- LinesStream::new(stderr.lines()),
- )
- };
+ let stdout = stdout.expect("piped stdout");
+ let stderr = stderr.expect("piped stderr");
- let log_output = {
- let mut sender = sender.clone();
+ let stdout = io::BufReader::new(stdout);
+ let stderr = io::BufReader::new(stderr);
+
+ stream::select(
+ LinesStream::new(stdout.lines()),
+ LinesStream::new(stderr.lines()),
+ )
+ };
async move {
while let Some(line) = lines.next().await {
@@ -339,7 +326,7 @@ impl Assistant {
let check_health = async move {
loop {
- tokio::time::sleep(Duration::from_secs(1)).await;
+ time::sleep(Duration::from_secs(1)).await;
if let Ok(response) = reqwest::get(format!(
"http://localhost:{port}/health",
@@ -355,11 +342,11 @@ impl Assistant {
}
.boxed();
- if futures::future::select(log_output, check_health)
- .await
- .factor_first()
- .0
- {
+ let log_handle = task::spawn(log_output);
+
+ if check_health.await {
+ log_handle.abort();
+
return Ok(BootEvent::Finished(Assistant {
file,
_server: Arc::new(server),
@@ -544,37 +531,6 @@ impl Assistant {
pub fn name(&self) -> &str {
self.file.model.name()
}
-
- fn launch_with_executable(
- executable: &'static str,
- file: &model::File,
- backend: Backend,
- ) -> Result {
- let gpu_flags = match backend {
- Backend::Cpu => "",
- Backend::Cuda | Backend::Rocm => "--gpu-layers 80",
- };
-
- let server = process::Command::new(executable)
- .args(Self::parse_args(&format!(
- "--model models/{filename} \
- --port 8080 --host 0.0.0.0 {gpu_flags}",
- filename = file.name,
- )))
- .kill_on_drop(true)
- .stdout(std::process::Stdio::piped())
- .stderr(std::process::Stdio::piped())
- .spawn()?;
-
- Ok(server)
- }
-
- fn parse_args(command: &str) -> impl Iterator
- {
- command
- .split(' ')
- .map(str::trim)
- .filter(|arg| !arg.is_empty())
- }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -644,6 +600,39 @@ enum Server {
Process(process::Child),
}
+impl Server {
+ fn launch_with_executable(
+ executable: &'static str,
+ file: &model::File,
+ backend: Backend,
+ ) -> Result {
+ let gpu_flags = match backend {
+ Backend::Cpu => "",
+ Backend::Cuda | Backend::Rocm => "--gpu-layers 80",
+ };
+
+ let server = process::Command::new(executable)
+ .args(Self::parse_args(&format!(
+ "--model models/{filename} \
+ --port 8080 --host 0.0.0.0 {gpu_flags}",
+ filename = file.name,
+ )))
+ .kill_on_drop(true)
+ .stdout(std::process::Stdio::piped())
+ .stderr(std::process::Stdio::piped())
+ .spawn()?;
+
+ Ok(server)
+ }
+
+ fn parse_args(command: &str) -> impl Iterator
- {
+ command
+ .split(' ')
+ .map(str::trim)
+ .filter(|arg| !arg.is_empty())
+ }
+}
+
impl Drop for Server {
fn drop(&mut self) {
use std::process;
@@ -664,7 +653,7 @@ impl Drop for Server {
#[derive(Debug, Clone)]
pub enum BootEvent {
- Progressed { stage: &'static str, percent: u64 },
+ Progressed { stage: &'static str, percent: u32 },
Logged(String),
Finished(Assistant),
}
diff --git a/core/src/chat.rs b/core/src/chat.rs
index 16d15f0..16fdf12 100644
--- a/core/src/chat.rs
+++ b/core/src/chat.rs
@@ -6,9 +6,8 @@ use crate::model;
use crate::plan::{self, Plan};
use crate::Error;
-use futures::Stream;
use serde::{Deserialize, Serialize};
-use sipper::{sipper, Sipper, Straw};
+use sipper::{sipper, Sipper, Straw, Stream};
use tokio::fs;
use tokio::task;
use uuid::Uuid;
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 40e8ac7..962f04f 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -8,6 +8,8 @@ pub use model::Model;
pub use plan::Plan;
pub use url::Url;
+mod request;
+
use std::io;
use std::sync::Arc;
use tokio::task;
diff --git a/core/src/plan.rs b/core/src/plan.rs
index 6741063..6bd83df 100644
--- a/core/src/plan.rs
+++ b/core/src/plan.rs
@@ -92,7 +92,7 @@ impl Plan {
};
let plan = design(assistant, &history).run(&progress).await?;
- let _ = progress.send(Event::Designed(plan.clone())).await;
+ progress.send(Event::Designed(plan.clone())).await;
execute(assistant, &history, query, &plan)
.run(progress)
diff --git a/core/src/request.rs b/core/src/request.rs
new file mode 100644
index 0000000..ae3ba43
--- /dev/null
+++ b/core/src/request.rs
@@ -0,0 +1,69 @@
+use crate::Error;
+
+use reqwest::IntoUrl;
+use sipper::{sipper, Straw};
+use tokio::fs;
+use tokio::io::{self, AsyncWriteExt};
+
+use std::path::Path;
+use std::time::Instant;
+
+#[derive(Debug, Clone, Copy)]
+pub struct Progress {
+ pub total: Option,
+ pub downloaded: u64,
+ pub speed: u64,
+}
+
+impl Progress {
+ pub fn percent(self) -> Option<(u64, u32)> {
+ let total = self.total?;
+
+ Some((
+ total,
+ (self.downloaded as f32 / total as f32 * 100.0).round() as u32,
+ ))
+ }
+}
+
+pub fn download_file<'a>(
+ url: impl IntoUrl + Send + 'a,
+ destination: impl AsRef + Send + 'a,
+) -> impl Straw<(), Progress, Error> + 'a {
+ sipper(move |mut progress| async move {
+ let destination = destination.as_ref();
+ let mut file = io::BufWriter::new(fs::File::create(destination).await?);
+
+ let mut download = reqwest::get(url).await?;
+ let start = Instant::now();
+ let total = download.content_length();
+ let mut downloaded = 0;
+
+ progress
+ .send(Progress {
+ total,
+ downloaded,
+ speed: 0,
+ })
+ .await;
+
+ while let Some(chunk) = download.chunk().await? {
+ downloaded += chunk.len() as u64;
+ let speed = (downloaded as f32 / start.elapsed().as_secs_f32()) as u64;
+
+ progress
+ .send(Progress {
+ total,
+ downloaded,
+ speed,
+ })
+ .await;
+
+ file.write_all(&chunk).await?;
+ }
+
+ file.flush().await?;
+
+ Ok(())
+ })
+}
diff --git a/src/screen/conversation.rs b/src/screen/conversation.rs
index ffa3836..231270d 100644
--- a/src/screen/conversation.rs
+++ b/src/screen/conversation.rs
@@ -39,7 +39,7 @@ enum State {
file: File,
logs: Vec,
stage: String,
- progress: u64,
+ progress: u32,
tick: usize,
_task: task::Handle,
},
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800.diff
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy