bufstream.yaml
The bufstream.yaml
file defines configuration for a Bufstream agent. The
Bufstream CLI can be instructed to use the configuration file with the -c
flag.
Fields
name
string
The name of this Bufstream agent.
Names should be unique for each agent in the cluster. Defaults to the hostname. Do not store sensitive information in this field. The name may be stored in logs, traces, metrics, etc.
cluster
string
The name of the cluster.
All agents in the same cluster should have the same value. Do not store sensitive information in this field. The cluster path may be stored in keys, logs, traces, metrics, etc.
zone
string
The location of the agent, e.g., the datacenter/availability zone where the agent is running.
If unspecified, the agent will attempt to resolve an availability zone from the host's metadata service. Do not store sensitive information in this field. The zone may be stored in keys, logs, traces, metrics, etc.
observability
Configuration of observability and debugging utilities exposed by the agent.
etcd
If specified, the agent will use Etcd as the metadata storage of the cluster.
This field is mutually exclusive with in_memory
.
in_memory
bool
If true, the agent will use an in-memory cache for metadata storage.
This option is intended for local use and testing, and only works with single agent clusters.
This field is mutually exclusive with etcd
.
auto_migrate_metadata_storage
bool
If true, the agent will run migrations for the metadata storage on startup.
Only one agent per cluster should have this option enabled.
storage
The data storage configuration.
actors
list<Actor
>
The actors that are enabled on this agent.
If empty, all actors are enabled. This field is mutually exclusive with disabled_actors
.
disabled_actors
list<Actor
>
The actors that are disabled on this agent.
This field is mutually exclusive with actors
.
dispatch
Configuration for dispatching of requests and data flow between agents.
intake
Configuration for intake and processing of produced data.
cache
Configuration for caches maintained by the agent.
archive
Configuration for archiving and compaction performed by the agent.
kafka
Configuration for the Kafka interface.
data_enforcement
Configuration for data enforcement via schemas of records flowing in and out of the agent.
available_memory_bytes
uint64
The maximum amount of memory bufstream should consider usable.
By default this is expected to be 4GiB per vCPU, as determined at startup. The configured value is not a hard limit, and is used to influence the default sizes of various caches. Explicitly setting a cache size elsewhere overrides any settings derived from this value.
labels
Labels associated with the Bufstream agent.
Labels may appear in logs, metrics, and traces.
connect_address
The address to listen on for inter-agent Connect RPCs.
By default, agents bind to a random, available port on localhost.
connect_public_address
The public address advertised to other agents.
This field should only be set if different from connect_address
.
connect_http_version
The HTTP version to use for inter-agent Connect RPCs.
By default, HTTP/1.1 is used.
connect_isolation
bool
Whether inter-agent Connect clients should be unique for reads and writes.
Disabled by default. Recommended when using HTTP/2 in connect_http_version
.
record_expiry_delay_max
duration
How often to scan all owned partitions to (try to) delete expired records.
Defaults to 6h.
fetch_sync_group_count
int32
The number of 'groups' to cluster fetchers into for synchronization at the same log append time.
Dynamically configurable as bufstream.fetch.sync.group.count
.
Sub-Messages
ObservabilityConfig
Configuration for observability primitives
log_level
log level, defaults to INFO
log_format
log format, defaults to TEXT when connected to a terminal, otherwise JSON.
log_git_version
bool
If set, include "version=
metrics_exporter
OpenTelemetry exporter for metrics, defaults to NONE.
metrics
Configuration for metrics.
debug_address
If configured, pprof and prometheus exported metrics will be exposed on this address.
trace_exporter
OpenTelemetry exporter for traces, defaults to NONE.
traces
Configuration for traces.
trace_ratio
float64
OpenTelemetry trace sample ratio, defaults to 1.
exporter
Default values for metrics and traces exporters.
sensitive_information_redaction
Redact sensitive information such as topic names, before adding to to metrics, traces and logs.
EtcdConfig
Configuration options specific to etcd metadata storage.
addresses
list<Address
>
The etcd node addresses.
Currently, Bufstream assumes no TLS and no path-prefix when connecting to the etcd cluster.
session_ttl_seconds
int32
The amount of time an etcd node can be unreachable before it is considered down.
After this TTL, the agent's presence in the cluster is essentially lost. Currently, the agent will shutdown if this TTL is exceeded.
StorageConfig
Configuration options specific to data storage.
provider
The data storage provider.
If unspecified, a provider is automatically resolved with the following heuristics:
- If
bucket
is set, we attempt to resolve metadata from the host - If the AWS metadata service responds, we assume
S3
- Otherwise, we assume
GCS
- If
in_memory
is set on the root configuration, we assumeINLINE
- Otherwise, we assume
LOCAL_DISK
region
string
The region in which the bucket
exists.
This field defaults to the region of the agent's host.
bucket
string
The object storage bucket where data is stored.
This field is required for GCS
and S3
providers.
prefix
string
The path prefix of objects stored in the data storage.
Defaults to bufstream/
endpoint
string
The provider's HTTPS endpoint to use instead of the default
force_path_style
bool
Enable path-based routing (instead of subdomains) for buckets.
access_key_id
Specifies the AWS access key ID for authentication to the bucket.
By default, authentication is performed using the metadata service off the
agent's host. If set, secret_access_key
must also be provided.
secret_access_key
Specifies the AWS secret access key for authentication to the bucket.
By default, authentication is performed using the metadata service off the
agent's host. If set, access_key_id
must also be provided.
get_hedge_delay
duration
How long before a GET request to the data storage provider is hedged with an additional request.
Hedging improves p99 performance of requests to the storage provider. Defaults to 250ms.
debug_logging
Enables data storage debug logging at the specified level.
This level must be equal to or higher than the level specified in
observability.log_level
.
put_hedge_delay
duration
Enables hedging of PUT requests to the data storage provider with the specified delay.
Hedging of PUT requests should only be hedged for S3 and GCS providers.
write_isolation
bool
If writes should use the same clients as reads.
By default, different clients are used between reads and writes.
DispatchConfig
Configuration options specific to request dispatching and data flow between agents.
local_intake_cache
bool
Whether the intake cache should be handled separately by each node or sharded among multiple nodes.
local_produce
bool
Whether calls to produce records should be handled separately by each node or sharded among multiple nodes.
local_fetch
bool
Whether calls to fetch records should be handled separately by each node or sharded among multiple nodes.
unavailable_retry_count
int32
The number of retry attempts to make when an Unavailable error is encountered.
When N, N retries, N+1 attempts in total.
IntakeConfig
Configuration options specific to intake and processing of produced data.
delay_max
duration
The maximum delay to wait before writing an intake file.
Dynamically configurable as bufstream.intake.delay.max.ms
.
delay_max_bytes
int64
The maximum number of bytes to enqueue before writing an intake file.
Dynamically configurable as bufstream.intake.delay.max.bytes
.
txn_timeout_max
duration
The maximum timeout for all transactions.
txn_timeout_default
duration
The default timeout for a new transactions.
log_append_time_difference_max
duration
The maximum difference between intake write time and log append time.
Dynamically configurable as bufstream.intake.log.append.time.difference.max.ms
.
recent_sequence_eager
bool
Whether recent messages should be sequenced actively.
When true, recent messages will be sequenced as soon as they are available. When false, recent messages will be sequenced only when requested.
producer_id_batch_size
int32
How many producer IDs a Bufstream process reserves at a time.
file_delete_delay_max
duration
How often to scan all intake files to (try to) delete old files.
write_through_cache
bool
Whether intake entries should be written through the cache.
write_stream
bool
Whether intake entries should be streamed when written.
write_stream_chunk_bytes
int32
The maximum number of bytes to write in a single intake write stream chunk.
shelf_msg_max
int32
The maximum number of recent messages to shelve in at a time.
recent_msg_min
int32
The minimum number of recent messages to keep for each topic/partition.
end_txn_skew_max
duration
The maximum amount of time an end transaction request can appear to come before the last modification to the transaction.
end_txn_revision_check
bool
Whether to record the revision that the end transaction request was started at and to fail the request if the transaction changed while active since then.
CacheConfig
Configuration options specific to the cache behavior of the agent.
intake_max_bytes
int64
The maximum number of intake file bytes to keep in memory.
shelf_max_bytes
int64
The maximum number of shelf bytes to keep in memory.
archive_max_bytes
int64
The maximum number of archive log entry bytes to keep in memory.
fetch_record_max_bytes
int64
The maximum number of record bytes fetched from recent or shelf messages to keep in memory.
kafka_fetch_eager_max_bytes
int64
The maximum number of record bytes to keep in memory for eagerly fetched records.
producer_max
int32
The maximum number of producers tracked per topic/partition. (May be exceeded due to other constraints.)
Each topic/partition tracks the sequence number and transaction state for each producer writing to it.
The sequence number may be forgotten for the least-recently-used producer, when this limit is exceeded.
ArchiveConfig
Configuration options specific to the archiving of the agent.
min_bytes
int64
Determines when to start writing an archive for any topart.
When -1, no archive ever starts. When 0, an archive starts as soon as a shelf write is detected (see start_delay_max) or a previous archive completes (unless the topic/partition was idle). When >0, an archive starts once the accumulation of that many bytes is detected (see start_delay_max) in the shelves.
An archive completes when:
- It contains more than
max_bytes
(at a suitable data boundary). - No new records are produced for
idle_max
. (The topic/partition is idle.) - The archive is
upload_delay_max
old.
Dynamically configurable as bufstream.archive.min.bytes
.
max_bytes
int64
The maximum size of an archive.
Actually the threshold after which an archive is completed.
Dynamically configurable as bufstream.archive.max.bytes
.
start_delay_max
duration
How often to check a topic/partition to start a new archive.
complete_delay_max
duration
The maximum time before an archive upload is completed.
Dynamically configurable as bufstream.archive.complete.delay.max.ms
.
idle_max
duration
The duration to wait for more data before completing an archive.
When 0, an archive completes as soon as there are no more records to archive. When >0, an archive completes after waiting this duration with no new records.
Dynamically configurable as bufstream.archive.idle.max.ms
.
concurrent_max
int32
The maximum number of topic/partitions to archive at once. When unset (or 0), the default limit is used. When -1, no limit is enforced. When >0, only that many topic/partitions are archived at once per node.
fetch_sync
bool
Whether archive fetches should be synchronized to read at the same log append time.
Dynamically configurable as bufstream.archive.fetch.sync
.
fetch_max_batches
int32
The maximum number of batches to fetch from an archive in a single request, per topic/partition.
Set to 0 to use the default value. Set to -1 to disable the limit.
follow_active
bool
Whether archive creation should try to read/write from the last active zone.
The last active zone is the zone that most recently read the topic/partition. Or is no zone has read the topic/partition, the zone that most recently wrote to it.
default_log_level
The default log level for background archive operations.
KafkaConfig
Configuration options specific to the agent's Kafka interface
address
The address the Kafka server should listen on.
Defaults to a random available port on localhost.
public_address
The public address clients should use to connect to the Kafka server, if different from address
.
tls
If populated, enables and enforces TLS termination on the Kafka server.
fetch_eager
bool
If a fetch should return as soon as any records are available.
When false, fetch wait for every topic/partition to be queried. When true, fetch returns as soon as any topic/partition has records, and the rest are fetched in the background under the assumption the client will try to fetch them in a subsequent request.
Dynamically configurable as bufstream.kafka.fetch.eager
.
fetch_eager_offset_strategy
The strategy to use when no data is available for a topic partition.
fetch_sync
bool
If fetches from different readers should be synchronized to improve cache hit rates.
Dynamically configurable as bufstream.kafka.fetch.sync
.
produce_concurrent
bool
If records from a producer to different topic/partitions may be sequenced concurrently instead of serially.
Dynamically configurable as bufstream.kafka.produce.concurrent
.
zone_balance_strategy
How to balance clients across zones, when then client does not specify a zone.
Dynamically configurable as bufstream.kafka.zone.balance.strategy
.
partition_balance_strategy
How to balance topic/partitions across bufstream agents.
Dynamically configurable as bufstream.kafka.partition.balance.strategy
.
request_buffer_size
uint32
The number of Kafka requests to unmarshal and buffer before processing.
Defaults to 5.
idle_timeout
duration
How long a Kafka connection can be idle before being closed by the server.
If set a value less than or equal to zero, the timeout will be disabled.
num_partitions
int32
The default number of partitions to use for a new topic.
Dynamically configurable as num.partitions
.
exact_log_sizes
bool
If exact log sizes should be fetched when listing sizes for all topics/partitions.
exact_log_offsets
bool
If exact log high water mark and start offsets should be computed when fetching records.
distinct_hosts
bool
If the casing of hostnames should be randomized per 'broker'.
wait_for_latest
bool
If 'broker' should ensure a topic/partition is fully loaded before serving.
group_consumer_session_timeout
duration
The default group consumer session timeout.
Dynamically configurable as group.consumer.session.timeout.ms
.
group_consumer_session_timeout_min
duration
The minimum group consumer session timeout.
Dynamically configurable as group.consumer.min.session.timeout.ms
.
group_consumer_session_timeout_max
duration
The maximum group consumer session timeout.
Dynamically configurable as group.consumer.max.session.timeout.ms
.
shutdown_grace_period
duration
The grace period to allow clients before shutting down.
DataEnforcementConfig
Configuration of data enforcement policies applied to records.
schema_registries
list<SchemaRegistry
>
The schema registries used for data enforcement.
produce
list<DataEnforcementPolicy
>
Policies to attempt to apply to produce requests. The first policy that matches the topic will be used. If none match, no data enforcement will occur.
fetch
list<DataEnforcementPolicy
>
Policies to attempt to apply to fetch responses. The first policy that matches the topic will be used. If none match, no data enforcement will occur.
Address
A network host and optional port pair.
host
string
A hostname or IP address to connect to.
port
uint32
The associated port. If unspecified, refer to the field documentation for default behavior.
MetricsConfig
Configuration for metrics.
exporter_type
The type of exporter to use.
address
string
The endpoint for OTLP exporter, with a host name and an optional port number. If this is not set, it falls back to observability.exporter.address. If that is not set, it falls back to OTEL's default behavior, using the the host and port of OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, if not found then OTEL_EXPORTER_OTLP_ENDPOINT and finally localhost:4318 for OTLP_HTTP or locahost:4317 for OTLP_GRPC.
For OTLP_HTTP, metrics.path will be appended to this address.
path
string
This url path used by the OTLP_HTTP exporter, this defaults to "/v1/metrics". This is appended to the host and port of the endpoint that the exporter connects to.
insecure
bool
If set to true, TLS is disabled for the OTLP exporter.
omit_partition_attribute
bool
This omits metrics that depend on the kafka.topic.partition attribute, which may have high cardinality depending on the configuration. One example is kafka.topic.partition.offset.high_water_mark. This omits only the attribute for metrics that have this attribute without depending on it. One example is kafka.produce.record.size.
TracesConfig
Configuration for traces.
exporter_type
The type of exporter to use.
address
string
The endpoint for OTLP exporter, with a host name and an optional port number. If this is not set, it falls back to observability.exporter.address. If that is not set, it falls back to the OTEL's default behavior, using the host and port of OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, if not found then OTEL_EXPORTER_OTLP_ENDPOINT and finally localhost:4318 for OTLP_HTTP or locahost:4317 for OTLP_GRPC.
For OTLP_HTTP, traces.path will be appended to this address.
path
string
This url path used by the OTLP_HTTP exporter, this defaults to "/v1/traces". This is appended to the host and port of the endpoint that the exporter connects to.
insecure
bool
If set to true, TLS is disabled for the OTLP exporter.
trace_ratio
float64
OpenTelemetry trace sample ratio, defaults to 1.
ExporterDefaults
Default configuration for metrics and traces exporters.
address
string
The default base address used by OTLP_HTTP and OTLP_GRPC exporters, with a host name and an optional port number. For OTLP_HTTP, "/v1/{metrics, traces}" will be appended to this address, unless the path is overriden by metrics.path or traces.path. If port is unspecified, it defaults to 4317 for OTLP_GRPC and 4318 for OTLP_HTTP.
insecure
bool
If set to true, TLS is disabled for the OTLP exporter. This can be overwritten by metrics.insecure or traces.insecure.
DataSource
Configuration values sourced from various locations.
path
string
A file path to the data relative to the current working directory. Trailing newlines are stripped from the file contents.
env_var
string
An environment variable containing the data.
string
string
An inline string of the data
bytes
base64-bytes
An inline byte blob of the data
encoding
The encoding of the data source value. Defaults to PLAINTEXT.
TLSListenerConfig
TLSListenerConfig is TLS/SSL configuration options for servers. At least one certificate must be specified.
certificates
list<Certificate
>
Certificates to present to the client. The first certificate compatible with the client's requirements is selected automatically.
client_auth
Declare the policy the server will follow for mutual TLS (mTLS).
client_cas
list<DataSource
>
The PEM-encoded certificate authorities used by the server to validate the client certificates. This field cannot be empty if client_auth performs verification.
SchemaRegistry
A single schema registry used in data enforcement.
name
string
Name of this registry, used to disambiguate multiple registries used across policies.
confluent
Confluent Schema Registry
DataEnforcementPolicy
A set of policies to apply data enforcement rules on records flowing into or out Kafka.
topics
Apply these policies only if the topic of the record(s) matches. If no topics are specified, the policy will always be applied.
schema_registry
string (required)
The schema registry to use for retrieving schemas for this policy.
keys
The policy to apply to a record's key. If unset, enforcement will not occur.
values
The policy to apply to a record's value. If unset, enforcement will not occur.
Certificate
A certificate chain and private key pair.
chain
DataSource
(required)
The PEM-encoded leaf certificate, which may contain intermediate certificates following the leaf certificate to form a certificate chain.
private_key
DataSource
(required)
The PEM-encoded (unencrypted) private key of the certificate chain.
CSRConfig
Configuration for the Confluent Schema Registry (CSR) API.
url
string
Root URL (including protocol and any required path prefix) of the CSR API.
instance_name
string
Name of the CSR instance within the BSR. This name is used to disambiguate subjects of the same name within the same schema file. Used exclusively for schema coercion.
tls
TLS configuration. If unset and the url field specifies https, a default configuration is used.
basic_auth
Authenticate against the CSR API using basic auth credentials
StringMatcher
Provides match rules to be applied to string values
invert
bool
Inverts the matching behavior (effectively "not").
all
bool
Matches all values; useful as a catch-all.
equal
string
Matches case-sensitively.
in
Matches case-sensitively any of the values in the set.
Element
Rules applied to either the key or value of a record.
name_strategy
The strategy used to associate this element with the subject name when looking up the schema.
coerce
bool
If the element is not wrapped in the schema registries expected format and a schema is associated with it, setting this field to true will attempt to resolve a schema for the element and wrap it correctly.
on_internal_error
The action to perform for internal errors (e.g., unavailability of the schema registry). If unset, the default behavior is REJECT_BATCH in produce and PASS_THROUGH in fetch.
on_no_schema
The action to perform for elements that do not have a schema associated with them. If skip_parse is true, this action will apply if the message is not in the appropriate schema wire format. If unset, the default behavior is PASS_THROUGH.
skip_parse
bool
If true, will skip verifying that the schema applies to the element's contents. If set with coerce, coerced messages will be identified as the latest version of the element's schema and may be erroneous. Setting this field is mutually exclusive with validation and redaction.
on_parse_error
The action to perform for elements that fail to parse with their associated schema. Fetch policies should not REJECT_BATCH to avoid blocking consumers.
validation
If set, parsed messages will have semantic validation applied to them based off their schema.
redaction
If set, parsed messages will have the specified fields redacted. For produce, this will result in data loss.
TLSDialerConfig
TLSDialerConfig is TLS/SSL configuration options for clients. The empty value of this message is a valid configuration for most applications.
certificate
Certificate to present if client certificate verification is enabled on the server (i.e., mTLS).
insecure_skip_verify
bool
Controls whether a client verifies the server's certificate chain and host name. If true, the dialer accepts any certificate presented by the server and host name in that certificate. In this mode, TLS is susceptible to machine-in-the-middle attacks and should only be used for testing.
root_cas
list<DataSource
>
The PEM-encoded root certificate authorities used by the client to validate the server certificates. If empty, the host's root CA set is used.
BasicAuth
Basic Authentication username/password pair.
username
DataSource
(required)
The source of the basicauth username.
password
DataSource
(required)
The source of the basicauth password.
StringSet
Effectively a wrapped repeated string
to accomodate usage in a oneof or
differentiating a null and empty list.
values
list<string>
ValidationPolicy
The semantic validation rules applied to parsed elements during data enforcement.
on_error
The action to perform if the element fails semantic validation defined in the schema. Fetch policies should not REJECT_BATCH to avoid blocking consumers.
RedactPolicy
The redaction rules applied to parsed elements during data enforcement.
fields
Strip fields with matching names.
debug_redact
bool
Strip fields from the element annotated with the debug_redact field option (proto only).
shallow
bool
By default, fields will be redacted recursively in the message. If shallow is set to true, only the top level fields will be evaluated.
Enums
Actor
ACTOR_WRITER
ACTOR_READER
ACTOR_SEQUENCER
ACTOR_CASHIER
ACTOR_CLEANER
ConnectHttpVersion
HTTP version options used by ConnectRPC clients.
CONNECT_HTTP_VERSION_1_1
HTTP/1.1
CONNECT_HTTP_VERSION_2_0
HTTP/2
Level
DEBUG
INFO
WARN
ERROR
Format
TEXT
JSON
Exporter
NONE
STDOUT
HTTP
HTTPS
PROMETHEUS
SensitiveInformationRedaction
Redact sensitive information such as topic names, before adding to to metrics, traces and logs.
UNSPECIFIED
NONE
This shows sensitive information as is. For example, topic names will be included as attributes in metrics.
OPAQUE
This shows sensitive information as opaque strings. For example, topic IDs (UUIDs) will be included, instead of topic names.
OMITTED
This omits sensitive information. For example, attributes including topic names will not added to metrics. If a metric only makes sense with this attribute, it will also be omitted. kafka.topic.partition.offset.high_water_mark is one such example.
Provider
The provider options for data storage.
S3
AWS S3 or S3-compatible service (e.g., LocalStack)
GCS
GCP GCS service
LOCAL_DISK
Local, on-disk storage
This option is for debugging purposes and should only be used by clusters that share the same filesystem.
INLINE
Use metadata storage (e.g., in_memory or etcd).
This option should only be used for testing purposes.
FetchEagerOffsetStrategy
FETCH_EAGER_OFFSET_STRATEGY_FAKE
Return fake offsets when no data is available.
FETCH_EAGER_OFFSET_STRATEGY_CACHE
Recturn cached offsets when no data is available.
FETCH_EAGER_OFFSET_STRATEGY_LATEST
Return latest offsets when no data is available.
BalanceStrategy
Balance strategies for distributing client connections and partition assignments within the cluster.
BALANCE_STRATEGY_PARTITION
Balance based on a hash of the partition ID
BALANCE_STRATEGY_HOST
Balance based on a hash of the host name
BALANCE_STRATEGY_CLIENT_ID
Balance based on a hash of the client ID
ExporterType
NONE
STDOUT
OTLP_HTTP
OTLP_GRPC
PROMETHEUS
ExporterType
NONE
STDOUT
OTLP_HTTP
OTLP_GRPC
Encoding
PLAINTEXT
Value is treated as-is.
BASE64
Value is treated as standard RFC4648 (not URL) base64-encoded with '=' padding.
Type
NO_CERT
No client certificate will be requested during the handshake. If any certificates are sent, they will not be verified.
REQUEST_CERT
Server will request a client certificate during the handshake, but does not require that the client send any certificates. Any certificates sent will not be verified.
REQUIRE_CERT
Server requires clients to send any certificate during the handshake, but the certificate will not be verified.
VERIFY_CERT_IF_GIVEN
Server will request a client certificate during the handshake, but does not require that the client send any certificates. If the client does send a certificate, it must be valid.
REQUIRE_AND_VERIFY_CERT
Server will request and require clients to send a certificate during the handshake. The certificate is required to be valid.
SubjectNameStrategy
The strategy used to create the identifier (subject) used to lookup the schema of a record. Typically the strategy is derived from the topic name and which element (key or value) of the record is being deserialized.
TOPIC_NAME_STRATEGY
The default Confluent Schema Registry strategy, of the form
"
EnforcementAction
The action to perform when an error occurs.
PASS_THROUGH
Log and emit metrics on failure, but allow the record and its batch to pass through regardless. Useful for testing a new policy before rolling out to production.
REJECT_BATCH
Rejects the record batch containing the error, returning an error to the caller. This action should not be used with fetch responses, as rejecting batches on the fetch side will result in blocked consumers.
FILTER_RECORD
Filters out the record from the batch, while preserving the rest of the data. Note that this will result in data loss if used on the producer side. On the consumer side, invalid records will be skipped.