Line data Source code
1 : use anyhow::Result;
2 : use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
3 : use std::io::Write;
4 : use std::path::Path;
5 : use tokio::sync::mpsc;
6 :
7 : use crate::diff::DiffSnapshot;
8 : use crate::git::GitRepo;
9 :
10 : // Debug logging helper
11 6 : fn debug_log(msg: String) {
12 6 : if let Ok(mut file) = std::fs::OpenOptions::new()
13 6 : .create(true)
14 6 : .append(true)
15 6 : .open("hunky-debug.log")
16 6 : {
17 6 : let _ = writeln!(file, "[{}] {}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(), msg);
18 6 : }
19 6 : }
20 :
21 : pub struct FileWatcher {
22 : _watcher: RecommendedWatcher,
23 : }
24 :
25 : impl FileWatcher {
26 12 : pub fn new(
27 12 : git_repo: GitRepo,
28 12 : snapshot_sender: mpsc::UnboundedSender<DiffSnapshot>,
29 12 : ) -> Result<Self> {
30 12 : let repo_path = git_repo.repo_path().to_path_buf();
31 :
32 12 : let (tx, rx) = std::sync::mpsc::channel();
33 :
34 12 : let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
35 :
36 12 : watcher.watch(repo_path.as_ref(), RecursiveMode::Recursive)?;
37 :
38 : // Spawn a task to handle file system events
39 12 : tokio::spawn(async move {
40 1 : let mut last_snapshot_time = std::time::Instant::now();
41 1 : let debounce_duration = std::time::Duration::from_millis(500);
42 :
43 1 : debug_log(format!("File watcher started for {:?}", repo_path));
44 :
45 : loop {
46 3 : match rx.recv() {
47 2 : Ok(Ok(event)) => {
48 2 : debug_log(format!("Received event: {:?}", event));
49 : // Only process events for git-tracked files
50 2 : if should_process_event(&event, &repo_path) {
51 1 : debug_log("Processing event for snapshot".to_string());
52 : // Debounce: only create a new snapshot if enough time has passed
53 1 : let now = std::time::Instant::now();
54 1 : if now.duration_since(last_snapshot_time) >= debounce_duration {
55 1 : if let Ok(snapshot) = git_repo.get_diff_snapshot() {
56 1 : debug_log(format!("Created snapshot with {} files", snapshot.files.len()));
57 : // Only send if there are actual changes
58 1 : if !snapshot.files.is_empty() {
59 1 : let _ = snapshot_sender.send(snapshot);
60 1 : last_snapshot_time = now;
61 1 : } else {
62 0 : debug_log("Snapshot was empty, not sending".to_string());
63 0 : }
64 0 : }
65 0 : } else {
66 0 : debug_log("Debouncing, too soon since last snapshot".to_string());
67 0 : }
68 1 : } else {
69 1 : debug_log("Event filtered out (likely .git directory)".to_string());
70 1 : }
71 : }
72 0 : Ok(Err(e)) => {
73 0 : debug_log(format!("Watch error: {:?}", e));
74 0 : }
75 1 : Err(_) => break,
76 : }
77 : }
78 1 : });
79 :
80 12 : Ok(Self { _watcher: watcher })
81 12 : }
82 : }
83 :
84 7 : fn should_process_event(event: &Event, repo_path: &Path) -> bool {
85 : use notify::EventKind;
86 :
87 : // Filter out events we don't care about
88 7 : match event.kind {
89 : EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
90 : // Check if any of the paths are:
91 : // 1. Not in .git directory (working directory changes), OR
92 : // 2. The .git/index file specifically (staging changes)
93 5 : event.paths.iter().any(|path| {
94 : // Check if it's the git index file
95 5 : if path.ends_with(".git/index") {
96 1 : return true;
97 4 : }
98 :
99 : // Check if it's a working directory file (not in .git)
100 4 : path.strip_prefix(repo_path)
101 4 : .ok()
102 4 : .and_then(|p| p.components().next())
103 4 : .map(|c| c.as_os_str() != ".git")
104 4 : .unwrap_or(false)
105 5 : })
106 : }
107 2 : _ => false,
108 : }
109 7 : }
110 :
111 : #[cfg(test)]
112 : mod tests {
113 : use super::*;
114 : use notify::{event::{CreateKind, ModifyKind, RemoveKind}, EventKind};
115 : use std::fs;
116 : use std::path::{Path, PathBuf};
117 : use std::process::Command;
118 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
119 :
120 : const FS_STABILIZATION_DELAY: Duration = Duration::from_millis(700);
121 : const WATCHER_RETRY_ATTEMPTS: usize = 3;
122 : const WATCHER_RECV_TIMEOUT: Duration = Duration::from_secs(3);
123 :
124 : #[test]
125 1 : fn processes_working_tree_modifications() {
126 1 : let repo_path = PathBuf::from("/tmp/repo");
127 1 : let event = Event::new(EventKind::Modify(ModifyKind::Any))
128 1 : .add_path(repo_path.join("src/main.rs"));
129 :
130 1 : assert!(should_process_event(&event, &repo_path));
131 1 : }
132 :
133 : #[test]
134 1 : fn ignores_git_directory_changes_except_index() {
135 1 : let repo_path = PathBuf::from("/tmp/repo");
136 1 : let git_object_event = Event::new(EventKind::Create(CreateKind::Any))
137 1 : .add_path(repo_path.join(".git/objects/ab/cdef"));
138 1 : let index_event =
139 1 : Event::new(EventKind::Modify(ModifyKind::Any)).add_path(repo_path.join(".git/index"));
140 :
141 1 : assert!(!should_process_event(&git_object_event, &repo_path));
142 1 : assert!(should_process_event(&index_event, &repo_path));
143 1 : }
144 :
145 : #[test]
146 1 : fn ignores_non_create_modify_remove_events() {
147 1 : let repo_path = PathBuf::from("/tmp/repo");
148 1 : let event =
149 1 : Event::new(EventKind::Remove(RemoveKind::Any)).add_path(repo_path.join("README.md"));
150 1 : assert!(should_process_event(&event, &repo_path));
151 :
152 1 : let access_event = Event::new(EventKind::Any).add_path(repo_path.join("README.md"));
153 1 : assert!(!should_process_event(&access_event, &repo_path));
154 1 : }
155 :
156 : struct TestRepo {
157 : path: PathBuf,
158 : }
159 :
160 : impl TestRepo {
161 1 : fn new() -> Self {
162 1 : let unique = SystemTime::now()
163 1 : .duration_since(UNIX_EPOCH)
164 1 : .expect("failed to get system time")
165 1 : .as_nanos();
166 1 : let path = std::env::temp_dir().join(format!(
167 1 : "hunky-watcher-tests-{}-{}",
168 1 : std::process::id(),
169 1 : unique
170 1 : ));
171 1 : fs::create_dir_all(&path).expect("failed to create temp directory");
172 1 : run_git(&path, &["init"]);
173 1 : run_git(&path, &["config", "user.name", "Test User"]);
174 1 : run_git(&path, &["config", "user.email", "test@example.com"]);
175 1 : Self { path }
176 1 : }
177 :
178 2 : fn write_file(&self, rel_path: &str, content: &str) {
179 2 : fs::write(self.path.join(rel_path), content).expect("failed to write file");
180 2 : }
181 :
182 1 : fn commit_all(&self, message: &str) {
183 1 : run_git(&self.path, &["add", "."]);
184 1 : run_git(&self.path, &["commit", "-m", message]);
185 1 : }
186 : }
187 :
188 : impl Drop for TestRepo {
189 1 : fn drop(&mut self) {
190 1 : let _ = fs::remove_dir_all(&self.path);
191 1 : }
192 : }
193 :
194 5 : fn run_git(repo_path: &Path, args: &[&str]) {
195 5 : let output = Command::new("git")
196 5 : .args(args)
197 5 : .current_dir(repo_path)
198 5 : .output()
199 5 : .expect("failed to execute git");
200 5 : assert!(
201 5 : output.status.success(),
202 : "git {:?} failed: {}",
203 : args,
204 0 : String::from_utf8_lossy(&output.stderr)
205 : );
206 5 : }
207 :
208 : #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
209 1 : async fn watcher_emits_snapshot_for_tracked_file_changes() {
210 1 : let repo = TestRepo::new();
211 1 : repo.write_file("tracked.txt", "line 1\n");
212 1 : repo.commit_all("initial");
213 :
214 1 : let git_repo = GitRepo::new(&repo.path).expect("failed to open repo");
215 1 : let (tx, mut rx) = mpsc::unbounded_channel();
216 1 : let _watcher = FileWatcher::new(git_repo, tx).expect("failed to start watcher");
217 :
218 1 : tokio::time::sleep(FS_STABILIZATION_DELAY).await;
219 :
220 1 : for attempt in 0..WATCHER_RETRY_ATTEMPTS {
221 1 : repo.write_file("tracked.txt", &format!("line 1\nline {}\n", attempt + 2));
222 1 : if let Ok(Some(snapshot)) = tokio::time::timeout(WATCHER_RECV_TIMEOUT, rx.recv()).await {
223 1 : assert!(!snapshot.files.is_empty());
224 1 : assert!(snapshot.files.iter().any(|file| file.path.ends_with("tracked.txt")));
225 1 : return;
226 1 : }
227 1 : tokio::time::sleep(FS_STABILIZATION_DELAY).await;
228 1 : }
229 1 :
230 1 : panic!("watcher did not emit a snapshot in time");
231 1 : }
232 : }
|