Reference
Intake and archive
Bufstream stores topic data in archive files in object storage. Before data is archived by Bufstream it enters the system as an intake file. Intake files include messages from any number of topics and partitions and are grouped according to a time boundary. If you are using transactions, intake files will also include uncommitted messages. Intake files are refined into archives per the configured delay values. Once intake files are refined into archives, each archive file will contain committed messages for a single topic partition.
When the specified archive format is iceberg
, Bufstream will also update the configured Iceberg catalog when it publishes an archive to sync the new data file to the table's metadata. The catalog update process is performed synchronously. If the catalog update fails, it is addressed later via a reconciliation process.
Data transformations
Note
Transformations are only available for Protobuf encoded data.
At the start of each archiving job, Bufstream queries the configured schema registry to fetch the latest message schema and caches it in memory to reduce concurrent queries for the same topic. The retrieved Protobuf schema is used to compute an Iceberg schema, and Bufstream stores the state of the Iceberg schema in the system's metadata store (e.g. etcd) to ensure proper re-use of allocated field IDs. The stored state also tracks deleted field and columns and decides whether to create or rename fields if there are incompatible changes to a column's type. Once the Iceberg schema is computed, Bufstream checks the catalog to determine whether or not the schema has changed. If there are changes, Bufstream will update the schema in the table's metadata. If no Iceberg table exists yet, Bufstream creates it and sets the schema ID to 0 for the computed schema.
From the computed Iceberg schema, Bufstream derives a Parquet schema that is used to write the data files. Bufstream synthesizes elements from the Protobuf data to map it to Parquet column values. At this time, Bufstream does not support customizing the name of Iceberg or Parquet field types via the use of Protobuf options.
Metadata
Because the archived Parquet files are used by Bufstream to serve Kafka subscribers and properly reconstruct the stream, the system must maintain additional metadata in the archive files and, as a result, the Iceberg table. The additional metadata is represented by the schema below:
key
struct
Represents the key in the original published record.
There is a child field named __raw__
(bytes) that will always be populated, and it represents the original bytes of the key.
Optionally, additional fields will be present if a Protobuf message schema is associated with the key of this Bufstream topic.
value
struct
Represents the value in the original published record.
There is a child field named __raw__
(bytes) that is optional, and it represents the original bytes of the value. It will only be populated when there is no Protobuf message schema or there was an error when processing and decoding the Protobuf data. When not populated, the value can be reconstructed from other fields.
Optionally, additional fields will be present if a Protobuf message schema is associated with the value of the Bufstream topic.
headers
list of structs
Key value pairs that Kafka producers attach to records.
key
string
value
bytes
kafka
struct
Metadata pertaining to the Kafka topics, partitions, and records.
offset
int64
The unique identifier for a record within a single topic-partition. This value is assigned by the system at ingestion in order to preserve record ordering when serving subscribers.
event_timestamp
timestamp
A timestamp (defined in microseconds) associated with the record set by the Kafka producer so that events originating in a remote system are preserved when published in a topic.
ingest_timestamp
timestamp
A timestamp (defined in microseconds) assigned by the system when the record is accepted. Ingestion timestamps are not strictly ordered due to clock skew between Bufstream nodes. Because records in the Parquet file are explicitly ordered, the system will adjust the ingestion timestamp so that it is monotonic. Monotonic timestamps are necessary for querying as well as time-based partitioning strategies.
batch_start
int64
This field identifies the first offset in the batch that contained this record. The system can preserve the grouping when reconstructing the batches to send to Kafka consumers.
Memory and Performance
Enabling the Bufstream Iceberg integration may result in higher read latency (particularly for consumers that are lagging) and memory usage as a result of the additional broker and reconciliation processes needed to transform data, archive Parquet files, and read from and update Iceberg catalogs.
To address read performance for Iceberg records, we recommend adjusting the Bufstream cleanup interval. The default for Bufstream's cleanup jobs is 6 hours. We recommend a 1 hour cleanup interval for any topics archived as Iceberg.
The above adjustments decrease the latency needed to get data into the source Iceberg table in the face of temporary errors, like a momentary network partition or outage of the Iceberg catalog. In particular, the reconciliation process (when a synchronous catalog update fails) happens on the same schedule as compaction and cleaning.
Because Bufstream does not currently compact Iceberg topics, this process is not designed to improve query performance. However, it does improve read performance of query engines. It is also likely to reduce cost as it allows for fewer object store reads.