Skip to content

Commit ce242a5

Browse files
committed
new: gdrive support added
1 parent a53dbd9 commit ce242a5

2 files changed

Lines changed: 353 additions & 0 deletions

File tree

examples/gdrive/gdrive.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/Matrix86/cloudwatcher"
8+
"golang.org/x/oauth2"
9+
"golang.org/x/oauth2/google"
10+
"google.golang.org/api/drive/v3"
11+
"time"
12+
)
13+
14+
const (
15+
ClientId = "XXXXX.apps.googleusercontent.com"
16+
ClientSecret = "X-XXXX-XX"
17+
Token = "{\"access_token\":\"XXXXX\",\"token_type\":\"Bearer\",\"refresh_token\":\"XXXX\",\"expiry\":\"2021-02-09T20:11:25.925429905+01:00\"}"
18+
)
19+
20+
func getToken() (string, error) {
21+
config := &oauth2.Config{
22+
ClientID: ClientId,
23+
ClientSecret: ClientSecret,
24+
Scopes: []string{drive.DriveMetadataReadonlyScope},
25+
Endpoint: google.Endpoint,
26+
}
27+
28+
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
29+
fmt.Printf("Go to the following link in your browser then type the "+
30+
"authorization code: \n%v\n", authURL)
31+
32+
var authCode string
33+
if _, err := fmt.Scan(&authCode); err != nil {
34+
return "", err
35+
}
36+
37+
tok, err := config.Exchange(context.TODO(), authCode)
38+
if err != nil {
39+
return "", err
40+
}
41+
42+
b, err := json.Marshal(tok)
43+
if err != nil {
44+
return "", err
45+
}
46+
return string(b), nil
47+
}
48+
func main() {
49+
s, err := cloudwatcher.New("gdrive", "", 2*time.Second)
50+
if err != nil {
51+
fmt.Printf("ERROR: %s", err)
52+
return
53+
}
54+
55+
config := map[string]string{
56+
"debug": "true",
57+
"token": Token,
58+
"client_id": ClientId,
59+
"client_secret": ClientSecret,
60+
}
61+
62+
if v, ok := config["token"]; !ok || v == "" {
63+
token, err := getToken()
64+
if err != nil {
65+
fmt.Printf("ERROR: %s", err)
66+
return
67+
}
68+
fmt.Printf("NEW TOKEN: %s\n", token)
69+
config["token"] = token
70+
}
71+
72+
err = s.SetConfig(config)
73+
if err != nil {
74+
fmt.Printf("ERROR: %s", err)
75+
return
76+
}
77+
78+
err = s.Start()
79+
defer s.Close()
80+
for {
81+
select {
82+
case v := <-s.GetEvents():
83+
fmt.Printf("EVENT: %s %s\n", v.Key, v.TypeString())
84+
85+
case e := <-s.GetErrors():
86+
fmt.Printf("ERROR: %s\n", e)
87+
}
88+
}
89+
}

