Skip to main content
Version: 6.4

Kafka

Overview

The Kafka output can be used to send records to Apache Kafka, Confluent Platform, Confluent Cloud, Redpanda and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

EF_OUTPUT_KAFKA_ENABLE

Specifies whether the Kafka output is enabled.

  • Valid Values
    • true, false
  • Default
    • false

EF_OUTPUT_KAFKA_BROKERS

A comma-separated list of brokers, IP address and port number, to which the collector is to connect.

  • Example
    • 192.0.2.11:9092,192.0.2.12:9092,192.0.2.13:9092
  • Default
    • 127.0.0.1:9092

EF_OUTPUT_KAFKA_VERSION

The version of Kafka to which the collector will assume it is connecting.

danger

Kafka provides backwards-compatibility, so specifying an older version is OK. However specifying a version greater than the brokers will likely cause issues issues.

  • Default
    • 1.0.0

EF_OUTPUT_KAFKA_CLIENT_ID

A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.

  • Default
    • elastiflow-flowcoll

EF_OUTPUT_KAFKA_RACK_ID

A rack identifier. This can be any string value which indicates where the collector is physically located.

  • Default
    • ''

EF_OUTPUT_KAFKA_TIMEOUT

The amount of time, in seconds, that the client will wait for the connection to be established.

  • Default
    • 30

EF_OUTPUT_KAFKA_TOPIC

The Kafka topic to which messages will be produced.

  • Default
    • elastiflow-flow-codex

EF_OUTPUT_KAFKA_PARTITION_KEY

Defines a field from the message which will be used to set the partitioning key.

  • Default
    • flow.export.ip.addr

EF_OUTPUT_KAFKA_DROP_FIELDS

This setting allows for a comma-separated list of fields that are to be removed from all records.

note

Fields are dropped after any output specific fields have been added and after any schema conversion. This means that you should use the field names as you see them in the user interface.

  • Valid Values
    • any field names related to the enabled schema, comma-separated
  • Example
    • flow.export.sysuptime,flow.export.version.ver,flow.start.sysuptime,flow.end.sysuptime,flow.seq_num
  • Default
    • ''

EF_OUTPUT_KAFKA_ALLOWED_RECORD_TYPES

This setting allows for a comma-separated list of record types that the output will send will emit. This is particularly useful when used with multiple namespaced outputs, e.g. sending flow records to one datastore and telemetry to another.

  • Valid Values
    • as_path_hop, flow_option, flow , telemetry
  • Default
    • 'as_path_hop,flow_option,flow,telemetry'

EF_OUTPUT_KAFKA_SASL_ENABLE

Specifies whether SASL based authentication is used when connecting to the Kafka brokers. While there are multiple SASL authentication methods the current implementation is limited to plaintext (SASL/PLAIN) authentication.

  • Default
    • false

EF_OUTPUT_KAFKA_SASL_USERNAME

The authentication identity (authcid) to present for SASL/PLAIN authentication.

  • Default
    • ''

EF_OUTPUT_KAFKA_SASL_PASSWORD

The password to use for SASL/PLAIN authentication.

  • Default
    • ''

EF_OUTPUT_KAFKA_TLS_ENABLE

This setting is used to enable/disable TLS connections to Kafka.

  • Valid Values
    • true, false
  • Default
    • false

EF_OUTPUT_KAFKA_TLS_CA_CERT_FILEPATH

The path to the Certificate Authority (CA) certificate to use for connecting to the Kafka brokers.

  • Default
    • ''

EF_OUTPUT_KAFKA_TLS_CERT_FILEPATH

The path to the TLS certificate to use for connecting to the Kafka brokers.

  • Default
    • ''

EF_OUTPUT_KAFKA_TLS_KEY_FILEPATH

The path to the TLS key to use for connecting to the Kafka brokers.

  • Default
    • ''

EF_OUTPUT_KAFKA_TLS_SKIP_VERIFICATION

This setting is used to enable/disable TLS verification of the Kafka brokers to which the output is attempting to connect.

  • Valid Values
    • true, false
  • Default
    • false

EF_OUTPUT_KAFKA_PRODUCER_MAX_MESSAGE_BYTES

The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's message.max.bytes.

  • Default
    • 1000000

EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS

