@@ -29,6 +29,10 @@ pub struct S3RemoteFs {
2929 bucket : arc_swap:: ArcSwap < Bucket > ,
3030 sub_path : Option < String > ,
3131 delete_mut : Mutex < ( ) > ,
32+ /// When set, the refresh loop watches this file for changes and calls
33+ /// STS AssumeRoleWithWebIdentity with the JWT inside it.
34+ web_identity_token_file : Option < String > ,
35+ web_identity_role_arn : Option < String > ,
3236}
3337
3438impl fmt:: Debug for S3RemoteFs {
@@ -50,36 +54,49 @@ impl S3RemoteFs {
5054 bucket_name : String ,
5155 sub_path : Option < String > ,
5256 ) -> Result < Arc < Self > , CubeError > {
53- // Incorrect naming for ENV variables...
5457 let access_key = env:: var ( "CUBESTORE_AWS_ACCESS_KEY_ID" ) . ok ( ) ;
5558 let secret_key = env:: var ( "CUBESTORE_AWS_SECRET_ACCESS_KEY" ) . ok ( ) ;
59+ let token_file = env:: var ( "CUBESTORE_AWS_WEB_IDENTITY_TOKEN_FILE" ) . ok ( ) ;
60+ let role_arn = env:: var ( "CUBESTORE_AWS_ROLE_ARN" ) . ok ( ) ;
61+
62+ let credentials = if let ( Some ( ref tf) , Some ( ref arn) ) = ( & token_file, & role_arn) {
63+ // Web identity mode: read JWT from file and exchange via STS.
64+ let jwt = std:: fs:: read_to_string ( tf) . map_err ( |e| {
65+ CubeError :: internal ( format ! (
66+ "Failed to read web identity token file '{}': {}" ,
67+ tf, e
68+ ) )
69+ } ) ?;
70+ info ! (
71+ "Using web identity token file for S3 credentials (role={})" ,
72+ arn
73+ ) ;
74+ Credentials :: from_sts ( arn, "cubestore" , & jwt) . map_err ( |e| {
75+ CubeError :: internal ( format ! ( "STS AssumeRoleWithWebIdentity failed: {}" , e) )
76+ } ) ?
77+ } else {
78+ // Static credentials mode (or credential chain fallback).
79+ Credentials :: new (
80+ access_key. as_deref ( ) ,
81+ secret_key. as_deref ( ) ,
82+ None ,
83+ None ,
84+ None ,
85+ )
86+ . map_err ( |e| CubeError :: internal ( format ! ( "Failed to create S3 credentials: {}" , e) ) ) ?
87+ } ;
5688
57- let credentials = Credentials :: new (
58- access_key. as_deref ( ) ,
59- secret_key. as_deref ( ) ,
60- None ,
61- None ,
62- None ,
63- )
64- . map_err ( |err| {
65- CubeError :: internal ( format ! (
66- "Failed to create S3 credentials: {}" ,
67- err. to_string( )
68- ) )
69- } ) ?;
70- let region = region. parse :: < Region > ( ) . map_err ( |err| {
71- CubeError :: internal ( format ! (
72- "Failed to parse Region '{}': {}" ,
73- region,
74- err. to_string( )
75- ) )
89+ let region = region. parse :: < Region > ( ) . map_err ( |e| {
90+ CubeError :: internal ( format ! ( "Failed to parse Region '{}': {}" , region, e) )
7691 } ) ?;
7792 let bucket = Bucket :: new ( & bucket_name, region. clone ( ) , credentials) ?;
7893 let fs = Arc :: new ( Self {
7994 dir,
8095 bucket : arc_swap:: ArcSwap :: new ( Arc :: new ( bucket) ) ,
8196 sub_path,
8297 delete_mut : Mutex :: new ( ( ) ) ,
98+ web_identity_token_file : token_file,
99+ web_identity_role_arn : role_arn,
83100 } ) ;
84101 spawn_creds_refresh_loop ( access_key, secret_key, bucket_name, region, & fs) ;
85102
@@ -94,15 +111,36 @@ fn spawn_creds_refresh_loop(
94111 region : Region ,
95112 fs : & Arc < S3RemoteFs > ,
96113) {
97- // Refresh credentials. TODO: use expiration time.
98- let refresh_every = refresh_interval_from_env ( ) ;
114+ let token_file = fs. web_identity_token_file . clone ( ) ;
115+ let role_arn = fs. web_identity_role_arn . clone ( ) ;
116+ let is_web_identity = token_file. is_some ( ) && role_arn. is_some ( ) ;
117+
118+ // Web identity STS credentials expire in ~1 hour, so poll the token file
119+ // every 30s by default. Static credentials use 3-hour default.
120+ // CUBESTORE_AWS_CREDS_REFRESH_EVERY_MINS overrides both.
121+ let refresh_every = {
122+ let configured = refresh_interval_from_env ( ) ;
123+ if is_web_identity && configured == Duration :: from_secs ( 60 * 180 ) {
124+ Duration :: from_secs ( 30 )
125+ } else {
126+ configured
127+ }
128+ } ;
129+
99130 if refresh_every. as_secs ( ) == 0 {
100131 return ;
101132 }
102133
103134 let fs = Arc :: downgrade ( fs) ;
135+ let mut last_modified = token_file
136+ . as_ref ( )
137+ . and_then ( |f| std:: fs:: metadata ( f) . ok ( ) ?. modified ( ) . ok ( ) ) ;
138+
104139 std:: thread:: spawn ( move || {
105- log:: debug!( "Started S3 credentials refresh loop" ) ;
140+ log:: debug!(
141+ "Started S3 credentials refresh loop (web_identity={})" ,
142+ is_web_identity
143+ ) ;
106144 loop {
107145 std:: thread:: sleep ( refresh_every) ;
108146 let fs = match fs. upgrade ( ) {
@@ -112,13 +150,36 @@ fn spawn_creds_refresh_loop(
112150 }
113151 Some ( fs) => fs,
114152 } ;
115- let c = match Credentials :: new (
116- access_key. as_deref ( ) ,
117- secret_key. as_deref ( ) ,
118- None ,
119- None ,
120- None ,
121- ) {
153+
154+ // In web identity mode, only refresh when the token file changed.
155+ if let ( Some ( ref file) , Some ( _) ) = ( & token_file, & role_arn) {
156+ let current_modified = std:: fs:: metadata ( file) . ok ( ) . and_then ( |m| m. modified ( ) . ok ( ) ) ;
157+ if current_modified == last_modified {
158+ continue ;
159+ }
160+ last_modified = current_modified;
161+ info ! ( "Web identity token file changed, refreshing S3 credentials" ) ;
162+ }
163+
164+ let c = if let ( Some ( ref file) , Some ( ref arn) ) = ( & token_file, & role_arn) {
165+ match std:: fs:: read_to_string ( file) {
166+ Ok ( jwt) => Credentials :: from_sts ( arn, "cubestore" , & jwt) ,
167+ Err ( e) => {
168+ log:: error!( "Failed to read web identity token file: {}" , e) ;
169+ continue ;
170+ }
171+ }
172+ } else {
173+ Credentials :: new (
174+ access_key. as_deref ( ) ,
175+ secret_key. as_deref ( ) ,
176+ None ,
177+ None ,
178+ None ,
179+ )
180+ } ;
181+
182+ let c = match c {
122183 Ok ( c) => c,
123184 Err ( e) => {
124185 log:: error!( "Failed to refresh S3 credentials: {}" , e) ;
0 commit comments