Using Protovalidate in Kafka with Bufstream
This tutorial shows how to add Protovalidate to a Go streaming application powered by Bufstream:
- Adding the Protovalidate dependency.
- Annotating Protobuf files.
- Regenerating code.
- Enabling validation in the broker.
Prerequisites
- Install the Buf CLI. If you already have, run
buf --version
to verify that you're using at least1.32.0
. - Use Linux or Mac and have
git
,go
, andmake
installed and in your$PATH
. -
Clone the
buf-examples
repo and navigate to theprotovalidate/bufstream/start
directory:
Goal
This tutorial's Kafka producer publishes EmailUpdated
messages that change a fictitious user's email from an old value to a new value, but it doesn't have any input validation. Your goal is to add the following validation rules using Protovalidate:
- Messages must provide an
id
value that's a valid UUID. - Messages must provide a valid email address for the
new_email_address
field.
Run the code
Before you begin to code, verify that the example is working. In one terminal window, start the Bufstream broker. It's the default make
target:
After a little while, you should see that it has started:
time=2025-02-18T19:46:46.368-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-18T19:46:46.369-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1
In a second terminal window, start the producer with the producer
target:
After a few seconds, you should see that it has started:
time=2025-02-18T19:47:28.780-05:00 level=INFO msg="starting produce"
time=2025-02-18T19:47:34.541-05:00 level=INFO msg="produced semantically valid protobuf message" id=f00d937e-3c94-4fce-b51a-eab6c1f164fd
time=2025-02-18T19:47:34.795-05:00 level=INFO msg="produced semantically invalid protobuf message" id=53cba699-f972-4e6b-9512-4e40ff6eedf7
In a third terminal window, start the consumer with the consumer
target:
After a few seconds, you should see that it has started:
time=2025-02-18T19:47:56.717-05:00 level=INFO msg="starting consume"
time=2025-02-18T19:48:09.249-05:00 level=INFO msg="consumed message with new email margareterutherford@mckenzie.name and redacted old email"
time=2025-02-18T19:48:10.251-05:00 level=INFO msg="consumed message with new email bad-email-fish and redacted old email"
Note that there's something fishy in the last log message—the new email isn't a valid address. You're about to combine Protovalidate with Bufstream's Kafka capabilities to make sure invalid messages are rejected by the broker.
It's okay to use Ctrl-C
to stop the broker, producer, and consumer before continuing.
Explore tutorial code
This tutorial uses the example in bufstream/start
. All filenames are relative to this directory.
Makefile
This project's Makefile
provides targets for downloading and running the Bufstream broker and the included Go-based producer and consumer applications.
One item to note is that producer and consumer targets both specify Kafka targets, groups, and Buf Schema Registry (BSR)-based Confluent Schema Registry URLs. When using Protovalidate in your own Bufstream-based Kafka applications, they should be configured for your equivalents.
.PHONY: producer
producer:
go run ./cmd/bufstream-demo-produce \
--topic email-updated \
--group email-verifier \
--csr-url "https://demo.buf.dev/integrations/confluent/bufstream-demo"
Protobuf
The project provides a Protobuf message representing a user updating their email address:
message EmailUpdated {
// Confluent Schema Registry (CSR) configuration for this message.
option (buf.confluent.v1.subject) = {
instance_name: "bufstream-demo"
name: "email-updated-value"
};
// The ID of the user associated with this email address update.
string id = 1;
// The old email address.
string old_email_address = 2;
// The new email address.
string new_email_address = 3;
}
YAML
When you add Protovalidate, you'll update the following files:
buf.yaml
: Protovalidate must be added as a dependency.buf.gen.yaml
: To avoid a common issue in projects using the Buf CLI's managed mode, you'll see how to exclude Protovalidate from package renaming.config/bufstream.yaml
: To inform the Bufstream broker that semantically invalid messages (messages failing Protovalidate validation) should be rejected, you'll need to enable a configuration option.
Now that you know your way around the example code, it's time to integrate Protovalidate.
Integrate Protovalidate
It's time to add Protovalidate to your project. It may be useful to read the Protovalidate overview and its quickstart before continuing.
Add Protovalidate dependency
Because Protovalidate is a publicly available Buf Schema Registry (BSR) module, it's simple to add it to any Buf CLI project.
-
Add Protovalidate to your Go project:
-
Add Protovalidate as a dependency to
buf.yaml
.buf.yaml# buf.yaml files define how to build the .proto files within your local directory # # See https://buf.build/docs/tutorials/getting-started-with-buf-cli for more details. version: v2 modules: # Our .proto files live within the proto directory. - path: proto deps: # We import "buf/confluent/v1/extensions.proto" within our example files, which # comes from the demo.buf.dev/bufbuild/protovalidate module. - buf.build/bufbuild/confluent + - buf.build/bufbuild/protovalidate
-
Use any of your open terminal windows to update dependencies with the Buf CLI. You'll be warned that Protovalidate is declared but unused. That's fine.
-
Because this example uses managed mode, exclude Protovalidate from any updates to
go_package
.buf.gen.yaml# buf.gen.yaml files define how to generate stubs using the buf generate command. # # See https://buf.build/docs/generate/tutorial for more details. version: v2 managed: enabled: true override: - file_option: go_package_prefix value: github.com/bufbuild/buf-examples/protovalidate/bufstream/start - file_option: go_package_prefix module: buf.build/bufbuild/confluent value: buf.build/gen/go/bufbuild/confluent/protocolbuffers/go + disable: + - file_option: go_package + module: buf.build/bufbuild/protovalidate plugins: - remote: buf.build/protocolbuffers/go out: gen opt: paths=source_relative clean: true
-
In any of your open terminal windows, verify that configuration is complete by running
buf generate
. It should complete with no error.
Further reading and protoc
Learn more about incorporating Protovalidate and protoc
support in the Adding protovalidate page.
Add Protovalidate rules
You'll now add standard rules to demo.proto
requiring that the id
field is a UUID and that the new_email_address
is a valid email address. Start by importing Protovalidate:
syntax = "proto3";
// Implements types for the Bufstream demo.
package bufstream.demo.v1;
// We import extensions.proto to use a custom option that allows us to associate
// a message with a specific subject.
//
// See the https://buf.build/bufbuild/confluent module
// for the full documentation.
import "buf/confluent/v1/extensions.proto";
+ import "buf/validate/validate.proto";
Next, add field-level validation rules:
- Use
string.uuid
to declare thatid
must be present and a valid UUID. - Use
required
andstring.email
to validate thenew_email_address
field.
message EmailUpdated {
// Code omitted for brevity
// The ID of the user associated with this email address update.
- string id = 1;
+ string id = 1 [
+ (buf.validate.field).string.uuid = true
+ ];
// The old email address.
string old_email_address = 2 [
// When data quality enforcement is enabled, debug_redact fields can be optionally redacted
// on a per-topic basis when records are read by producers.
//
// This is generally used for sensitive fields.
debug_redact = true
];
// The new email address.
- string new_email_address = 3;
+ string new_email_address = 3 [
+ (buf.validate.field).required = true,
+ (buf.validate.field).string.email = true
+ ];
}
Compile Protobuf
Next, use any terminal window to compile your Protobuf and regenerate code, adding the Protovalidate options to all of your message descriptors:
With regenerated code, your broker, producer and consumer should still build and start. (If you're still running the either, stop them with Ctrl-c
.)
Restart the broker:
After a little while, you should see that it has started:
time=2025-02-19T10:32:46.909-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-19T10:32:46.910-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1
Restart the producer:
After a few seconds, you should see that it has started:
time=2025-02-19T10:40:47.878-05:00 level=INFO msg="starting produce"
time=2025-02-19T10:41:13.985-05:00 level=INFO msg="produced semantically valid protobuf message" id=9e253fe7-5123-413b-b010-ab4caba5142b
time=2025-02-19T10:41:14.237-05:00 level=INFO msg="produced semantically invalid protobuf message" id=8ef986cd-a865-420d-857b-05ebac436984
In a third terminal window, start the consumer with the consumer
target:
After a few seconds, you should see that it has started, but these log messages may be a surprise: the broker is still accepting updates with invalid email addresses, and the consumer still receives them.
time=2025-02-19T10:40:58.132-05:00 level=INFO msg="starting consume"
time=2025-02-19T10:41:14.662-05:00 level=INFO msg="consumed message with new email madgegottlieb@kunze.com and redacted old email"
time=2025-02-19T10:41:15.663-05:00 level=INFO msg="consumed message with new email bad-email-cheetah and redacted old email"
The producer fails to reject the invalid messages because Bufstream hasn't been told to validate inbound requests.
Add Protovalidate enforcement
Enforcing Protovalidate logic in a Bufstream broker is a simple configuration change.
Follow these steps to begin enforcing Protovalidate rules in Bufstream:
- Stop the broker, producer, and consumer with
Ctrl-c
. - Open
config/bufstream.yaml
. -
Make the following change to the
data_enforcement.produce
configuration, stating that validation should take place and failures should cause rejection:config/bufstream.yamldata_enforcement: schema_registries: - name: csr confluent: url: "https://demo.buf.dev/integrations/confluent/bufstream-demo" instance_name: "bufstream-demo" produce: - topics: { all: true } schema_registry: csr values: on_parse_error: REJECT_BATCH + validation: + on_error: REJECT_BATCH
Bufstream is now configured to reject messages that fail to pass your validation rules. Restart your broker:
After a few seconds, you should see that it has started:
time=2025-02-19T10:45:18.607-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-19T10:45:18.608-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1
Next, start your producer:
After a few seconds, you should see that it has started:
time=2025-02-19T10:45:21.797-05:00 level=INFO msg="starting produce"
time=2025-02-19T10:45:23.688-05:00 level=INFO msg="produced semantically valid protobuf message" id=9e05b5fc-3c18-4c8c-a4ea-09ff161bff32
This time, note that invalid messages are being rejected:
time=2025-02-19T10:45:23.809-05:00 level=ERROR msg="error on produce of semantically invalid protobuf message" error="failed to produce: INVALID_RECORD: This record has failed the validation on broker and hence be rejected."
Back in the broker's output, the details of the error are available.
time=2025-02-19T10:45:23.808-05:00 level=ERROR msg="data enforcement error" kafka.topic.name=email-updated offset=0 error="enforcement validate error, rejecting batch: validation error:\n - new_email_address: value must be a valid email address [string.email]" action=REJECT_BATCH kafka.kafka.api.key=Produce kafka.kafka.api.version=10 kafka.correlation_id=1
In a third terminal window, start the consumer with the consumer
target:
After a few seconds, you should see that it has started and no invalid messages are received:
time=2025-02-19T10:45:23.926-05:00 level=INFO msg="starting consume"
time=2025-02-19T10:45:24.518-05:00 level=INFO msg="consumed message with new email fridamurphy@veum.io and redacted old email"
You've now added Protovalidate enforcement to a Bufstream broker. All consumers can trust that only semantically valid messages are in the stream.
Conclusion
In this tutorial, you've learned how to add Protovalidate to your Protobuf project, declare validation rules in your Protobuf files, and enable their enforcement within Bufstream.
Further reading
- Add Protovalidate's standard rules to schemas
- Use CEL expressions to declare field and message-level custom rules
- Reuse logic with predefined rules
- Add Protovalidate to Connect Go
- Add Protovalidate to gRPC with quickstarts for gRPC and Go, gRPC and Java, or gRPC and Python.