blob: 4c52631388dbf8f5ce6c54d3f6492b8a7395f38f [file] [log] [blame] [edit]
use super::requests::*;
use jsonrpc_core as jsonrpc;
use server::{Action, Request, Response};
use server::io::Output;
use actions::InitActionContext;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::fmt;
lazy_static! {
static ref TIMEOUT: Duration = Duration::from_millis(::COMPILER_TIMEOUT);
}
/// Macro enum `DispatchRequest` packing in various similar `Request` types
macro_rules! define_dispatch_request_enum {
($($request_type:ident),*) => {
pub enum DispatchRequest {
$(
$request_type($request_type, Request<$request_type>, InitActionContext),
)*
}
$(
impl From<(Request<$request_type>, InitActionContext)> for DispatchRequest {
fn from((req, ctx): (Request<$request_type>, InitActionContext)) -> Self {
DispatchRequest::$request_type($request_type::new(), req, ctx)
}
}
)*
impl DispatchRequest {
fn handle<O: Output>(self, out: &O) {
match self {
$(
DispatchRequest::$request_type(mut var, req, ctx) => {
let Request { id, params, received, .. } = req;
let fallback = var.fallback_response();
let timeout = var.timeout();
let receiver = receive_from_thread(move || {
// checking timeout here can prevent starting expensive work that has
// already timed out due to previous long running requests
// Note: done here on the threadpool as pool scheduling may incur
// a further delay
if received.elapsed() >= timeout {
var.fallback_response()
}
else {
var.handle(ctx, params)
}
});
match receiver.recv_timeout(timeout).unwrap_or(fallback) {
Ok(response) => response.send(id, out),
Err(ResponseError::Empty) => debug!("Error handling request"),
Err(ResponseError::Message(code, msg)) => {
out.failure_message(id, code, msg)
}
}
}
)*
}
}
}
}
}
define_dispatch_request_enum!(
Completion,
Definition,
References,
WorkspaceSymbol,
Symbols,
Hover,
FindImpls,
DocumentHighlight,
Rename,
CodeAction
);
/// Provides ability to dispatch requests to a worker thread that will
/// handle the requests sequentially, without blocking stdin.
/// Requests dispatched this way are automatically timed out & avoid
/// processing if have already timed out before starting.
pub struct Dispatcher {
sender: mpsc::Sender<DispatchRequest>,
request_handled_receiver: mpsc::Receiver<()>,
/// Number of as-yet-unhandled requests dispatched to the worker thread
in_flight_requests: usize,
}
impl Dispatcher {
/// Creates a new `Dispatcher` starting a new thread and channel
pub fn new<O: Output>(out: O) -> Self {
let (sender, receiver) = mpsc::channel::<DispatchRequest>();
let (request_handled_sender, request_handled_receiver) = mpsc::channel::<()>();
thread::Builder::new()
.name("dispatch-worker".into())
.spawn(move || {
while let Ok(request) = receiver.recv() {
request.handle(&out);
let _ = request_handled_sender.send(());
}
})
.unwrap();
Self {
sender,
request_handled_receiver,
in_flight_requests: 0,
}
}
/// Blocks until all dispatched requests have been handled
pub fn await_all_dispatched(&mut self) {
while self.in_flight_requests != 0 {
self.request_handled_receiver.recv().unwrap();
self.in_flight_requests -= 1;
}
}
/// Sends a request to the dispatch-worker thread, does not block
pub fn dispatch<R: Into<DispatchRequest>>(&mut self, request: R) {
if let Err(err) = self.sender.send(request.into()) {
debug!("Failed to dispatch request: {:?}", err);
} else {
self.in_flight_requests += 1;
}
// Clear the handled queue if possible in a non-blocking way
while self.request_handled_receiver.try_recv().is_ok() {
self.in_flight_requests -= 1;
}
}
}
/// Stdin-nonblocking request logic designed to be packed into a `DispatchRequest`
/// and handled on the `WORK_POOL` via a `Dispatcher`.
pub trait RequestAction: Action {
/// Serializable response type
type Response: ::serde::Serialize + fmt::Debug + Send;
/// Max duration this request should finish within, also see `fallback_response()`
fn timeout(&self) -> Duration {
*TIMEOUT
}
///
fn new() -> Self;
/// Returns a response used in timeout scenarios
fn fallback_response(&self) -> Result<Self::Response, ResponseError>;
/// Request processing logic
fn handle(
&mut self,
ctx: InitActionContext,
params: Self::Params,
) -> Result<Self::Response, ResponseError>;
}
/// Wrapper for a response error
pub enum ResponseError {
/// Error with no special response to the client
Empty,
/// Error with a response to the client
Message(jsonrpc::ErrorCode, String),
}
impl From<()> for ResponseError {
fn from(_: ()) -> Self {
ResponseError::Empty
}
}