summaryrefslogtreecommitdiffhomepage
path: root/talpid-core
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-07-01 11:01:03 +0200
committerDavid Lönnhager <david.l@mullvad.net>2021-08-26 13:46:33 +0200
commitd6f2ebac9a67df07bd596ffc8e0a10b368192c04 (patch)
tree682a11fddeaf067e972b8710f4534338ea609bc5 /talpid-core
parent5538762a00723fbcddd52cd7dc01bc0cc7eb3a37 (diff)
downloadmullvadvpn-d6f2ebac9a67df07bd596ffc8e0a10b368192c04.tar.xz
mullvadvpn-d6f2ebac9a67df07bd596ffc8e0a10b368192c04.zip
Add NTFS reparse point monitor
Diffstat (limited to 'talpid-core')
-rw-r--r--talpid-core/src/split_tunnel/windows/mod.rs58
-rw-r--r--talpid-core/src/split_tunnel/windows/path_monitor.rs820
2 files changed, 876 insertions, 2 deletions
diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs
index 2e538a014b..4df5e5d4ce 100644
--- a/talpid-core/src/split_tunnel/windows/mod.rs
+++ b/talpid-core/src/split_tunnel/windows/mod.rs
@@ -1,4 +1,5 @@
mod driver;
+mod path_monitor;
mod windows;
use crate::{
@@ -17,7 +18,7 @@ use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
os::windows::io::{AsRawHandle, RawHandle},
ptr,
- sync::{mpsc as sync_mpsc, Arc, Weak},
+ sync::{mpsc as sync_mpsc, Arc, Mutex, Weak},
time::Duration,
};
use talpid_types::{tunnel::ErrorStateCause, ErrorExt};
@@ -85,6 +86,10 @@ pub enum Error {
/// The request handling thread is down
#[error(display = "The ST request thread is down")]
RequestThreadDown,
+
+ /// Failed to start the NTFS reparse point monitor
+ #[error(display = "Failed to start path monitor")]
+ StartPathMonitor(#[error(source)] io::Error),
}
/// Manages applications whose traffic to exclude from the tunnel.
@@ -288,6 +293,12 @@ impl SplitTunnel {
let (tx, rx): (RequestTx, _) = sync_mpsc::sync_channel(3);
let (init_tx, init_rx) = sync_mpsc::channel();
+ let (path_monitor, path_change_rx) =
+ path_monitor::PathMonitor::spawn().map_err(Error::StartPathMonitor)?;
+
+ let monitored_paths = Arc::new(Mutex::new(vec![]));
+ let monitored_paths_copy = monitored_paths.clone();
+
std::thread::spawn(move || {
let result = driver::DeviceHandle::new()
.map(Arc::new)
@@ -306,11 +317,27 @@ impl SplitTunnel {
while let Ok((request, response_tx)) = rx.recv() {
let response = match request {
Request::SetPaths(paths) => {
- if paths.len() > 0 {
+ let mut monitored_paths_guard = monitored_paths.lock().unwrap();
+
+ let result = if paths.len() > 0 {
handle.set_config(&paths).map_err(Error::SetConfiguration)
} else {
handle.clear_config().map_err(Error::SetConfiguration)
+ };
+
+ if result.is_ok() {
+ if let Err(error) = path_monitor.set_paths(&paths) {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to update path monitor")
+ );
+ monitored_paths_guard.clear();
+ } else {
+ *monitored_paths_guard = paths.to_vec();
+ }
}
+
+ result
}
Request::RegisterIps(
mut tunnel_ipv4,
@@ -331,12 +358,39 @@ impl SplitTunnel {
log::error!("A response could not be sent for a completed request");
}
}
+
+ if let Err(error) = path_monitor.shutdown() {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to shut down path monitor")
+ );
+ }
});
let handle = init_rx
.recv_timeout(REQUEST_TIMEOUT)
.map_err(|_| Error::RequestThreadStuck)??;
+ let handle_copy = handle.clone();
+
+ std::thread::spawn(move || {
+ while let Ok(()) = path_change_rx.recv() {
+ let paths = monitored_paths_copy.lock().unwrap();
+ let result = if paths.len() > 0 {
+ log::debug!("Re-resolving excluded paths");
+ handle_copy.set_config(&*paths)
+ } else {
+ continue;
+ };
+ if let Err(error) = result {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to update excluded paths")
+ );
+ }
+ }
+ });
+
Ok((tx, handle))
}
diff --git a/talpid-core/src/split_tunnel/windows/path_monitor.rs b/talpid-core/src/split_tunnel/windows/path_monitor.rs
new file mode 100644
index 0000000000..febc0eff67
--- /dev/null
+++ b/talpid-core/src/split_tunnel/windows/path_monitor.rs
@@ -0,0 +1,820 @@
+use std::{
+ collections::HashSet,
+ ffi::{OsStr, OsString},
+ fs, io, mem,
+ os::windows::{
+ ffi::{OsStrExt, OsStringExt},
+ fs::OpenOptionsExt,
+ io::{AsRawHandle, RawHandle},
+ },
+ path::{Path, PathBuf},
+ pin::Pin,
+ ptr,
+ sync::{mpsc as sync_mpsc, Arc},
+ time::{Duration, Instant},
+};
+use winapi::{
+ self,
+ shared::{
+ minwindef::TRUE,
+ winerror::{ERROR_NOT_FOUND, ERROR_OPERATION_ABORTED},
+ },
+ um::{
+ fileapi::{GetFileAttributesW, GetFullPathNameW},
+ handleapi::{CloseHandle, INVALID_HANDLE_VALUE},
+ ioapiset::{
+ CancelIoEx, CreateIoCompletionPort, DeviceIoControl, GetQueuedCompletionStatus,
+ PostQueuedCompletionStatus,
+ },
+ minwinbase::OVERLAPPED,
+ stringapiset::CompareStringOrdinal,
+ winbase::{
+ ReadDirectoryChangesW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OPEN_REPARSE_POINT,
+ FILE_FLAG_OVERLAPPED, INFINITE,
+ },
+ winioctl::FSCTL_GET_REPARSE_POINT,
+ winnt::{
+ FILE_ATTRIBUTE_REPARSE_POINT, FILE_NOTIFY_CHANGE_DIR_NAME,
+ FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_INFORMATION, HANDLE,
+ IO_REPARSE_TAG_MOUNT_POINT, IO_REPARSE_TAG_SYMLINK, MAXIMUM_REPARSE_DATA_BUFFER_SIZE,
+ },
+ },
+};
+
+const SHUTDOWN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
+const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2);
+const PATH_MONITOR_COMPLETION_KEY_IGNORE: usize = usize::MAX;
+
+const CSTR_EQUAL: i32 = 2;
+
+const ANYSIZE_ARRAY: usize = 1;
+const SYMLINK_FLAG_RELATIVE: u32 = 0x00000001;
+
+
+// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/c3a420cb-8a72-4adf-87e8-eee95379d78f.
+#[repr(C)]
+struct ReparseData {
+ tag: u32,
+ data_length: u16,
+ reserved: u16,
+ data: [u8; ANYSIZE_ARRAY],
+}
+
+// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/ca069dad-ed16-42aa-b057-b6b207f447cc.
+#[repr(C)]
+struct ReparseDataMountPoint {
+ tag: u32,
+ data_length: u16,
+ reserved: i16,
+ // Offset to a pathname pointing to the target path.
+ sub_name_offset: u16,
+ sub_name_length: u16,
+ // Offset to a user-displayable pathname.
+ print_name_offset: u16,
+ print_name_length: u16,
+ path_buffer: [u16; ANYSIZE_ARRAY],
+}
+
+// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/b41f1cbf-10df-4a47-98d4-1c52a833d913.
+#[repr(C)]
+struct ReparseDataSymlink {
+ tag: u32,
+ data_length: u16,
+ reserved: i16,
+ // Offset to a pathname pointing to the target path.
+ sub_name_offset: u16,
+ sub_name_length: u16,
+ // Offset to a user-displayable pathname.
+ print_name_offset: u16,
+ print_name_length: u16,
+ flags: u32,
+ path_buffer: [u16; ANYSIZE_ARRAY],
+}
+
+fn strip_namespace<P: AsRef<Path>>(path: P) -> PathBuf {
+ // \??: symlink to "DosDevices"
+ path.as_ref()
+ .strip_prefix(r"\\??")
+ .map(PathBuf::from)
+ .unwrap_or(path.as_ref().to_path_buf())
+}
+
+macro_rules! get_reparse_path {
+ ($tag_type:ident, $data:ident) => {{
+ let reparse_data = &*($data.as_ptr() as *const $tag_type);
+ let last_offset = reparse_data.sub_name_offset as usize
+ + reparse_data.sub_name_length as usize
+ + memoffset::offset_of!($tag_type, path_buffer);
+
+ if last_offset > $data.len() {
+ log::error!("Ignoring mount point with out-of-bounds index");
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "link indices out-of-bounds",
+ ))
+ } else {
+ let path_buffer = (&reparse_data.path_buffer) as *const u16;
+ let parsed_path = std::slice::from_raw_parts(
+ path_buffer.offset(
+ (reparse_data.sub_name_offset as usize / mem::size_of::<u16>()) as isize,
+ ),
+ reparse_data.sub_name_length as usize / mem::size_of::<u16>(),
+ );
+ Ok::<PathBuf, io::Error>(PathBuf::from(OsString::from_wide(parsed_path)))
+ }
+ }};
+}
+
+/// Returns the target of a reparse point as an absolute path.
+/// If `path` is not a link, `None` is returned.
+fn resolve_link<T: AsRef<Path> + Copy>(path: T) -> io::Result<Option<PathBuf>> {
+ let mut data_buffer = vec![0u8; MAXIMUM_REPARSE_DATA_BUFFER_SIZE as usize];
+
+ let mut stripped_path = strip_namespace(path);
+ if !stripped_path.starts_with(r"\\?\") {
+ stripped_path = Path::new(r"\\?\").join(stripped_path);
+ }
+
+ // Note: `file_attributes()` doesn't include all attributes, so we must use GetfileAttributesW.
+ let mut u16_path: Vec<u16> = osstr_to_wide(&stripped_path);
+ let attributes = unsafe { GetFileAttributesW(u16_path.as_mut_ptr()) };
+
+ if (attributes & FILE_ATTRIBUTE_REPARSE_POINT) == 0 {
+ return Ok(None);
+ }
+
+ let file = fs::OpenOptions::new()
+ .read(true)
+ .custom_flags(FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS)
+ .open(path)?;
+
+ let mut bytes_returned = 0u32;
+
+ if unsafe {
+ DeviceIoControl(
+ file.as_raw_handle() as *mut _,
+ FSCTL_GET_REPARSE_POINT,
+ ptr::null_mut(),
+ 0u32,
+ data_buffer.as_mut_ptr() as *mut _,
+ data_buffer.len() as u32,
+ &mut bytes_returned,
+ ptr::null_mut(),
+ )
+ } == 0
+ {
+ return Err(io::Error::last_os_error());
+ }
+
+ if (bytes_returned as usize) < mem::size_of::<ReparseDataMountPoint>() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "invalid reparse point data",
+ ));
+ }
+
+ let reparse_tag = unsafe { &*(data_buffer.as_ptr() as *const ReparseData) }.tag;
+ match reparse_tag {
+ IO_REPARSE_TAG_SYMLINK => {
+ let is_relative = unsafe { &*(data_buffer.as_ptr() as *const ReparseDataSymlink) }
+ .flags
+ & SYMLINK_FLAG_RELATIVE
+ != 0;
+ let mut path_buf = unsafe { get_reparse_path!(ReparseDataSymlink, data_buffer) }?;
+
+ if is_relative {
+ if let Some(parent) = stripped_path.parent() {
+ path_buf = get_full_path_name(parent.join(path_buf))?;
+ }
+ } else {
+ path_buf = strip_namespace(path_buf);
+ }
+
+ Ok(Some(path_buf))
+ }
+ IO_REPARSE_TAG_MOUNT_POINT => {
+ let path_buf = unsafe { get_reparse_path!(ReparseDataMountPoint, data_buffer) }?;
+ Ok(Some(strip_namespace(path_buf)))
+ }
+ // unknown reparse tag
+ _ => Ok(None),
+ }
+}
+
+/// The same as [`resolve_all_links`] but for a set of paths.
+fn resolve_all_links_multiple<P: AsRef<Path>>(paths: &[P]) -> HashSet<PathBuf> {
+ let mut monitored_paths = HashSet::new();
+ for path in paths {
+ match resolve_all_links(path) {
+ Ok(paths) => monitored_paths.extend(paths),
+ Err(error) => {
+ log::error!("Failed to identify paths to monitor: {:?}", error);
+ }
+ }
+ }
+ monitored_paths
+}
+
+/// Returns all links and targets for a given path (including any of its parent directories).
+fn resolve_all_links<P: AsRef<Path>>(path: P) -> io::Result<Vec<PathBuf>> {
+ let mut monitor_paths = vec![path.as_ref().to_path_buf()];
+ let mut iter = path.as_ref().components();
+
+ let mut partial_path = PathBuf::new();
+ partial_path.push(iter.next().ok_or(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path is missing prefix",
+ ))?);
+ partial_path.push(iter.next().ok_or(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path is missing root",
+ ))?);
+
+ for component in &mut iter {
+ partial_path.push(component);
+ if let Ok(Some(target)) = resolve_link(&partial_path) {
+ monitor_paths.extend(resolve_all_links(target.join(iter))?);
+ break;
+ }
+ }
+
+ Ok(monitor_paths)
+}
+
+struct DirContext {
+ path: PathBuf,
+ dir_handle: fs::File,
+ buffer: Vec<u8>,
+ overlapped: Pin<Box<OVERLAPPED>>,
+ _io_completion_port: Arc<CompletionPort>,
+}
+
+impl DirContext {
+ fn new<P: AsRef<Path>>(
+ path: P,
+ io_completion_port: Arc<CompletionPort>,
+ completion_key: usize,
+ ) -> io::Result<DirContext> {
+ let dir_handle = fs::OpenOptions::new()
+ .read(true)
+ .custom_flags(FILE_FLAG_OVERLAPPED | FILE_FLAG_BACKUP_SEMANTICS)
+ .open(&path)?;
+
+ let handle = unsafe {
+ CreateIoCompletionPort(
+ dir_handle.as_raw_handle() as *mut _,
+ io_completion_port.as_raw_handle() as *mut _,
+ completion_key,
+ // num of threads is ignored here
+ 0,
+ )
+ };
+
+ if handle == ptr::null_mut() {
+ return Err(io::Error::last_os_error());
+ }
+
+ Ok(DirContext {
+ path: path.as_ref().to_path_buf(),
+ dir_handle,
+ buffer: vec![0u8; 4096],
+ overlapped: Box::pin(unsafe { mem::zeroed() }),
+ _io_completion_port: io_completion_port,
+ })
+ }
+
+ fn read_directory_changes(&mut self) -> io::Result<()> {
+ let mut _bytes_returned = 0;
+ if unsafe {
+ ReadDirectoryChangesW(
+ self.dir_handle.as_raw_handle() as *mut _,
+ self.buffer.as_mut_ptr() as *mut _,
+ self.buffer.len() as u32,
+ TRUE,
+ FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME,
+ &mut _bytes_returned,
+ &mut *self.overlapped,
+ None,
+ )
+ } == 0
+ {
+ return Err(io::Error::last_os_error());
+ }
+ Ok(())
+ }
+
+ fn path(&self) -> &Path {
+ &self.path
+ }
+
+ /// Try to cancel a request. On success, return whether a request was cancelled.
+ fn cancel_io(&mut self) -> io::Result<bool> {
+ if unsafe { CancelIoEx(self.dir_handle.as_raw_handle(), ptr::null_mut()) } == 0 {
+ match io::Error::last_os_error() {
+ _error if _error.raw_os_error() == Some(ERROR_NOT_FOUND as i32) => Ok(false),
+ error => Err(error),
+ }
+ } else {
+ Ok(true)
+ }
+ }
+}
+
+impl Drop for DirContext {
+ fn drop(&mut self) {
+ if let Err(error) = self.cancel_io() {
+ log::error!("Failed to cancel pending file I/O request: {}", error);
+ }
+ }
+}
+
+unsafe impl Send for DirContext {}
+unsafe impl Sync for DirContext {}
+
+struct CompletionStatus {
+ bytes_returned: u32,
+ completion_key: usize,
+ used_overlapped: *mut OVERLAPPED,
+}
+
+struct CompletionPort {
+ handle: HANDLE,
+}
+
+impl CompletionPort {
+ // `concurrent_threads`: 0 ==> number of processors
+ fn create(concurrent_threads: u32) -> io::Result<Self> {
+ let handle = unsafe {
+ CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, concurrent_threads)
+ };
+ if handle == ptr::null_mut() {
+ return Err(io::Error::last_os_error());
+ }
+ Ok(CompletionPort { handle })
+ }
+
+ fn get_queued_completion_status(
+ &self,
+ ) -> Result<CompletionStatus, (io::Error, CompletionStatus)> {
+ self.get_queued_completion_status_timeout(INFINITE)
+ }
+
+ fn get_queued_completion_status_timeout(
+ &self,
+ timeout: u32,
+ ) -> Result<CompletionStatus, (io::Error, CompletionStatus)> {
+ let mut result = CompletionStatus {
+ bytes_returned: 0,
+ completion_key: 0,
+ used_overlapped: ptr::null_mut(),
+ };
+
+ if unsafe {
+ GetQueuedCompletionStatus(
+ self.handle,
+ &mut result.bytes_returned,
+ &mut result.completion_key,
+ &mut result.used_overlapped,
+ timeout,
+ )
+ } == 0
+ {
+ return Err((io::Error::last_os_error(), result));
+ }
+
+ Ok(result)
+ }
+
+ fn post_queued_completion_status(
+ &self,
+ bytes_transferred: u32,
+ completion_key: usize,
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<()> {
+ if unsafe {
+ PostQueuedCompletionStatus(self.handle, bytes_transferred, completion_key, overlapped)
+ } == 0
+ {
+ return Err(io::Error::last_os_error());
+ }
+ Ok(())
+ }
+}
+
+impl AsRawHandle for CompletionPort {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.handle as *mut _
+ }
+}
+
+impl Drop for CompletionPort {
+ fn drop(&mut self) {
+ unsafe { CloseHandle(self.handle) };
+ }
+}
+
+unsafe impl Send for CompletionPort {}
+unsafe impl Sync for CompletionPort {}
+
+#[derive(Clone, Hash, PartialEq, Eq)]
+struct StrippedPath {
+ /// The volume that the path is on. For `C:\a\b\c`, this would be for `C:\`.
+ prefix: PathBuf,
+ /// The remainder of the path. For `C:\a\b\c`, this would be for `a\b\c`.
+ tail: Vec<u16>,
+}
+
+impl StrippedPath {
+ fn new(path: &PathBuf) -> io::Result<StrippedPath> {
+ let mut iter = path.components();
+ let prefix = iter.next().ok_or(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path is missing prefix",
+ ))?;
+ let prefix = Path::new(&prefix).join(iter.next().ok_or(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path is missing root",
+ ))?);
+
+ Ok(StrippedPath {
+ prefix: prefix.clone(),
+ tail: osstr_to_wide(iter.as_path()),
+ })
+ }
+}
+
+pub struct PathMonitorHandle {
+ port_handle: Arc<CompletionPort>,
+ tx: sync_mpsc::Sender<PathMonitorCommand>,
+}
+
+impl PathMonitorHandle {
+ pub fn set_paths<P: AsRef<Path>>(&self, paths: &[P]) -> io::Result<()> {
+ let _ = self.tx.send(PathMonitorCommand::SetPaths(
+ paths.iter().map(|p| p.as_ref().to_path_buf()).collect(),
+ ));
+ self.notify_monitor()
+ }
+
+ pub fn shutdown(&self) -> io::Result<()> {
+ let _ = self.tx.send(PathMonitorCommand::Shutdown);
+ self.notify_monitor()
+ }
+
+ fn notify_monitor(&self) -> io::Result<()> {
+ self.port_handle.post_queued_completion_status(
+ 0,
+ PATH_MONITOR_COMPLETION_KEY_IGNORE,
+ ptr::null_mut(),
+ )
+ }
+}
+
+pub type PathChangeNotifyRx = sync_mpsc::Receiver<()>;
+
+enum PathMonitorCommand {
+ Shutdown,
+ SetPaths(Vec<PathBuf>),
+}
+
+pub struct PathMonitor {
+ port_handle: Arc<CompletionPort>,
+ dir_contexts: Vec<DirContext>,
+ discarded_contexts: Vec<DirContext>,
+ stripped_paths: HashSet<StrippedPath>,
+}
+
+impl PathMonitor {
+ pub fn spawn() -> io::Result<(PathMonitorHandle, PathChangeNotifyRx)> {
+ let port_handle = Arc::new(CompletionPort::create(0)?);
+ let mut original_paths: Vec<PathBuf> = vec![];
+
+ let mut monitor = Self {
+ port_handle: port_handle.clone(),
+ dir_contexts: vec![],
+ discarded_contexts: vec![],
+ stripped_paths: HashSet::new(),
+ };
+
+ let (cmd_tx, cmd_rx) = sync_mpsc::channel();
+ let (notify_tx, notify_rx) = sync_mpsc::channel();
+
+ std::thread::spawn(move || {
+ loop {
+ if !monitor.service_commands(&mut original_paths, &cmd_rx) {
+ break;
+ }
+ match monitor.handle_next_completion_packet() {
+ Ok(true) => match monitor.update_paths(&original_paths) {
+ Ok(true) => {
+ let _ = notify_tx.send(());
+ }
+ Ok(false) => (),
+ Err(_) => break,
+ },
+ Ok(false) => (),
+ Err(error) => {
+ log::error!("handle_next_completion_packet failed: {}", error);
+ break;
+ }
+ }
+ }
+ log::debug!("Shutting down reparse point monitor");
+
+ monitor.abort_all_requests();
+ });
+
+ Ok((
+ PathMonitorHandle {
+ port_handle,
+ tx: cmd_tx,
+ },
+ notify_rx,
+ ))
+ }
+
+ fn service_commands(
+ &mut self,
+ original_paths: &mut Vec<PathBuf>,
+ cmd_rx: &sync_mpsc::Receiver<PathMonitorCommand>,
+ ) -> bool {
+ while let Some(cmd) = cmd_rx.try_iter().next() {
+ match cmd {
+ PathMonitorCommand::Shutdown => {
+ return false;
+ }
+ PathMonitorCommand::SetPaths(new_paths) => {
+ *original_paths = new_paths;
+ return !self.update_paths(&original_paths).is_err();
+ }
+ }
+ }
+ true
+ }
+
+ fn update_paths(&mut self, unresolved_paths: &[PathBuf]) -> Result<bool, ()> {
+ let resolved_paths = resolve_all_links_multiple(unresolved_paths);
+ let new_stripped_paths = resolved_paths
+ .iter()
+ .filter_map(|p| StrippedPath::new(p).ok())
+ .collect();
+ if new_stripped_paths != self.stripped_paths {
+ self.stripped_paths = new_stripped_paths;
+ if let Err(error) = self.update_directory_contexts() {
+ log::error!("Failed to open new directory handles: {}", error);
+ return Err(());
+ }
+ return Ok(true);
+ }
+ Ok(false)
+ }
+
+ fn update_directory_contexts(&mut self) -> io::Result<()> {
+ // Remove paths we no longer need to monitor
+ let len = self.dir_contexts.len();
+ for i in (0..len).rev() {
+ if !self
+ .stripped_paths
+ .iter()
+ .any(|p| p.prefix == self.dir_contexts[i].path)
+ {
+ let mut removed_ctx = self.dir_contexts.remove(i);
+ match removed_ctx.cancel_io() {
+ Ok(true) => self.discarded_contexts.push(removed_ctx),
+ Err(error) => {
+ log::error!("Failed to cancel pending I/O for dir context: {}", error);
+ mem::forget(removed_ctx)
+ }
+ Ok(false) => (),
+ }
+ }
+ }
+
+ // Add new paths to monitor
+ for path in &self.stripped_paths {
+ if self
+ .dir_contexts
+ .iter()
+ .any(|ctx| path.prefix == ctx.path())
+ {
+ continue;
+ }
+
+ let index = self.dir_contexts.len();
+ let mut ctx = match DirContext::new(&path.prefix, self.port_handle.clone(), index) {
+ Ok(ctx) => ctx,
+ Err(error) if error.kind() == io::ErrorKind::NotFound => {
+ log::warn!(
+ "Not monitoring reparse points on {} since it does not exist",
+ path.prefix.to_string_lossy()
+ );
+ continue;
+ }
+ Err(error) => return Err(error),
+ };
+ ctx.read_directory_changes()?;
+ self.dir_contexts.push(ctx);
+ }
+
+ Ok(())
+ }
+
+ fn handle_next_completion_packet(&mut self) -> io::Result<bool> {
+ let result = match self.port_handle.get_queued_completion_status() {
+ Ok(result) if result.completion_key == PATH_MONITOR_COMPLETION_KEY_IGNORE => {
+ return Ok(false);
+ }
+ Err((error, status)) => {
+ self.free_discarded_context(status.used_overlapped);
+ if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) {
+ log::error!("GetQueuedCompletionStatus failed: {:?}", error);
+ return Err(error);
+ }
+ return Ok(false);
+ }
+ Ok(result) => result,
+ };
+
+ if self.free_discarded_context(result.used_overlapped) {
+ return Ok(false);
+ }
+
+ let ctx_index = self
+ .find_context(result.used_overlapped)
+ .ok_or(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "received I/O completion packet without an associated DirContext",
+ ))?;
+
+ let changed = if result.bytes_returned == 0 {
+ log::debug!("Change event buffer is empty");
+ false
+ } else {
+ self.process_file_notification(&self.dir_contexts[ctx_index])?
+ };
+
+ if let Err(error) = self.dir_contexts[ctx_index].read_directory_changes() {
+ log::error!("Failed to queue new directory change event: {}", error);
+ return Err(error);
+ }
+
+ Ok(changed)
+ }
+
+ /// Find the index of the `DirContext` that owns the `OVERLAPPED` object, or None.
+ fn find_context(&self, overlapped: *const OVERLAPPED) -> Option<usize> {
+ if overlapped == ptr::null_mut() {
+ return None;
+ }
+ for i in 0..self.dir_contexts.len() {
+ if ((&*self.dir_contexts[i].overlapped) as *const _) == overlapped {
+ return Some(i);
+ }
+ }
+ None
+ }
+
+ /// Remove the element in `discarded_contexts` that owns the `OVERLAPPED` object, if it exists.
+ fn free_discarded_context(&mut self, overlapped: *const OVERLAPPED) -> bool {
+ if overlapped == ptr::null_mut() {
+ return false;
+ }
+ let mut was_discarded = false;
+ self.discarded_contexts.retain(|ctx| {
+ if ((&*ctx.overlapped) as *const _) != overlapped {
+ true
+ } else {
+ was_discarded = true;
+ false
+ }
+ });
+ was_discarded
+ }
+
+ fn process_file_notification(&self, dir_context: &DirContext) -> io::Result<bool> {
+ let mut info = dir_context.buffer.as_ptr() as *const FILE_NOTIFY_INFORMATION;
+ loop {
+ let current_field = unsafe { &*info };
+
+ let file_name = unsafe {
+ std::slice::from_raw_parts(
+ current_field.FileName.as_ptr(),
+ current_field.FileNameLength as usize / mem::size_of::<u16>(),
+ )
+ };
+
+ for path in &self.stripped_paths {
+ if path.prefix != dir_context.path() {
+ continue;
+ }
+ if path.tail.len() <= file_name.len() {
+ continue;
+ }
+ let cmp_status = unsafe {
+ CompareStringOrdinal(
+ path.tail.as_ptr(),
+ file_name.len() as i32,
+ file_name.as_ptr(),
+ file_name.len() as i32,
+ TRUE,
+ )
+ };
+ match cmp_status {
+ CSTR_EQUAL => return Ok(true),
+ 0 => log::error!("Bug: CompareStringOrdinal failed"),
+ _ => (),
+ }
+ }
+
+ if current_field.NextEntryOffset == 0 {
+ break;
+ }
+ info = unsafe { (info as *mut u8).offset(current_field.NextEntryOffset as isize) }
+ as *const FILE_NOTIFY_INFORMATION;
+ }
+ Ok(false)
+ }
+
+ /// Cancel all requests and give the cancelled operations some time to complete.
+ fn abort_all_requests(&mut self) {
+ let mut contexts = vec![];
+ for mut ctx in self
+ .dir_contexts
+ .drain(..)
+ .chain(self.discarded_contexts.drain(..))
+ {
+ match ctx.cancel_io() {
+ Ok(true) => contexts.push(ctx),
+ Ok(false) => (),
+ Err(error) => {
+ log::error!("Failed to cancel pending I/O request: {}", error);
+ mem::forget(ctx);
+ }
+ }
+ }
+
+ let time = Instant::now();
+ while !contexts.is_empty() {
+ if time.elapsed() >= SHUTDOWN_TIMEOUT {
+ log::error!("Timeout while cancelling I/O requests");
+ mem::forget(contexts);
+ return;
+ }
+
+ let result = match self
+ .port_handle
+ .get_queued_completion_status_timeout(SHUTDOWN_POLL_TIMEOUT.as_millis() as u32)
+ {
+ Ok(result) => result,
+ Err((error, result)) => {
+ if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) {
+ log::error!("GetQueuedCompletionStatus failed: {:?}", error);
+ if result.used_overlapped == ptr::null_mut() {
+ continue;
+ }
+ }
+ result
+ }
+ };
+ contexts.retain(|ctx| ((&*ctx.overlapped) as *const _) != result.used_overlapped);
+ }
+ }
+}
+
+fn get_full_path_name<T: AsRef<OsStr>>(path: T) -> io::Result<PathBuf> {
+ let path_buf_os: Vec<u16> = osstr_to_wide(path);
+ let mut full_path_buffer = vec![0u16; 2048 / mem::size_of::<u16>()];
+
+ let full_length = loop {
+ let required_length = unsafe {
+ GetFullPathNameW(
+ path_buf_os.as_ptr(),
+ full_path_buffer.len() as u32,
+ full_path_buffer.as_mut_ptr(),
+ ptr::null_mut(),
+ )
+ } as usize;
+
+ if required_length == 0 {
+ return Err(io::Error::last_os_error());
+ }
+
+ if required_length > full_path_buffer.len() {
+ full_path_buffer.resize(required_length, 0);
+ } else {
+ break required_length;
+ }
+ };
+
+ full_path_buffer.resize(full_length, 0);
+ Ok(PathBuf::from(OsString::from_wide(&full_path_buffer)))
+}
+
+/// Converts an `OsStr` to a null-terminated UTF-16 string.
+fn osstr_to_wide<T: AsRef<OsStr>>(string: T) -> Vec<u16> {
+ string
+ .as_ref()
+ .encode_wide()
+ .chain(std::iter::once(0u16))
+ .collect()
+}