Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions sqlparse/filters/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,63 @@ def process(self, stmt):
return stmt


def _generate_assignment_header(varname, has_nl, count, quote_char, assign_op='='):
"""Generate the variable assignment header tokens.

Yields the tokens that start a variable assignment line:
varname = ' (Python)
varname = " (PHP)

Args:
varname: Variable name string
has_nl: Whether the SQL has newlines
count: Statement count (for blank line before subsequent statements)
quote_char: Single quote (') for Python, double quote (") for PHP
assign_op: Assignment operator ('=' or '.=')
"""
if count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, assign_op)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, quote_char)


def _generate_continuation_header(varname, quote_char, assign_op='=', indent_padding=0):
"""Generate the continuation line header tokens after a newline.

Yields the tokens for a continuation line when the SQL has newlines:
' (Python - with optional indent padding)
varname .= " (PHP)

Args:
varname: Variable name string
quote_char: Quote character for the new line
assign_op: Assignment operator for continuation ('=' or '.=')
indent_padding: Number of spaces for Python-style continuation padding
"""
yield sql.Token(T.Text, f' {quote_char}')
yield sql.Token(T.Whitespace, '\n')
if assign_op == '.=':
# PHP-style: re-assign with .=
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '.=')
yield sql.Token(T.Whitespace, ' ')
else:
# Python-style: continuation with padding
yield sql.Token(T.Whitespace, ' ' * indent_padding)
yield sql.Token(T.Text, quote_char)


class OutputPythonFilter(OutputFilter):
_quote_char = "'"
_escape_char = "'"
_escape_replacement = "\\'"

def _process(self, stream, varname, has_nl):
# SQL query assignation to varname
# Variable assignment header (without trailing quote — we add it below)
if self.count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
Expand All @@ -42,82 +96,83 @@ def _process(self, stream, varname, has_nl):
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Operator, '(')
yield sql.Token(T.Text, "'")

# Print the tokens on the quote
yield sql.Token(T.Text, self._quote_char)

# Print the tokens within the quote
continuation_padding = len(varname) + 4
for token in stream:
# Token is a new line separator
if token.is_whitespace and '\n' in token.value:
# Close quote and add a new line
yield sql.Token(T.Text, " '")
yield sql.Token(T.Whitespace, '\n')

# Quote header on secondary lines
yield sql.Token(T.Whitespace, ' ' * (len(varname) + 4))
yield sql.Token(T.Text, "'")
# Close current quote and start continuation line
yield from _generate_continuation_header(
varname, self._quote_char, '=',
indent_padding=continuation_padding)

# Indentation
# Indentation after the newline
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue

# Token has escape chars
elif "'" in token.value:
token.value = token.value.replace("'", "\\'")
# Escape the quote character within token values
if self._escape_char in token.value:
token.value = token.value.replace(
self._escape_char, self._escape_replacement)

# Put the token
yield sql.Token(T.Text, token.value)

# Close quote
yield sql.Token(T.Text, "'")
yield sql.Token(T.Text, self._quote_char)
if has_nl:
yield sql.Token(T.Operator, ')')


class OutputPHPFilter(OutputFilter):
varname_prefix = '$'
_quote_char = '"'
_escape_char = '"'
_escape_replacement = '\\"'

def _process(self, stream, varname, has_nl):
# SQL query assignation to varname (quote header)
# Variable assignment header
if self.count > 1:
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
if has_nl:
# Extra space for alignment with continuation .= lines
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, '"')
yield sql.Token(T.Text, self._quote_char)

# Print the tokens on the quote
# Print the tokens within the quote
for token in stream:
# Token is a new line separator
if token.is_whitespace and '\n' in token.value:
# Close quote and add a new line
# Close current quote with semicolon and start continuation
yield sql.Token(T.Text, ' ";')
yield sql.Token(T.Whitespace, '\n')

# Quote header on secondary lines
# PHP continuation line: $varname .= "
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '.=')
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, '"')
yield sql.Token(T.Text, self._quote_char)

# Indentation
# Indentation after the newline
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue

