Skip to content

Broker-side semantic validation#

For Bufstream to validate message contents, you'll need to add a schema registry and update topic or broker-level configuration.

TL;DR#

Add a schema registry:

version: v1beta1

# Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
schema_registry:
  confluent:
    url: https://my-domain.buf.dev/integrations/confluent/instance-name
dataEnforcement:
  # Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name

Use existing tools, a console like AKHQ, or a CLI to set bufstream.validate.mode to REJECT:

kafkactl alter topic topic-name --config bufstream.validate.mode=REJECT
kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name topic-name \
    --alter \
    --add-config bufstream.validate.mode=REJECT

Once you've made this change, batches containing messages associated with subjects that fail to pass Protovalidate validation are rejected. To enforce this on all topics, set a broker-level configuration instead of targeting a specific topic.

Overview#

More than just a drop-in Kafka replacement, Bufstream's built from the ground up to understand the shape of the data traversing its topics. We call this broker-side schema awareness, and it brings some interesting capabilities. Chief among these is broker-side semantic validation—its ability to block bad data from entering topics in the first place.

Broker-side enforcement#

Rather than trusting clients to send valid messages and manage their own schemas, Bufstream can ensure messages match a known schema and pass its semantic validation rules. This guarantees consumers always receive well-formed data, eliminating a large class of data outages.

This feature works with binary-encoded Protobuf messages and the Buf Schema Registry or any other Confluent API-compatible schema registry. To associate Protobuf schemas with Kafka topics in schema registries, follow the documentation for integrating the BSR with Kafka.

Semantic validation#

Fields in a schema are more than just data types. They have semantic properties:

  • They can be optional or required.
  • Fields like email addresses aren't just strings—they must match a specific format.
  • Numbers, like a user's age, may be a uint32, but you can safely assume a new user can't be more than 150 years old.

That's why Bufstream goes beyond simple schema ID validation (making sure messages use a schema ID) and schema validation (type-checking message data against its schema) to provide semantic validation, deeply inspecting messages to ensure they meet Protovalidate rules in their schema.

With Protovalidate, you can verify that a user's email is a valid email address, that an age is provided and within a sensible range, and even use a CEL expression to state that first_name should only be set if a last_name is provided:

Semantic validation for a User
message User {
    option (buf.validate.message).cel = {
        id:"first_name.requires.last_name",
        message: "If a first name is provided, a last name must also be provided.",
        expression: "this.first_name.size() == 0 || this.last_name.size() > 0"
    };

    string email = 1 [
        (buf.validate.field).string.email = true
    ];
    uint32 age = 2 [
        (buf.validate.field).required = true,
        (buf.validate.field).uint32.gte = 0,
        (buf.validate.field).uint32.lte = 150
    ];
    string last_name = 3;
    string first_name = 4;
}

Using Bufstream's broker-side semantic validation means you can rely on topic data to meet your business rules and domain logic, eliminating an entire class of data errors before they materialize in downstream systems.

Enabling semantic validation#

Enabling semantic validation requires adding a schema registry and updating topic or broker-level configuration.

Add a schema registry#

To parse a message, Bufstream determines its schema using the Confluent wire format, expecting it to use a five-byte "envelope" stating the ID of a schema stored in a configured schema registry.

To allow Bufstream to look up schemas using these IDs, add your Buf Schema Registry instance (or any other Confluent API-compatible schema registry) to its configuration:

schema_registry:
  confluent:
    url: https://my-domain.buf.dev/integrations/confluent/instance-name
dataEnforcement:
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name

Schema registry options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Update topic or broker configuration#

To start rejecting messages that don't pass your validation rules, set bufstream.validation.mode to REJECT. This configuration is available at the topic or broker level. Because Bufstream's API-compatible with Kafka, you can use your existing processes, whether they're infrastructure-as-code (IaC), Web consoles like AKHQ, or Kafka CLI tools.