Skip to content

Protovalidate in Kafka with Bufstream#

This quickstart shows how to add Protovalidate to a Go streaming application powered by Bufstream:

  1. Adding the Protovalidate dependency.
  2. Annotating Protobuf files.
  3. Regenerating code.
  4. Enabling validation in the broker.

Prerequisites#

  • Install the Buf CLI. If you already have, run buf --version to verify that you're using at least 1.54.0.
  • Use Linux or Mac and have git, go, and make installed and in your $PATH.
  • Clone the buf-examples repo and navigate to the protovalidate/bufstream/start directory:

    $ git clone https://github.com/bufbuild/buf-examples.git && cd buf-examples/protovalidate/bufstream/start
    

Goal#

This quickstart's Kafka producer publishes EmailUpdated messages that change a fictitious user's email from an old value to a new value, but it doesn't have any input validation. Your goal is to add the following validation rules using Protovalidate:

  1. Messages must provide an id value that's a valid UUID.
  2. Messages must provide a valid email address for the new_email_address field.

Run the code#

Before you begin to code, verify that the example is working. In one terminal window, start the Bufstream broker. It's the default make target:

Broker terminal
$ make

After a little while, you should see that it has started:

Broker terminal
time=2025-02-18T19:46:46.368-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-18T19:46:46.369-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1

In a second terminal window, start the producer with the producer target:

Producer terminal
$ make producer

After a few seconds, you should see that it has started:

Producer terminal
time=2025-02-18T19:47:28.780-05:00 level=INFO msg="starting produce"
time=2025-02-18T19:47:34.541-05:00 level=INFO msg="produced semantically valid protobuf message" id=f00d937e-3c94-4fce-b51a-eab6c1f164fd
time=2025-02-18T19:47:34.795-05:00 level=INFO msg="produced semantically invalid protobuf message" id=53cba699-f972-4e6b-9512-4e40ff6eedf7

In a third terminal window, start the consumer with the consumer target:

Consumer terminal
$ make consumer

After a few seconds, you should see that it has started:

Consumer terminal
time=2025-02-18T19:47:56.717-05:00 level=INFO msg="starting consume"
time=2025-02-18T19:48:09.249-05:00 level=INFO msg="consumed message with new email margareterutherford@mckenzie.name and redacted old email"
time=2025-02-18T19:48:10.251-05:00 level=INFO msg="consumed message with new email bad-email-fish and redacted old email"

Note that there's something fishy in the last log message—the new email isn't a valid address. You're about to combine Protovalidate with Bufstream's Kafka capabilities to make sure invalid messages are rejected by the broker.

It's okay to use Ctrl-C to stop the broker, producer, and consumer before continuing.

Explore quickstart code#

This quickstart uses the example in bufstream/start. All filenames are relative to this directory.

Makefile#

This project's Makefile provides targets for downloading and running the Bufstream broker and the included Go-based producer and consumer applications.

One item to note is that producer and consumer targets both specify Kafka targets, groups, and Buf Schema Registry (BSR)-based Confluent Schema Registry URLs. When using Protovalidate in your own Bufstream-based Kafka applications, they should be configured for your equivalents.

Makefile producer target
.PHONY: producer
producer:
    go run ./cmd/bufstream-demo-produce \
      --topic email-updated \
      --group email-verifier \
      --csr-url "https://demo.buf.dev/integrations/confluent/bufstream-demo"

Protobuf#

The project provides a Protobuf message representing a user updating their email address:

proto/bufstream/demo/v1/demo.proto
message EmailUpdated {
  // Confluent Schema Registry (CSR) configuration for this message.
  option (buf.confluent.v1.subject) = {
    instance_name: "bufstream-demo"
    name: "email-updated-value"
  };

  // The ID of the user associated with this email address update.
  string id = 1;

  // The old email address.
  string old_email_address = 2;

  // The new email address.
  string new_email_address = 3;
}

YAML#

When you add Protovalidate, you'll update the following files:

  1. buf.yaml: Protovalidate must be added as a dependency.
  2. buf.gen.yaml: To avoid a common issue in projects using the Buf CLI's managed mode, you'll see how to exclude Protovalidate from package renaming.
  3. config/bufstream.yaml: To inform the Bufstream broker that semantically invalid messages (messages failing Protovalidate validation) should be rejected, you'll need to enable a configuration option.

