Skip to content

Broker-side semantic validation#

For Bufstream to validate message contents, you'll need to update its configuration to include a schema registry and data enforcement policy.

TL;DR#

This minimal example configures Bufstream for semantic validation:

data_enforcement:
  # Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name
  # Reject any batch of messages with values that fail semantic validation or parsing, using
  # "coerce" to add missing schema information (automatic enveloping).
  #
  # Change "produce" to "fetch" and "REJECT_BATCH" to "FILTER_RECORD" to accept invalid
  # messages and filter them out of streams sent to consumers. Add "topics" to configure a
  # topic-specific policy.
  produce:
    - schema_registry: csr
      values:
        coerce: true
        on_parse_error: REJECT_BATCH
        validation:
          on_error: REJECT_BATCH
dataEnforcement:
  # Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name
  # Reject any batch of messages with values that fail semantic validation or parsing, using
  # "coerce" to add missing schema information (automatic enveloping).
  #
  # Change "produce" to "fetch" and "REJECT_BATCH" to "FILTER_RECORD" to accept invalid
  # messages and filter them out of streams sent to consumers. Add "topics" to configure a
  # topic-specific policy.
  produce:
    - schema_registry: csr
      values:
        coerce: true
        on_parse_error: REJECT_BATCH
        validation:
          on_error: REJECT_BATCH

Once you've made this change, batches containing messages associated with subjects that fail to pass Protovalidate validation rules are rejected.

Overview#

More than just a drop-in Kafka replacement, Bufstream's built from the ground up to understand the shape of the data traversing its topics. We call this broker-side schema awareness, and it brings some interesting capabilities. Chief among these is broker-side semantic validation—its ability to block bad data from entering topics in the first place.

Broker-side enforcement#

Rather than trusting clients to send valid messages and manage their own schemas, Bufstream can ensure messages match a known schema and pass its semantic validation rules. This guarantees consumers always receive well-formed data, eliminating a large class of data outages.

This feature works with binary-encoded Protobuf messages and the Buf Schema Registry or any other Confluent API-compatible schema registry. To associate Protobuf schemas with Kafka topics in schema registries, follow the documentation for integrating the BSR with Kafka.

Semantic validation#

Fields in a schema are more than just data types. They have semantic properties:

  • They can be optional or required.
  • Fields like email addresses aren't just strings—they must match a specific format.
  • Numbers, like a user's age, may be a uint32, but you can safely assume a new user can't be more than 150 years old.

That's why Bufstream goes beyond simple schema ID validation (making sure messages use a schema ID) and schema validation (type-checking message data against its schema) to provide semantic validation, deeply inspecting messages to ensure they meet Protovalidate rules in their schema.

With Protovalidate, you can verify that a user's email is a valid email address, that an age is provided and within a sensible range, and even use a CEL expression to state that first_name should only be set if a last_name is provided:

Semantic validation for a User
message User {
    option (buf.validate.message).cel = {
        id:"first_name.requires.last_name",
        message: "If a first name is provided, a last name must also be provided.",
        expression: "this.first_name.size() == 0 || this.last_name.size() > 0"
    };

    string email = 1 [
        (buf.validate.field).string.email = true
    ];
    uint32 age = 2 [
        (buf.validate.field).required = true,
        (buf.validate.field).uint32.gte = 0,
        (buf.validate.field).uint32.lte = 150
    ];
    string last_name = 3;
    string first_name = 4;
}

Using Bufstream's broker-side semantic validation means you can rely on topic data to meet your business rules and domain logic, eliminating an entire class of data errors before they materialize in downstream systems.

Enabling semantic validation#

Enabling semantic validation requires adding a schema registry and at least one produce or fetch policy to your Bufstream configuration.

Add a schema registry#

To parse a message, Bufstream determines its schema using the Confluent wire format, expecting it to use a five-byte "envelope" stating the ID of a schema stored in a configured schema registry. Optionally, Bufstream can add missing envelopes using a topic's latest schema ID.

