Skip to content

Commit fe1a475

Browse files
committed
sf cli auth
1 parent 5c7fe22 commit fe1a475

5 files changed

Lines changed: 332 additions & 21 deletions

File tree

src/datacustomcode/cli.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
import json
1717
import os
1818
import sys
19-
from typing import List, Union
19+
from typing import (
20+
List,
21+
Optional,
22+
Union,
23+
)
2024

2125
import click
2226
from loguru import logger
@@ -294,12 +298,20 @@ def scan(filename: str, config: str, dry_run: bool, no_requirements: bool):
294298
@click.option("--config-file", default=None)
295299
@click.option("--dependencies", default=[], multiple=True)
296300
@click.option("--profile", default="default")
301+
@click.option(
302+
"--sf-cli-org",
303+
default=None,
304+
help="SF CLI org alias or username. Fetches credentials via `sf org display`.",
305+
)
297306
def run(
298307
entrypoint: str,
299308
config_file: Union[str, None],
300309
dependencies: List[str],
301310
profile: str,
311+
sf_cli_org: Optional[str],
302312
):
303313
from datacustomcode.run import run_entrypoint
304314

305-
run_entrypoint(entrypoint, config_file, dependencies, profile)
315+
run_entrypoint(
316+
entrypoint, config_file, dependencies, profile, sf_cli_org=sf_cli_org
317+
)

src/datacustomcode/io/reader/query_api.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
from datacustomcode.credentials import AuthType, Credentials
3838
from datacustomcode.io.reader.base import BaseDataCloudReader
39+
from datacustomcode.io.reader.sf_cli import SFCLIDataCloudReader
3940

4041
if TYPE_CHECKING:
4142
import pandas
@@ -136,6 +137,7 @@ class QueryAPIDataCloudReader(BaseDataCloudReader):
136137
Supports multiple authentication methods:
137138
- OAuth Tokens (default, needs client_id/secret with refresh_token)
138139
- Client Credentials (server-to-server, needs client_id/secret only)
140+
- SF CLI (uses ``sf org display`` access token via the REST API directly)
139141
140142
Supports dataspace configuration for querying data within specific dataspaces.
141143
When a dataspace is provided (and not "default"), queries are executed within
@@ -149,6 +151,7 @@ def __init__(
149151
spark: SparkSession,
150152
credentials_profile: str = "default",
151153
dataspace: Optional[str] = None,
154+
sf_cli_org: Optional[str] = None,
152155
) -> None:
153156
"""Initialize QueryAPIDataCloudReader.
154157
@@ -160,14 +163,30 @@ def __init__(
160163
dataspace: Optional dataspace identifier. If provided and not "default",
161164
the connection will be configured for the specified dataspace.
162165
When None or "default", uses the default dataspace.
166+
sf_cli_org: Optional SF CLI org alias or username. When set, the
167+
reader delegates to :class:`SFCLIDataCloudReader` which calls
168+
the Data Cloud REST API directly using the token obtained from
169+
``sf org display``, bypassing the CDP token-exchange flow.
163170
"""
164171
self.spark = spark
165-
credentials = Credentials.from_available(profile=credentials_profile)
166-
logger.debug(
167-
"Initializing QueryAPIDataCloudReader with "
168-
f"auth_type={credentials.auth_type.value}"
169-
)
170-
self._conn = create_cdp_connection(credentials, dataspace)
172+
if sf_cli_org:
173+
logger.debug(
174+
f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'"
175+
)
176+
self._sf_cli_reader: Optional[SFCLIDataCloudReader] = SFCLIDataCloudReader(
177+
spark=spark,
178+
sf_cli_org=sf_cli_org,
179+
dataspace=dataspace,
180+
)
181+
self._conn = None
182+
else:
183+
self._sf_cli_reader = None
184+
credentials = Credentials.from_available(profile=credentials_profile)
185+
logger.debug(
186+
"Initializing QueryAPIDataCloudReader with "
187+
f"auth_type={credentials.auth_type.value}"
188+
)
189+
self._conn = create_cdp_connection(credentials, dataspace)
171190

