pub mod types; use parity_tokio_ipc::Endpoint as IpcEndpoint; #[cfg(unix)] use std::{env, fs, os::unix::fs::PermissionsExt}; use std::{ future::Future, io, pin::Pin, task::{Context, Poll}, }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::transport::{server::Connected, Endpoint, Server, Uri}; use tower::service_fn; pub use tonic::{async_trait, transport::Channel, Code, Request, Response, Status}; pub type ManagementServiceClient = types::management_service_client::ManagementServiceClient; pub use types::management_service_server::{ManagementService, ManagementServiceServer}; #[cfg(unix)] lazy_static::lazy_static! { static ref MULLVAD_MANAGEMENT_SOCKET_GROUP: Option = env::var("MULLVAD_MANAGEMENT_SOCKET_GROUP") .ok(); } #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { #[error(display = "Management RPC server or client error")] GrpcTransportError(#[error(source)] tonic::transport::Error), #[error(display = "Failed to start IPC pipe/socket")] StartServerError(#[error(source)] io::Error), #[error(display = "Failed to initialize pipe/socket security attributes")] SecurityAttributes(#[error(source)] io::Error), #[error(display = "Unable to set permissions for IPC endpoint")] PermissionsError(#[error(source)] io::Error), #[cfg(unix)] #[error(display = "Group not found")] NoGidError, #[cfg(unix)] #[error(display = "Failed to obtain group ID")] ObtainGidError(#[error(source)] nix::Error), #[cfg(unix)] #[error(display = "Failed to set group ID")] SetGidError(#[error(source)] nix::Error), } pub async fn new_rpc_client() -> Result { let ipc_path = mullvad_paths::get_rpc_socket_path(); // The URI will be ignored let channel = Endpoint::from_static("lttp://[::]:50051") .connect_with_connector(service_fn(move |_: Uri| { IpcEndpoint::connect(ipc_path.clone()) })) .await .map_err(Error::GrpcTransportError)?; Ok(ManagementServiceClient::new(channel)) } pub type ServerJoinHandle = tokio::task::JoinHandle>; pub async fn spawn_rpc_server + Send + 'static>( service: T, abort_rx: F, ) -> std::result::Result { use futures::stream::TryStreamExt; use parity_tokio_ipc::SecurityAttributes; let socket_path = mullvad_paths::get_rpc_socket_path(); let mut endpoint = IpcEndpoint::new(socket_path.to_string_lossy().to_string()); endpoint.set_security_attributes( SecurityAttributes::allow_everyone_create() .map_err(Error::SecurityAttributes)? .set_mode(0o766) .map_err(Error::SecurityAttributes)?, ); let incoming = endpoint.incoming().map_err(Error::StartServerError)?; #[cfg(unix)] if let Some(group_name) = &*MULLVAD_MANAGEMENT_SOCKET_GROUP { let group = nix::unistd::Group::from_name(group_name) .map_err(Error::ObtainGidError)? .ok_or(Error::NoGidError)?; nix::unistd::chown(&socket_path, None, Some(group.gid)).map_err(Error::SetGidError)?; fs::set_permissions(&socket_path, PermissionsExt::from_mode(0o760)) .map_err(Error::PermissionsError)?; } Ok(tokio::spawn(async move { Server::builder() .add_service(ManagementServiceServer::new(service)) .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx) .await .map_err(Error::GrpcTransportError) })) } #[derive(Debug)] struct StreamBox(pub T); impl Connected for StreamBox { type ConnectInfo = Option<()>; fn connect_info(&self) -> Self::ConnectInfo { None } } impl AsyncRead for StreamBox { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } impl AsyncWrite for StreamBox { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.0).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_shutdown(cx) } }