Skip to content

Schema providers

Bufstream is more than just a drop-in Kafka replacement: it's built from the ground up to understand the shape of the data traversing its topics. We call this broker-side schema awareness, and it brings some interesting capabilities, like semantic validation and Iceberg integration.

Overview#

To get started with schema awareness, you'll need to:

  1. Configure Bufstream to use a Protobuf schema provider like the Buf Schema Registry, any local Buf input, or a Confluent Schema Registry (CSR)-compatible API.
  2. Map Protobuf messages to topics.

Use the Buf Schema Registry#

The Buf Schema Registry is the recommended schema provider for production Bufstream deployments. The BSR provides centralized schema management, version control, breaking change detection, and more. Once configured, topics automatically resolve schemas from the registry.

Configure Bufstream#

It's simple to use a Buf Schema Registry as a schema provider. Start by adding your schema registry to Bufstream's configuration:

schema_registry:
  bsr:
    host: buf.build
schemaRegistry:
  bsr:
    host: buf.build

To access private modules or BSR instances, options like authentication and TLS configuration are documented in the reference pages for bufstream.yaml and Helm values.

Configure topics#

Now, set the following topic configurations:

  • buf.registry.value.schema.module - The name of the BSR module containing the topic's message.
  • buf.registry.value.schema.message - The fully-qualified name of the message for the topic.

Because Bufstream is API-compatible with Kafka, you can use your existing processes to configure topics, whether they're infrastructure-as-code (IaC), web consoles like AKHQ, or Kafka CLI tools.

Using Bufstream's built-in kafka config command:

Configure a BSR module and message for the topic
./bufstream kafka config topic set --topic demo --name buf.registry.value.schema.module --value demo.buf.dev/bufbuild/invoice
./bufstream kafka config topic set --topic demo --name buf.registry.value.schema.message --value invoice.v1.Invoice

Use Buf inputs (local development)#

For development purposes, you can point Bufstream directly at a Buf input to skip the need to run and configure a schema registry.

Configure Bufstream#

If you're running Bufstream as a standalone binary, use the --schema flag to point to any valid Buf input.

Given this directory structure and buf.yaml:

Directory structure for using local Protobuf files
.
├── buf.yaml
└── proto
    └── invoice
        └── v1
            └── invoice.proto
Example buf.yaml with Protovalidate
version: v2
modules:
  - path: proto
deps:
  - buf.build/bufbuild/protovalidate
lint:
  use:
    - STANDARD
breaking:
  use:
    - FILE

Any messages in invoice.proto, and their Protovalidate rules, are provided to Bufstream:

Example invoice.proto with Protovalidate rules
syntax = "proto3";

package invoice.v1;

import "buf/validate/validate.proto";

// Invoice is a collection of goods or services sold to a customer.
message Invoice {
  string invoice_id = 1 [(buf.validate.field).string.uuid = true];
  string account_id = 2 [(buf.validate.field).string.uuid = true];
  repeated LineItem line_items = 4 [(buf.validate.field).repeated.min_items = 1];
}

// LineItem is an individual good or service added to an invoice.
message LineItem {
  string line_item_id = 1 [(buf.validate.field).string.uuid = true];
  string product_id = 2 [(buf.validate.field).string.uuid = true];
  uint64 quantity = 3 [(buf.validate.field).uint64.gt = 0];
  uint64 unit_price_cents = 4 [(buf.validate.field).uint64.gte = 0];
}

Configure topics#

Set the following topic configurations:

  • buf.registry.value.schema.message - The fully-qualified name of the message for the topic.

Because Bufstream is API-compatible with Kafka, you can use your existing processes to configure topics, whether they're infrastructure-as-code (IaC), web consoles like AKHQ, or Kafka CLI tools.

Using Bufstream's built-in kafka config command:

Configure a local message for the topic
./bufstream kafka config topic set --topic demo --name buf.registry.value.schema.message --value invoice.v1.Invoice

Use a Confluent Schema Registry#

Add a schema registry#

Bufstream can also use any Confluent Schema Registry (CSR)-compatible API as a schema provider.

Get started by adding your schema registry to its configuration as a confluent schema registry:

schema_registry:
  confluent:
    url: https://my-domain.buf.dev/integrations/confluent/instance-name
schemaRegistry:
  confluent:
    url: https://my-domain.buf.dev/integrations/confluent/instance-name

Confluent Schema Registry options, like TLS configuration and authentication, are documented in the reference pages for bufstream.yaml and Helm values.

Map subjects to messages#

Deployments using a Confluent Schema Registry must use bufbuild/confluent in their Protobuf to map subjects to Protobuf messages. See the CSR-specific Manage schemas page for more.