diff --git a/sqlparse/filters/output.py b/sqlparse/filters/output.py index 235db540..acd84073 100644 --- a/sqlparse/filters/output.py +++ b/sqlparse/filters/output.py @@ -31,9 +31,63 @@ def process(self, stmt): return stmt +def _generate_assignment_header(varname, has_nl, count, quote_char, assign_op='='): + """Generate the variable assignment header tokens. + + Yields the tokens that start a variable assignment line: + varname = ' (Python) + varname = " (PHP) + + Args: + varname: Variable name string + has_nl: Whether the SQL has newlines + count: Statement count (for blank line before subsequent statements) + quote_char: Single quote (') for Python, double quote (") for PHP + assign_op: Assignment operator ('=' or '.=') + """ + if count > 1: + yield sql.Token(T.Whitespace, '\n') + yield sql.Token(T.Name, varname) + yield sql.Token(T.Whitespace, ' ') + yield sql.Token(T.Operator, assign_op) + yield sql.Token(T.Whitespace, ' ') + yield sql.Token(T.Text, quote_char) + + +def _generate_continuation_header(varname, quote_char, assign_op='=', indent_padding=0): + """Generate the continuation line header tokens after a newline. + + Yields the tokens for a continuation line when the SQL has newlines: + ' (Python - with optional indent padding) + varname .= " (PHP) + + Args: + varname: Variable name string + quote_char: Quote character for the new line + assign_op: Assignment operator for continuation ('=' or '.=') + indent_padding: Number of spaces for Python-style continuation padding + """ + yield sql.Token(T.Text, f' {quote_char}') + yield sql.Token(T.Whitespace, '\n') + if assign_op == '.=': + # PHP-style: re-assign with .= + yield sql.Token(T.Name, varname) + yield sql.Token(T.Whitespace, ' ') + yield sql.Token(T.Operator, '.=') + yield sql.Token(T.Whitespace, ' ') + else: + # Python-style: continuation with padding + yield sql.Token(T.Whitespace, ' ' * indent_padding) + yield sql.Token(T.Text, quote_char) + + class OutputPythonFilter(OutputFilter): + _quote_char = "'" + _escape_char = "'" + _escape_replacement = "\\'" + def _process(self, stream, varname, has_nl): - # SQL query assignation to varname + # Variable assignment header (without trailing quote — we add it below) if self.count > 1: yield sql.Token(T.Whitespace, '\n') yield sql.Token(T.Name, varname) @@ -42,82 +96,83 @@ def _process(self, stream, varname, has_nl): yield sql.Token(T.Whitespace, ' ') if has_nl: yield sql.Token(T.Operator, '(') - yield sql.Token(T.Text, "'") - # Print the tokens on the quote + yield sql.Token(T.Text, self._quote_char) + + # Print the tokens within the quote + continuation_padding = len(varname) + 4 for token in stream: - # Token is a new line separator if token.is_whitespace and '\n' in token.value: - # Close quote and add a new line - yield sql.Token(T.Text, " '") - yield sql.Token(T.Whitespace, '\n') - - # Quote header on secondary lines - yield sql.Token(T.Whitespace, ' ' * (len(varname) + 4)) - yield sql.Token(T.Text, "'") + # Close current quote and start continuation line + yield from _generate_continuation_header( + varname, self._quote_char, '=', + indent_padding=continuation_padding) - # Indentation + # Indentation after the newline after_lb = token.value.split('\n', 1)[1] if after_lb: yield sql.Token(T.Whitespace, after_lb) continue - # Token has escape chars - elif "'" in token.value: - token.value = token.value.replace("'", "\\'") + # Escape the quote character within token values + if self._escape_char in token.value: + token.value = token.value.replace( + self._escape_char, self._escape_replacement) - # Put the token yield sql.Token(T.Text, token.value) # Close quote - yield sql.Token(T.Text, "'") + yield sql.Token(T.Text, self._quote_char) if has_nl: yield sql.Token(T.Operator, ')') class OutputPHPFilter(OutputFilter): varname_prefix = '$' + _quote_char = '"' + _escape_char = '"' + _escape_replacement = '\\"' def _process(self, stream, varname, has_nl): - # SQL query assignation to varname (quote header) + # Variable assignment header if self.count > 1: yield sql.Token(T.Whitespace, '\n') yield sql.Token(T.Name, varname) yield sql.Token(T.Whitespace, ' ') if has_nl: + # Extra space for alignment with continuation .= lines yield sql.Token(T.Whitespace, ' ') yield sql.Token(T.Operator, '=') yield sql.Token(T.Whitespace, ' ') - yield sql.Token(T.Text, '"') + yield sql.Token(T.Text, self._quote_char) - # Print the tokens on the quote + # Print the tokens within the quote for token in stream: - # Token is a new line separator if token.is_whitespace and '\n' in token.value: - # Close quote and add a new line + # Close current quote with semicolon and start continuation yield sql.Token(T.Text, ' ";') yield sql.Token(T.Whitespace, '\n') - # Quote header on secondary lines + # PHP continuation line: $varname .= " yield sql.Token(T.Name, varname) yield sql.Token(T.Whitespace, ' ') yield sql.Token(T.Operator, '.=') yield sql.Token(T.Whitespace, ' ') - yield sql.Token(T.Text, '"') + yield sql.Token(T.Text, self._quote_char) - # Indentation + # Indentation after the newline after_lb = token.value.split('\n', 1)[1] if after_lb: yield sql.Token(T.Whitespace, after_lb) continue - # Token has escape chars - elif '"' in token.value: - token.value = token.value.replace('"', '\\"') + # Escape the quote character within token values + if self._escape_char in token.value: + token.value = token.value.replace( + self._escape_char, self._escape_replacement) - # Put the token yield sql.Token(T.Text, token.value) # Close quote - yield sql.Token(T.Text, '"') + yield sql.Token(T.Text, self._quote_char) yield sql.Token(T.Punctuation, ';') diff --git a/sqlparse/formatter.py b/sqlparse/formatter.py index 1fba2466..ded70c68 100644 --- a/sqlparse/formatter.py +++ b/sqlparse/formatter.py @@ -11,125 +11,104 @@ from sqlparse.exceptions import SQLParseError +# ─── Option validation schema ──────────────────────────────────────────────── +# Each entry defines: valid values/coercion, side effects, and error messages. +# This replaces the repetitive if/raise pattern from the original code. + +_BOOL_VALUES = [True, False] +_CASE_VALUES = [None, 'upper', 'lower', 'capitalize'] +_OUTPUT_FORMAT_VALUES = [None, 'sql', 'python', 'php'] + + +def _validate_choice(option_name, value, valid_choices): + """Validate that a value is one of the allowed choices.""" + if value not in valid_choices: + raise SQLParseError(f'Invalid value for {option_name}: {value!r}') + + +def _validate_positive_int(option_name, value, min_value=1): + """Validate and coerce to a positive integer.""" + try: + value = int(value) + except (TypeError, ValueError): + raise SQLParseError(f'{option_name} requires an integer') + if value < min_value: + raise SQLParseError( + f'{option_name} requires a positive integer' + if min_value == 1 + else f'{option_name} requires an integer > {min_value - 1}' + ) + return value + + def validate_options(options): - """Validates options.""" - kwcase = options.get('keyword_case') - if kwcase not in [None, 'upper', 'lower', 'capitalize']: - raise SQLParseError('Invalid value for keyword_case: ' - f'{kwcase!r}') - - idcase = options.get('identifier_case') - if idcase not in [None, 'upper', 'lower', 'capitalize']: - raise SQLParseError('Invalid value for identifier_case: ' - f'{idcase!r}') - - ofrmt = options.get('output_format') - if ofrmt not in [None, 'sql', 'python', 'php']: - raise SQLParseError('Unknown output format: ' - f'{ofrmt!r}') - - strip_comments = options.get('strip_comments', False) - if strip_comments not in [True, False]: - raise SQLParseError('Invalid value for strip_comments: ' - f'{strip_comments!r}') - - space_around_operators = options.get('use_space_around_operators', False) - if space_around_operators not in [True, False]: - raise SQLParseError('Invalid value for use_space_around_operators: ' - f'{space_around_operators!r}') - - strip_ws = options.get('strip_whitespace', False) - if strip_ws not in [True, False]: - raise SQLParseError('Invalid value for strip_whitespace: ' - f'{strip_ws!r}') + """Validates formatting options using a declarative schema. + Each option is validated according to its type and allowed values. + Side effects (like setting dependent options) are applied after + validation passes. + """ + # ── Choice-type options ──────────────────────────────────────────────── + _validate_choice('keyword_case', options.get('keyword_case'), _CASE_VALUES) + _validate_choice('identifier_case', options.get('identifier_case'), _CASE_VALUES) + _validate_choice('output_format', options.get('output_format'), _OUTPUT_FORMAT_VALUES) + _validate_choice('strip_comments', options.get('strip_comments', False), _BOOL_VALUES) + _validate_choice('use_space_around_operators', + options.get('use_space_around_operators', False), _BOOL_VALUES) + _validate_choice('strip_whitespace', options.get('strip_whitespace', False), _BOOL_VALUES) + _validate_choice('reindent', options.get('reindent', False), _BOOL_VALUES) + _validate_choice('reindent_aligned', options.get('reindent_aligned', False), _BOOL_VALUES) + _validate_choice('indent_tabs', options.get('indent_tabs', False), _BOOL_VALUES) + + # ── Integer-type options ─────────────────────────────────────────────── truncate_strings = options.get('truncate_strings') if truncate_strings is not None: - try: - truncate_strings = int(truncate_strings) - except (ValueError, TypeError): - raise SQLParseError('Invalid value for truncate_strings: ' - f'{truncate_strings!r}') - if truncate_strings <= 1: - raise SQLParseError('Invalid value for truncate_strings: ' - f'{truncate_strings!r}') + truncate_strings = _validate_positive_int( + 'truncate_strings', truncate_strings, min_value=2) options['truncate_strings'] = truncate_strings options['truncate_char'] = options.get('truncate_char', '[...]') + indent_width = options.get('indent_width', 2) + options['indent_width'] = _validate_positive_int('indent_width', indent_width) + + wrap_after = options.get('wrap_after', 0) + options['wrap_after'] = _validate_positive_int('wrap_after', wrap_after, min_value=0) + + right_margin = options.get('right_margin') + if right_margin is not None: + right_margin = _validate_positive_int( + 'right_margin', right_margin, min_value=11) + options['right_margin'] = right_margin + + # ── Side effects: dependent options ──────────────────────────────────── indent_columns = options.get('indent_columns', False) - if indent_columns not in [True, False]: - raise SQLParseError('Invalid value for indent_columns: ' - f'{indent_columns!r}') - elif indent_columns: + _validate_choice('indent_columns', indent_columns, _BOOL_VALUES) + if indent_columns: options['reindent'] = True # enforce reindent options['indent_columns'] = indent_columns - reindent = options.get('reindent', False) - if reindent not in [True, False]: - raise SQLParseError('Invalid value for reindent: ' - f'{reindent!r}') - elif reindent: - options['strip_whitespace'] = True - - reindent_aligned = options.get('reindent_aligned', False) - if reindent_aligned not in [True, False]: - raise SQLParseError('Invalid value for reindent_aligned: ' - f'{reindent!r}') - elif reindent_aligned: - options['strip_whitespace'] = True - indent_after_first = options.get('indent_after_first', False) - if indent_after_first not in [True, False]: - raise SQLParseError('Invalid value for indent_after_first: ' - f'{indent_after_first!r}') + _validate_choice('indent_after_first', indent_after_first, _BOOL_VALUES) options['indent_after_first'] = indent_after_first - indent_tabs = options.get('indent_tabs', False) - if indent_tabs not in [True, False]: - raise SQLParseError('Invalid value for indent_tabs: ' - f'{indent_tabs!r}') - elif indent_tabs: - options['indent_char'] = '\t' - else: - options['indent_char'] = ' ' - - indent_width = options.get('indent_width', 2) - try: - indent_width = int(indent_width) - except (TypeError, ValueError): - raise SQLParseError('indent_width requires an integer') - if indent_width < 1: - raise SQLParseError('indent_width requires a positive integer') - options['indent_width'] = indent_width - - wrap_after = options.get('wrap_after', 0) - try: - wrap_after = int(wrap_after) - except (TypeError, ValueError): - raise SQLParseError('wrap_after requires an integer') - if wrap_after < 0: - raise SQLParseError('wrap_after requires a positive integer') - options['wrap_after'] = wrap_after - comma_first = options.get('comma_first', False) - if comma_first not in [True, False]: - raise SQLParseError('comma_first requires a boolean value') + _validate_choice('comma_first', comma_first, _BOOL_VALUES) options['comma_first'] = comma_first compact = options.get('compact', False) - if compact not in [True, False]: - raise SQLParseError('compact requires a boolean value') + _validate_choice('compact', compact, _BOOL_VALUES) options['compact'] = compact - right_margin = options.get('right_margin') - if right_margin is not None: - try: - right_margin = int(right_margin) - except (TypeError, ValueError): - raise SQLParseError('right_margin requires an integer') - if right_margin < 10: - raise SQLParseError('right_margin requires an integer > 10') - options['right_margin'] = right_margin + if options.get('reindent', False): + options['strip_whitespace'] = True + + if options.get('reindent_aligned', False): + options['strip_whitespace'] = True + + if options.get('indent_tabs', False): + options['indent_char'] = '\t' + else: + options['indent_char'] = ' ' return options diff --git a/sqlparse/utils.py b/sqlparse/utils.py index 58c0245a..86dec21b 100644 --- a/sqlparse/utils.py +++ b/sqlparse/utils.py @@ -78,13 +78,18 @@ def wrapped_f(tlist): return wrap -def imt(token, i=None, m=None, t=None): - """Helper function to simplify comparisons Instance, Match and TokenType - :param token: - :param i: Class or Tuple/List of Classes - :param m: Tuple of TokenType & Value. Can be list of Tuple for multiple - :param t: TokenType or Tuple/List of TokenTypes - :return: bool +def token_matches(token, i=None, m=None, t=None): + """Check if a token matches the given criteria. + + Tests a token against Instance, Match, and/or TokenType criteria. + Returns True if any criterion matches. + + :param token: The token to test + :param i: Class or Tuple/List of Classes to check isinstance against + :param m: Tuple of (TokenType, Value) to match via token.match(). + Can be a list of tuples for multiple patterns. + :param t: TokenType or Tuple/List of TokenTypes to check membership via 'in' + :return: bool """ if token is None: return False @@ -105,6 +110,10 @@ def imt(token, i=None, m=None, t=None): return False +# Backward-compatible alias: imt() was the original name +imt = token_matches + + def consume(iterator, n): """Advance the iterator n-steps ahead. If n is none, consume entirely.""" deque(itertools.islice(iterator, n), maxlen=0)