java kafka客户端何时设置的kafka消费者默认值
kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
.define(GROUP_INSTANCE_ID_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
GROUP_INSTANCE_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
10000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
Type.INT,
3000,
Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Collections.singletonList(RangeAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Type.LONG,
5 * 60 * 1000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
Type.BOOLEAN,
true,
Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
Type.INT,
5000,
atLeast(0),
Importance.LOW,
AUTO_COMMIT_INTERVAL_MS_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.LOW,
CommonClientConfigs.CLIENT_ID_DOC)
.define(CLIENT_RACK_CONFIG,
Type.STRING,
"",
Importance.LOW,
CommonClientConfigs.CLIENT_RACK_DOC)
.define(MAX_PARTITION_FETCH_BYTES_CONFIG,
Type.INT,
DEFAULT_MAX_PARTITION_FETCH_BYTES,
atLeast(0),
Importance.HIGH,
MAX_PARTITION_FETCH_BYTES_DOC)
.define(SEND_BUFFER_CONFIG,
Type.INT,
128 * 1024,
atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
Importance.MEDIUM,
CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
64 * 1024,
atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
Importance.MEDIUM,
CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(FETCH_MIN_BYTES_CONFIG,
Type.INT,
1,
atLeast(0),
Importance.HIGH,
FETCH_MIN_BYTES_DOC)
.define(FETCH_MAX_BYTES_CONFIG,
Type.INT,
DEFAULT_FETCH_MAX_BYTES,
atLeast(0),
Importance.MEDIUM,
FETCH_MAX_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG,
Type.INT,
500,
atLeast(0),
Importance.LOW,
FETCH_MAX_WAIT_MS_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG,
Type.LONG,
50L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
Type.LONG,
1000L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(AUTO_OFFSET_RESET_CONFIG,
Type.STRING,
"latest", //
in("latest", "earliest", "none"),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
.define(CHECK_CRCS_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
CHECK_CRCS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG,
Type.INT,
2,
atLeast(1),
Importance.LOW,
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
30000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
.define(DEFAULT_API_TIMEOUT_MS_CONFIG,
Type.INT,
60 * 1000,
atLeast(0),
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
Type.INT,
500,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
.define(MAX_POLL_INTERVAL_MS_CONFIG,
Type.INT,
300000,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_INTERVAL_MS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW)
.defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
Type.BOOLEAN,
false,
Importance.LOW)
.define(ISOLATION_LEVEL_CONFIG,
Type.STRING,
DEFAULT_ISOLATION_LEVEL,
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
.define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
Importance.MEDIUM,
ALLOW_AUTO_CREATE_TOPICS_DOC)
// security support
.define(SECURITY_PROVIDERS_CONFIG,
Type.STRING,
null,
Importance.LOW,
SECURITY_PROVIDERS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
}
像auto.offset.reset这个配置默认值为latest一样,再看下ConsumerConfig的几个构造方法
public ConsumerConfig(Properties props) {
super(CONFIG, props);
}
public ConsumerConfig(Map<String, Object> props) {
super(CONFIG, props);
}
是的,所有的ConsumerConfig构造方法都将上面的默认配置CONFIG传入了构造方法,将下来的处理就是如果显式配置了对应的配置项就使用显式配置数据,没有则使用CONFIG里面的默认配置。
PS:
上面的默认配置除了有一些配置的默认配置,一些枚举属性还有其可选值,比如
auto.offset.reset的可选项