This feature is only available as an add-on to the Enterprise plan.
The BSR's Confluent Schema Registry is compatible with all clients and tools that can consume the CSR API. The examples below use the official Confluent clients in Java and Go. Configuration and usage may vary for third-party clients.
Authentication and configuration
-
Clients must be authenticated against the BSR to access its Confluent Schema Registry using a token. We recommend using a machine user per client application.
-
The BSR's Confluent Schema Registry supports any subject name strategy, but only supports the default reference subject name strategy at this time.
-
The BSR's Confluent Schema Registry runs in the
READONLY
mode and will block clients from auto-registering schemas or changing other configuration via the API.
Producers
In this example, we've configured a Kafka producer to serialize and emit a message onto the email-updated
topic using the
BSR's Confluent Schema Registry. The serializer uses the default TopicNameStrategy to resolve a subject name of
email-updated-value
used to look up the schema in the BSR.
package com.example.demo;
import com.google.protobuf.ByteString;
import com.example.buf.gen.demo.analytics.EmailUpdated;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.UUID;
public class ProtobufProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class.getName());
props.put(KafkaProtobufSerializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "STATIC_TOKEN");
props.put(KafkaProtobufSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA_HOST_AND_PORT>");
props.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "<CSR_INSTANCE_URL>");
props.put(KafkaProtobufSerializerConfig.BEARER_AUTH_TOKEN_CONFIG, "<BUF_TOKEN>");
try (Producer<String, EmailUpdated> producer = new KafkaProducer<>(props)) {
String topic = "email-updated"; // corresponds to the `email-updated-value` subject using the TopicNameStrategy
String key = "testkey";
EmailUpdated event = EmailUpdated.newBuilder()
.setUserId(123)
.setPreviousEmail("previous@example.com")
.setNewEmail("new@example.com")
.setNewEmailVerified(true)
.build();
producer.send(new ProducerRecord<>(topic, key, event));
}
}
}
Consumers
The Java deserializer supports decoding into either concrete messages (like those used in the producer example above) or dynamic messages constructed from the schema resolved from the BSR's Confluent Schema Registry. The example below uses a DynamicMessage
:
package com.example.demo;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ProtobufConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA_HOST_AND_PORT>");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "<CSR_INSTANCE_URL>");
props.put(KafkaProtobufDeserializerConfig.BEARER_AUTH_TOKEN_CONFIG, "<BUF_TOKEN>");
props.put(KafkaProtobufDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "STATIC_TOKEN");
try (Consumer<String, DynamicMessage> consumer = new KafkaConsumer<>(props)) {
String topic = "email-updated";
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(10_000));
for (ConsumerRecord<String, DynamicMessage> record : records) {
System.out.printf("Consumed event from topic %s: key %s -> value %s%n", topic, record.key(), record.value());
}
}
}
}