Skip to content

fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633

Open
chitender wants to merge 1 commit into
openmeterio:mainfrom
chitender:fix/kafka-sasl-plaintext
Open

fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633
chitender wants to merge 1 commit into
openmeterio:mainfrom
chitender:fix/kafka-sasl-plaintext

Conversation

@chitender

@chitender chitender commented Jul 1, 2026

Copy link
Copy Markdown

The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected.

Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink.

Notes for reviewer
▎ Added a table-driven test for createKafkaConfig covering PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, and SASL_PLAINTEXT+SCRAM. Verified in production: OpenMeter pods connect successfully to SASL_PLAINTEXT + PLAIN brokers with this change.

Summary by CodeRabbit

  • New Features

    • Kafka connections now support SASL authentication for both secure and plaintext security protocols.
    • TLS is enabled only when using the secure SASL-over-TLS option, matching the configured connection mode.
  • Bug Fixes

    • Improved handling of Kafka authentication settings so usernames, passwords, and SASL mechanism choices are applied consistently.
    • Added test coverage for multiple Kafka security configurations, including SCRAM-based authentication.

The Sarama-based event publisher only enabled SASL when securityProtocol
was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers
fail with "client has run out of available brokers to talk to: EOF" since
the client opened an unauthenticated connection that brokers rejected.

Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind
SASL_SSL only. This matches the confluent-kafka-go path used by the sink.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@chitender chitender requested a review from a team as a code owner July 1, 2026 15:29
@coderabbitai

coderabbitai Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

The createKafkaConfig function in the Kafka broker driver was refactored to support SASL for both SASL_SSL and SASL_PLAINTEXT security protocols, enabling TLS only for SASL_SSL. A new unit test validates SASL/TLS enablement, credential propagation, and SCRAM client generator selection.

Changes

Kafka SASL/TLS config refactor

Layer / File(s) Summary
SASL/TLS enablement logic
openmeter/watermill/driver/kafka/broker.go
Replaced the single SASL_SSL-only conditional with a switch that enables SASL for both SASL_SSL and SASL_PLAINTEXT, restricts TLS to SASL_SSL only, and selects the SCRAM client generator based on the configured SASL mechanism.
Test coverage for SASL scenarios
openmeter/watermill/driver/kafka/broker_test.go
Adds a new test that checks SASL/TLS enablement flags, SASL credential and mechanism propagation, and correct presence/absence of the SCRAM client generator function.

Estimated code review effort: 2 (Simple) | ~12 minutes

Sequence Diagram(s)

Not applicable — this change is a config-logic refactor with a matching test, not a multi-component control flow.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly matches the main change: adding SASL_PLAINTEXT support for the Watermill Kafka publisher.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR updates the Watermill Kafka publisher config to support SASL_PLAINTEXT. The main changes are:

  • SASL setup now runs for both SASL_SSL and SASL_PLAINTEXT.
  • TLS setup stays limited to SASL_SSL.
  • New table-driven tests cover plaintext, SASL plaintext, SASL SSL, and SCRAM setup.

Confidence Score: 5/5

This looks safe to merge after a small config-validation cleanup.

  • The main SASL and TLS behavior matches the intended protocol split.
  • Missing SASL fields can still reach runtime connection setup and produce a less clear Kafka failure.

openmeter/watermill/driver/kafka/broker.go

Important Files Changed

Filename Overview
openmeter/watermill/driver/kafka/broker.go Expands Sarama SASL setup to SASL_PLAINTEXT and keeps TLS gated to SASL_SSL; the main follow-up is clearer validation for missing SASL fields.
openmeter/watermill/driver/kafka/broker_test.go Adds focused tests for SASL and TLS config combinations.

Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
openmeter/watermill/driver/kafka/broker.go:87-88
**SASL Config Allows Empty Mechanism**

When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.

Reviews (1): Last reviewed commit: "fix(kafka): support SASL_PLAINTEXT in wa..." | Re-trigger Greptile

