Kafka
Overview
The Kafka output can be used to send records to Apache Kafka, Confluent Platform, Confluent Cloud, Redpanda and Amazon Managed Streaming for Apache Kafka (Amazon MSK).
EF_OUTPUT_KAFKA_ENABLE
Specifies whether the Kafka output is enabled.
- Valid Values
true
,false
- Default
false
EF_OUTPUT_KAFKA_BROKERS
A comma-separated list of brokers, IP address and port number, to which the collector is to connect.
- Example
192.0.2.11:9092,192.0.2.12:9092,broker1.example.com:9092
- Default
127.0.0.1:9092
EF_OUTPUT_KAFKA_VERSION
The version of Kafka to which the collector will assume it is connecting.
Kafka provides backwards-compatibility, so specifying an older version is OK. However specifying a version greater than the brokers will likely cause issues issues.
- Default
1.0.0
EF_OUTPUT_KAFKA_CLIENT_ID
A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.
- Default
elastiflow-flowcoll
EF_OUTPUT_KAFKA_RACK_ID
A rack identifier. This can be any string value which indicates where the collector is physically located.
- Default
''
EF_OUTPUT_KAFKA_TIMEOUT
The amount of time, in seconds, that the client will wait for the connection to be established.
- Default
30
EF_OUTPUT_KAFKA_TOPIC
The Kafka topic to which messages will be produced.
- Default
elastiflow-flow-codex
EF_OUTPUT_KAFKA_PARTITION_KEY
Defines a field from the message which will be used to set the partitioning key.
- Default
flow.export.ip.addr
EF_OUTPUT_KAFKA_DROP_FIELDS
This setting allows for a comma-separated list of fields that are to be removed from all records.
Fields are dropped after any output specific fields have been added and after any schema conversion. This means that you should use the field names as you see them in the user interface.
- Valid Values
- any field names related to the enabled schema, comma-separated
- Example
flow.export.sysuptime,flow.export.version.ver,flow.start.sysuptime,flow.end.sysuptime,flow.seq_num
- Default
''
EF_OUTPUT_KAFKA_ALLOWED_RECORD_TYPES
This setting allows for a comma-separated list of record types that the output will send will emit. This is particularly useful when used with multiple namespaced outputs, e.g. sending flow records to one datastore and telemetry to another.
- Valid Values
as_path_hop
,flow_option
,flow
,ifa_hop
,telemetry
,metric
- Default
'as_path_hop,flow_option,flow,ifa_hop,telemetry,metric'
EF_OUTPUT_KAFKA_SASL_ENABLE
Specifies whether SASL based authentication is used when connecting to the Kafka brokers. While there are multiple SASL authentication methods the current implementation is limited to plaintext (SASL/PLAIN) authentication.
- Default
false
EF_OUTPUT_KAFKA_SASL_USERNAME
The authentication identity (authcid) to present for SASL/PLAIN authentication.
- Default
''
EF_OUTPUT_KAFKA_SASL_PASSWORD
The password to use for SASL/PLAIN authentication.
- Default
''
EF_OUTPUT_KAFKA_TLS_ENABLE
This setting is used to enable/disable TLS connections to Kafka.
- Valid Values
true
,false
- Default
false
EF_OUTPUT_KAFKA_TLS_CA_CERT_FILEPATH
The path to the Certificate Authority (CA) certificate to use for connecting to the Kafka brokers.
- Default
''
EF_OUTPUT_KAFKA_TLS_CERT_FILEPATH
The path to the TLS certificate to use for connecting to the Kafka brokers.
- Default
''
EF_OUTPUT_KAFKA_TLS_KEY_FILEPATH
The path to the TLS key to use for connecting to the Kafka brokers.
- Default
''
EF_OUTPUT_KAFKA_TLS_SKIP_VERIFICATION
This setting is used to enable/disable TLS verification of the Kafka brokers to which the output is attempting to connect.
- Valid Values
true
,false
- Default
false
EF_OUTPUT_KAFKA_PRODUCER_MAX_MESSAGE_BYTES
The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's message.max.bytes
.
- Default
1000000
EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS
The level of acknowledgement reliability needed from the broker (defaults to 1 - WaitForLocal).
- Valid Values
0
- (NoResponse) Doesn't require any acknowledgement other than the TCP ACK that the message payload was received.1
- (WaitForLocal) Waits for only the receiving broker to acknowledge commitment of the message.-1
- (WaitForAll) Waits for the topic's minimum in-sync replicas to acknowledge commitment of the message. The minimum number of in-sync replicas is configured for Kafka topic via themin.insync.replicas
attribute.
- Default
1
EF_OUTPUT_KAFKA_PRODUCER_TIMEOUT
The maximum duration, in seconds, that the producer will wait for the RequiredAcks defined in EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS
. This is only relevant when EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS
is set to -1
(WaitForAll).
- Default
10
EF_OUTPUT_KAFKA_PRODUCER_COMPRESSION
The type of compression to use on messages (defaults to no compression).
Beginning with ElastiFlow 6.3.0
the Kafka output's default value for this setting was changed to 3
(LZ4). Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of 0
(none), you should ensure that it is specifically set in your configuration.
- Valid Values
0
- none1
- Gzip2
- snappy3
- LZ44
- ZSTD
- Default
3
EF_OUTPUT_KAFKA_PRODUCER_COMPRESSION_LEVEL
The level of compression to use on messages. The meaning depends on the actual compression type used and defaults to the compression codecs default level.
- Default
-1000
EF_OUTPUT_KAFKA_PRODUCER_FLUSH_BYTES
The best-effort number of bytes needed to trigger a flush.
EF_OUTPUT_KAFKA_PRODUCER_FLUSH_BYTES
should not be set to a value larger than the message.max.bytes
setting of the Kafka topic to which records are to be written.
- Default
1000000
EF_OUTPUT_KAFKA_PRODUCER_FLUSH_MESSAGES
The best-effort number of messages needed to trigger a flush.
- Default
1024
EF_OUTPUT_KAFKA_PRODUCER_FLUSH_FREQUENCY
The best-effort frequency of flushes, in milliseconds.
Beginning with ElastiFlow 6.3.0
the Kafka output's default value for this setting was changed to 1000
. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of 500
, you should ensure that it is specifically set in your configuration.
- Default
1000
EF_OUTPUT_KAFKA_PRODUCER_FLUSH_MAX_MESSAGES
The maximum number of messages the producer will send in a single broker request.
- Default
0
(unlimited)
EF_OUTPUT_KAFKA_PRODUCER_RETRY_MAX
The total number of times to retry sending a message.
- Default
3
EF_OUTPUT_KAFKA_PRODUCER_RETRY_BACKOFF
The period of time, in milliseconds, to wait for the Kafka cluster to settle between retries.
- Default
100
EF_OUTPUT_KAFKA_FLAT_RECORD_ENABLE
The Kafka output can send JSON records using nested or flattened field names. Historically only nested records were supported, which is why the default value for this setting is false
. However a record consisting of flattened fields names reduces resource requirements and improves throughput.
Beginning with ElastiFlow 6.3.0
the Kafka output's default value for this setting was changed to true
. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of false
, you should ensure that it is specifically set in your configuration.
- Valid Values
true
,false
- Default
true
EF_OUTPUT_KAFKA_ECS_ENABLE
Specifies whether the data will be sent using Elastic Common Schema (ECS).
- Valid Values
true
,false
- Default
false
EF_OUTPUT_KAFKA_TIMESTAMP_SOURCE
Determines the timestamp source to be used to set the @timestamp
field. Usually end
would be the best setting. However, in the case of poorly behaving or misconfigured devices, collect
may be the better option.
Beginning with ElastiFlow 6.3.0
the Kafka output's default value for this setting was changed to collect
. This will allow the collector to handle a wider variety of situations without additional configuration. If you wish to continue to use the old default setting of end
, you should ensure that it is specifically set in your configuration.
- Valid Values
start
- Use the timestamp fromflow.start.timestamp
. The flow start time indicated in the flow.end
- Use the timestamp fromflow.end.timestamp
. The flow end time (or last reported time).export
- Use the timestamp fromflow.export.timestamp
. The time from the flow record header.collect
- Use the timestamp fromflow.collect.timestamp
. The time that the collector processed the flow record.
- Default
collect