# Kafka

The Kafka output can be used to send records to [Apache Kafka](https://kafka.apache.org), [Confluent Platform](https://www.confluent.io/product/confluent-platform/), [Confluent Cloud](https://www.confluent.io/confluent-cloud/), [Redpanda](https://redpanda.com) and [Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/).

## EF\_OUTPUT\_KAFKA\_ENABLE

Specifies whether the Kafka output is enabled.

* Valid Values
  * `true`, `false`
* Default
  * `false`

## EF\_OUTPUT\_KAFKA\_POOL\_SIZE

Specifies the number of Kafka output workers to start. An output worker is an instance of a Kafka producer.

* Default
  * Based on the number of licensed flows or devices, depending on the collector.

## EF\_OUTPUT\_KAFKA\_BROKERS

A comma-separated list of brokers, IP address and port number, to which the collector is to connect.

* Example
  * `192.0.2.11:9092,192.0.2.12:9092,broker1.example.com:9092`
* Default
  * `127.0.0.1:9092`

## EF\_OUTPUT\_KAFKA\_VERSION

The version of Kafka to which the collector will assume it is connecting.

{% hint style="danger" %}
Kafka provides backwards-compatibility, so specifying an older version is OK. However, specifying a version greater than the brokers will likely cause issues.
{% endhint %}

* Default
  * `1.0.0`

## EF\_OUTPUT\_KAFKA\_CLIENT\_ID

A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.

* Default
  * `elastiflow`

## EF\_OUTPUT\_KAFKA\_RACK\_ID

A rack identifier. This can be any string value which indicates where the collector is physically located.

* Default
  * `''`

## EF\_OUTPUT\_KAFKA\_TIMEOUT

The amount of time, in seconds, that the client will wait for the connection to be established.

* Default
  * `30`

## EF\_OUTPUT\_KAFKA\_TOPIC

The Kafka topic to which messages will be produced.

* Default
  * `elastiflow-flow-codex`

## EF\_OUTPUT\_KAFKA\_PARTITION\_KEY

Defines a field from the message which will be used to set the partitioning key.

* Default
  * `flow.export.ip.addr`

## EF\_OUTPUT\_KAFKA\_DROP\_FIELDS

This setting allows for a comma-separated list of fields that are to be removed from all records.

{% hint style="info" %}
Fields are dropped after any output specific fields have been added and after any schema conversion. This means that you should use the field names as you see them in the user interface.
{% endhint %}

* Valid Values
  * any field names related to the enabled schema, comma-separated
* Example
  * `flow.export.sysuptime,flow.export.version.ver,flow.start.sysuptime,flow.end.sysuptime,flow.seq_num`
* Default
  * `''`

## EF\_OUTPUT\_KAFKA\_ALLOWED\_RECORD\_TYPES

This setting allows for a comma-separated list of record types that the output will send will emit. This is particularly useful when used with multiple namespaced outputs, e.g. sending flow records to one datastore and telemetry to another.

* Valid Values
  * `as_path_hop`, `flow_option`, `flow`, `ifa_hop`, `telemetry`, `metric`, `log`
* Default
  * `'as_path_hop,flow_option,flow,ifa_hop,telemetry,metric'`

## EF\_OUTPUT\_KAFKA\_SASL\_ENABLE

Specifies whether SASL based authentication is used when connecting to the Kafka brokers. While there are multiple SASL authentication methods, the current implementation is limited to plaintext (SASL/PLAIN) authentication.

* Default
  * `false`

## EF\_OUTPUT\_KAFKA\_SASL\_USERNAME

The authentication identity (authcid) to present for SASL/PLAIN authentication.

* Default
  * `''`

## EF\_OUTPUT\_KAFKA\_SASL\_PASSWORD

The password to use for SASL/PLAIN authentication.

* Default
  * `''`

## EF\_OUTPUT\_KAFKA\_TLS\_ENABLE

This setting is used to enable/disable TLS connections to Kafka.

* Valid Values
  * `true`, `false`
* Default
  * `false`

## EF\_OUTPUT\_KAFKA\_TLS\_CA\_CERT\_FILEPATH

The path to the Certificate Authority (CA) certificate to use for connecting to the Kafka brokers.

* Default
  * `''`

## EF\_OUTPUT\_KAFKA\_TLS\_CERT\_FILEPATH

The path to the TLS certificate to use for connecting to the Kafka brokers.

* Default
  * `''`

* Default
  \*

## EF\_OUTPUT\_KAFKA\_TLS\_SKIP\_VERIFICATION

This setting is used to enable/disable TLS verification of the Kafka brokers to which the output is attempting to connect.

* Valid Values
  * `true`, `false`
* Default
  * `false`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_MAX\_MESSAGE\_BYTES

The maximum permitted size of a message (defaults to 1\_000\_000). Should be set equal to or smaller than the broker's `message.max.bytes`.

* Valid Values
  * `0` (unlimited)
  * `>=1`
* Default
  * `1048576`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_REQUIRED\_ACKS

The level of acknowledgement reliability needed from the broker (defaults to 1 - WaitForLocal).

* Valid Values
  * `0` - (NoResponse) Doesn't require any acknowledgement other than the TCP ACK that the message payload was received.
  * `1` - (WaitForLocal) Waits for only the receiving broker to acknowledge commitment of the message.
  * `-1` - (WaitForAll) Waits for the topic's minimum in-sync replicas to acknowledge commitment of the message. The minimum number of in-sync replicas is configured for a Kafka topic via the `min.insync.replicas` attribute.
* Default
  * `1`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_TIMEOUT

The maximum duration, in seconds, that the producer will wait for the RequiredAcks defined in `EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS`. This is only relevant when `EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS` is set to `-1` (WaitForAll).

* Default
  * `10`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_COMPRESSION

The type of compression to use on messages (defaults to no compression).

{% hint style="info" %}
Beginning with ElastiFlow `6.3.0` the Kafka output's default value for this setting was changed to `3` (LZ4). Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of `0` (none), you should ensure that it is specifically set in your configuration.
{% endhint %}

* Valid Values
  * `0` - none
  * `1` - Gzip
  * `2` - snappy
  * `3` - LZ4
  * `4` - ZSTD
* Default
  * `3`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_COMPRESSION\_LEVEL

The level of compression to use on messages. The meaning depends on the actual compression type used and defaults to the compression codec's default level.

* Default
  * `-1000` (use the codec's default level)

## EF\_OUTPUT\_KAFKA\_PRODUCER\_FLUSH\_BYTES

The best-effort number of bytes needed to trigger a flush. This setting is per broker. If no partition key is set, i.e. messages are distributed to partitions round-robin, each broker will receive approximately the same number of bytes. If a broker has two partitions of the topic, messages will be split between the two partitions. A partition which is the only partition of the topic on the broker will receive all of the produced messages and have approximately twice the volume of messages as on the broker with two partitions.

{% hint style="danger" %}
`EF_OUTPUT_KAFKA_PRODUCER_FLUSH_BYTES` should not be set to a value larger than the `message.max.bytes` setting of the Kafka topic to which records are to be written.
{% endhint %}

* Valid Values
  * `0` (unlimited)
  * `>=1`
* Default
  * `1048576`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_FLUSH\_MESSAGES

The best-effort number of messages needed to trigger a flush. This setting is per broker. If no partition key is set, i.e. messages are distributed to partitions round-robin, each broker will receive the same number of messages. If a broker has two partitions of the topic, messages will be split between the two partitions. A partition which is the only partition of the topic on the broker will receive all of the produced messages and have approximately twice as many messages as on the broker with two partitions.

* Valid Values
  * `0` (unlimited)
  * `>=1`
* Default
  * `1024`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_FLUSH\_FREQUENCY

The best-effort frequency of flushes, in milliseconds. This setting is per partition. If multiple partitions for the same topic are on the same broker, messages will be flushed to each partition at this interval. If no partition key is set, i.e. messages are distributed to partitions round-robin, a broker with two partitions will receive approximately twice as many messages as a broker with only one partition.

{% hint style="danger" %}
Beginning with ElastiFlow `7.17.0` the Kafka output's default value for this setting was changed to `2000`. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of `1000`, you should ensure that it is specifically set in your configuration.
{% endhint %}

* Valid Values
  * `0` (unlimited)
  * `>=1`
* Default
  * `2000`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_FLUSH\_MAX\_MESSAGES

The maximum number of messages the producer will send in a single broker request.

* Valid Values
  * `0` (unlimited)
  * `>=1`
* Default
  * `0`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_RETRY\_MAX

The total number of times to retry sending a message.

* Default
  * `3`

## EF\_OUTPUT\_KAFKA\_PRODUCER\_RETRY\_BACKOFF

The period of time, in milliseconds, to wait for the Kafka cluster to settle between retries.

* Default
  * `100`

## EF\_OUTPUT\_KAFKA\_FLAT\_RECORD\_ENABLE

The Kafka output can send JSON records using nested or flattened field names. Historically only nested records were supported, which is why the default value for this setting is `false`. However, a record consisting of flattened field names reduces resource requirements and improves throughput.

{% hint style="info" %}
Beginning with ElastiFlow `6.3.0` the Kafka output's default value for this setting was changed to `true`. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of `false`, you should ensure that it is specifically set in your configuration.
{% endhint %}

* Valid Values
  * `true`, `false`
* Default
  * `true`

## EF\_OUTPUT\_KAFKA\_ECS\_ENABLE

Specifies whether the data will be sent using Elastic Common Schema (ECS).

* Valid Values
  * `true`, `false`
* Default
  * `false`

## EF\_OUTPUT\_KAFKA\_TIMESTAMP\_SOURCE

Determines the timestamp source to be used to set the `@timestamp` field. Usually `end` would be the best setting. However, in the case of poorly behaving or misconfigured devices, `collect` may be the better option.

{% hint style="info" %}
Beginning with ElastiFlow `6.3.0` the Kafka output's default value for this setting was changed to `collect`. This will allow the collector to handle a wider variety of situations without additional configuration. If you wish to continue to use the old default setting of `end`, you should ensure that it is specifically set in your configuration.
{% endhint %}

* Valid Values
  * `start` - Use the timestamp from `flow.start.timestamp`. The flow start time indicated in the flow.
  * `end` - Use the timestamp from `flow.end.timestamp`. The flow end time (or last reported time).
  * `export` - Use the timestamp from `flow.export.timestamp`. The time from the flow record header.
  * `collect` - Use the timestamp from `flow.collect.timestamp`. The time that the collector processed the flow record.
* Default
  * `collect`


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.elastiflow.com/trapcoll/configuration/outputs/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
