fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633
fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633chitender wants to merge 1 commit into
Conversation
The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected. Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThe ChangesKafka SASL/TLS config refactor
Estimated code review effort: 2 (Simple) | ~12 minutes Sequence Diagram(s)Not applicable — this change is a config-logic refactor with a matching test, not a multi-component control flow. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Greptile SummaryThis PR updates the Watermill Kafka publisher config to support SASL_PLAINTEXT. The main changes are:
Confidence Score: 5/5This looks safe to merge after a small config-validation cleanup.
openmeter/watermill/driver/kafka/broker.go Important Files Changed
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
openmeter/watermill/driver/kafka/broker.go:87-88
**SASL Config Allows Empty Mechanism**
When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.
Reviews (1): Last reviewed commit: "fix(kafka): support SASL_PLAINTEXT in wa..." | Re-trigger Greptile |
| switch o.KafkaConfig.SecurityProtocol { | ||
| case "SASL_SSL", "SASL_PLAINTEXT": |
There was a problem hiding this comment.
SASL Config Allows Empty Mechanism
When securityProtocol is SASL_PLAINTEXT but saslMechanisms or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: openmeter/watermill/driver/kafka/broker.go
Line: 87-88
Comment:
**SASL Config Allows Empty Mechanism**
When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
🧹 Nitpick comments (1)
openmeter/watermill/driver/kafka/broker_test.go (1)
55-67: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd rows for the untested SCRAM paths.
Nice table setup. One small gap: this only exercises SCRAM-SHA-512 over
SASL_PLAINTEXT; a broken SCRAM-SHA-256 branch, or a regression in existingSASL_SSL+ SCRAM behavior, would still pass.Suggested extra cases
{ name: "SASL_PLAINTEXT with SCRAM wires up a SCRAM client generator", kafkaConfig: config.KafkaConfiguration{ Broker: "localhost:9092", SecurityProtocol: "SASL_PLAINTEXT", SaslMechanisms: sarama.SASLTypeSCRAMSHA512, SaslUsername: "user", SaslPassword: "pass", }, expectSASLEnabled: true, expectTLSEnabled: false, expectSCRAMGenerator: true, }, + { + name: "SASL_PLAINTEXT with SCRAM-SHA-256 wires up a SCRAM client generator", + kafkaConfig: config.KafkaConfiguration{ + Broker: "localhost:9092", + SecurityProtocol: "SASL_PLAINTEXT", + SaslMechanisms: sarama.SASLTypeSCRAMSHA256, + SaslUsername: "user", + SaslPassword: "pass", + }, + expectSASLEnabled: true, + expectTLSEnabled: false, + expectSCRAMGenerator: true, + }, + { + name: "SASL_SSL with SCRAM wires up a SCRAM client generator", + kafkaConfig: config.KafkaConfiguration{ + Broker: "localhost:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: sarama.SASLTypeSCRAMSHA512, + SaslUsername: "user", + SaslPassword: "pass", + }, + expectSASLEnabled: true, + expectTLSEnabled: true, + expectSCRAMGenerator: true, + },As per path instructions, tests should be comprehensive and cover the changes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/watermill/driver/kafka/broker_test.go` around lines 55 - 67, Add the missing table-driven test cases in broker_test.go so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and expectSCRAMGenerator checks.Source: Path instructions
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@openmeter/watermill/driver/kafka/broker_test.go`:
- Around line 55-67: Add the missing table-driven test cases in broker_test.go
so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT
with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration
scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client
generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using
sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the
assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and
expectSCRAMGenerator checks.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 24ea5a65-90e3-4cff-b628-bd2c6819d57e
📒 Files selected for processing (2)
openmeter/watermill/driver/kafka/broker.goopenmeter/watermill/driver/kafka/broker_test.go
The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected.
Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink.
Notes for reviewer
▎ Added a table-driven test for createKafkaConfig covering PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, and SASL_PLAINTEXT+SCRAM. Verified in production: OpenMeter pods connect successfully to SASL_PLAINTEXT + PLAIN brokers with this change.
Summary by CodeRabbit
New Features
Bug Fixes