Comment on lines +87 to +88
switch o.KafkaConfig.SecurityProtocol {
case "SASL_SSL", "SASL_PLAINTEXT":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 SASL Config Allows Empty Mechanism

When securityProtocol is SASL_PLAINTEXT but saslMechanisms or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.

Prompt To Fix With AI
This is a comment left during a code review.
Path: openmeter/watermill/driver/kafka/broker.go
Line: 87-88

Comment:
**SASL Config Allows Empty Mechanism**

When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code Fix in Codex

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
openmeter/watermill/driver/kafka/broker_test.go (1)

55-67: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Add rows for the untested SCRAM paths.

Nice table setup. One small gap: this only exercises SCRAM-SHA-512 over SASL_PLAINTEXT; a broken SCRAM-SHA-256 branch, or a regression in existing SASL_SSL + SCRAM behavior, would still pass.

Suggested extra cases
 		{
 			name: "SASL_PLAINTEXT with SCRAM wires up a SCRAM client generator",
 			kafkaConfig: config.KafkaConfiguration{
 				Broker:           "localhost:9092",
 				SecurityProtocol: "SASL_PLAINTEXT",
 				SaslMechanisms:   sarama.SASLTypeSCRAMSHA512,
 				SaslUsername:     "user",
 				SaslPassword:     "pass",
 			},
 			expectSASLEnabled:    true,
 			expectTLSEnabled:     false,
 			expectSCRAMGenerator: true,
 		},
+		{
+			name: "SASL_PLAINTEXT with SCRAM-SHA-256 wires up a SCRAM client generator",
+			kafkaConfig: config.KafkaConfiguration{
+				Broker:           "localhost:9092",
+				SecurityProtocol: "SASL_PLAINTEXT",
+				SaslMechanisms:   sarama.SASLTypeSCRAMSHA256,
+				SaslUsername:     "user",
+				SaslPassword:     "pass",
+			},
+			expectSASLEnabled:    true,
+			expectTLSEnabled:     false,
+			expectSCRAMGenerator: true,
+		},
+		{
+			name: "SASL_SSL with SCRAM wires up a SCRAM client generator",
+			kafkaConfig: config.KafkaConfiguration{
+				Broker:           "localhost:9092",
+				SecurityProtocol: "SASL_SSL",
+				SaslMechanisms:   sarama.SASLTypeSCRAMSHA512,
+				SaslUsername:     "user",
+				SaslPassword:     "pass",
+			},
+			expectSASLEnabled:    true,
+			expectTLSEnabled:     true,
+			expectSCRAMGenerator: true,
+		},

As per path instructions, tests should be comprehensive and cover the changes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/watermill/driver/kafka/broker_test.go` around lines 55 - 67, Add
the missing table-driven test cases in broker_test.go so the SCRAM coverage
includes the other untested paths, not just SASL_PLAINTEXT with
sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration scenarios to
verify SASL_SSL with SCRAM still wires up the SCRAM client generator, and add a
separate case for SASL_PLAINTEXT/SASL_SSL using sarama.SASLTypeSCRAMSHA256 so
both SCRAM branches are exercised. Keep the assertions aligned with the current
expectSASLEnabled, expectTLSEnabled, and expectSCRAMGenerator checks.

Source: Path instructions

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@openmeter/watermill/driver/kafka/broker_test.go`:
- Around line 55-67: Add the missing table-driven test cases in broker_test.go
so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT
with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration
scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client
generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using
sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the
assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and
expectSCRAMGenerator checks.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 24ea5a65-90e3-4cff-b628-bd2c6819d57e

📥 Commits

Reviewing files that changed from the base of the PR and between ede95f6 and 0399967.

📒 Files selected for processing (2)
  • openmeter/watermill/driver/kafka/broker.go
  • openmeter/watermill/driver/kafka/broker_test.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant