Rust学习笔记
Rust编程语言入门教程课程笔记
Lecture 20: Final Project: Building a Multithreaded Web Server
src/main.rs
use std::fs;
use std::{
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
};
use std::thread;
use std::time::Duration;
use hello::ThreadPool;
fn main() {
// bind to a port
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);// set a limit on the number of threads in the pool
// listen for incoming connections
for stream in listener.incoming().take(2) {// only accept two connections
let stream = stream.unwrap();
// println!("Connection established!");
// handle each connection
// handle_connection(stream);
// spawn a new thread for each connection
// thread::spawn(|| {
// handle_connection(stream);//suffer from ddos attack
// });
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
// let buf_reader = BufReader::new(&mut stream);
// let http_request: Vec<_> = buf_reader
// .lines()
// .map(|result| result.unwrap())
// .take_while(|line| !line.is_empty())
// .collect();
// println!("Request: {:#?}", http_request);
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1rn";
let sleep = b"GET /sleep HTTP/1.1rn";
//Reading a Request
// Request Format
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body
// Example
// Request: [
// "GET / HTTP/1.1",
// "Host: 127.0.0.1:7878",
// "Connection: keep-alive",
// "sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"",
// "sec-ch-ua-mobile: ?0",
// "sec-ch-ua-platform: "macOS"",
// "Upgrade-Insecure-Requests: 1",
// "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
// "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
// "Sec-Fetch-Site: none",
// "Sec-Fetch-Mode: navigate",
// "Sec-Fetch-User: ?1",
// "Sec-Fetch-Dest: document",
// "Accept-Encoding: gzip, deflate, br",
// "Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",
// ]
//Writing a Response
// Response Format
// HTTP-Version Status-Code Reason-Phrase CRLF
// headers CRLF
// message-body
//let response = "HTTP/1.1 200 OKrnrn";
// let contents = fs::read_to_string("hello.html").unwrap();
// let response = format!("HTTP/1.1 200 OKrnrn{}", contents);
// write the response to the stream
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();
// if buffer.starts_with(get) {
// let contents = fs::read_to_string("hello.html").unwrap();
// let response = format!("HTTP/1.1 200 OKrnrn{}", contents);
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();
// } else {
// // other request
// let status_line = "HTTP/1.1 404 NOT FOUNDrnrn";
// let contents = fs::read_to_string("404.html").unwrap();
// let response = format!("{}{}", status_line, contents);
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();
// }
// Refactoring
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OKrnrn", "hello.html")
} else if buffer.starts_with(sleep) {// simulate a slow request
// simulate a slow request
std::thread::sleep(std::time::Duration::from_secs(5));
("HTTP/1.1 200 OKrnrn", "hello.html")
}
else {
("HTTP/1.1 404 NOT FOUNDrnrn", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
src/lib.rs
use std::thread;
use std::sync::{mpsc, Arc, Mutex};
enum Message {
NewJob(Job),
Terminate,
}
struct Worker {
id: usize,
// thread: thread::JoinHandle<()>,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
// let thread = thread::spawn(|| {});
// let thread = thread::spawn(|| {
// receiver;
// });
// let thread = thread::spawn(move || loop {
// while let Ok(_) = receiver.lock().unwrap().recv().unwrap(){
// println!("Worker {} got a job; executing.", id);
// job.call_box();
// }
// });
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
});
Worker { id, thread: Some(thread) }
}
// fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// let thread = thread::spawn(|| {
// // loop {
// // receiver.lock().unwrap().recv().unwrap();
// // }
// loop {
// let job = receiver.lock().unwrap().recv().unwrap();
// println!("Worker {} got a job; executing.", id);
// job.call_box();
// }
// });
// Worker { id, thread }
// }
}
pub struct ThreadPool {
// threads: Vec<thread::JoinHandle<()>>,
workers: Vec<Worker>,
// sender: mpsc::Sender<Job>,
sender: mpsc::Sender<Message>,
}
impl ThreadPool {
/// Create a new ThreadPool
///
/// The size is the number of threads in the pool
///
/// # Panics
///
/// The `new` function will panic if the size is zero
//
// pub fn new(size: usize) -> ThreadPool {
// assert!(size > 0);
// let (sender, receiver) = mpsc::channel();
// let mut workers = Vec::with_capacity(size);
// for id in 0..size {
// workers.push(Worker::new(id, receiver));
// }
// ThreadPool { workers, sender }
// }
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);//Box<dyn FnOnce() + Send + 'static>
// self.sender.send(job).unwrap();
self.sender.send(Message::NewJob(job)).unwrap();
}
// the execute method should be similar with thread::spawn
// pub fn spawn<F, T>(f: F) -> JoinHandle<T>
// where
// F: FnOnce() -> T + Send + 'static,
// T: Send + 'static,
// {
// Builder::new().spawn(f).expect("failed to spawn thread")
// }
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
// for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// worker.thread.join().unwrap();
// }
// for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// // worker.thread.join().unwrap();
// if let Ok(_) = worker.thread.join() {
// println!("Worker {} shut down successfully.", worker.id);
// } else {
// println!("Worker {} failed to shut down.", worker.id);
// }
// }
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// worker.thread.join().unwrap();
if let Some(thread) = worker.thread.take() {
if let Ok(_) = thread.join() {
println!("Worker {} shut down successfully.", worker.id);
} else {
println!("Worker {} failed to shut down.", worker.id);
}
}
}
}
}
// struct Job;
type Job = Box<dyn FnBox + Send + 'static>;
trait FnBox {
fn call_box(self: Box<Self>);
}
impl <F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Hello World</title>
</head>
<body>
<h1>Hello World</h1>
<p>
<?php
echo "Hello World";
?>
</p>
</body>
</html>
404.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>404</title>
</head>
<body>
<h1>Oops!</h1>
<p>Page not found</p>
</body>
</html>
原文地址:https://blog.csdn.net/weixin_45347752/article/details/134623927
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_892.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。