#[tokio::main]async fn main() { let server = Server::new(); server.host("0.0.0.0").await; server.port(8080).await; server.route("/concurrent", concurrent_handler).await; server.route("/stats", stats_handler).await; server.run().await.unwrap();}
async fn concurrent_handler(ctx: Context) { let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed); let start_time = std::time::Instant::now(); let result = simulate_async_work(request_id).await; let duration = start_time.elapsed();
ctx.set_response_status_code(200).await .set_response_header("X-Request-ID", request_id.to_string()).await .set_response_header("X-Process-Time", format!("{}μs", duration.as_micros())).await .set_response_body(result).await;}
async fn memory_efficient_handler(ctx: Context) { let memory_before = get_memory_usage(); let mut tasks = Vec::new();
for i in 0..1000 { tasks.push(tokio::spawn(async move { lightweight_operation(i).await })); }
let results: Vec<_> = futures::future::join_all(tasks).await; let memory_after = get_memory_usage();
let report = MemoryUsageReport { tasks_created: 1000, memory_used_kb: (memory_after - memory_before) / 1024, memory_per_task_bytes: (memory_after - memory_before) / 1000, successful_tasks: results.iter().filter(|r| r.is_ok()).count(), };
ctx.set_response_status_code(200).await .set_response_body(serde_json::to_string(&report).unwrap()).await;}
async fn event_loop_demo(ctx: Context) { let cpu_task = tokio::spawn(cpu_intensive_work()); let io_task = tokio::spawn(io_intensive_work()); let mixed_task = tokio::spawn(mixed_workload());
let (cpu_res, io_res, mixed_res) = tokio::join!(cpu_task, io_task, mixed_task);
// Trả về hiệu suất và trạng thái các tác vụ}
Semaphore
giới hạn số lượng kết nối đồng thời.async fn backpressure_demo(ctx: Context) { let semaphore = Arc::new(Semaphore::new(100)); let permit = match semaphore.try_acquire() { Ok(permit) => permit, Err(_) => { ctx.set_response_status_code(503).await .set_response_body("Server too busy, please try again later").await; return; } };
let result = process_with_backpressure().await; drop(permit);
ctx.set_response_status_code(200).await .set_response_body(result).await;}
VecDeque
kết hợp Mutex
để tái sử dụng kết nối một cách tối ưu.Thuộc tính | Ý nghĩa |
---|---|
max_size | Số lượng kết nối tối đa |
current_size | Số lượng kết nối đang hoạt động |
connections | Hàng đợi các kết nối sẵn sàng tái sử dụng |
struct ConnectionPool { connections: Arc<Mutex<VecDeque<Connection>>>, max_size: usize, current_size: Arc<AtomicU64>,}
Tham số | Mô tả |
---|---|
Total Requests | Tổng số yêu cầu đã xử lý |
Active Connections | Số kết nối đang hoạt động |
Memory Usage (MB) | Bộ nhớ đang sử dụng |
CPU Usage (%) | Phần trăm tải CPU |
Avg. Response Time (ms) | Thời gian phản hồi trung bình |
Throughput (rps) | Số yêu cầu xử lý mỗi giây |
async fn stats_handler(ctx: Context) { let stats = ConcurrencyStats { total_requests: REQUEST_COUNTER.load(Ordering::Relaxed), active_connections: get_active_connections(), memory_usage_mb: get_memory_usage() / 1024 / 1024, cpu_usage_percent: get_cpu_usage(), average_response_time_ms: get_average_response_time(), throughput_rps: get_throughput(), };
ctx.set_response_status_code(200).await .set_response_header("Content-Type", "application/json").await .set_response_body(serde_json::to_string(&stats).unwrap()).await;}