172191
def read_dlo(
173192
self,
@@ -186,8 +205,15 @@ def read_dlo(
186205
Returns:
187206
PySparkDataFrame: The PySpark DataFrame.
188207
"""
208+
sf_cli_reader: Optional[SFCLIDataCloudReader] = getattr(
209+
self, "_sf_cli_reader", None
210+
)
211+
if sf_cli_reader is not None:
212+
return sf_cli_reader.read_dlo(name, schema, row_limit)
213+
189214
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
190215

216+
assert self._conn is not None
191217
pandas_df = self._conn.get_pandas_dataframe(query)
192218

193219
# Convert pandas DataFrame to Spark DataFrame
@@ -214,8 +240,15 @@ def read_dmo(
214240
Returns:
215241
PySparkDataFrame: The PySpark DataFrame.
216242
"""
243+
sf_cli_reader: Optional[SFCLIDataCloudReader] = getattr(
244+
self, "_sf_cli_reader", None
245+
)
246+
if sf_cli_reader is not None:
247+
return sf_cli_reader.read_dmo(name, schema, row_limit)
248+
217249
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
218250

251+
assert self._conn is not None
219252
pandas_df = self._conn.get_pandas_dataframe(query)
220253

221254
# Convert pandas DataFrame to Spark DataFrame
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# Copyright (c) 2025, Salesforce, Inc.
2+
# SPDX-License-Identifier: Apache-2
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from __future__ import annotations
16+
17+
import json
18+
import logging
19+
import subprocess
20+
from typing import (
21+
TYPE_CHECKING,
22+
Final,
23+
Optional,
24+
Union,
25+
)
26+
27+
import pandas as pd
28+
import pandas.api.types as pd_types
29+
from pyspark.sql.types import (
30+
BooleanType,
31+
DoubleType,
32+
LongType,
33+
StringType,
34+
StructField,
35+
StructType,
36+
TimestampType,
37+
)
38+
import requests
39+
40+
from datacustomcode.io.reader.base import BaseDataCloudReader
41+
42+
if TYPE_CHECKING:
43+
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
44+
from pyspark.sql.types import AtomicType
45+
46+
logger = logging.getLogger(__name__)
47+
48+
API_VERSION: Final = "v66.0"
49+
PANDAS_TYPE_MAPPING = {
50+
"object": StringType(),
51+
"int64": LongType(),
52+
"float64": DoubleType(),
53+
"bool": BooleanType(),
54+
}
55+
56+
57+
def _pandas_to_spark_schema(
58+
pandas_df: pd.DataFrame, nullable: bool = True
59+
) -> StructType:
60+
fields = []
61+
for column, dtype in pandas_df.dtypes.items():
62+
spark_type: AtomicType
63+
if pd_types.is_datetime64_any_dtype(dtype):
64+
spark_type = TimestampType()
65+
else:
66+
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
67+
fields.append(StructField(column, spark_type, nullable))
68+
return StructType(fields)
69+
70+
71+
class SFCLIDataCloudReader(BaseDataCloudReader):
72+
"""DataCloud reader that authenticates via the Salesforce CLI.
73+
74+
Uses ``sf org display`` to obtain a fresh access token and queries
75+
Data Cloud through the REST API directly
76+
(``/services/data/{version}/ssot/query-sql``), bypassing the CDP
77+
token-exchange flow that requires special OAuth scopes.
78+
"""
79+
80+
CONFIG_NAME = "SFCLIDataCloudReader"
81+
82+
def __init__(
83+
self,
84+
spark: SparkSession,
85+
sf_cli_org: str,
86+
dataspace: Optional[str] = None,
87+
) -> None:
88+
"""Initialize SFCLIDataCloudReader.
89+
90+
Args:
91+
spark: SparkSession instance for creating DataFrames.
92+
sf_cli_org: Salesforce org alias or username as known to the SF CLI
93+
(e.g. the alias given to ``sf org login web --alias dev1``).
94+
dataspace: Optional dataspace identifier. If ``None`` or
95+
``"default"`` the query runs against the default dataspace.
96+
"""
97+
self.spark = spark
98+
self.sf_cli_org = sf_cli_org
99+
self.dataspace = (
100+
dataspace if dataspace and dataspace != "default" else "default"
101+
)
102+
logger.debug(f"Initialized SFCLIDataCloudReader for org '{sf_cli_org}'")
103+
104+
def _get_token(self) -> tuple[str, str]:
105+
"""Fetch a fresh access token and instance URL from the SF CLI.
106+
107+
Returns:
108+
``(access_token, instance_url)``
109+
110+
Raises:
111+
RuntimeError: If the ``sf`` command is not on PATH, times out, or
112+
returns an error.
113+
"""
114+
try:
115+
result = subprocess.run(
116+
["sf", "org", "display", "--target-org", self.sf_cli_org, "--json"],
117+
capture_output=True,
118+
text=True,
119+
check=True,
120+
timeout=30,
121+
)
122+
except FileNotFoundError as exc:
123+
raise RuntimeError(
124+
"The 'sf' command was not found. "
125+
"Please install Salesforce CLI: https://developer.salesforce.com/tools/salesforcecli"
126+
) from exc
127+
except subprocess.TimeoutExpired as exc:
128+
raise RuntimeError(
129+
f"'sf org display' timed out for org '{self.sf_cli_org}'"
130+
) from exc
131+
except subprocess.CalledProcessError as exc:
132+
raise RuntimeError(
133+
f"'sf org display' failed for org '{self.sf_cli_org}'.\n"
134+
f"Ensure the org is authenticated via 'sf org login web'.\n"
135+
f"stderr: {exc.stderr.strip()}"
136+
) from exc
137+
138+
try:
139+
data = json.loads(result.stdout)
140+
except json.JSONDecodeError as exc:
141+
raise RuntimeError(
142+
f"Failed to parse 'sf org display' output: {exc}"
143+
) from exc
144+
145+
if data.get("status") != 0:
146+
raise RuntimeError(
147+
f"SF CLI error for org '{self.sf_cli_org}': "
148+
f"{data.get('message', 'unknown error')}"
149+
)
150+
151+
org_result = data.get("result", {})
152+
access_token = org_result.get("accessToken")
153+
instance_url = org_result.get("instanceUrl")
154+
155+
if not access_token or not instance_url:
156+
raise RuntimeError(
157+
f"'sf org display' did not return an access token or instance URL "
158+
f"for org '{self.sf_cli_org}'"
159+
)
160+
161+
logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'")
162+
return access_token, instance_url
163+
164+
def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
165+
"""Execute *sql* against the Data Cloud REST endpoint.
166+
167+
Args:
168+
sql: Base SQL query (no ``LIMIT`` clause).
169+
row_limit: Maximum rows to return.
170+
171+
Returns:
172+
Pandas DataFrame with query results.
173+
174+
Raises:
175+
RuntimeError: On HTTP errors or unexpected response shapes.
176+
"""
177+
access_token, instance_url = self._get_token()
178+
179+
url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql"
180+
headers = {"Authorization": f"Bearer {access_token}"}
181+
params = {"dataspace": self.dataspace}
182+
body = {"sql": f"{sql} LIMIT {row_limit}"}
183+
184+
logger.debug(f"Executing Data Cloud query: {body['sql']}")
185+
186+
try:
187+
response = requests.post(
188+
url,
189+
json=body,
190+
params=params,
191+
headers=headers,
192+
timeout=120,
193+
)
194+
except requests.RequestException as exc:
195+
raise RuntimeError(f"Data Cloud query request failed: {exc}") from exc
196+
197+
if response.status_code >= 300:
198+
error_msg = response.text
199+
try:
200+
error_data = response.json()
201+
if isinstance(error_data, list) and error_data:
202+
error_msg = error_data[0].get("message", error_msg)
203+
except (json.JSONDecodeError, KeyError):
204+
pass
205+
raise RuntimeError(
206+
f"Data Cloud query failed (HTTP {response.status_code}): {error_msg}"
207+
)
208+
209+
result = response.json()
210+
metadata = result.get("metadata", [])
211+
column_names = [col.get("name") for col in metadata]
212+
rows = result.get("data", [])
213+
214+
if not rows:
215+
return pd.DataFrame(columns=column_names)
216+
return pd.DataFrame(rows, columns=column_names)
217+
218+
def read_dlo(
219+
self,
220+
name: str,
221+
schema: Union[AtomicType, StructType, str, None] = None,
222+
row_limit: int = 1000,
223+
) -> PySparkDataFrame:
224+
"""Read a Data Lake Object (DLO) from Data Cloud.
225+
226+
Args:
227+
name: DLO name.
228+
schema: Optional explicit schema.
229+
row_limit: Maximum rows to fetch.
230+
231+
Returns:
232+
PySpark DataFrame.
233+
"""
234+
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
235+
if not schema:
236+
schema = _pandas_to_spark_schema(pandas_df)
237+
return self.spark.createDataFrame(pandas_df, schema)
238+
239+
def read_dmo(
240+
self,
241+
name: str,
242+
schema: Union[AtomicType, StructType, str, None] = None,
243+
row_limit: int = 1000,
244+
) -> PySparkDataFrame:
245+
"""Read a Data Model Object (DMO) from Data Cloud.
246+
247+
Args:
248+
name: DMO name.
249+
schema: Optional explicit schema.
250+
row_limit: Maximum rows to fetch.
251+
252+
Returns:
253+
PySpark DataFrame.
254+
"""
255+
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
256+
if not schema:
257+
schema = _pandas_to_spark_schema(pandas_df)
258+
return self.spark.createDataFrame(pandas_df, schema)

0 commit comments

Comments
 (0)