Now that you know your way around the example code, it's time to integrate Protovalidate.

Integrate Protovalidate#

It's time to add Protovalidate to your project. It may be useful to read the Protovalidate overview and its quickstart before continuing.

Add Protovalidate dependency#

Because Protovalidate is a publicly available Buf Schema Registry (BSR) module, it's simple to add it to any Buf CLI project.

  1. Add Protovalidate to your Go project:

    $ go get buf.build/go/protovalidate@v0.12.0
    $ go mod tidy
    
  2. Add Protovalidate as a dependency to buf.yaml.

    buf.yaml
    # buf.yaml files define how to build the .proto files within your local directory
    #
    # See https://buf.build/docs/tutorials/getting-started-with-buf-cli for more details.
    version: v2
    modules:
      # Our .proto files live within the proto directory.
      - path: proto
    deps:
      # We import "buf/confluent/v1/extensions.proto" within our example files, which
      # comes from the demo.buf.dev/bufbuild/protovalidate module.
      - buf.build/bufbuild/confluent
    + - buf.build/bufbuild/protovalidate:v0.11.1
    
  3. Use any of your open terminal windows to update dependencies with the Buf CLI. You'll be warned that Protovalidate is declared but unused. That's fine.

    Updating CLI dependencies
    $ buf dep update
    WARN    Module buf.build/bufbuild/protovalidate is declared in your buf.yaml deps but is unused...
    
  4. Because this example uses managed mode, exclude Protovalidate from any updates to go_package.

    buf.gen.yaml
    # buf.gen.yaml files define how to generate stubs using the buf generate command.
    #
    # See https://buf.build/docs/generate/tutorial for more details.
    version: v2
    managed:
      enabled: true
      override:
        - file_option: go_package_prefix
          value: github.com/bufbuild/buf-examples/protovalidate/bufstream/start
        - file_option: go_package_prefix
          module: buf.build/bufbuild/confluent
          value: buf.build/gen/go/bufbuild/confluent/protocolbuffers/go
    + disable:
    +   - file_option: go_package
    +     module: buf.build/bufbuild/protovalidate
    plugins:
      - remote: buf.build/protocolbuffers/go
        out: gen
        opt: paths=source_relative
    clean: true
    
  5. In any of your open terminal windows, verify that configuration is complete by running buf generate. It should complete with no error.

Add Protovalidate rules#

You'll now add standard rules to demo.proto requiring that the id field is a UUID and that the new_email_address is a valid email address. Start by importing Protovalidate:

proto/bufstream/demo/v1/demo.proto
syntax = "proto3";

// Implements types for the Bufstream demo.
package bufstream.demo.v1;

// We import extensions.proto to use a custom option that allows us to associate
// a message with a specific subject.
//
// See the https://buf.build/bufbuild/confluent module
// for the full documentation.
import "buf/confluent/v1/extensions.proto";
+ import "buf/validate/validate.proto";

Next, add field-level validation rules:

  1. Use string.uuid to declare that id must be present and a valid UUID.
  2. Use required and string.email to validate the new_email_address field.
proto/bufstream/demo/v1/demo.proto
message EmailUpdated {
  // Code omitted for brevity

  // The ID of the user associated with this email address update.
- string id = 1;
+ string id = 1 [
+   (buf.validate.field).string.uuid = true
+ ];

  // The old email address.
  string old_email_address = 2 [
    // When data quality enforcement is enabled, debug_redact fields can be optionally redacted
    // on a per-topic basis when records are read by producers.
    //
    // This is generally used for sensitive fields.
    debug_redact = true
  ];
  // The new email address.
- string new_email_address = 3;
+ string new_email_address = 3 [
+   (buf.validate.field).required = true,
+   (buf.validate.field).string.email = true
+ ];
}

Learn more about string and standard rules.

Compile Protobuf#

Next, use any terminal window to compile your Protobuf and regenerate code, adding the Protovalidate options to all of your message descriptors:

$ buf generate

