Skip to content

Commit a2bc784

Browse files
committed
fix: typo on the filename
1 parent fbf87eb commit a2bc784

1 file changed

Lines changed: 269 additions & 0 deletions

File tree

s3.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package cloudwatcher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/minio/minio-go"
11+
"github.com/minio/minio-go/pkg/credentials"
12+
)
13+
14+
type ObjectInfo = minio.ObjectInfo
15+
type S3Configuration struct {
16+
BucketName string
17+
Environment string
18+
Endpoint string
19+
AccessKey string
20+
SecretAccessKey string
21+
SessionToken string
22+
Region string
23+
SSLEnabled bool
24+
}
25+
26+
type S3Watcher struct {
27+
WatcherBase
28+
29+
synching uint32
30+
31+
ticker *time.Ticker
32+
stop chan bool
33+
config *S3Configuration
34+
client *minio.Client
35+
cache map[string]*S3Object
36+
}
37+
38+
type S3Object struct {
39+
Key string
40+
Etag string
41+
Size int64
42+
Tags map[string]string
43+
LastModified time.Time
44+
}
45+
46+
func NewS3Watcher(c interface{}) (Watcher, error) {
47+
var config *S3Configuration
48+
var ok bool
49+
if config, ok = c.(*S3Configuration); !ok {
50+
return nil, fmt.Errorf("configuration is not a S3Configuration object")
51+
}
52+
53+
upd := &S3Watcher{
54+
cache: make(map[string]*S3Object),
55+
config: config,
56+
stop: make(chan bool, 1),
57+
}
58+
59+
client, err := minio.New(upd.config.Endpoint, &minio.Options{
60+
Creds: credentials.NewStaticV4(upd.config.AccessKey, upd.config.SecretAccessKey, upd.config.SessionToken),
61+
Secure: upd.config.SSLEnabled,
62+
})
63+
if err != nil {
64+
return nil, err
65+
}
66+
upd.client = client
67+
68+
return upd, nil
69+
}
70+
71+
func (u *S3Watcher) Start() {
72+
u.ticker = time.NewTicker(u.pollingTime)
73+
go func() {
74+
// launch synchronization also the first time
75+
u.sync()
76+
for {
77+
select {
78+
case <-u.ticker.C:
79+
u.sync()
80+
81+
case <-u.stop:
82+
return
83+
}
84+
}
85+
}()
86+
}
87+
88+
func (u *S3Watcher) Close() {
89+
}
90+
91+
func (u *S3Watcher) getCachedObject(o *S3Object) *S3Object {
92+
if cachedObject, ok := u.cache[o.Key]; ok {
93+
return cachedObject
94+
}
95+
return nil
96+
}
97+
98+
func (u *S3Object) AreTagsChanged(new *S3Object) bool {
99+
// Check if tags are changed
100+
if len(u.Tags) != len(new.Tags) {
101+
return true
102+
}
103+
for k, v := range u.Tags {
104+
if nv, ok := new.Tags[k]; !ok || v != nv {
105+
return true
106+
}
107+
}
108+
for k, v := range new.Tags {
109+
if nv, ok := u.Tags[k]; !ok || v != nv {
110+
return true
111+
}
112+
}
113+
return false
114+
}
115+
116+
func (u *S3Watcher) sync() {
117+
// allow only one sync at same time
118+
if !atomic.CompareAndSwapUint32(&u.synching, 0, 1) {
119+
return
120+
}
121+
defer atomic.StoreUint32(&u.synching, 0)
122+
123+
// Avoid to delete all the things if the updater env is not ready...
124+
if u.isConnected() == false {
125+
return
126+
}
127+
128+
fileList := make(map[string]*S3Object, 0)
129+
130+
err := u.EnumerateFiles(u.config.BucketName, u.watchDir, func(page int64, obj *ObjectInfo) bool {
131+
// Get Info from S3 object
132+
upd, err := u.getInfoFromObject(obj)
133+
if err != nil {
134+
return true // continue
135+
}
136+
137+
// Store the files to check the deleted one
138+
fileList[upd.Key] = upd
139+
140+
// Check if the object is cached by Key
141+
cached := u.getCachedObject(upd)
142+
// Object has been cached previously but we need to check its tags
143+
if cached != nil {
144+
// Check if the LastModified has been changed
145+
if !cached.LastModified.Equal(upd.LastModified) {
146+
event := Event{
147+
Key: upd.Key,
148+
Type: FileChanged,
149+
Object: upd,
150+
}
151+
u.Events <- event
152+
}
153+
// Check if the tags have been updated
154+
if cached.AreTagsChanged(upd) {
155+
event := Event{
156+
Key: upd.Key,
157+
Type: TagsChanged,
158+
Object: upd,
159+
}
160+
u.Events <- event
161+
}
162+
} else {
163+
event := Event{
164+
Key: upd.Key,
165+
Type: FileCreated,
166+
Object: upd,
167+
}
168+
u.Events <- event
169+
}
170+
u.cache[upd.Key] = upd
171+
return true
172+
})
173+
if err != nil {
174+
return
175+
}
176+
177+
for k, o := range u.cache {
178+
if _, found := fileList[k]; !found {
179+
// file not found in the list...deleting it
180+
delete(u.cache, k)
181+
event := Event{
182+
Key: o.Key,
183+
Type: FileDeleted,
184+
Object: nil,
185+
}
186+
u.Events <- event
187+
}
188+
}
189+
}
190+
191+
func (u *S3Watcher) bucketExists(bucket string) (bool, error) {
192+
found, err := u.client.BucketExists(context.Background(), bucket)
193+
if err != nil {
194+
return false, err
195+
}
196+
return found, nil
197+
}
198+
199+
func (u *S3Watcher) getTags(key string, bucket string) (map[string]string, error) {
200+
t, err := u.client.GetObjectTagging(context.Background(), bucket, key, minio.GetObjectTaggingOptions{})
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
tags := make(map[string]string)
206+
for key, tag := range t.ToMap() {
207+
tags[key] = tag
208+
}
209+
return tags, nil
210+
}
211+
212+
func (u *S3Watcher) isConnected() bool {
213+
found, err := u.bucketExists(u.config.BucketName)
214+
if err != nil {
215+
return false
216+
}
217+
return found
218+
}
219+
220+
func (u *S3Watcher) getInfoFromObject(obj *ObjectInfo) (*S3Object, error) {
221+
var upd *S3Object
222+
223+
tags, err := u.getTags(obj.Key, u.config.BucketName)
224+
if err != nil {
225+
return nil, fmt.Errorf("getting tags from key '%s': %s", obj.Key, err)
226+
}
227+
//log.Debug("s3 watcher: get tags from key '%s': %v", obj.Key, tags)
228+
229+
upd = &S3Object{
230+
Key: obj.Key,
231+
Etag: strings.ToLower(strings.Trim(obj.ETag, "\"")), // ETag contains double quotes
232+
Size: obj.Size,
233+
LastModified: obj.LastModified,
234+
Tags: make(map[string]string),
235+
}
236+
for k, v := range tags {
237+
upd.Tags[k] = v
238+
}
239+
return upd, nil
240+
}
241+
242+
func (u *S3Watcher) EnumerateFiles(bucket, prefix string, callback func(page int64, object *ObjectInfo) bool) error {
243+
doneCh := make(chan struct{})
244+
defer close(doneCh)
245+
246+
// List all objects from a bucket-name with a matching prefix.
247+
for object := range u.client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
248+
WithVersions: false,
249+
WithMetadata: false,
250+
Prefix: prefix,
251+
Recursive: true,
252+
MaxKeys: 0,
253+
UseV1: false,
254+
}) {
255+
if object.Err != nil {
256+
continue
257+
}
258+
259+
obj := ObjectInfo(object)
260+
if callback(0, &obj) == false {
261+
break
262+
}
263+
}
264+
return nil
265+
}
266+
267+
func init() {
268+
supportedServices["s3"] = NewS3Watcher
269+
}

0 commit comments

Comments
 (0)