1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::net::SocketAddr;
use std::io;
use std::sync::Arc;
use mio::udp::UdpSocket;
use mio::EventSet;
use ::authority::Catalog;
use ::op::*;
use ::serialize::binary::*;
pub struct UdpHandler {
state: UdpState,
addr: SocketAddr,
message: Message,
buffer: Vec<u8>,
}
impl UdpHandler {
pub fn new_client(server_addr: SocketAddr, request: Message) -> Self {
let mut bytes: Vec<u8> = Vec::with_capacity(512);
{
let mut encoder: BinEncoder = BinEncoder::new(&mut bytes);
request.emit(&mut encoder).unwrap();
}
UdpHandler{ state: UdpState::Writing, addr: server_addr, message: request, buffer: bytes}
}
pub fn new_server(socket: &UdpSocket, catalog: Arc<Catalog>) -> Option<Self> {
let mut buf: [u8; 4096] = [0u8; 4096];
let recv_result = socket.recv_from(&mut buf);
match recv_result {
Ok(Some((length, addr))) => {
debug!("revieved {} bytes from {:?}", length, addr);
let request = {
let mut decoder = BinDecoder::new(&buf);
Message::read(&mut decoder)
};
let response = match request {
Err(ref decode_error) => {
warn!("unable to decode request from client: {:?}: {}", addr, decode_error);
Catalog::error_msg(0, OpCode::Query, ResponseCode::FormErr)
},
Ok(ref req) => catalog.handle_request(req),
};
let buf = Self::serialize_msg(buf.iter().take(length).cloned().collect(), &response);
if buf.len() > request.unwrap().get_max_payload() as usize {
let truncated_response = response.truncate();
let buf = Self::serialize_msg(buf, &truncated_response);
Some(UdpHandler{ state: UdpState::Writing, addr: addr, message: response, buffer: buf})
} else {
Some(UdpHandler{ state: UdpState::Writing, addr: addr, message: response, buffer: buf})
}
},
Err(e) => {
warn!("error recieving on socket {:?}: {}", socket, e);
None
}
_ => None,
}
}
pub fn remote_addr(&self) -> SocketAddr {
self.addr
}
pub fn serialize_msg(mut buf: Vec<u8>, response: &Message) -> Vec<u8> {
buf.clear();
let encode_result = {
let mut encoder:BinEncoder = BinEncoder::new(&mut buf);
response.emit(&mut encoder)
};
if let Err(encode_error) = encode_result {
error!("error encoding response to client: {}", encode_error);
Self::serialize_msg(buf, &Catalog::error_msg(response.get_id(), response.get_op_code(), ResponseCode::ServFail))
} else {
buf
}
}
pub fn handle_message(&self, socket: &UdpSocket, events: EventSet) -> io::Result<UdpState> {
match self.state {
UdpState::Reading => {
unimplemented!();
},
UdpState::Writing => {
if events.is_writable() {
info!("sending message to: {} id: {} rcode: {:?}", self.addr, self.message.get_id(), self.message.get_response_code());
match socket.send_to(&self.buffer, &self.addr) {
Ok(..) => {
Ok(UdpState::Done)
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
Ok(UdpState::Writing)
},
Err(e) => {
Err(e)
}
}
} else {
Ok(UdpState::Writing)
}
},
UdpState::Done => panic!("This handler should have been removed or reset"),
}
}
}
pub enum UdpState {
Reading,
Writing,
Done,
}