Skip to content

Schema enforcement

Data quality enforcement

Rather than trusting producers to send valid data, Bufstream can reject messages that don't match the topic's schema. This guarantees that consumers always receive well-formed data, eliminating a large class of data outages. This feature works with binary-encoded Protobuf messages and the Buf Schema Registry (or any other Protobuf registry that supports Confluent's REST API). To associate Protobuf schemas with a Kafka topic in the Buf Schema Registry, follow the documentation for integrating the BSR with Kafka.

Self-describing data

It's often helpful to make Kafka messages self-describing, so that tools and frameworks can unmarshal, manipulate, and display them. The most common approach to making messages self-describing is to prefix the serialized message with a few extra bytes (commonly called an "envelope"). The prefix encodes the ID of the message's schema, which can then be retrieved from a schema registry. Because Confluent introduced and popularized this format, it's commonly called the Confluent wire format. Much of the Kafka ecosystem supports this format, including most client libraries, Kafka Connect, AKHQ, kSQLdb, and Snowflake's Kafka Connector.

Enabling schema enforcement

To reject data that doesn't match the topic's schema, add the following to your Helm values file:

data_enforcement:
  schema_registries:
    - name: my-registry
      confluent:
        url: https://my-schema-registry.com/registry-id
        basic_auth:
          username:
            string: your-user
          password:
            string: your-password
  produce:
    - topics: { equal: my-topic }
      schema_registry: my-registry
      values:
        coerce: true
        on_parse_error: REJECT_BATCH

With this configuration, consumers always receive well-formed, self-describing data. If the producer sends un-enveloped messages, Bufstream automatically envelopes them using the latest revision of the topic's schema. Bufstream then parses the message using the schema from the envelope. If the message doesn't match the schema, Bufstream rejects it and the data producer sees an error.

Advanced configurations

Automatic enveloping without parsing

To enable automatic enveloping without verifying that the data matches the schema, change the produce filter above to the following:

produce:
  - topics: { all: true }
    schema_registry: my-registry
    values:
      coerce: true
      skip_parse: true

Enforce producer enveloping without parsing

To require that data producers send enveloped messages without verifying that the data matches the schema, change the produce filter to the following:

produce:
  - topics: { all: true }
    schema_registry: my-registry
    values:
      coerce: false
      on_no_schema: REJECT_BATCH

Automatically enveloping old messages

If a topic contains un-enveloped data, Bufstream can envelope the existing data on the fly as it's read. To automatically envelope the data and verify that it matches the schema, hiding messages with invalid data from the consumer, add the following to your Helm values file:

data_enforcement:
  schema_registries:
    - name: my-registry
      confluent:
        url: https://my-schema-registry.com/registry-id
        basic_auth:
          username:
            string: your-user
          password:
            string: your-password
  fetch:
    - topics: { all: true }
      schema_registry: my-registry
      values:
        coerce: true
        on_parse_error: FILTER_RECORD

To instead allow the consumer to see enveloped but invalid data, change FILTER_RECORD to PASS_THROUGH.

Topic-specific policies

For finer-grained control, Bufstream supports topic-specific policies. Only the first policy that matches a given produce or fetch request takes effect. This Helm values snippet shows how to configure a policy for a specific topic:

produce:
  - topics: { equal: "topic" }
    schema_registry: my-registry
    values:
      coerce: true
      on_parse_error: REJECT_BATCH

Using multiple registries

There may be cases where you have more than one registry in place—for example, a development and production registry. Bufstream allows you to configure each registry independently and choose which registry to use with each policy. As an example, this Helm values snippet configures two registries and uses each in one policy:

produce:
  # pass through any errors from the development registry for a specific topic ("dev-topic").
  - topics: { equal: "dev-topic" }
    schema_registry: dev-registry
    values:
      coerce: true
      on_parse_error: PASS_THROUGH
    # reject messages that error for all topics in the production registry
  - topics: { all: true }
    schema_registry: prod-registry
    values:
      coerce: true
      on_parse_error: REJECT_BATCH