Content-Length: 575171 | pFad | http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800

88 Refactor file download into `request` module · hecrj/icebreaker@beb5e71 · GitHub
Skip to content

Commit

Permalink
Refactor file download into request module
Browse files Browse the repository at this point in the history
  • Loading branch information
hecrj committed Feb 8, 2025
1 parent b11727c commit beb5e71
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 106 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ iced.git = "https://github.com/iced-rs/iced.git"
iced.rev = "4bbb5cbc1f8b2a0ee8e09be18071368df3ba5bbd"

sipper.git = "https://github.com/hecrj/sipper.git"
sipper.rev = "ca161524a82e2a181bccc384c07d2fc446655f6b"
sipper.rev = "c53184fc5517fa027f193ce1ba4c5b40a3b2e18a"

chrono = "0.4"
dirs = "6.0"
Expand All @@ -48,4 +48,3 @@ tokio-stream = "0.1"
tracing-subscriber = "0.3"
url = "2.5"
uuid = "1.10"

187 changes: 88 additions & 99 deletions core/src/assistant.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::model;
use crate::request;
use crate::Error;

use futures::Stream;
use futures::{FutureExt, StreamExt};
use serde::Deserialize;
use serde_json::json;
use sipper::{sipper, Sipper, Straw};
use tokio::fs;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
use sipper::{sipper, FutureExt, Sipper, Straw, Stream, StreamExt};
use tokio::process;