With regenerated code, your broker, producer and consumer should still build and start. (If you're still running the either, stop them with Ctrl-c.)

Restart the broker:

Broker terminal
$ make

After a little while, you should see that it has started:

Broker terminal
time=2025-02-19T10:32:46.909-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-19T10:32:46.910-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1

Restart the producer:

Producer terminal
$ make producer

After a few seconds, you should see that it has started:

Producer terminal
time=2025-02-19T10:40:47.878-05:00 level=INFO msg="starting produce"
time=2025-02-19T10:41:13.985-05:00 level=INFO msg="produced semantically valid protobuf message" id=9e253fe7-5123-413b-b010-ab4caba5142b
time=2025-02-19T10:41:14.237-05:00 level=INFO msg="produced semantically invalid protobuf message" id=8ef986cd-a865-420d-857b-05ebac436984

In a third terminal window, start the consumer with the consumer target:

Consumer terminal
$ make consumer

After a few seconds, you should see that it has started, but these log messages may be a surprise: the broker is still accepting updates with invalid email addresses, and the consumer still receives them.

Consumer terminal
time=2025-02-19T10:40:58.132-05:00 level=INFO msg="starting consume"
time=2025-02-19T10:41:14.662-05:00 level=INFO msg="consumed message with new email madgegottlieb@kunze.com and redacted old email"
time=2025-02-19T10:41:15.663-05:00 level=INFO msg="consumed message with new email bad-email-cheetah and redacted old email"

The producer fails to reject the invalid messages because Bufstream hasn't been told to validate inbound requests.

Add Protovalidate enforcement#

Enforcing Protovalidate logic in a Bufstream broker is a simple configuration change.

Follow these steps to begin enforcing Protovalidate rules in Bufstream:

  1. Stop the broker, producer, and consumer with Ctrl-c.
  2. Open config/bufstream.yaml.
  3. Make the following change to the data_enforcement.produce configuration, stating that validation should take place and failures should cause rejection:

    config/bufstream.yaml
    data_enforcement:
      schema_registries:
        - name: csr
          confluent:
            url: "https://demo.buf.dev/integrations/confluent/bufstream-demo"
            instance_name: "bufstream-demo"
      produce:
        - topics: { all: true }
          schema_registry: csr
          values:
            on_parse_error: REJECT_BATCH
    +       validation:
    +         on_error: REJECT_BATCH
    

Bufstream is now configured to reject messages that fail to pass your validation rules. Restart your broker:

Broker terminal
$ make

After a few seconds, you should see that it has started:

Broker terminal
time=2025-02-19T10:45:18.607-05:00 level=INFO msg="kafka server started" host=:: port=9092 tls=false
time=2025-02-19T10:45:18.608-05:00 level=INFO msg="updating ownership" oldShardNum=0 oldShardCount=0 shardNum=0 shardCount=1

Next, start your producer:

Producer terminal
$ make producer

After a few seconds, you should see that it has started:

Producer terminal
time=2025-02-19T10:45:21.797-05:00 level=INFO msg="starting produce"
time=2025-02-19T10:45:23.688-05:00 level=INFO msg="produced semantically valid protobuf message" id=9e05b5fc-3c18-4c8c-a4ea-09ff161bff32

This time, note that invalid messages are being rejected:

Producer terminal
time=2025-02-19T10:45:23.809-05:00 level=ERROR msg="error on produce of semantically invalid protobuf message" error="failed to produce: INVALID_RECORD: This record has failed the validation on broker and hence be rejected."

Back in the broker's output, the details of the error are available.

Broker terminal
time=2025-02-19T10:45:23.808-05:00 level=ERROR msg="data enforcement error" kafka.topic.name=email-updated offset=0 error="enforcement validate error, rejecting batch: validation error:\n - new_email_address: value must be a valid email address [string.email]" action=REJECT_BATCH kafka.kafka.api.key=Produce kafka.kafka.api.version=10 kafka.correlation_id=1

In a third terminal window, start the consumer with the consumer target:

Consumer terminal
$ make consumer

After a few seconds, you should see that it has started and no invalid messages are received:

Consumer terminal
time=2025-02-19T10:45:23.926-05:00 level=INFO msg="starting consume"
time=2025-02-19T10:45:24.518-05:00 level=INFO msg="consumed message with new email fridamurphy@veum.io and redacted old email"

You've now added Protovalidate enforcement to a Bufstream broker. All consumers can trust that only semantically valid messages are in the stream.

Conclusion#

In this quickstart, you've learned how to add Protovalidate to your Protobuf project, declare validation rules in your Protobuf files, and enable their enforcement within Bufstream.

Further reading#