Skip to content

Commit b1b7532

Browse files
committed
misc: refactoring and example for s3 created
1 parent 33e2ffa commit b1b7532

6 files changed

Lines changed: 269 additions & 30 deletions

File tree

cloudwatcher.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"time"
66
)
77

8-
type storageFunc func(config interface{}) (Watcher, error)
8+
type storageFunc func(dir string, interval time.Duration) (Watcher, error)
99
var supportedServices map[string]storageFunc
1010

1111
type WatcherBase struct {
@@ -17,18 +17,29 @@ type WatcherBase struct {
1717
}
1818

1919
type Watcher interface {
20-
Start()
20+
Start() error
21+
SetConfig(c map[string]string) error
2122
Close()
23+
GetEvents() chan Event
24+
GetErrors() chan error
2225
}
2326

24-
func New(service string, config interface{}) (Watcher, error) {
27+
func New(service string, dir string, interval time.Duration) (Watcher, error) {
2528
if f, ok := supportedServices[service]; !ok {
26-
return nil, fmt.Errorf("service %s is not yet supported")
29+
return nil, fmt.Errorf("service %s is not yet supported", service)
2730
} else {
28-
return f(config)
31+
return f(dir, interval)
2932
}
3033
}
3134

35+
func (w *WatcherBase) GetEvents() chan Event {
36+
return w.Events
37+
}
38+
39+
func (w *WatcherBase) GetErrors() chan error {
40+
return w.Errors
41+
}
42+
3243
func init() {
3344
supportedServices = make(map[string]storageFunc)
3445
}

cloudwatcher_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package cloudwatcher
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestNew(t *testing.T) {
9+
_, err := New("wrong", "/", 10*time.Second)
10+
if err == nil {
11+
t.Errorf("it should return an error if the service name not exists")
12+
}
13+
14+
_, err = New("s3", "/", 10*time.Second)
15+
if err != nil {
16+
t.Errorf("error during creation: %s", err)
17+
}
18+
}

event.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,18 @@ const (
1414
FileDeleted
1515
TagsChanged
1616
)
17+
18+
func (e *Event) TypeString() string {
19+
switch e.Type {
20+
case FileCreated:
21+
return "FileCreated"
22+
case FileChanged:
23+
return "FileChanged"
24+
case FileDeleted:
25+
return "FileDeleted"
26+
case TagsChanged:
27+
return "TagsChanged"
28+
default:
29+
return "unknown"
30+
}
31+
}

examples/s3.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/Matrix86/cloudwatcher"
6+
"time"
7+
)
8+
9+
func main() {
10+
s, err := cloudwatcher.New("s3", "/", 1*time.Second)
11+
if err != nil {
12+
fmt.Printf("ERROR: %s", err)
13+
return
14+
}
15+
16+
config := map[string]string{
17+
"bucketname": "test.storage",
18+
"Endpoint": "127.0.0.1:9000",
19+
"AccessKey": "minio",
20+
"SecretAccessKey": "minio123",
21+
"SessionToken": "",
22+
"Region": "",
23+
"SSLEnabled": "false",
24+
}
25+
err = s.SetConfig(config)
26+
if err != nil {
27+
fmt.Printf("ERROR: %s", err)
28+
return
29+
}
30+
31+
err = s.SetConfig(config)
32+
if err != nil {
33+
fmt.Printf("ERROR: %s", err)
34+
return
35+
}
36+
37+
err = s.Start()
38+
defer s.Close()
39+
40+
go func() {
41+
for v := range s.GetEvents() {
42+
fmt.Printf("EVENT: %s %s\n", v.Key, v.TypeString())
43+
}
44+
}()
45+
46+
go func() {
47+
for e := range s.GetErrors() {
48+
fmt.Printf("ERROR: %#v\n", e)
49+
}
50+
}()
51+
52+
time.Sleep(30*time.Second)
53+
}

s3.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cloudwatcher
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"strings"
78
"sync/atomic"
@@ -11,16 +12,22 @@ import (
1112
"github.com/minio/minio-go/pkg/credentials"
1213
)
1314

14-
type ObjectInfo = minio.ObjectInfo
15+
type Bool bool
16+
func (bit *Bool) UnmarshalJSON(b []byte) error {
17+
txt := string(b)
18+
*bit = Bool(txt == "1" || txt == "true")
19+
return nil
20+
}
21+
22+
type objectInfo = minio.ObjectInfo
1523
type S3Configuration struct {
1624
BucketName string
17-
Environment string
1825
Endpoint string
1926
AccessKey string
2027
SecretAccessKey string
2128
SessionToken string
2229
Region string
23-
SSLEnabled bool
30+
SSLEnabled Bool
2431
}
2532

2633
type S3Watcher struct {
@@ -43,32 +50,56 @@ type S3Object struct {
4350
LastModified time.Time
4451
}
4552

46-
func newS3Watcher(c interface{}) (Watcher, error) {
47-
var config *S3Configuration
48-
var ok bool
49-
if config, ok = c.(*S3Configuration); !ok {
50-
return nil, fmt.Errorf("configuration is not a S3Configuration object")
51-
}
53+
func newS3Watcher(dir string, interval time.Duration) (Watcher, error) {
5254

5355
upd := &S3Watcher{
5456
cache: make(map[string]*S3Object),
55-
config: config,
57+
config: nil,
5658
stop: make(chan bool, 1),
59+
WatcherBase: WatcherBase{
60+
Events: make(chan Event, 100),
61+
Errors: make(chan error, 100),
62+
watchDir: dir,
63+
pollingTime: interval,
64+
},
5765
}
66+
return upd, nil
67+
}
5868

59-
client, err := minio.New(upd.config.Endpoint, &minio.Options{
60-
Creds: credentials.NewStaticV4(upd.config.AccessKey, upd.config.SecretAccessKey, upd.config.SessionToken),
61-
Secure: upd.config.SSLEnabled,
62-
})
69+
func (u *S3Watcher) SetConfig(m map[string]string) error {
70+
j, err := json.Marshal(m)
6371
if err != nil {
64-
return nil, err
72+
return err
6573
}
66-
upd.client = client
6774

68-
return upd, nil
75+
config := S3Configuration{}
76+
if err := json.Unmarshal(j, &config); err != nil {
77+
return err
78+
}
79+
u.config = &config
80+
81+
client, err := minio.New(u.config.Endpoint, &minio.Options{
82+
Creds: credentials.NewStaticV4(u.config.AccessKey, u.config.SecretAccessKey, u.config.SessionToken),
83+
Secure: bool(u.config.SSLEnabled),
84+
})
85+
if err != nil {
86+
return err
87+
}
88+
u.client = client
89+
return nil
6990
}
7091

71-
func (u *S3Watcher) Start() {
92+
func (u *S3Watcher) Start() error {
93+
if u.config == nil {
94+
return fmt.Errorf("configuration for S3 needed")
95+
}
96+
97+
if ok, err := u.bucketExists(u.config.BucketName); err != nil {
98+
return fmt.Errorf("error on checking the bucket: %s", err)
99+
} else if !ok {
100+
return fmt.Errorf("error on checking the bucket: bucket %s not exists", u.config.BucketName)
101+
}
102+
72103
u.ticker = time.NewTicker(u.pollingTime)
73104
go func() {
74105
// launch synchronization also the first time
@@ -83,6 +114,7 @@ func (u *S3Watcher) Start() {
83114
}
84115
}
85116
}()
117+
return nil
86118
}
87119

88120
func (u *S3Watcher) Close() {
@@ -96,7 +128,7 @@ func (u *S3Watcher) getCachedObject(o *S3Object) *S3Object {
96128
return nil
97129
}
98130

99-
func (u *S3Object) AreTagsChanged(new *S3Object) bool {
131+
func (u *S3Object) areTagsChanged(new *S3Object) bool {
100132
// Check if tags are changed
101133
if len(u.Tags) != len(new.Tags) {
102134
return true
@@ -128,7 +160,8 @@ func (u *S3Watcher) sync() {
128160

129161
fileList := make(map[string]*S3Object, 0)
130162

131-
err := u.enumerateFiles(u.config.BucketName, u.watchDir, func(page int64, obj *ObjectInfo) bool {
163+
err := u.enumerateFiles(u.config.BucketName, u.watchDir, func(page int64, obj *objectInfo) bool {
164+
132165
// Get Info from S3 object
133166
upd, err := u.getInfoFromObject(obj)
134167
if err != nil {
@@ -140,7 +173,7 @@ func (u *S3Watcher) sync() {
140173

141174
// Check if the object is cached by Key
142175
cached := u.getCachedObject(upd)
143-
// Object has been cached previously but we need to check its tags
176+
// Object has been cached previously by Key
144177
if cached != nil {
145178
// Check if the LastModified has been changed
146179
if !cached.LastModified.Equal(upd.LastModified) {
@@ -152,7 +185,7 @@ func (u *S3Watcher) sync() {
152185
u.Events <- event
153186
}
154187
// Check if the tags have been updated
155-
if cached.AreTagsChanged(upd) {
188+
if cached.areTagsChanged(upd) {
156189
event := Event{
157190
Key: upd.Key,
158191
Type: TagsChanged,
@@ -169,6 +202,7 @@ func (u *S3Watcher) sync() {
169202
u.Events <- event
170203
}
171204
u.cache[upd.Key] = upd
205+
172206
return true
173207
})
174208
if err != nil {
@@ -218,7 +252,7 @@ func (u *S3Watcher) isConnected() bool {
218252
return found
219253
}
220254

221-
func (u *S3Watcher) getInfoFromObject(obj *ObjectInfo) (*S3Object, error) {
255+
func (u *S3Watcher) getInfoFromObject(obj *objectInfo) (*S3Object, error) {
222256
var upd *S3Object
223257

224258
tags, err := u.getTags(obj.Key, u.config.BucketName)
@@ -240,7 +274,7 @@ func (u *S3Watcher) getInfoFromObject(obj *ObjectInfo) (*S3Object, error) {
240274
return upd, nil
241275
}
242276

243-
func (u *S3Watcher) enumerateFiles(bucket, prefix string, callback func(page int64, object *ObjectInfo) bool) error {
277+
func (u *S3Watcher) enumerateFiles(bucket, prefix string, callback func(page int64, object *objectInfo) bool) error {
244278
doneCh := make(chan struct{})
245279
defer close(doneCh)
246280

@@ -257,7 +291,7 @@ func (u *S3Watcher) enumerateFiles(bucket, prefix string, callback func(page int
257291
continue
258292
}
259293

260-
obj := ObjectInfo(object)
294+
obj := objectInfo(object)
261295
if callback(0, &obj) == false {
262296
break
263297
}

0 commit comments

Comments
 (0)