# Token has escape chars
elif '"' in token.value:
token.value = token.value.replace('"', '\\"')
# Escape the quote character within token values
if self._escape_char in token.value:
token.value = token.value.replace(
self._escape_char, self._escape_replacement)

# Put the token
yield sql.Token(T.Text, token.value)

# Close quote
yield sql.Token(T.Text, '"')
yield sql.Token(T.Text, self._quote_char)
yield sql.Token(T.Punctuation, ';')
177 changes: 78 additions & 99 deletions sqlparse/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,125 +11,104 @@
from sqlparse.exceptions import SQLParseError


# ─── Option validation schema ────────────────────────────────────────────────
# Each entry defines: valid values/coercion, side effects, and error messages.
# This replaces the repetitive if/raise pattern from the original code.

_BOOL_VALUES = [True, False]
_CASE_VALUES = [None, 'upper', 'lower', 'capitalize']
_OUTPUT_FORMAT_VALUES = [None, 'sql', 'python', 'php']


def _validate_choice(option_name, value, valid_choices):
"""Validate that a value is one of the allowed choices."""
if value not in valid_choices:
raise SQLParseError(f'Invalid value for {option_name}: {value!r}')


def _validate_positive_int(option_name, value, min_value=1):
"""Validate and coerce to a positive integer."""
try:
value = int(value)
except (TypeError, ValueError):
raise SQLParseError(f'{option_name} requires an integer')
if value < min_value:
raise SQLParseError(
f'{option_name} requires a positive integer'
if min_value == 1
else f'{option_name} requires an integer > {min_value - 1}'
)
return value


def validate_options(options):
"""Validates options."""
kwcase = options.get('keyword_case')
if kwcase not in [None, 'upper', 'lower', 'capitalize']:
raise SQLParseError('Invalid value for keyword_case: '
f'{kwcase!r}')

idcase = options.get('identifier_case')
if idcase not in [None, 'upper', 'lower', 'capitalize']:
raise SQLParseError('Invalid value for identifier_case: '
f'{idcase!r}')

ofrmt = options.get('output_format')
if ofrmt not in [None, 'sql', 'python', 'php']:
raise SQLParseError('Unknown output format: '
f'{ofrmt!r}')

strip_comments = options.get('strip_comments', False)
if strip_comments not in [True, False]:
raise SQLParseError('Invalid value for strip_comments: '
f'{strip_comments!r}')

space_around_operators = options.get('use_space_around_operators', False)
if space_around_operators not in [True, False]:
raise SQLParseError('Invalid value for use_space_around_operators: '
f'{space_around_operators!r}')

strip_ws = options.get('strip_whitespace', False)
if strip_ws not in [True, False]:
raise SQLParseError('Invalid value for strip_whitespace: '
f'{strip_ws!r}')
"""Validates formatting options using a declarative schema.

Each option is validated according to its type and allowed values.
Side effects (like setting dependent options) are applied after
validation passes.
"""
# ── Choice-type options ────────────────────────────────────────────────
_validate_choice('keyword_case', options.get('keyword_case'), _CASE_VALUES)
_validate_choice('identifier_case', options.get('identifier_case'), _CASE_VALUES)
_validate_choice('output_format', options.get('output_format'), _OUTPUT_FORMAT_VALUES)
_validate_choice('strip_comments', options.get('strip_comments', False), _BOOL_VALUES)
_validate_choice('use_space_around_operators',
options.get('use_space_around_operators', False), _BOOL_VALUES)
_validate_choice('strip_whitespace', options.get('strip_whitespace', False), _BOOL_VALUES)
_validate_choice('reindent', options.get('reindent', False), _BOOL_VALUES)
_validate_choice('reindent_aligned', options.get('reindent_aligned', False), _BOOL_VALUES)
_validate_choice('indent_tabs', options.get('indent_tabs', False), _BOOL_VALUES)

