Skip to content

Commit 145c2c4

Browse files
committed
new: added local watcher
1 parent 82877fb commit 145c2c4

2 files changed

Lines changed: 363 additions & 0 deletions

File tree

examples/local/local.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/Matrix86/cloudwatcher"
6+
"time"
7+
)
8+
9+
func main() {
10+
s, err := cloudwatcher.New("local", "/home/user/dir", time.Second)
11+
if err != nil {
12+
fmt.Printf("ERROR: %s", err)
13+
return
14+
}
15+
16+
config := map[string]string{
17+
"disable_fsnotify": "false",
18+
}
19+
20+
err = s.SetConfig(config)
21+
if err != nil {
22+
fmt.Printf("ERROR: %s", err)
23+
return
24+
}
25+
26+
err = s.Start()
27+
defer s.Close()
28+
for {
29+
select {
30+
case v := <-s.GetEvents():
31+
fmt.Printf("EVENT: %s %s\n", v.Key, v.TypeString())
32+
33+
case e := <-s.GetErrors():
34+
fmt.Printf("ERROR: %s\n", e)
35+
}
36+
}
37+
}
38+

local.go

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
package cloudwatcher
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"github.com/fsnotify/fsnotify"
7+
"os"
8+
"path/filepath"
9+
"sync/atomic"
10+
"time"
11+
)
12+
13+
type LocalWatcher struct {
14+
WatcherBase
15+
16+
syncing uint32
17+
18+
watcher *fsnotify.Watcher
19+
ticker *time.Ticker
20+
stop chan bool
21+
config *LocalConfiguration
22+
cache map[string]*LocalObject
23+
}
24+
25+
type LocalObject struct {
26+
Key string
27+
Size int64
28+
LastModified time.Time
29+
FileMode os.FileMode
30+
}
31+
32+
type LocalConfiguration struct {
33+
Debug Bool `json:"debug"`
34+
DisableFsNotify Bool `json:"disable_fsnotify"`
35+
}
36+
37+
func newLocalWatcher(dir string, interval time.Duration) (Watcher, error) {
38+
w := &LocalWatcher{
39+
config: &LocalConfiguration{},
40+
stop: make(chan bool, 1),
41+
cache: make(map[string]*LocalObject),
42+
WatcherBase: WatcherBase{
43+
Events: make(chan Event, 100),
44+
Errors: make(chan error, 100),
45+
watchDir: dir,
46+
pollingTime: interval,
47+
},
48+
}
49+
50+
if _, err := os.Stat(dir); os.IsNotExist(err) {
51+
return nil, fmt.Errorf("directory '%s' not found", dir)
52+
}
53+
54+
return w, nil
55+
}
56+
57+
func (w *LocalWatcher) SetConfig(m map[string]string) error {
58+
j, err := json.Marshal(m)
59+
if err != nil {
60+
return err
61+
}
62+
63+
config := LocalConfiguration{}
64+
if err := json.Unmarshal(j, &config); err != nil {
65+
return err
66+
}
67+
w.config = &config
68+
69+
return nil
70+
}
71+
72+
func (w *LocalWatcher) Start() error {
73+
if _, err := os.Stat(w.watchDir); os.IsNotExist(err) {
74+
return fmt.Errorf("directory '%s' not found", w.watchDir)
75+
}
76+
77+
if w.config.DisableFsNotify {
78+
w.ticker = time.NewTicker(w.pollingTime)
79+
go func() {
80+
// launch synchronization also the first time
81+
w.sync()
82+
for {
83+
select {
84+
case <-w.ticker.C:
85+
w.sync()
86+
87+
case <-w.stop:
88+
close(w.Events)
89+
close(w.Errors)
90+
return
91+
}
92+
}
93+
}()
94+
} else {
95+
var err error
96+
if w.watcher == nil {
97+
w.watcher, err = fsnotify.NewWatcher()
98+
if err != nil {
99+
return err
100+
}
101+
}
102+
103+
go func() {
104+
for {
105+
select {
106+
case event, ok := <-w.watcher.Events:
107+
if !ok {
108+
return
109+
}
110+
111+
obj := &LocalObject{
112+
Key: event.Name,
113+
Size: 0,
114+
LastModified: time.Now(),
115+
FileMode: 0,
116+
}
117+
e := Event{}
118+
119+
var t Op
120+
if event.Op&fsnotify.Write == fsnotify.Write {
121+
t = FileChanged
122+
} else if event.Op&fsnotify.Create == fsnotify.Create {
123+
t = FileCreated
124+
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
125+
t = FileDeleted
126+
} else if event.Op&fsnotify.Chmod == fsnotify.Chmod {
127+
t = TagsChanged
128+
} else {
129+
// ignoring other events
130+
continue
131+
}
132+
133+
switch t {
134+
case FileDeleted:
135+
e = Event{
136+
Key: obj.Key,
137+
Object: obj,
138+
Type: t,
139+
}
140+
141+
// we don't know if it was a folder...
142+
w.rmRecursive(event.Name)
143+
144+
case FileCreated, FileChanged, TagsChanged:
145+
fi, err := os.Stat(event.Name)
146+
if err != nil {
147+
w.Errors <- err
148+
continue
149+
}
150+
151+
// create listener on subfolders
152+
if fi.IsDir() {
153+
if err := w.addRecursive(event.Name); err != nil {
154+
w.Errors <- err
155+
}
156+
}
157+
158+
obj = &LocalObject{
159+
Key: event.Name,
160+
Size: fi.Size(),
161+
LastModified: fi.ModTime(),
162+
FileMode: fi.Mode(),
163+
}
164+
165+
e = Event{
166+
Key: obj.Key,
167+
Object: obj,
168+
Type: t,
169+
}
170+
}
171+
w.Events <- e
172+
173+
case err, ok := <-w.watcher.Errors:
174+
if !ok {
175+
return
176+
}
177+
w.Errors <- err
178+
179+
case <-w.stop:
180+
close(w.Events)
181+
close(w.Errors)
182+
w.rmRecursive(w.watchDir)
183+
w.watcher.Close()
184+
return
185+
}
186+
}
187+
}()
188+
189+
if err = w.addRecursive(w.watchDir); err != nil {
190+
w.Close()
191+
return err
192+
}
193+
}
194+
return nil
195+
}
196+
197+
func (w *LocalWatcher) sync() {
198+
// allow only one sync at same time
199+
if !atomic.CompareAndSwapUint32(&w.syncing, 0, 1) {
200+
return
201+
}
202+
defer atomic.StoreUint32(&w.syncing, 0)
203+
204+
if _, err := os.Stat(w.watchDir); os.IsNotExist(err) {
205+
w.Errors <- fmt.Errorf("directory '%s' not found", w.watchDir)
206+
}
207+
208+
fileList := make(map[string]*LocalObject, 0)
209+
210+
err := filepath.Walk(w.watchDir, func(walkPath string, fi os.FileInfo, err error) error {
211+
if err != nil {
212+
return err
213+
}
214+
215+
// Ignore the changes on the watched dir
216+
if walkPath == w.watchDir {
217+
return nil
218+
}
219+
220+
obj := &LocalObject{
221+
Key: walkPath,
222+
Size: fi.Size(),
223+
LastModified: fi.ModTime(),
224+
FileMode: fi.Mode(),
225+
}
226+
227+
fileList[walkPath] = obj
228+
229+
// Check if the object is cached by Key
230+
cached := w.getCachedObject(obj)
231+
// Object has been cached previously by Key
232+
if cached != nil {
233+
// Check if the LastModified has been changed
234+
if !cached.LastModified.Equal(obj.LastModified) || (cached.Size != obj.Size) {
235+
event := Event{
236+
Key: obj.Key,
237+
Type: FileChanged,
238+
Object: obj,
239+
}
240+
w.Events <- event
241+
}
242+
// Check if the file modes have been updated
243+
if cached.FileMode != obj.FileMode {
244+
event := Event{
245+
Key: obj.Key,
246+
Type: TagsChanged,
247+
Object: obj,
248+
}
249+
w.Events <- event
250+
}
251+
} else {
252+
event := Event{
253+
Key: obj.Key,
254+
Type: FileCreated,
255+
Object: obj,
256+
}
257+
w.Events <- event
258+
}
259+
w.cache[obj.Key] = obj
260+
261+
return nil
262+
})
263+
if err != nil {
264+
w.Errors <- err
265+
return
266+
}
267+
268+
for k, o := range w.cache {
269+
if _, found := fileList[k]; !found {
270+
// file not found in the list...deleting it
271+
delete(w.cache, k)
272+
event := Event{
273+
Key: o.Key,
274+
Type: FileDeleted,
275+
Object: o,
276+
}
277+
w.Events <- event
278+
}
279+
}
280+
}
281+
282+
func (w *LocalWatcher) getCachedObject(o *LocalObject) *LocalObject {
283+
if cachedObject, ok := w.cache[o.Key]; ok {
284+
return cachedObject
285+
}
286+
return nil
287+
}
288+
289+
func (w *LocalWatcher) addRecursive(dir string) error {
290+
err := filepath.Walk(dir, func(walkPath string, fi os.FileInfo, err error) error {
291+
if err != nil {
292+
return err
293+
}
294+
if fi.IsDir() {
295+
if err = w.watcher.Add(walkPath); err != nil {
296+
return err
297+
}
298+
}
299+
return nil
300+
})
301+
return err
302+
}
303+
304+
func (w *LocalWatcher) rmRecursive(dir string) error {
305+
err := filepath.Walk(dir, func(walkPath string, fi os.FileInfo, err error) error {
306+
if err != nil {
307+
return err
308+
}
309+
if fi.IsDir() {
310+
if err = w.watcher.Remove(walkPath); err != nil {
311+
return err
312+
}
313+
}
314+
return nil
315+
})
316+
return err
317+
}
318+
319+
func (w *LocalWatcher) Close() {
320+
w.stop <- true
321+
}
322+
323+
func init() {
324+
supportedServices["local"] = newLocalWatcher
325+
}

0 commit comments

Comments
 (0)