To allow Bufstream to look up schemas using these IDs, add your Buf Schema Registry instance (or any other Confluent API-compatible schema registry) to its data enforcement configuration:

data_enforcement:
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name
dataEnforcement:
  schema_registries:
    - name: csr
      confluent:
        url: https://my-domain.buf.dev/integrations/confluent/instance-name

Schema registry options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Add a policy#

Bufstream data enforcement policies control actions taken on a record's keys and values. Because policies use schemas, all policies must be associated with a configured schema registry. By default, policies apply to all topics, but topic-specific policies are supported.

To add a policy enforcing semantic validation, add at least one produce or fetch policy to Bufstream's data enforcement configuration. The following example shows a produce policy that:

  1. Automatically envelopes any message values not already in the Confluent wire format, allowing their contents to be parsed and validated against the latest revision of the topic's schema.
  2. Rejects any batch of messages containing a malformed value, returning a produce exception to the client.
  3. Rejects any batch of messages containing a value failing semantic validation rules, returning a produce exception to the client.

With this configuration, Bufstream only accepts well-formed, semantically valid messages.

data_enforcement:
  produce:
    - schema_registry: csr
      values:
        # Automatically envelope messages.
        coerce: true
        # Reject messages that do not match the topic's schema.
        on_parse_error: REJECT_BATCH
        # Reject messages failing semantic validation.
        validation:
          on_error: REJECT_BATCH
dataEnforcement:
  produce:
    - schema_registry: csr
      values:
        # Automatically envelope messages.
        coerce: true
        # Reject messages that do not match the topic's schema.
        on_parse_error: REJECT_BATCH
        # Reject messages failing semantic validation.
        validation:
          on_error: REJECT_BATCH

Policy options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Common policy configurations#

Topic-specific policies#

For finer-grained control, Bufstream supports topic-specific policies. Only the first policy that matches a given produce or fetch request takes effect. This configuration snippet shows how to configure a policy for a specific topic.

produce:
  # Only apply this policy to "my-topic"
  - topics: { equal: "my-topic" }
    schema_registry: csr
    values:
      # Automatically envelope messages.
      coerce: true
      # Reject messages that do not match the topic's schema.
      on_parse_error: REJECT_BATCH
      # Reject messages failing semantic validation.
      validation:
        on_error: REJECT_BATCH

These options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Filtering with fetch policies#

It may not be reasonable to expect all of your existing producers to always send valid messages, but that doesn't mean you can't leverage broker-side semantic validation to protect consumers from bad data.

To prevent blocking these producers while guaranteeing consumers only receive well-formed, semantically valid messages, use a fetch policy to filter out invalid messages:

fetch:
  - schema_registry: csr
    values:
      # Filter messages that do not match the topic's schema.
      on_parse_error: FILTER_RECORD
      # Filter messages failing semantic validation.
      validation:
        on_error: FILTER_RECORD

These options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Using existing topic data#

If a topic already contains messages of unknown quality, fetch policies can be used to automatically envelope and filter data before it reaches consumers.

This is identical to filtering with fetch policies, but adds coerce: true to envelope any messages not already in the Confluent wire format.

fetch:
  - schema_registry: csr
    values:
      # Automatically envelope messages.
      coerce: true
      # Filter messages that do not match the topic's schema.
      on_parse_error: FILTER_RECORD
      # Filter messages failing semantic validation.
      validation:
        on_error: FILTER_RECORD

To emulate traditional Kafka, allowing consumers to see enveloped but invalid data, change FILTER_RECORD to PASS_THROUGH.

These options are documented in the reference page for bufstream.yaml. Helm syntax is identical.

Requiring enveloped messages#

To require that producers only send enveloped messages, use coerce: false to disable automatic enveloping and on_no_schema: REJECT_BATCH to reject batches of messages containing any messages not already in the Confluent wire format:

produce:
  - schema_registry: csr
    values:
      coerce: false
      on_no_schema: REJECT_BATCH

These options are documented in the reference page for bufstream.yaml. Helm syntax is identical.