# ── Integer-type options ───────────────────────────────────────────────
truncate_strings = options.get('truncate_strings')
if truncate_strings is not None:
try:
truncate_strings = int(truncate_strings)
except (ValueError, TypeError):
raise SQLParseError('Invalid value for truncate_strings: '
f'{truncate_strings!r}')
if truncate_strings <= 1:
raise SQLParseError('Invalid value for truncate_strings: '
f'{truncate_strings!r}')
truncate_strings = _validate_positive_int(
'truncate_strings', truncate_strings, min_value=2)
options['truncate_strings'] = truncate_strings
options['truncate_char'] = options.get('truncate_char', '[...]')

indent_width = options.get('indent_width', 2)
options['indent_width'] = _validate_positive_int('indent_width', indent_width)

wrap_after = options.get('wrap_after', 0)
options['wrap_after'] = _validate_positive_int('wrap_after', wrap_after, min_value=0)

right_margin = options.get('right_margin')
if right_margin is not None:
right_margin = _validate_positive_int(
'right_margin', right_margin, min_value=11)
options['right_margin'] = right_margin

# ── Side effects: dependent options ────────────────────────────────────
indent_columns = options.get('indent_columns', False)
if indent_columns not in [True, False]:
raise SQLParseError('Invalid value for indent_columns: '
f'{indent_columns!r}')
elif indent_columns:
_validate_choice('indent_columns', indent_columns, _BOOL_VALUES)
if indent_columns:
options['reindent'] = True # enforce reindent
options['indent_columns'] = indent_columns

reindent = options.get('reindent', False)
if reindent not in [True, False]:
raise SQLParseError('Invalid value for reindent: '
f'{reindent!r}')
elif reindent:
options['strip_whitespace'] = True

reindent_aligned = options.get('reindent_aligned', False)
if reindent_aligned not in [True, False]:
raise SQLParseError('Invalid value for reindent_aligned: '
f'{reindent!r}')
elif reindent_aligned:
options['strip_whitespace'] = True

indent_after_first = options.get('indent_after_first', False)
if indent_after_first not in [True, False]:
raise SQLParseError('Invalid value for indent_after_first: '
f'{indent_after_first!r}')
_validate_choice('indent_after_first', indent_after_first, _BOOL_VALUES)
options['indent_after_first'] = indent_after_first

indent_tabs = options.get('indent_tabs', False)
if indent_tabs not in [True, False]:
raise SQLParseError('Invalid value for indent_tabs: '
f'{indent_tabs!r}')
elif indent_tabs:
options['indent_char'] = '\t'
else:
options['indent_char'] = ' '

indent_width = options.get('indent_width', 2)
try:
indent_width = int(indent_width)
except (TypeError, ValueError):
raise SQLParseError('indent_width requires an integer')
if indent_width < 1:
raise SQLParseError('indent_width requires a positive integer')
options['indent_width'] = indent_width

wrap_after = options.get('wrap_after', 0)
try:
wrap_after = int(wrap_after)
except (TypeError, ValueError):
raise SQLParseError('wrap_after requires an integer')
if wrap_after < 0:
raise SQLParseError('wrap_after requires a positive integer')
options['wrap_after'] = wrap_after

comma_first = options.get('comma_first', False)
if comma_first not in [True, False]:
raise SQLParseError('comma_first requires a boolean value')
_validate_choice('comma_first', comma_first, _BOOL_VALUES)
options['comma_first'] = comma_first

compact = options.get('compact', False)
if compact not in [True, False]:
raise SQLParseError('compact requires a boolean value')
_validate_choice('compact', compact, _BOOL_VALUES)
options['compact'] = compact

right_margin = options.get('right_margin')
if right_margin is not None:
try:
right_margin = int(right_margin)
except (TypeError, ValueError):
raise SQLParseError('right_margin requires an integer')
if right_margin < 10:
raise SQLParseError('right_margin requires an integer > 10')
options['right_margin'] = right_margin
if options.get('reindent', False):
options['strip_whitespace'] = True

if options.get('reindent_aligned', False):
options['strip_whitespace'] = True

if options.get('indent_tabs', False):
options['indent_char'] = '\t'
else:
options['indent_char'] = ' '

return options

Expand Down
Loading