gdrive.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
package cloudwatcher
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"golang.org/x/oauth2"
8+
"golang.org/x/oauth2/google"
9+
"google.golang.org/api/option"
10+
"strings"
11+
"sync/atomic"
12+
"time"
13+
14+
"google.golang.org/api/drive/v3"
15+
)
16+
17+
type GDriveWatcher struct {
18+
WatcherBase
19+
20+
syncing uint32
21+
22+
ticker *time.Ticker
23+
stop chan bool
24+
config *GDriveConfiguration
25+
cache map[string]*GDriveObject
26+
}
27+
28+
type GDriveObject struct {
29+
Id string
30+
Key string
31+
Size int64
32+
LastModified time.Time
33+
Hash string
34+
}
35+
36+
type GDriveConfiguration struct {
37+
Debug Bool `json:"debug"`
38+
JToken string `json:"token"`
39+
ClientId string `json:"client_id"`
40+
ClientSecret string `json:"client_secret"`
41+
ApiKey string `json:"api_key"`
42+
43+
token *oauth2.Token
44+
}
45+
46+
func newGDriveWatcher(dir string, interval time.Duration) (Watcher, error) {
47+
w := &GDriveWatcher{
48+
cache: make(map[string]*GDriveObject),
49+
config: nil,
50+
stop: make(chan bool, 1),
51+
WatcherBase: WatcherBase{
52+
Events: make(chan Event, 100),
53+
Errors: make(chan error, 100),
54+
watchDir: dir,
55+
pollingTime: interval,
56+
},
57+
}
58+
return w, nil
59+
}
60+
61+
func (w *GDriveWatcher) SetConfig(m map[string]string) error {
62+
j, err := json.Marshal(m)
63+
if err != nil {
64+
return err
65+
}
66+
67+
config := GDriveConfiguration{}
68+
if err := json.Unmarshal(j, &config); err != nil {
69+
return err
70+
}
71+
72+
if config.JToken == "" && config.ApiKey == "" {
73+
return fmt.Errorf("token or api_key have to be set")
74+
}
75+
w.config = &config
76+
77+
tok := &oauth2.Token{}
78+
if err := json.Unmarshal([]byte(config.JToken), tok); err != nil {
79+
return err
80+
}
81+
w.config.token = tok
82+
return nil
83+
}
84+
85+
func (w *GDriveWatcher) Start() error {
86+
if w.config == nil {
87+
return fmt.Errorf("configuration for Dropbox needed")
88+
}
89+
90+
w.ticker = time.NewTicker(w.pollingTime)
91+
go func() {
92+
// launch synchronization also the first time
93+
w.sync()
94+
for {
95+
select {
96+
case <-w.ticker.C:
97+
w.sync()
98+
99+
case <-w.stop:
100+
close(w.Events)
101+
close(w.Errors)
102+
return
103+
}
104+
}
105+
}()
106+
return nil
107+
}
108+
109+
func (w *GDriveWatcher) Close() {
110+
w.stop <- true
111+
}
112+
113+
func (w *GDriveWatcher) sync() {
114+
// allow only one sync at same time
115+
if !atomic.CompareAndSwapUint32(&w.syncing, 0, 1) {
116+
return
117+
}
118+
defer atomic.StoreUint32(&w.syncing, 0)
119+
120+
fileList := make(map[string]*GDriveObject, 0)
121+
122+
err := w.enumerateFiles(w.watchDir, func(obj *GDriveObject) bool {
123+
// Store the files to check the deleted one
124+
fileList[obj.Id] = obj
125+
// Check if the object is cached by Key
126+
cached := w.getCachedObject(obj)
127+
// Object has been cached previously by Key
128+
if cached != nil {
129+
// Check if the LastModified has been changed
130+
if !cached.LastModified.Equal(obj.LastModified) || cached.Hash != obj.Hash {
131+
fmt.Printf("cached %s obj %s\n", cached.LastModified, obj.LastModified)
132+
event := Event{
133+
Key: obj.Key,
134+
Type: FileChanged,
135+
Object: obj,
136+
}
137+
w.Events <- event
138+
}
139+
} else {
140+
event := Event{
141+
Key: obj.Key,
142+
Type: FileCreated,
143+
Object: obj,
144+
}
145+
w.Events <- event
146+
}
147+
w.cache[obj.Id] = obj
148+
return true
149+
})
150+
if err != nil {
151+
w.Errors <- err
152+
return
153+
}
154+
155+
for k, o := range w.cache {
156+
if _, found := fileList[k]; !found {
157+
// file not found in the list...deleting it
158+
delete(w.cache, k)
159+
event := Event{
160+
Key: o.Key,
161+
Type: FileDeleted,
162+
Object: o,
163+
}
164+
w.Events <- event
165+
}
166+
}
167+
}
168+
169+
func (w *GDriveWatcher) resolveParents(file *drive.File, list map[string]*drive.File) [][]string {
170+
paths := make([][]string, len(file.Parents))
171+
for i, _ := range paths {
172+
paths[i] = make([]string, 0)
173+
paths[i] = append(paths[i], file.Name)
174+
}
175+
for i, parentId := range file.Parents {
176+
if v, ok := list[parentId]; !ok {
177+
continue
178+
} else {
179+
for _, ppath := range w.resolveParents(v, list) {
180+
paths[i] = append(ppath, paths[i]...)
181+
}
182+
}
183+
}
184+
return paths
185+
}
186+
187+
func (w *GDriveWatcher) getFullPaths(file *drive.File, list map[string]*drive.File) []string {
188+
tpaths := w.resolveParents(file, list)
189+
paths := make([]string, 0)
190+
for _, path := range tpaths {
191+
paths = append(paths, strings.Join(path, "/"))
192+
}
193+
return paths
194+
}
195+
196+
func (w *GDriveWatcher) enumerateFiles(prefix string, callback func(object *GDriveObject) bool) error {
197+
config := &oauth2.Config{
198+
ClientID: w.config.ClientId,
199+
ClientSecret: w.config.ClientSecret,
200+
Scopes: []string{drive.DriveMetadataReadonlyScope},
201+
Endpoint: google.Endpoint,
202+
}
203+
204+
var opt option.ClientOption
205+
if w.config.token != nil {
206+
opt = option.WithTokenSource(config.TokenSource(context.Background(), w.config.token))
207+
} else if w.config.ApiKey != "" {
208+
opt = option.WithAPIKey(w.config.ApiKey)
209+
}
210+
211+
srv, err := drive.NewService(context.Background(), opt)
212+
if err != nil {
213+
return fmt.Errorf("unable to retrieve Drive client: %v", err)
214+
}
215+
216+
fileList := make(map[string]*drive.File)
217+
218+
err = srv.Files.List().Fields("nextPageToken, files(id, name, mimeType, modifiedTime, parents, size, md5Checksum, trashed)").Pages(context.Background(), func(files *drive.FileList) error {
219+
for _, f := range files.Files {
220+
fileList[f.Id] = f
221+
}
222+
223+
return nil
224+
})
225+
if err != nil {
226+
return fmt.Errorf("unable to retrieve files: %v", err)
227+
}
228+
229+
for _, file := range fileList {
230+
if file.MimeType != "application/vnd.google-apps.folder" && !file.Trashed {
231+
for _, name := range w.getFullPaths(file, fileList) {
232+
mt, e := time.Parse(time.RFC3339, file.ModifiedTime)
233+
if err != nil {
234+
w.Errors <- e
235+
continue
236+
}
237+
if strings.HasPrefix(name, prefix) {
238+
o := &GDriveObject{
239+
Id: file.Id,
240+
Key: name,
241+
Size: file.Size,
242+
LastModified: mt,
243+
Hash: file.Md5Checksum,
244+
}
245+
if callback(o) == false {
246+
break
247+
}
248+
}
249+
}
250+
}
251+
}
252+
return nil
253+
}
254+
255+
func (w *GDriveWatcher) getCachedObject(o *GDriveObject) *GDriveObject {
256+
if cachedObject, ok := w.cache[o.Id]; ok {
257+
return cachedObject
258+
}
259+
return nil
260+
}
261+
262+
func init() {
263+
supportedServices["gdrive"] = newGDriveWatcher
264+
}

0 commit comments

Comments
 (0)