The level of acknowledgement reliability needed from the broker (defaults to 1 - WaitForLocal).

  • Valid Values
    • 0 - (NoResponse) Doesn't require any acknowledgement other than the TCP ACK that the message payload was received.
    • 1 - (WaitForLocal) Waits for only the receiving broker to acknowledge commitment of the message.
    • -1 - (WaitForAll) Waits for the topic's minimum in-sync replicas to acknowledge commitment of the message. The minimum number of in-sync replicas is configured for Kafka topic via the min.insync.replicas attribute.
  • Default
    • 1

EF_OUTPUT_KAFKA_PRODUCER_TIMEOUT

The maximum duration, in seconds, that the producer will wait for the RequiredAcks defined in EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS. This is only relevant when EF_OUTPUT_KAFKA_PRODUCER_REQUIRED_ACKS is set to -1 (WaitForAll).

  • Default
    • 10

EF_OUTPUT_KAFKA_PRODUCER_COMPRESSION

The type of compression to use on messages (defaults to no compression).

info

Beginning with ElastiFlow 6.3.0 the Kafka output's default value for this setting was changed to 3 (LZ4). Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of 0 (none), you should ensure that it is specifically set in your configuration.

  • Valid Values
    • 0 - none
    • 1 - Gzip
    • 2 - snappy
    • 3 - LZ4
    • 4 - ZSTD
  • Default
    • 3

EF_OUTPUT_KAFKA_PRODUCER_COMPRESSION_LEVEL

The level of compression to use on messages. The meaning depends on the actual compression type used and defaults to the compression codecs default level.

  • Default
    • -1000

EF_OUTPUT_KAFKA_PRODUCER_FLUSH_BYTES

The best-effort number of bytes needed to trigger a flush.

danger

EF_OUTPUT_KAFKA_PRODUCER_FLUSH_BYTES should not be set to a value larger than the message.max.bytes setting of the Kafka topic to which records are to be written.

  • Default
    • 1000000

EF_OUTPUT_KAFKA_PRODUCER_FLUSH_MESSAGES

The best-effort number of messages needed to trigger a flush.

  • Default
    • 1024

EF_OUTPUT_KAFKA_PRODUCER_FLUSH_FREQUENCY

The best-effort frequency of flushes, in milliseconds.

info

Beginning with ElastiFlow 6.3.0 the Kafka output's default value for this setting was changed to 1000. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of 500, you should ensure that it is specifically set in your configuration.

  • Default
    • 1000

EF_OUTPUT_KAFKA_PRODUCER_FLUSH_MAX_MESSAGES

The maximum number of messages the producer will send in a single broker request.

  • Default
    • 0 (unlimited)

EF_OUTPUT_KAFKA_PRODUCER_RETRY_MAX

The total number of times to retry sending a message.

  • Default
    • 3

EF_OUTPUT_KAFKA_PRODUCER_RETRY_BACKOFF

The period of time, in milliseconds, to wait for the Kafka cluster to settle between retries.

  • Default
    • 100

EF_OUTPUT_KAFKA_FLAT_RECORD_ENABLE

The Kafka output can send JSON records using nested or flattened field names. Historically only nested records were supported, which is why the default value for this setting is false. However a record consisting of flattened fields names reduces resource requirements and improves throughput.

info

Beginning with ElastiFlow 6.3.0 the Kafka output's default value for this setting was changed to true. Performance testing has shown that this change can improve throughput. If you wish to continue to use the old default setting of false, you should ensure that it is specifically set in your configuration.

  • Valid Values
    • true, false
  • Default
    • true

EF_OUTPUT_KAFKA_ECS_ENABLE

Specifies whether the data will be sent using Elastic Common Schema (ECS).

  • Valid Values
    • true, false
  • Default
    • false

EF_OUTPUT_KAFKA_TIMESTAMP_SOURCE

Determines the timestamp source to be used to set the @timestamp field. Usually end would be the best setting. However, in the case of poorly behaving or misconfigured devices, collect may be the better option.

info

Beginning with ElastiFlow 6.3.0 the Kafka output's default value for this setting was changed to collect. This will allow the collector to handle a wider variety of situations without additional configuration. If you wish to continue to use the old default setting of end, you should ensure that it is specifically set in your configuration.

  • Valid Values
    • start - Use the timestamp from flow.start.timestamp. The flow start time indicated in the flow.
    • end - Use the timestamp from flow.end.timestamp. The flow end time (or last reported time).
    • export - Use the timestamp from flow.export.timestamp. The time from the flow record header.
    • collect - Use the timestamp from flow.collect.timestamp. The time that the collector processed the flow record.
  • Default
    • collect