Skip to content

Commit fbf87eb

Browse files
committed
Initial commit
0 parents  commit fbf87eb

6 files changed

Lines changed: 1011 additions & 0 deletions

File tree

.gitignore

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Binaries for programs and plugins
2+
*.exe
3+
*.exe~
4+
*.dll
5+
*.so
6+
*.dylib
7+
8+
# Test binary, built with `go test -c`
9+
*.test
10+
11+
# Output of the go coverage tool, specifically when used with LiteIDE
12+
*.out
13+
14+
# Dependency directories (remove the comment below to include it)
15+
vendor/
16+
.idea/

LICENSE

Lines changed: 674 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# cloudwatcher
2+
File system notification for Cloud platforms in Golang.

S3.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package cloudwatcher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/minio/minio-go"
11+
"github.com/minio/minio-go/pkg/credentials"
12+
)
13+
14+
type ObjectInfo = minio.ObjectInfo
15+
type S3Configuration struct {
16+
BucketName string
17+
Environment string
18+
Endpoint string
19+
AccessKey string
20+
SecretAccessKey string
21+
SessionToken string
22+
Region string
23+
SSLEnabled bool
24+
}
25+
26+
type S3Watcher struct {
27+
WatcherBase
28+
29+
synching uint32
30+
31+
ticker *time.Ticker
32+
stop chan bool
33+
config *S3Configuration
34+
client *minio.Client
35+
cache map[string]*S3Object
36+
}
37+
38+
type S3Object struct {
39+
Key string
40+
Etag string
41+
Size int64
42+
Tags map[string]string
43+
LastModified time.Time
44+
}
45+
46+
func NewS3Watcher(c interface{}) (Watcher, error) {
47+
var config *S3Configuration
48+
var ok bool
49+
if config, ok = c.(*S3Configuration); !ok {
50+
return nil, fmt.Errorf("configuration is not a S3Configuration object")
51+
}
52+
53+
upd := &S3Watcher{
54+
cache: make(map[string]*S3Object),
55+
config: config,
56+
stop: make(chan bool, 1),
57+
}
58+
59+
client, err := minio.New(upd.config.Endpoint, &minio.Options{
60+
Creds: credentials.NewStaticV4(upd.config.AccessKey, upd.config.SecretAccessKey, upd.config.SessionToken),
61+
Secure: upd.config.SSLEnabled,
62+
})
63+
if err != nil {
64+
return nil, err
65+
}
66+
upd.client = client
67+
68+
return upd, nil
69+
}
70+
71+
func (u *S3Watcher) Start() {
72+
u.ticker = time.NewTicker(u.pollingTime)
73+
go func() {
74+
// launch synchronization also the first time
75+
u.sync()
76+
for {
77+
select {
78+
case <-u.ticker.C:
79+
u.sync()
80+
81+
case <-u.stop:
82+
return
83+
}
84+
}
85+
}()
86+
}
87+
88+
func (u *S3Watcher) Close() {
89+
}
90+
91+
func (u *S3Watcher) getCachedObject(o *S3Object) *S3Object {
92+
if cachedObject, ok := u.cache[o.Key]; ok {
93+
return cachedObject
94+
}
95+
return nil
96+
}
97+
98+
func (u *S3Object) AreTagsChanged(new *S3Object) bool {
99+
// Check if tags are changed
100+
if len(u.Tags) != len(new.Tags) {
101+
return true
102+
}
103+
for k, v := range u.Tags {
104+
if nv, ok := new.Tags[k]; !ok || v != nv {
105+
return true
106+
}
107+
}
108+
for k, v := range new.Tags {
109+
if nv, ok := u.Tags[k]; !ok || v != nv {
110+
return true
111+
}
112+
}
113+
return false
114+
}
115+
116+
func (u *S3Watcher) sync() {
117+
// allow only one sync at same time
118+
if !atomic.CompareAndSwapUint32(&u.synching, 0, 1) {
119+
return
120+
}
121+
defer atomic.StoreUint32(&u.synching, 0)
122+
123+
// Avoid to delete all the things if the updater env is not ready...
124+
if u.isConnected() == false {
125+
return
126+
}
127+
128+
fileList := make(map[string]*S3Object, 0)
129+
130+
err := u.EnumerateFiles(u.config.BucketName, u.watchDir, func(page int64, obj *ObjectInfo) bool {
131+
// Get Info from S3 object
132+
upd, err := u.getInfoFromObject(obj)
133+
if err != nil {
134+
return true // continue
135+
}
136+
137+
// Store the files to check the deleted one
138+
fileList[upd.Key] = upd
139+
140+
// Check if the object is cached by Key
141+
cached := u.getCachedObject(upd)
142+
// Object has been cached previously but we need to check its tags
143+
if cached != nil {
144+
// Check if the LastModified has been changed
145+
if !cached.LastModified.Equal(upd.LastModified) {
146+
event := Event{
147+
Key: upd.Key,
148+
Type: FileChanged,
149+
Object: upd,
150+
}
151+
u.Events <- event
152+
}
153+
// Check if the tags have been updated
154+
if cached.AreTagsChanged(upd) {
155+
event := Event{
156+
Key: upd.Key,
157+
Type: TagsChanged,
158+
Object: upd,
159+
}
160+
u.Events <- event
161+
}
162+
} else {
163+
event := Event{
164+
Key: upd.Key,
165+
Type: FileCreated,
166+
Object: upd,
167+
}
168+
u.Events <- event
169+
}
170+
u.cache[upd.Key] = upd
171+
return true
172+
})
173+
if err != nil {
174+
return
175+
}
176+
177+
for k, o := range u.cache {
178+
if _, found := fileList[k]; !found {
179+
// file not found in the list...deleting it
180+
delete(u.cache, k)
181+
event := Event{
182+
Key: o.Key,
183+
Type: FileDeleted,
184+
Object: nil,
185+
}
186+
u.Events <- event
187+
}
188+
}
189+
}
190+
191+
func (u *S3Watcher) bucketExists(bucket string) (bool, error) {
192+
found, err := u.client.BucketExists(context.Background(), bucket)
193+
if err != nil {
194+
return false, err
195+
}
196+
return found, nil
197+
}
198+
199+
func (u *S3Watcher) getTags(key string, bucket string) (map[string]string, error) {
200+
t, err := u.client.GetObjectTagging(context.Background(), bucket, key, minio.GetObjectTaggingOptions{})
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
tags := make(map[string]string)
206+
for key, tag := range t.ToMap() {
207+
tags[key] = tag
208+
}
209+
return tags, nil
210+
}
211+
212+
func (u *S3Watcher) isConnected() bool {
213+
found, err := u.bucketExists(u.config.BucketName)
214+
if err != nil {
215+
return false
216+
}
217+
return found
218+
}
219+
220+
func (u *S3Watcher) getInfoFromObject(obj *ObjectInfo) (*S3Object, error) {
221+
var upd *S3Object
222+
223+
tags, err := u.getTags(obj.Key, u.config.BucketName)
224+
if err != nil {
225+
return nil, fmt.Errorf("getting tags from key '%s': %s", obj.Key, err)
226+
}
227+
//log.Debug("s3 watcher: get tags from key '%s': %v", obj.Key, tags)
228+
229+
upd = &S3Object{
230+
Key: obj.Key,
231+
Etag: strings.ToLower(strings.Trim(obj.ETag, "\"")), // ETag contains double quotes
232+
Size: obj.Size,
233+
LastModified: obj.LastModified,
234+
Tags: make(map[string]string),
235+
}
236+
for k, v := range tags {
237+
upd.Tags[k] = v
238+
}
239+
return upd, nil
240+
}
241+
242+
func (u *S3Watcher) EnumerateFiles(bucket, prefix string, callback func(page int64, object *ObjectInfo) bool) error {
243+
doneCh := make(chan struct{})
244+
defer close(doneCh)
245+
246+
// List all objects from a bucket-name with a matching prefix.
247+
for object := range u.client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
248+
WithVersions: false,
249+
WithMetadata: false,
250+
Prefix: prefix,
251+
Recursive: true,
252+
MaxKeys: 0,
253+
UseV1: false,
254+
}) {
255+
if object.Err != nil {
256+
continue
257+
}
258+
259+
obj := ObjectInfo(object)
260+
if callback(0, &obj) == false {
261+
break
262+
}
263+
}
264+
return nil
265+
}
266+
267+
func init() {
268+
supportedServices["s3"] = NewS3Watcher
269+
}

cloudwatcher.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package cloudwatcher
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
type storageFunc func(config interface{}) (Watcher, error)
9+
var supportedServices map[string]storageFunc
10+
11+
type WatcherBase struct {
12+
Events chan Event
13+
Errors chan error
14+
15+
watchDir string
16+
pollingTime time.Duration
17+
}
18+
19+
type Watcher interface {
20+
Start()
21+
Close()
22+
}
23+
24+
func New(service string, config interface{}) (Watcher, error) {
25+
if f, ok := supportedServices[service]; !ok {
26+
return nil, fmt.Errorf("service %s is not yet supported")
27+
} else {
28+
return f(config)
29+
}
30+
}
31+
32+
func init() {
33+
supportedServices = make(map[string]storageFunc)
34+
}

event.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cloudwatcher
2+
3+
type Event struct {
4+
Key string // Path of file
5+
Type Op // File operation
6+
Object interface{} // Object pointer
7+
}
8+
9+
type Op uint32
10+
11+
const (
12+
FileCreated = iota
13+
FileChanged
14+
FileDeleted
15+
TagsChanged
16+
)

0 commit comments

Comments
 (0)