Broker-side semantic validation#
For Bufstream to validate message contents, you'll need to update its configuration to include a schema registry and data enforcement policy.
TL;DR#
This minimal example configures Bufstream for semantic validation:
data_enforcement:
# Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
schema_registries:
- name: csr
confluent:
url: https://my-domain.buf.dev/integrations/confluent/instance-name
# Reject any batch of messages with values that fail semantic validation or parsing, using
# "coerce" to add missing schema information (automatic enveloping).
#
# Add "topics" to configure topic-specific policies.
produce:
- schema_registry: csr
values:
coerce: true
on_parse_error: REJECT_BATCH
validation:
on_error: REJECT_BATCH
dataEnforcement:
# Add the Buf Schema Registry or any other Confluent API-compatible Protobuf schema registry.
schema_registries:
- name: csr
confluent:
url: https://my-domain.buf.dev/integrations/confluent/instance-name
# Reject any batch of messages with values that fail semantic validation or parsing, using
# "coerce" to add missing schema information (automatic enveloping).
#
# Add "topics" to configure topic-specific policies.
produce:
- schema_registry: csr
values:
coerce: true
on_parse_error: REJECT_BATCH
validation:
on_error: REJECT_BATCH
Once you've made this change, batches containing messages associated with subjects that fail to pass Protovalidate validation rules are rejected.
Overview#
More than just a drop-in Kafka replacement, Bufstream's built from the ground up to understand the shape of the data traversing its topics. We call this broker-side schema awareness, and it brings some interesting capabilities. Chief among these is broker-side semantic validation—its ability to block bad data from entering topics in the first place.
Broker-side enforcement#
Rather than trusting clients to send valid messages and manage their own schemas, Bufstream can ensure messages match a known schema and pass its semantic validation rules. This guarantees consumers always receive well-formed data, eliminating a large class of data outages.
This feature works with binary-encoded Protobuf messages and the Buf Schema Registry or any other Confluent API-compatible schema registry. To associate Protobuf schemas with Kafka topics in schema registries, follow the documentation for integrating the BSR with Kafka.
Semantic validation#
Fields in a schema are more than just data types. They have semantic properties:
- They can be optional or required.
- Fields like email addresses aren't just
strings
—they must match a specific format. - Numbers, like a user's age, may be a
uint32
, but you can safely assume a new user can't be more than 150 years old.
That's why Bufstream goes beyond simple schema ID validation (making sure messages use a schema ID) and schema validation (type-checking message data against its schema) to provide semantic validation, deeply inspecting messages to ensure they meet Protovalidate rules in their schema.
With Protovalidate, you can verify that a user's email is a valid email address, that an age is provided and within a sensible range, and even use a CEL expression to state that first_name
should only be set if a last_name
is provided:
message User {
option (buf.validate.message).cel = {
id:"first_name.requires.last_name",
message: "If a first name is provided, a last name must also be provided.",
expression: "this.first_name.size() == 0 || this.last_name.size() > 0"
};
string email = 1 [
(buf.validate.field).string.email = true
];
uint32 age = 2 [
(buf.validate.field).required = true,
(buf.validate.field).uint32.gte = 0,
(buf.validate.field).uint32.lte = 150
];
string last_name = 3;
string first_name = 4;
}
Using Bufstream's broker-side semantic validation means you can rely on topic data to meet your business rules and domain logic, eliminating an entire class of data errors before they materialize in downstream systems.
Enabling semantic validation#
Enabling semantic validation requires adding a schema registry and at least one produce
policy to your Bufstream configuration.
Add a schema registry#
To parse a message, Bufstream determines its schema using the Confluent wire format, expecting it to use a five-byte "envelope" stating the ID of a schema stored in a configured schema registry. Optionally, Bufstream can add missing envelopes using a topic's latest schema ID.
To allow Bufstream to look up schemas using these IDs, add your Buf Schema Registry instance (or any other Confluent API-compatible schema registry) to its data enforcement configuration:
Schema registry options are documented in the reference page for bufstream.yaml. Helm syntax is identical.
Add a policy#
Bufstream data enforcement policies control actions taken on a record's keys and values. Because policies use schemas, all policies must be associated with a configured schema registry. By default, policies apply to all topics, but topic-specific policies are supported.
To add a policy enforcing semantic validation, add at least one produce
policy to Bufstream's data enforcement configuration. The following example shows a produce
policy that:
- Automatically envelopes any message values not already in the Confluent wire format, allowing their contents to be parsed and validated against the latest revision of the topic's schema.
- Rejects any batch of messages containing a malformed value, returning a produce exception to the client.
- Rejects any batch of messages containing a value failing semantic validation rules, returning a produce exception to the client.
With this configuration, Bufstream only accepts well-formed, semantically valid messages.
Policy options are documented in the reference page for bufstream.yaml. Helm syntax is identical.
Common policy configurations#
Topic-specific policies#
For finer-grained control, Bufstream supports topic-specific policies. Only the first policy that matches a given request takes effect. This configuration snippet shows how to configure a policy for a specific topic.
produce:
# Only apply this policy to "my-topic"
- topics: { equal: "my-topic" }
schema_registry: csr
values:
# Automatically envelope messages.
coerce: true
# Reject messages that do not match the topic's schema.
on_parse_error: REJECT_BATCH
# Reject messages failing semantic validation.
validation:
on_error: REJECT_BATCH
These options are documented in the reference page for bufstream.yaml. Helm syntax is identical.