use std::sync::Arc;
Expand All @@ -31,6 +28,12 @@ impl Assistant {
file: model::File,
backend: Backend,
) -> impl Stream<Item = Result<BootEvent, Error>> {
use tokio::fs;
use tokio::io::{self, AsyncBufReadExt};
use tokio::process;
use tokio::task;
use tokio::time;

#[derive(Clone)]
struct Sender(sipper::Sender<Result<BootEvent, Error>>);

Expand All @@ -39,7 +42,7 @@ impl Assistant {
let _ = self.0.send(Ok(BootEvent::Logged(log))).await;
}

async fn progress(&mut self, stage: &'static str, percent: u64) {
async fn progress(&mut self, stage: &'static str, percent: u32) {
let _ = self
.0
.send(Ok(BootEvent::Progressed { stage, percent }))
Expand Down Expand Up @@ -110,61 +113,42 @@ impl Assistant {
))
.await;

let mut model = io::BufWriter::new(fs::File::create(&model_path).await?);

let mut download = {
let url = format!(
"https://huggingface.co\
/{id}/resolve/main/\
{filename}?download=true",
id = file.model.0,
filename = file.name
);

reqwest::get(url)
}
.await?;

let model_size = download.content_length();
let mut downloaded = 0;
let mut progress = 0;
let start = Instant::now();

sender
.log(format!("Downloading {file}...", file = file.name))
.await;

while let Some(chunk) = download.chunk().await? {
downloaded += chunk.len() as u64;

let speed = downloaded as f32 / start.elapsed().as_secs_f32();
let url = format!(
"https://huggingface.co\
/{id}/resolve/main/\
{filename}?download=true",
id = file.model.0,
filename = file.name
);

if let Some(model_size) = model_size {
let new_progress =
(100.0 * downloaded as f32 / model_size as f32).round() as u64;
let mut download = request::download_file(url, &model_path).sip();
let mut last_percent = None;

if new_progress > progress {
progress = new_progress;
while let Some(progress) = download.next().await {
if let Some((total, percent)) = progress.percent() {
sender.progress("Downloading model...", percent).await;

sender.progress("Downloading model...", progress).await;
if Some(percent) != last_percent {
last_percent = Some(percent);

if progress % 5 == 0 {
sender
sender
.log(format!(
"=> {progress}% {downloaded:.2}GB of {model_size:.2}GB @ {speed:.2} MB/s",
downloaded = downloaded as f32 / 10f32.powi(9),
model_size = model_size as f32 / 10f32.powi(9),
speed = speed / 10f32.powi(6),
"=> {percent}% {downloaded:.2}GB of {total:.2}GB \
@ {speed:.2} MB/s",
downloaded = progress.downloaded as f32 / 10f32.powi(9),
total = total as f32 / 10f32.powi(9),
speed = progress.speed as f32 / 10f32.powi(6),
))
.await;
}
}
}

model.write_all(&chunk).await?;
}

model.flush().await?;
download.finish().await?;
}

sender.progress("Detecting executor...", 0).await;
Expand Down Expand Up @@ -194,7 +178,7 @@ impl Assistant {
))
.await;

let mut server = Self::launch_with_executable("llama-server", &file, backend)?;
let mut server = Server::launch_with_executable("llama-server", &file, backend)?;
let stdout = server.stdout.take();
let stderr = server.stderr.take();

Expand Down Expand Up @@ -252,7 +236,7 @@ impl Assistant {
};

let mut docker = process::Command::new("docker")
.args(Self::parse_args(&command))
.args(Server::parse_args(&command))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
Expand All @@ -272,7 +256,7 @@ impl Assistant {
}
};

let _handle = tokio::task::spawn(notify_progress);
let _handle = task::spawn(notify_progress);

let container = {
let output = io::BufReader::new(docker.stdout.take().expect("piped stdout"));
Expand Down Expand Up @@ -309,21 +293,24 @@ impl Assistant {
return Err(Error::NoExecutorAvailable);
};

let mut lines = {
use futures::stream;
use tokio_stream::wrappers::LinesStream;
let log_output = {
let mut sender = sender.clone();

let stdout = io::BufReader::new(stdout.expect("piped stdout"));
let stderr = io::BufReader::new(stderr.expect("piped stderr"));
let mut lines = {
use futures::stream;
use tokio_stream::wrappers::LinesStream;

stream::select(
LinesStream::new(stdout.lines()),
LinesStream::new(stderr.lines()),
)
};
let stdout = stdout.expect("piped stdout");
let stderr = stderr.expect("piped stderr");

let log_output = {
let mut sender = sender.clone();
let stdout = io::BufReader::new(stdout);
let stderr = io::BufReader::new(stderr);

stream::select(
LinesStream::new(stdout.lines()),
LinesStream::new(stderr.lines()),
)
};

async move {
while let Some(line) = lines.next().await {
Expand All @@ -339,7 +326,7 @@ impl Assistant {

let check_health = async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
time::sleep(Duration::from_secs(1)).await;

if let Ok(response) = reqwest::get(format!(
"http://localhost:{port}/health",
Expand All @@ -355,11 +342,11 @@ impl Assistant {
}
.boxed();

if futures::future::select(log_output, check_health)
.await
.factor_first()
.0
{
let log_handle = task::spawn(log_output);

if check_health.await {
log_handle.abort();

return Ok(BootEvent::Finished(Assistant {
file,
_server: Arc::new(server),
Expand Down Expand Up @@ -544,37 +531,6 @@ impl Assistant {
pub fn name(&self) -> &str {
self.file.model.name()
}

fn launch_with_executable(
executable: &'static str,
file: &model::File,
backend: Backend,
) -> Result<process::Child, Error> {
let gpu_flags = match backend {
Backend::Cpu => "",
Backend::Cuda | Backend::Rocm => "--gpu-layers 80",
};

let server = process::Command::new(executable)
.args(Self::parse_args(&format!(
"--model models/{filename} \
--port 8080 --host 0.0.0.0 {gpu_flags}",
filename = file.name,
)))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;

Ok(server)
}

fn parse_args(command: &str) -> impl Iterator<Item = &str> {
command
.split(' ')
.map(str::trim)
.filter(|arg| !arg.is_empty())
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -644,6 +600,39 @@ enum Server {
Process(process::Child),
}

impl Server {
fn launch_with_executable(
executable: &'static str,
file: &model::File,
backend: Backend,
) -> Result<process::Child, Error> {
let gpu_flags = match backend {
Backend::Cpu => "",
Backend::Cuda | Backend::Rocm => "--gpu-layers 80",
};

let server = process::Command::new(executable)
.args(Self::parse_args(&format!(
"--model models/{filename} \
--port 8080 --host 0.0.0.0 {gpu_flags}",
filename = file.name,
)))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;

Ok(server)
}

fn parse_args(command: &str) -> impl Iterator<Item = &str> {
command
.split(' ')
.map(str::trim)
.filter(|arg| !arg.is_empty())
}
}

impl Drop for Server {
fn drop(&mut self) {
use std::process;
Expand All @@ -664,7 +653,7 @@ impl Drop for Server {

#[derive(Debug, Clone)]
pub enum BootEvent {
Progressed { stage: &'static str, percent: u64 },
Progressed { stage: &'static str, percent: u32 },
Logged(String),
Finished(Assistant),
}
3 changes: 1 addition & 2 deletions core/src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::model;
use crate::plan::{self, Plan};
use crate::Error;

use futures::Stream;
use serde::{Deserialize, Serialize};
use sipper::{sipper, Sipper, Straw};
use sipper::{sipper, Sipper, Straw, Stream};
use tokio::fs;
use tokio::task;
use uuid::Uuid;
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub use model::Model;
pub use plan::Plan;
pub use url::Url;

mod request;

use std::io;
use std::sync::Arc;
use tokio::task;
Expand Down
2 changes: 1 addition & 1 deletion core/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Plan {
};

let plan = design(assistant, &history).run(&progress).await?;
let _ = progress.send(Event::Designed(plan.clone())).await;
progress.send(Event::Designed(plan.clone())).await;

execute(assistant, &history, query, &plan)
.run(progress)
Expand Down
Loading

0 comments on commit beb5e71

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/hecrj/icebreaker/commit/beb5e71c630d67ecacb8f7d818938b90c19c7800

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy