LogicMonitor Data Publisher for Kafka Receiver
Last updated - 04 September, 2025
LogicMonitor Data Publisher is an integrated service in the Collector that enables you to extract and send real-time DataSource metrics from the Collector to a Kafka receiver (a third-party destination) for further analytics. This feature enables the Collector to push the metrics to other data sinks in parallel to transmitting the data to the LogicMonitor portal. LogicMonitor Data Publisher sends metrics to the Kafka receiver.
When the Kafka receiver is integrated with the Collector, LogicMonitor Data Publisher automatically shares the metrics. The following steps outline the service workflow:
- Enable and configure the Kafka receiver in the
agent.conf
settings. - Restart the Collector to start the LogicMonitor Data Publisher service.
- The LogicMonitor Data Publisher collects and converts metrics into a standard OTLP-formatted JSON string.
- The Data Publisher sends the formatted data to the Kafka receiver.
Considerations for Kafka Cluster Settings
Kafka cluster configuration is similar to a m5.2xlarge instance of AWS EC2. The configuration details are as follows:
Hardware Settings
Configuration | Recommended Value |
CPU Cores | 8 |
Memory (RAM) | 32 GB |
Kafka Cluster Settings
The recommended values for a Kafka cluster are as follows:
Configuration | Recommended Value | Description |
Number of brokers in a cluster | 3 | Multiple brokers is beneficial in situations such as, one broker is down and other brokers in the cluster help to avoid data loss. |
Replication factor | 3 | This is similar to the broker count. Here, in all the brokers the same topic is created and also, the data that is sent is stored. |
Retention period | 6 hours | Indicates the duration for which the data stays in Kafka broker. The data is purged after the retention period is over. Although it depends on the consumer, it is recommended that you set this configuration for effective memory usage. |
Partition limit per broker | 2000 | In Kafka topics the data is stored in partitions. It is the maximum partition limit for each broker. |
Network Settings
- Throughput Calculation—To estimate the total throughput of the Kafka cluster, you must consider the rate at which data is produced and consumed. Based on it you can determine the network bandwidth between the collector and Kafka cluster.
- Replication Factor—Based on the recommended replication factor, ensure that the network bandwidth accommodates the replication traffic between brokers.
- Producer and Consumer Configurations—To reduce the network overhead, tune Kafka producers and consumers to batch messages. For more information, see Kafka Property Configurations.
Security Settings
The LogicMonitor Data Publisher supports both the plain text (noAuth) and SSL (Auth) mode. However, to strengthen the security, it is recommended that you use the SSL (Auth) mode. Configuration properties related to security are as follows:
agent.publisher.enable.auth
kafka.ssl.truststore.name
kafka.ssl.truststore.password
kafka.ssl.keystore.name
kafka.ssl.keystore.password
kafka.ssl.key.password
For more information, see Kafka Property Configurations.
Requirements for Using LogicMonitor Data Publisher for Kafka Receiver
To use LogicMonitor Data Publisher for Kafka receiver, you need the following:
- Support for Kafka client version 3.8.1 or later.
- EA Collector 37.300 or later installed on your machine.
- LogicMonitor Data Publisher service enabled (
enable.collector.publisher=true
) in the agent.config settings. - A strong network connectivity between Collector and Kafka.
- Kafka broker hosted URLs
- Kafka topic name
- The metrics.proto version v1.0.0 to read and convert metrics in the OTLP-formatted JSON string. For more information, see metrics.proto GitHub documentation.
Kafka Property Configurations
In the agent.conf settings, configure the following properties:
Property | Description |
enable.collector.publisher | (Mandatory) To enable the LogicMonitor Data Publisher service, set the value to true . By default, the value is set to false . |
kafka.broker.server.urls | (Mandatory) A comma separated list of host:port pairs to establish initial connection with the Kafka cluster. Example—host1:port1,host2:port2, and so on. |
kafka.topic.name | (Mandatory) Kafka topic name on which data is published. |
agent.publisher.enable.auth | By default, LogicMonitor Data Publisher sends data in the noAuth mode (that is, in plain text). To enable the Auth mode, set the property to true . When enabled, the LogicMonitor Data Publisher switches to the SSL mode. |
kafka.ssl.truststore.name | The Kafka truststore name. You must add the specific certificates to the publisherCerts folder in the Agent root directory. The default value of this property is set to kafka.producer.truststore.jks |
kafka.ssl.truststore.password | The truststore password of the kafka.ssl.truststore.name property. The value of this sensitive property is encrypted in the agent.conf settings. |
kafka.ssl.keystore.name | The Kafka keystore name. You must add the specific certificates to the publisherCerts folder in the Agent root directory. The default value of this property is set to kafka.producer.keystore.jks . |
kafka.ssl.keystore.password | The Keystore password of the kafka.ssl.keystore.name property. The value of this sensitive property is encrypted in the agent.conf settings. |
kafka.ssl.key.password | The Kafka ssl.key.password . The value of this sensitive property is encrypted in the agent.conf settings. |
kafka.linger.ms | The equivalence of Kafka Producer ProducerConfig.LINGER_MS_CONFIG. By default, the value is set to 5000 millisecond. |
kafka.batch.size | The equivalence of Kafka Producer ProducerConfig.BATCH_SIZE_CONFIG. By default the value is set to 50 KB. |
kafka.max.in.flight.requests.per.connection | The equivalence of Kafka Producer ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION. By default, the value is set to 1. |
kafka.enable.idempotence | The equivalence of Kafka Producer ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG. By default, the value is set to true . |
kafka.acks | The equivalence of Kafka Producer ProducerConfig.ACKS_CONFIG. By default, the value is set to all . |
kafka.retries | The equivalence of Kafka Producer ProducerConfig.RETRIES_CONFIG. By default, the value is set to 1 . |
kafka.max.block.ms | The equivalence of Kafka Producer ProducerConfig.LINGER_MS_CONFIG. By default, the value is set to 3000 millisecond. |
kafka.compression.type | The equivalence of Kafka Producer ProducerConfig.COMPRESSION_TYPE_CONFIG. By default, the value is set to snappy . |
enable.kafka.key.value.data | Kafka Producer provides a feature to send data in key-value format. To limit disk usage on Kafka broker, the Kafka sends only the message without any key. To send data in the key-value format, set the property to true .Note: The key is available in string format HostName$DataSourceName$InstanceName |
kafka.send.data.in.String | To send data in a string serialized format, set the property to true . By default, the data is sent in the ByteArray format. |
collector.publisher.device.props | This property enables Data Publisher to send the device properties under the “resource" section of the metrics. Sensitive properties such as snmp.community and wmi.pass are not sent. By default, 5 device properties are supported with the maximum limit of 10 device properties. |
Authentication for Kafka Receiver
By default, LogicMonitor Data Publisher sends data in the noAuth mode. To enable the Auth mode, perform the following steps:
- Add thedefault
kafka.producer.truststore.jks
andkafka.producer.keystore.jks
certificates to the publisherCerts directory at the location where Collector is installed. Instead of the default certificates, you can also add the truststore and keystore certificates with some other name. - In LogicMonitor, navigate to Settings > Collectors.
- Under the Collectors tab, select the Collector that you want to configure.
- Select
More and then select Collector Configuration.
- In the agent.conf settings, set value for the following properties:
agent.publisher.enable.auth
kafka.ssl.truststore.password
kafka.ssl.keystore.password
kafka.ssl.key.password
kafka.ssl.truststore.name
kafka.ssl.keystore.name
Note: If you are adding the truststore and keystore certificates with some other name, specify the names in thekafka.ssl.truststore.name
andkafka.ssl.keystore.name
properties in the agent.conf settings. For more information, see Kafka Property Configurations.
- Restart the Collector.
LogicMonitor Data Publisher switches to the Auth mode.
LogicMonitor Data Publisher Collection and Conversion for Kafka Receiver
LogicMonitor Data Publisher collects the metrics and add metadata details which is converted into an OTLP-formatted JSON string.
Kafka Receiver Data Model
LogicMonitor Data Publisher converts metrics into an OTLP-formatted JSON string. OTLP is a standard protocol for transmitting telemetry data in observability and monitoring systems. The OTLP converter is a gRPC service in LogicMonitor Data Publisher that implements protobuf (based on metrics.proto
version 1.0.0) to convert Collector metrics into an OTLP-formatted JSON string. Metrics in OTLP consists of one or more time series, where each time series represents a set of related datapoints over time.
The following is an example of OTLP-formatted metrics in JSON format for LogicMonitor_Collector_ThreadCPUUsage datasource of SNMP instance.
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "hostName",
"value": {
"stringValue": "127.0.0.1"
}
},
{
"key": "hostId",
"value": {
"stringValue": "1017594"
}
},
{
"key": "devicePropKey",
"value": {
"stringValue": "devicePropValue"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "LogicMonitor_Collector_ThreadCPUUsage",
"attributes": [
{
"key": "collector",
"value": {
"stringValue": "jmx"
}
},
{
"key": "epoch",
"value": {
"stringValue": "1715263558360"
}
},
{
"key": "datasourceId",
"value": {
"stringValue": "128265135"
}
},
{
"key": "datasourceInstanceId",
"value": {
"stringValue": "367542931"
}
}
]
},
"metrics": [
{
"name": "CpuUsage",
"sum": {
"dataPoints": [
{
"startTimeUnixNano": "1715263558360000000",
"timeUnixNano": "1715263558360000000",
"asDouble": 0,
"attributes": [
{
"key": "dataSourceInstanceName",
"value": {
"stringValue": "LogicMonitor_Collector_ThreadCPUUsage-netscan-propsdetection"
}
},
{
"key": "datapointid",
"value": {
"stringValue": "197642"
}
},
{
"key": "wildValue",
"value": {
"stringValue": "netscan-propsdetection"
}
},
{
"key": "wildAlias",
"value": {
"stringValue": "netscan-propsdetection"
}
}
]
}
],
"aggregationTemporality": "AGGREGATION_TEMPORALITY_DELTA",
"isMonotonic": true
}
},
{
"name": "ProcessorCount",
"gauge": {
"dataPoints": [
{
"startTimeUnixNano": "1715263558360000000",
"timeUnixNano": "1715263558360000000",
"asDouble": 10,
"attributes": [
{
"key": "dataSourceInstanceName",
"value": {
"stringValue": "LogicMonitor_Collector_ThreadCPUUsage-netscan-propsdetection"
}
},
{
"key": "datapointid",
"value": {
"stringValue": "197643"
}
},
{
"key": "wildValue",
"value": {
"stringValue": "netscan-propsdetection"
}
},
{
"key": "wildAlias",
"value": {
"stringValue": "netscan-propsdetection"
}
}
]
}
]
}
},
{
"name": "RunnableThreadCnt",
"gauge": {
"dataPoints": [
{
"startTimeUnixNano": "1715263558360000000",
"timeUnixNano": "1715263558360000000",
"asDouble": 0,
"attributes": [
{
"key": "dataSourceInstanceName",
"value": {
"stringValue": "LogicMonitor_Collector_ThreadCPUUsage-netscan-propsdetection"
}
},
{
"key": "datapointid",
"value": {
"stringValue": "197644"
}
},
{
"key": "wildValue",
"value": {
"stringValue": "netscan-propsdetection"
}
},
{
"key": "wildAlias",
"value": {
"stringValue": "netscan-propsdetection"
}
}
]
}
]
}
},
{
"name": "ThreadCnt",
"gauge": {
"dataPoints": [
{
"startTimeUnixNano": "1715263558360000000",
"timeUnixNano": "1715263558360000000",
"asDouble": 0,
"attributes": [
{
"key": "dataSourceInstanceName",
"value": {
"stringValue": "LogicMonitor_Collector_ThreadCPUUsage-netscan-propsdetection"
}
},
{
"key": "datapointid",
"value": {
"stringValue": "197645"
}
},
{
"key": "wildValue",
"value": {
"stringValue": "netscan-propsdetection"
}
},
{
"key": "wildAlias",
"value": {
"stringValue": "netscan-propsdetection"
}
}
]
}
]
}
}
]
}
]
}
]
}
The resourceMetrics consists of the following:
- Resource—The metadata of the device from which metrics is collected.
- ScopeMetrics—It contains scope and metrics.
- Scope—The metadata of datasource and instances for which metrics is collected.
- Metrics—It consists of actual datapoints of datasource instances which are retrieved from the device.
Note: Raw data has two types of datapoints: normal and complex. LogicMonitor Data Publisher can only send normal datapoints in metrics data.
Kafka Receiver Metadata Details
The OTLP-formatted JSON string contains data collected for a single poll along with the following metadata:
- Host name or Device name
- DataSource name
- Instance name
- Polling interval
- Epoch details
- DataPoint name
LogicMonitor Data Publisher for Kafka Receiver Performance Monitoring
LogicMonitor Data Publisher datasource monitors and provides real-time performance metrics and tracks the following datapoints:
Datapoint | Description |
CountOfDataEnqueued | Count of data enqueued for publishing. |
CountOfDataDequeued | Count of data dequeued for publishing. |
SizeOfBigQueue | Size of queue in which data persists. |
KafkaRequestCount | Number of Kafka requests. |
CountofSuccessfulRequestsToKafka | Number of successful requests to Kafka. |
CountOfRequestsFailedDuetoAuthError | Number of requests that failed due to authentication errors, if Auth is enabled. |
CountOfRequestsFailedDuetoNetworkErrors | Number of requests that failed due to network errors. |
CountofRequestsfailedDueToKafkaError | Number of messages failed due to Kafka errors. |
TimeTakenforDequeueAndConversion | Time taken for dequeuing data from queue and converting them to OTLP-formatted JSON string. |
SizeOfDataPublishedinBytes | Size (in bytes) of the data published to the Kafka receiver. |
Note: If the connection with Kafka broker fails, LogicMonitor Data Publisher can store data for 30 minutes.
Data Volume Estimation for Kafka Receiver
On an average, a single record is of approximately 25 KB. The amount of data that is send is calculated based on the following four factors:
- Number of collectors with LogicMonitor Data Publisher service enabled
- Number of devices
- Count of DataSource instances
- Polling period
Example of Estimating Data Published for Kafka Receiver
Assume that a Collector has the following monitoring setup:
Factors | Assumed Value |
Single record size | 25 KB |
Number of devices | 10 |
Number of DataSources | 10 |
Number of DataSources instances per device (considering 5 instance per DataSource) | 5 instances x10 datasource = 50 DataSource instances |
Total instances for 10 devices | 50 DataSource instances x 10 devices = 500 instances |
Average polling interval | 5 min |
If each instance represents a single record, the size of data published on an average per polling interval is calculated as follows:
500 instances × 25 KB/instance = 12500 KB/5 mins
Thus, LogicMonitor Data Publisher publishes DataSource instances in the monitoring setup as follows:
Polling Interval | Published DataSource Instances |
5 minutes | 12500 KB |
1 minute | 2500 KB |
1 second | 41.67 KB |