基于JMX实现消息队列监控
引言
实时监控中间件保障集群的可用性是极其重要的,本篇文章以JAVA语音基于JMX来实现对Kafka消息队列的监控
实现
添加依赖
在 pom.xml
中添加以下依赖:
<dependencies>
<!-- Kafka 客户端依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- JMX 依赖 -->
<dependency>
<groupId>javax.management</groupId>
<artifactId>javax.management-api</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
Java实现
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.Set;
public class KafkaPerformanceMonitor {
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi";
public static void main(String[] args) {
try {
// 连接到 JMX 服务器
JMXServiceURL url = new JMXServiceURL(JMX_URL);
JMXConnector connector = JMXConnectorFactory.connect(url);
MBeanServerConnection mbeanServerConnection = connector.getMBeanServerConnection();
// 获取所有 MBean 的名称
Set<ObjectName> mbeanNames = mbeanServerConnection.queryNames(null, null);
// 遍历 MBean 名称,查找 Kafka 相关的 MBean
for (ObjectName mbeanName : mbeanNames) {
if (mbeanName.getDomain().startsWith("kafka")) {
System.out.println("Found Kafka MBean: " + mbeanName);
// 获取 MBean 的属性
Set<String> attributeNames = mbeanServerConnection.getMBeanInfo(mbeanName).getAttributes().stream()
.map(attr -> attr.getName())
.collect(java.util.stream.Collectors.toSet());
for (String attributeName : attributeNames) {
try {
Object attributeValue = mbeanServerConnection.getAttribute(mbeanName, attributeName);
System.out.println(" Attribute: " + attributeName + " = " + attributeValue);
} catch (Exception e) {
System.err.println("Error getting attribute " + attributeName + " for MBean " + mbeanName + ": " + e.getMessage());
}
}
}
}
// 关闭连接
connector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- JMX 连接:通过
JMXServiceURL
和JMXConnectorFactory
连接到 Kafka 的 JMX 服务器。这里假设 Kafka 的 JMX 服务运行在本地的9999
端口,你需要根据实际情况修改JMX_URL
。 - 获取 MBean 信息:使用
MBeanServerConnection
查询所有的 MBean 名称,并筛选出 Kafka 相关的 MBean。 - 获取属性值:对于每个 Kafka 相关的 MBean,获取其所有属性的名称,并尝试获取属性的值进行打印。
- 异常处理:在获取属性值时,可能会抛出异常,需要进行异常处理。