Reference
This page contains technical reference information about Bufstream's Apache Iceberg™ integration.
Intake and archive#
Bufstream's produce data flow writes messages to intake files including messages from any number of topics and partitions, grouped according to a time boundary. If you are using transactions, intake files will also include uncommitted messages. Intake files are refined into archive files, with each archive file containing committed messages for a single topic partition and is partitioned by date.
When the specified archive format is iceberg
, Bufstream updates the configured Iceberg catalog when it publishes an archive to sync the new data file to the table's metadata. The catalog update process is performed synchronously. If the catalog update fails, it's addressed later via a reconciliation process.
Zero-copy Iceberg and table management#
Bufstream's Iceberg support is "zero copy"; it doesn't make a separate copy of the data for Iceberg. When Bufstream archives data per topic-partition to a durable storage form, it's writing Parquet files used both to serve topic records to Kafka consumers and as data files in the Iceberg table.
Because of this, there are some constraints on the Iceberg table layout as well as how it's managed and maintained:
- "Managed" Iceberg catalogs should not be used with Bufstream's tables. "Managed" features allow the catalog to re-write data files, including re-partitioning them and deleting earlier versions of the data files. This conflicts with the way Bufstream uses the data files to serve record data to Kafka consumers. Bufstream handles table maintenance itself, including cleaning up old snapshots and removing old data files.
- The Bufstream workload should be granted permissions to create tables in the catalog. Bufstream manages the table, it keeps its schema in sync with Protobuf schemas in a schema registry, and it keeps other table metadata (like partition scheme, sort order, etc) in sync with how it stores data in Parquet archive files. It's advisable to not create tables first. Instead, let Bufstream create the table when it first starts archiving the topic to the Iceberg format.
- Archiving is done per topic-partition. This means that Parquet files are generated per topic-partition. Consequently, the table's partition scheme can't be customized to use arbitrary properties in the published records. The only customization that's currently supported is the granularity of time-based partitioning. This partitions the table based on the timestamp when a record was ingested. The granularity can be hourly, daily (default), or monthly.
Data transformations#
At the start of each archiving job, Bufstream queries the configured schema registry to fetch the latest message schema and caches it in memory to reduce concurrent queries for the same topic. The retrieved Protobuf schema is used to compute an Iceberg schema, and Bufstream stores the state of the Iceberg schema in the system's metadata store (e.g., Postgres) to ensure proper re-use of allocated field IDs. The stored state also tracks deleted fields and columns and decides whether to create or rename fields if there are incompatible changes to a column's type. Once the Iceberg schema is computed, Bufstream checks the catalog to determine if the schema has changed. If there are changes, Bufstream updates the schema in the table's metadata. If no Iceberg table exists yet, Bufstream creates it and sets the schema ID to 0 for the computed schema.
From the computed Iceberg schema, Bufstream derives a Parquet schema used to write the data files. Bufstream synthesizes elements from the Protobuf data to map it to Parquet column values. At this time, Bufstream doesn't support customizing the name of Iceberg or Parquet field types via the use of Protobuf options. Transformations are only available for Protobuf encoded data.
An important implication of the above process is that the Iceberg table is only created when data is archived, which means that records must be produced to the topic. If the Protobuf schema in the schema registry changes, the Iceberg table isn't updated until data is archived. This means records must be produced after the schema change in order for the Iceberg schema to be updated.
Table schema#
Because the archived Parquet files are used by Bufstream to serve Kafka subscribers, the schema for these files and for the corresponding Iceberg table includes additional metadata used to reconstruct the stream data.
The following sections describe the fields of the Iceberg table schema.
key
#
struct
Represents the key in the original published record.
There is a child field named __raw__
(bytes) that will always be populated, and it represents the original bytes of the key.
Optionally, additional fields will be present if a Protobuf message schema is associated with the key of this Bufstream topic.
A schema can be associated with the topic via the [data enforcement] configuration in bufstream.yaml
.
When a Protobuf message schema is used, in addition to fields that mirror the structure of the message, there will also be fields named __prefix__
(bytes) and __err__
(string).
The former includes any preamble to the message data in the format of a CSR envelope. The field is only populated when the original message included such a prefix or if the coerce
option is configured in data enforcement.
The latter field is present when the message data can't be decoded, which should only be possible if data enforcement configuration allows invalid data to pass through.
When the __err__
field is present with a decoding error message, none of the other fields, aside from __raw__
will be populated.
val
#
struct
Represents the value in the original published record.
There is a child field named __raw__
(bytes) that's optional, and it represents the original bytes of the value. It will only be populated when there is no Protobuf message schema or there was an error when processing and decoding the Protobuf data. When not populated, the value can be reconstructed from other fields.
Optionally, additional fields are present if a Protobuf message schema is associated with the value of the Bufstream topic.
A schema can be associated with the topic via the data enforcement configuration in bufstream.yaml
.
When a Protobuf message schema is used, in addition to fields that mirror the structure of the message, there will also be fields named __prefix__
(bytes) and __err__
(string).
The former includes any preamble to the message data in the format of a CSR envelope. The field is only populated when the original message included such a prefix or if the coerce
option is configured in data enforcement.
The latter field is present when the message data can't be decoded, which should only be possible if data enforcement configuration allows invalid data to pass through.
When the __err__
field is present with a decoding error message, none of the other fields, aside from __raw__
will be populated.
headers
#
list of structs
Key value pairs that Kafka producers attach to records.
key
#
string
value
#
bytes
kafka
#
struct
Metadata pertaining to the Kafka topics, partitions, and records.
partition
#
int32
The zero-based index of the topic partition for this record. A topic with N partitions will have records with values from 0 to N-1 (inclusive).
offset
#
int64
The unique identifier for a record within a single topic-partition. This value is assigned by the system at ingestion to preserve record ordering when serving subscribers.
event_timestamp
#
timestamp
A timestamp (defined in microseconds) associated with the record set by the Kafka producer so that events originating in a remote system are preserved when published in a topic.
ingest_timestamp
#
timestamp
A timestamp (defined in microseconds) assigned by the system when the record is accepted. Ingestion timestamps aren't strictly ordered due to clock skew between Bufstream nodes. Because records in the Parquet file are explicitly ordered, the system will adjust the ingestion timestamp so that it's monotonic. Monotonic timestamps are necessary for querying as well as time-based partitioning strategies.
batch_start
#
int64
This field identifies the first offset in the batch that contained this record. The system can preserve the grouping when reconstructing the batches to send to Kafka consumers.
Mapping Protobuf schemas to Iceberg schemas#
The following describes how fields in the Iceberg schema are created from the Protobuf message schema:
Protobuf Type | Iceberg Type |
---|---|
repeated 1 |
list |
map 1 |
map |
int32 , sint32 , sfixed32 |
int |
uint32 , fixed32 |
int 2 |
int64 , sint64 , sfixed64 |
long |
uint64 , fixed64 |
long 2 |
bool |
bool |
float |
float |
double |
double |
string |
string |
bytes |
binary |
enum |
string 3 |
message , group |
struct 4 |
Avoiding name collisions#
In general, Bufstream uses names that start and end with double underscores (__
) for synthesized fields so that the names don't conflict with user-defined field names. To avoid name collision issues, avoid using double underscores as suffixes or prefixes for field names in your Protobuf schemas. Synthesized fields in the Iceberg schema are used by Bufstream to faithfully reconstruct record data for Kafka consumers.
Recursive types#
Note that Iceberg schemas can't represent recursive types. Protobuf, on the other hand, does support recursive types by allowing a message type to have a field that directly or indirectly refers to that same message type. When generating the Iceberg schema for a recursive Protobuf type, the message field that's the point of recursion is be treated as if it were an empty message. It will have only an __unrecognized__
field inside it, and all of the field's record data is stored in this __unrecognized__
field.
Parsing errors#
If a topic's data enforcement policy uses the PASS_THROUGH
value for on_parse_error
and a message fails to parse, the values of related Iceberg fields are impacted. The __err__
field in key or val contains any relevant error message. The __raw__
field continues to store the original bytes of the key or value.
Unrecognized fields#
If a parsed Protobuf message contains unrecognized field values, they'll be stored in the associated key or val's __unrecognized__
binary field with encoded bytes. Data in the __unrecognized__
field may include the following: Protobuf extension values, invalid data, fields that have since been deleted from the schema stored in the schema registry, or new fields that aren't yet in sync with the schema registry.
Protobuf extensions#
Bufstream doesn't currently support creating fields in Iceberg schemas representing Protobuf extensions. If Protobuf messages contain extensions, the extension values populate the __unrecognized__
field in key or val.
Compacted topics#
Bufstream doesn't currently support using Iceberg as the archive format for compacted topics.
Apache Avro™ and JSON#
If a topic uses Protobuf as its message data format, Bufstream ensures that the Iceberg schema mirrors the Protobuf message schema. However, for other message formats such as Avro and JSON, Bufstream represents the message keys and values as opaque binary columns.
Memory and performance#
Enabling Bufstream's Iceberg integration may result in higher read latency and higher memory usage if consumers are lagging. Transforming columnar Parquet data to records requires random access to the file, so Bufstream caches large portions of the file in-memory to reduce object storage operations (and thus also reduce cost).
To maintain Iceberg table freshness and consistency, we recommend adjusting the Bufstream cleanup interval. The default for Bufstream's cleanup jobs is 6 hours. We recommend a 1 hour cleanup interval for any topics archived as Iceberg.
The above adjustments decrease the latency in getting data into the source Iceberg table in the face of temporary errors, like a momentary network partition or outage of the Iceberg catalog. In particular, the reconciliation process (when a synchronous catalog update fails) happens on the same schedule as compaction and cleaning.
Because Bufstream doesn't currently compact Iceberg topics, this process isn't designed to improve query performance. Without regular compaction, there's a tradeoff between the latency of getting records into the Iceberg table vs. query performance.
-
The element type of an Iceberg list is derived from the element type of the repeated Protobuf field. Similarly, the key and value types of an Iceberg map are derived from the key and value types of the Protobuf map field. ↩↩
-
Iceberg doesn't support unsigned integers. When unsigned integer values are used in Protobuf, very large values may overflow and appear to be negative values in the Iceberg table. ↩↩
-
Parquet data files use Parquet's "enum" logical type for these string fields. ↩
-
The fields of the Iceberg struct are derived from the fields of the Protobuf message type. In addition to all fields in the Protobuf message definition, Iceberg structs also have a binary
__unrecognized__
field to store any unrecognized field bytes encountered when de-serializing record data. ↩