diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-07-01 11:01:03 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-08-26 13:46:33 +0200 |
| commit | d6f2ebac9a67df07bd596ffc8e0a10b368192c04 (patch) | |
| tree | 682a11fddeaf067e972b8710f4534338ea609bc5 | |
| parent | 5538762a00723fbcddd52cd7dc01bc0cc7eb3a37 (diff) | |
| download | mullvadvpn-d6f2ebac9a67df07bd596ffc8e0a10b368192c04.tar.xz mullvadvpn-d6f2ebac9a67df07bd596ffc8e0a10b368192c04.zip | |
Add NTFS reparse point monitor
| -rw-r--r-- | talpid-core/src/split_tunnel/windows/mod.rs | 58 | ||||
| -rw-r--r-- | talpid-core/src/split_tunnel/windows/path_monitor.rs | 820 |
2 files changed, 876 insertions, 2 deletions
diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 2e538a014b..4df5e5d4ce 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -1,4 +1,5 @@ mod driver; +mod path_monitor; mod windows; use crate::{ @@ -17,7 +18,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr}, os::windows::io::{AsRawHandle, RawHandle}, ptr, - sync::{mpsc as sync_mpsc, Arc, Weak}, + sync::{mpsc as sync_mpsc, Arc, Mutex, Weak}, time::Duration, }; use talpid_types::{tunnel::ErrorStateCause, ErrorExt}; @@ -85,6 +86,10 @@ pub enum Error { /// The request handling thread is down #[error(display = "The ST request thread is down")] RequestThreadDown, + + /// Failed to start the NTFS reparse point monitor + #[error(display = "Failed to start path monitor")] + StartPathMonitor(#[error(source)] io::Error), } /// Manages applications whose traffic to exclude from the tunnel. @@ -288,6 +293,12 @@ impl SplitTunnel { let (tx, rx): (RequestTx, _) = sync_mpsc::sync_channel(3); let (init_tx, init_rx) = sync_mpsc::channel(); + let (path_monitor, path_change_rx) = + path_monitor::PathMonitor::spawn().map_err(Error::StartPathMonitor)?; + + let monitored_paths = Arc::new(Mutex::new(vec![])); + let monitored_paths_copy = monitored_paths.clone(); + std::thread::spawn(move || { let result = driver::DeviceHandle::new() .map(Arc::new) @@ -306,11 +317,27 @@ impl SplitTunnel { while let Ok((request, response_tx)) = rx.recv() { let response = match request { Request::SetPaths(paths) => { - if paths.len() > 0 { + let mut monitored_paths_guard = monitored_paths.lock().unwrap(); + + let result = if paths.len() > 0 { handle.set_config(&paths).map_err(Error::SetConfiguration) } else { handle.clear_config().map_err(Error::SetConfiguration) + }; + + if result.is_ok() { + if let Err(error) = path_monitor.set_paths(&paths) { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update path monitor") + ); + monitored_paths_guard.clear(); + } else { + *monitored_paths_guard = paths.to_vec(); + } } + + result } Request::RegisterIps( mut tunnel_ipv4, @@ -331,12 +358,39 @@ impl SplitTunnel { log::error!("A response could not be sent for a completed request"); } } + + if let Err(error) = path_monitor.shutdown() { + log::error!( + "{}", + error.display_chain_with_msg("Failed to shut down path monitor") + ); + } }); let handle = init_rx .recv_timeout(REQUEST_TIMEOUT) .map_err(|_| Error::RequestThreadStuck)??; + let handle_copy = handle.clone(); + + std::thread::spawn(move || { + while let Ok(()) = path_change_rx.recv() { + let paths = monitored_paths_copy.lock().unwrap(); + let result = if paths.len() > 0 { + log::debug!("Re-resolving excluded paths"); + handle_copy.set_config(&*paths) + } else { + continue; + }; + if let Err(error) = result { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update excluded paths") + ); + } + } + }); + Ok((tx, handle)) } diff --git a/talpid-core/src/split_tunnel/windows/path_monitor.rs b/talpid-core/src/split_tunnel/windows/path_monitor.rs new file mode 100644 index 0000000000..febc0eff67 --- /dev/null +++ b/talpid-core/src/split_tunnel/windows/path_monitor.rs @@ -0,0 +1,820 @@ +use std::{ + collections::HashSet, + ffi::{OsStr, OsString}, + fs, io, mem, + os::windows::{ + ffi::{OsStrExt, OsStringExt}, + fs::OpenOptionsExt, + io::{AsRawHandle, RawHandle}, + }, + path::{Path, PathBuf}, + pin::Pin, + ptr, + sync::{mpsc as sync_mpsc, Arc}, + time::{Duration, Instant}, +}; +use winapi::{ + self, + shared::{ + minwindef::TRUE, + winerror::{ERROR_NOT_FOUND, ERROR_OPERATION_ABORTED}, + }, + um::{ + fileapi::{GetFileAttributesW, GetFullPathNameW}, + handleapi::{CloseHandle, INVALID_HANDLE_VALUE}, + ioapiset::{ + CancelIoEx, CreateIoCompletionPort, DeviceIoControl, GetQueuedCompletionStatus, + PostQueuedCompletionStatus, + }, + minwinbase::OVERLAPPED, + stringapiset::CompareStringOrdinal, + winbase::{ + ReadDirectoryChangesW, FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OPEN_REPARSE_POINT, + FILE_FLAG_OVERLAPPED, INFINITE, + }, + winioctl::FSCTL_GET_REPARSE_POINT, + winnt::{ + FILE_ATTRIBUTE_REPARSE_POINT, FILE_NOTIFY_CHANGE_DIR_NAME, + FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_INFORMATION, HANDLE, + IO_REPARSE_TAG_MOUNT_POINT, IO_REPARSE_TAG_SYMLINK, MAXIMUM_REPARSE_DATA_BUFFER_SIZE, + }, + }, +}; + +const SHUTDOWN_POLL_TIMEOUT: Duration = Duration::from_millis(500); +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2); +const PATH_MONITOR_COMPLETION_KEY_IGNORE: usize = usize::MAX; + +const CSTR_EQUAL: i32 = 2; + +const ANYSIZE_ARRAY: usize = 1; +const SYMLINK_FLAG_RELATIVE: u32 = 0x00000001; + + +// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/c3a420cb-8a72-4adf-87e8-eee95379d78f. +#[repr(C)] +struct ReparseData { + tag: u32, + data_length: u16, + reserved: u16, + data: [u8; ANYSIZE_ARRAY], +} + +// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/ca069dad-ed16-42aa-b057-b6b207f447cc. +#[repr(C)] +struct ReparseDataMountPoint { + tag: u32, + data_length: u16, + reserved: i16, + // Offset to a pathname pointing to the target path. + sub_name_offset: u16, + sub_name_length: u16, + // Offset to a user-displayable pathname. + print_name_offset: u16, + print_name_length: u16, + path_buffer: [u16; ANYSIZE_ARRAY], +} + +// See https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/b41f1cbf-10df-4a47-98d4-1c52a833d913. +#[repr(C)] +struct ReparseDataSymlink { + tag: u32, + data_length: u16, + reserved: i16, + // Offset to a pathname pointing to the target path. + sub_name_offset: u16, + sub_name_length: u16, + // Offset to a user-displayable pathname. + print_name_offset: u16, + print_name_length: u16, + flags: u32, + path_buffer: [u16; ANYSIZE_ARRAY], +} + +fn strip_namespace<P: AsRef<Path>>(path: P) -> PathBuf { + // \??: symlink to "DosDevices" + path.as_ref() + .strip_prefix(r"\\??") + .map(PathBuf::from) + .unwrap_or(path.as_ref().to_path_buf()) +} + +macro_rules! get_reparse_path { + ($tag_type:ident, $data:ident) => {{ + let reparse_data = &*($data.as_ptr() as *const $tag_type); + let last_offset = reparse_data.sub_name_offset as usize + + reparse_data.sub_name_length as usize + + memoffset::offset_of!($tag_type, path_buffer); + + if last_offset > $data.len() { + log::error!("Ignoring mount point with out-of-bounds index"); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "link indices out-of-bounds", + )) + } else { + let path_buffer = (&reparse_data.path_buffer) as *const u16; + let parsed_path = std::slice::from_raw_parts( + path_buffer.offset( + (reparse_data.sub_name_offset as usize / mem::size_of::<u16>()) as isize, + ), + reparse_data.sub_name_length as usize / mem::size_of::<u16>(), + ); + Ok::<PathBuf, io::Error>(PathBuf::from(OsString::from_wide(parsed_path))) + } + }}; +} + +/// Returns the target of a reparse point as an absolute path. +/// If `path` is not a link, `None` is returned. +fn resolve_link<T: AsRef<Path> + Copy>(path: T) -> io::Result<Option<PathBuf>> { + let mut data_buffer = vec![0u8; MAXIMUM_REPARSE_DATA_BUFFER_SIZE as usize]; + + let mut stripped_path = strip_namespace(path); + if !stripped_path.starts_with(r"\\?\") { + stripped_path = Path::new(r"\\?\").join(stripped_path); + } + + // Note: `file_attributes()` doesn't include all attributes, so we must use GetfileAttributesW. + let mut u16_path: Vec<u16> = osstr_to_wide(&stripped_path); + let attributes = unsafe { GetFileAttributesW(u16_path.as_mut_ptr()) }; + + if (attributes & FILE_ATTRIBUTE_REPARSE_POINT) == 0 { + return Ok(None); + } + + let file = fs::OpenOptions::new() + .read(true) + .custom_flags(FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS) + .open(path)?; + + let mut bytes_returned = 0u32; + + if unsafe { + DeviceIoControl( + file.as_raw_handle() as *mut _, + FSCTL_GET_REPARSE_POINT, + ptr::null_mut(), + 0u32, + data_buffer.as_mut_ptr() as *mut _, + data_buffer.len() as u32, + &mut bytes_returned, + ptr::null_mut(), + ) + } == 0 + { + return Err(io::Error::last_os_error()); + } + + if (bytes_returned as usize) < mem::size_of::<ReparseDataMountPoint>() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid reparse point data", + )); + } + + let reparse_tag = unsafe { &*(data_buffer.as_ptr() as *const ReparseData) }.tag; + match reparse_tag { + IO_REPARSE_TAG_SYMLINK => { + let is_relative = unsafe { &*(data_buffer.as_ptr() as *const ReparseDataSymlink) } + .flags + & SYMLINK_FLAG_RELATIVE + != 0; + let mut path_buf = unsafe { get_reparse_path!(ReparseDataSymlink, data_buffer) }?; + + if is_relative { + if let Some(parent) = stripped_path.parent() { + path_buf = get_full_path_name(parent.join(path_buf))?; + } + } else { + path_buf = strip_namespace(path_buf); + } + + Ok(Some(path_buf)) + } + IO_REPARSE_TAG_MOUNT_POINT => { + let path_buf = unsafe { get_reparse_path!(ReparseDataMountPoint, data_buffer) }?; + Ok(Some(strip_namespace(path_buf))) + } + // unknown reparse tag + _ => Ok(None), + } +} + +/// The same as [`resolve_all_links`] but for a set of paths. +fn resolve_all_links_multiple<P: AsRef<Path>>(paths: &[P]) -> HashSet<PathBuf> { + let mut monitored_paths = HashSet::new(); + for path in paths { + match resolve_all_links(path) { + Ok(paths) => monitored_paths.extend(paths), + Err(error) => { + log::error!("Failed to identify paths to monitor: {:?}", error); + } + } + } + monitored_paths +} + +/// Returns all links and targets for a given path (including any of its parent directories). +fn resolve_all_links<P: AsRef<Path>>(path: P) -> io::Result<Vec<PathBuf>> { + let mut monitor_paths = vec![path.as_ref().to_path_buf()]; + let mut iter = path.as_ref().components(); + + let mut partial_path = PathBuf::new(); + partial_path.push(iter.next().ok_or(io::Error::new( + io::ErrorKind::InvalidInput, + "path is missing prefix", + ))?); + partial_path.push(iter.next().ok_or(io::Error::new( + io::ErrorKind::InvalidInput, + "path is missing root", + ))?); + + for component in &mut iter { + partial_path.push(component); + if let Ok(Some(target)) = resolve_link(&partial_path) { + monitor_paths.extend(resolve_all_links(target.join(iter))?); + break; + } + } + + Ok(monitor_paths) +} + +struct DirContext { + path: PathBuf, + dir_handle: fs::File, + buffer: Vec<u8>, + overlapped: Pin<Box<OVERLAPPED>>, + _io_completion_port: Arc<CompletionPort>, +} + +impl DirContext { + fn new<P: AsRef<Path>>( + path: P, + io_completion_port: Arc<CompletionPort>, + completion_key: usize, + ) -> io::Result<DirContext> { + let dir_handle = fs::OpenOptions::new() + .read(true) + .custom_flags(FILE_FLAG_OVERLAPPED | FILE_FLAG_BACKUP_SEMANTICS) + .open(&path)?; + + let handle = unsafe { + CreateIoCompletionPort( + dir_handle.as_raw_handle() as *mut _, + io_completion_port.as_raw_handle() as *mut _, + completion_key, + // num of threads is ignored here + 0, + ) + }; + + if handle == ptr::null_mut() { + return Err(io::Error::last_os_error()); + } + + Ok(DirContext { + path: path.as_ref().to_path_buf(), + dir_handle, + buffer: vec![0u8; 4096], + overlapped: Box::pin(unsafe { mem::zeroed() }), + _io_completion_port: io_completion_port, + }) + } + + fn read_directory_changes(&mut self) -> io::Result<()> { + let mut _bytes_returned = 0; + if unsafe { + ReadDirectoryChangesW( + self.dir_handle.as_raw_handle() as *mut _, + self.buffer.as_mut_ptr() as *mut _, + self.buffer.len() as u32, + TRUE, + FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, + &mut _bytes_returned, + &mut *self.overlapped, + None, + ) + } == 0 + { + return Err(io::Error::last_os_error()); + } + Ok(()) + } + + fn path(&self) -> &Path { + &self.path + } + + /// Try to cancel a request. On success, return whether a request was cancelled. + fn cancel_io(&mut self) -> io::Result<bool> { + if unsafe { CancelIoEx(self.dir_handle.as_raw_handle(), ptr::null_mut()) } == 0 { + match io::Error::last_os_error() { + _error if _error.raw_os_error() == Some(ERROR_NOT_FOUND as i32) => Ok(false), + error => Err(error), + } + } else { + Ok(true) + } + } +} + +impl Drop for DirContext { + fn drop(&mut self) { + if let Err(error) = self.cancel_io() { + log::error!("Failed to cancel pending file I/O request: {}", error); + } + } +} + +unsafe impl Send for DirContext {} +unsafe impl Sync for DirContext {} + +struct CompletionStatus { + bytes_returned: u32, + completion_key: usize, + used_overlapped: *mut OVERLAPPED, +} + +struct CompletionPort { + handle: HANDLE, +} + +impl CompletionPort { + // `concurrent_threads`: 0 ==> number of processors + fn create(concurrent_threads: u32) -> io::Result<Self> { + let handle = unsafe { + CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, concurrent_threads) + }; + if handle == ptr::null_mut() { + return Err(io::Error::last_os_error()); + } + Ok(CompletionPort { handle }) + } + + fn get_queued_completion_status( + &self, + ) -> Result<CompletionStatus, (io::Error, CompletionStatus)> { + self.get_queued_completion_status_timeout(INFINITE) + } + + fn get_queued_completion_status_timeout( + &self, + timeout: u32, + ) -> Result<CompletionStatus, (io::Error, CompletionStatus)> { + let mut result = CompletionStatus { + bytes_returned: 0, + completion_key: 0, + used_overlapped: ptr::null_mut(), + }; + + if unsafe { + GetQueuedCompletionStatus( + self.handle, + &mut result.bytes_returned, + &mut result.completion_key, + &mut result.used_overlapped, + timeout, + ) + } == 0 + { + return Err((io::Error::last_os_error(), result)); + } + + Ok(result) + } + + fn post_queued_completion_status( + &self, + bytes_transferred: u32, + completion_key: usize, + overlapped: *mut OVERLAPPED, + ) -> io::Result<()> { + if unsafe { + PostQueuedCompletionStatus(self.handle, bytes_transferred, completion_key, overlapped) + } == 0 + { + return Err(io::Error::last_os_error()); + } + Ok(()) + } +} + +impl AsRawHandle for CompletionPort { + fn as_raw_handle(&self) -> RawHandle { + self.handle as *mut _ + } +} + +impl Drop for CompletionPort { + fn drop(&mut self) { + unsafe { CloseHandle(self.handle) }; + } +} + +unsafe impl Send for CompletionPort {} +unsafe impl Sync for CompletionPort {} + +#[derive(Clone, Hash, PartialEq, Eq)] +struct StrippedPath { + /// The volume that the path is on. For `C:\a\b\c`, this would be for `C:\`. + prefix: PathBuf, + /// The remainder of the path. For `C:\a\b\c`, this would be for `a\b\c`. + tail: Vec<u16>, +} + +impl StrippedPath { + fn new(path: &PathBuf) -> io::Result<StrippedPath> { + let mut iter = path.components(); + let prefix = iter.next().ok_or(io::Error::new( + io::ErrorKind::InvalidInput, + "path is missing prefix", + ))?; + let prefix = Path::new(&prefix).join(iter.next().ok_or(io::Error::new( + io::ErrorKind::InvalidInput, + "path is missing root", + ))?); + + Ok(StrippedPath { + prefix: prefix.clone(), + tail: osstr_to_wide(iter.as_path()), + }) + } +} + +pub struct PathMonitorHandle { + port_handle: Arc<CompletionPort>, + tx: sync_mpsc::Sender<PathMonitorCommand>, +} + +impl PathMonitorHandle { + pub fn set_paths<P: AsRef<Path>>(&self, paths: &[P]) -> io::Result<()> { + let _ = self.tx.send(PathMonitorCommand::SetPaths( + paths.iter().map(|p| p.as_ref().to_path_buf()).collect(), + )); + self.notify_monitor() + } + + pub fn shutdown(&self) -> io::Result<()> { + let _ = self.tx.send(PathMonitorCommand::Shutdown); + self.notify_monitor() + } + + fn notify_monitor(&self) -> io::Result<()> { + self.port_handle.post_queued_completion_status( + 0, + PATH_MONITOR_COMPLETION_KEY_IGNORE, + ptr::null_mut(), + ) + } +} + +pub type PathChangeNotifyRx = sync_mpsc::Receiver<()>; + +enum PathMonitorCommand { + Shutdown, + SetPaths(Vec<PathBuf>), +} + +pub struct PathMonitor { + port_handle: Arc<CompletionPort>, + dir_contexts: Vec<DirContext>, + discarded_contexts: Vec<DirContext>, + stripped_paths: HashSet<StrippedPath>, +} + +impl PathMonitor { + pub fn spawn() -> io::Result<(PathMonitorHandle, PathChangeNotifyRx)> { + let port_handle = Arc::new(CompletionPort::create(0)?); + let mut original_paths: Vec<PathBuf> = vec![]; + + let mut monitor = Self { + port_handle: port_handle.clone(), + dir_contexts: vec![], + discarded_contexts: vec![], + stripped_paths: HashSet::new(), + }; + + let (cmd_tx, cmd_rx) = sync_mpsc::channel(); + let (notify_tx, notify_rx) = sync_mpsc::channel(); + + std::thread::spawn(move || { + loop { + if !monitor.service_commands(&mut original_paths, &cmd_rx) { + break; + } + match monitor.handle_next_completion_packet() { + Ok(true) => match monitor.update_paths(&original_paths) { + Ok(true) => { + let _ = notify_tx.send(()); + } + Ok(false) => (), + Err(_) => break, + }, + Ok(false) => (), + Err(error) => { + log::error!("handle_next_completion_packet failed: {}", error); + break; + } + } + } + log::debug!("Shutting down reparse point monitor"); + + monitor.abort_all_requests(); + }); + + Ok(( + PathMonitorHandle { + port_handle, + tx: cmd_tx, + }, + notify_rx, + )) + } + + fn service_commands( + &mut self, + original_paths: &mut Vec<PathBuf>, + cmd_rx: &sync_mpsc::Receiver<PathMonitorCommand>, + ) -> bool { + while let Some(cmd) = cmd_rx.try_iter().next() { + match cmd { + PathMonitorCommand::Shutdown => { + return false; + } + PathMonitorCommand::SetPaths(new_paths) => { + *original_paths = new_paths; + return !self.update_paths(&original_paths).is_err(); + } + } + } + true + } + + fn update_paths(&mut self, unresolved_paths: &[PathBuf]) -> Result<bool, ()> { + let resolved_paths = resolve_all_links_multiple(unresolved_paths); + let new_stripped_paths = resolved_paths + .iter() + .filter_map(|p| StrippedPath::new(p).ok()) + .collect(); + if new_stripped_paths != self.stripped_paths { + self.stripped_paths = new_stripped_paths; + if let Err(error) = self.update_directory_contexts() { + log::error!("Failed to open new directory handles: {}", error); + return Err(()); + } + return Ok(true); + } + Ok(false) + } + + fn update_directory_contexts(&mut self) -> io::Result<()> { + // Remove paths we no longer need to monitor + let len = self.dir_contexts.len(); + for i in (0..len).rev() { + if !self + .stripped_paths + .iter() + .any(|p| p.prefix == self.dir_contexts[i].path) + { + let mut removed_ctx = self.dir_contexts.remove(i); + match removed_ctx.cancel_io() { + Ok(true) => self.discarded_contexts.push(removed_ctx), + Err(error) => { + log::error!("Failed to cancel pending I/O for dir context: {}", error); + mem::forget(removed_ctx) + } + Ok(false) => (), + } + } + } + + // Add new paths to monitor + for path in &self.stripped_paths { + if self + .dir_contexts + .iter() + .any(|ctx| path.prefix == ctx.path()) + { + continue; + } + + let index = self.dir_contexts.len(); + let mut ctx = match DirContext::new(&path.prefix, self.port_handle.clone(), index) { + Ok(ctx) => ctx, + Err(error) if error.kind() == io::ErrorKind::NotFound => { + log::warn!( + "Not monitoring reparse points on {} since it does not exist", + path.prefix.to_string_lossy() + ); + continue; + } + Err(error) => return Err(error), + }; + ctx.read_directory_changes()?; + self.dir_contexts.push(ctx); + } + + Ok(()) + } + + fn handle_next_completion_packet(&mut self) -> io::Result<bool> { + let result = match self.port_handle.get_queued_completion_status() { + Ok(result) if result.completion_key == PATH_MONITOR_COMPLETION_KEY_IGNORE => { + return Ok(false); + } + Err((error, status)) => { + self.free_discarded_context(status.used_overlapped); + if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) { + log::error!("GetQueuedCompletionStatus failed: {:?}", error); + return Err(error); + } + return Ok(false); + } + Ok(result) => result, + }; + + if self.free_discarded_context(result.used_overlapped) { + return Ok(false); + } + + let ctx_index = self + .find_context(result.used_overlapped) + .ok_or(io::Error::new( + io::ErrorKind::InvalidInput, + "received I/O completion packet without an associated DirContext", + ))?; + + let changed = if result.bytes_returned == 0 { + log::debug!("Change event buffer is empty"); + false + } else { + self.process_file_notification(&self.dir_contexts[ctx_index])? + }; + + if let Err(error) = self.dir_contexts[ctx_index].read_directory_changes() { + log::error!("Failed to queue new directory change event: {}", error); + return Err(error); + } + + Ok(changed) + } + + /// Find the index of the `DirContext` that owns the `OVERLAPPED` object, or None. + fn find_context(&self, overlapped: *const OVERLAPPED) -> Option<usize> { + if overlapped == ptr::null_mut() { + return None; + } + for i in 0..self.dir_contexts.len() { + if ((&*self.dir_contexts[i].overlapped) as *const _) == overlapped { + return Some(i); + } + } + None + } + + /// Remove the element in `discarded_contexts` that owns the `OVERLAPPED` object, if it exists. + fn free_discarded_context(&mut self, overlapped: *const OVERLAPPED) -> bool { + if overlapped == ptr::null_mut() { + return false; + } + let mut was_discarded = false; + self.discarded_contexts.retain(|ctx| { + if ((&*ctx.overlapped) as *const _) != overlapped { + true + } else { + was_discarded = true; + false + } + }); + was_discarded + } + + fn process_file_notification(&self, dir_context: &DirContext) -> io::Result<bool> { + let mut info = dir_context.buffer.as_ptr() as *const FILE_NOTIFY_INFORMATION; + loop { + let current_field = unsafe { &*info }; + + let file_name = unsafe { + std::slice::from_raw_parts( + current_field.FileName.as_ptr(), + current_field.FileNameLength as usize / mem::size_of::<u16>(), + ) + }; + + for path in &self.stripped_paths { + if path.prefix != dir_context.path() { + continue; + } + if path.tail.len() <= file_name.len() { + continue; + } + let cmp_status = unsafe { + CompareStringOrdinal( + path.tail.as_ptr(), + file_name.len() as i32, + file_name.as_ptr(), + file_name.len() as i32, + TRUE, + ) + }; + match cmp_status { + CSTR_EQUAL => return Ok(true), + 0 => log::error!("Bug: CompareStringOrdinal failed"), + _ => (), + } + } + + if current_field.NextEntryOffset == 0 { + break; + } + info = unsafe { (info as *mut u8).offset(current_field.NextEntryOffset as isize) } + as *const FILE_NOTIFY_INFORMATION; + } + Ok(false) + } + + /// Cancel all requests and give the cancelled operations some time to complete. + fn abort_all_requests(&mut self) { + let mut contexts = vec![]; + for mut ctx in self + .dir_contexts + .drain(..) + .chain(self.discarded_contexts.drain(..)) + { + match ctx.cancel_io() { + Ok(true) => contexts.push(ctx), + Ok(false) => (), + Err(error) => { + log::error!("Failed to cancel pending I/O request: {}", error); + mem::forget(ctx); + } + } + } + + let time = Instant::now(); + while !contexts.is_empty() { + if time.elapsed() >= SHUTDOWN_TIMEOUT { + log::error!("Timeout while cancelling I/O requests"); + mem::forget(contexts); + return; + } + + let result = match self + .port_handle + .get_queued_completion_status_timeout(SHUTDOWN_POLL_TIMEOUT.as_millis() as u32) + { + Ok(result) => result, + Err((error, result)) => { + if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) { + log::error!("GetQueuedCompletionStatus failed: {:?}", error); + if result.used_overlapped == ptr::null_mut() { + continue; + } + } + result + } + }; + contexts.retain(|ctx| ((&*ctx.overlapped) as *const _) != result.used_overlapped); + } + } +} + +fn get_full_path_name<T: AsRef<OsStr>>(path: T) -> io::Result<PathBuf> { + let path_buf_os: Vec<u16> = osstr_to_wide(path); + let mut full_path_buffer = vec![0u16; 2048 / mem::size_of::<u16>()]; + + let full_length = loop { + let required_length = unsafe { + GetFullPathNameW( + path_buf_os.as_ptr(), + full_path_buffer.len() as u32, + full_path_buffer.as_mut_ptr(), + ptr::null_mut(), + ) + } as usize; + + if required_length == 0 { + return Err(io::Error::last_os_error()); + } + + if required_length > full_path_buffer.len() { + full_path_buffer.resize(required_length, 0); + } else { + break required_length; + } + }; + + full_path_buffer.resize(full_length, 0); + Ok(PathBuf::from(OsString::from_wide(&full_path_buffer))) +} + +/// Converts an `OsStr` to a null-terminated UTF-16 string. +fn osstr_to_wide<T: AsRef<OsStr>>(string: T) -> Vec<u16> { + string + .as_ref() + .encode_wide() + .chain(std::iter::once(0u16)) + .collect() +} |
