kafka的kafka查看消费者组组该怎么删除

点击阅读原文
明天提醒我
我要该,理由是:
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)您还没有登录,快捷通道只有在登录后才能使用。 还没有帐号? 赶紧
1共10页10) ? 10 :
location='/detail/327190.html?page='+page+'';}">10) ? 10 :
location='/detail/327190.html?page='+page+'';">Go
部署kafka常见问答
在线时间0小时
&&发表于: 05-01
问题导读1、如何对Kafka Broker上持久化的数据进行加密2、Kafka是否支持跨数据中心的可用性3、Kafka支持哪些类型的数据转换4、Kafka支持哪些类型的数据转换5、如何通过Kafka发送大消息或者超大负荷量?6、Kafka是否支持MQTT或JMS协议?是否应当为Kafka Broker使用 固态硬盘 (SSD)实际上使用SSD盘并不能显著地改善 Kafka 的性能,主要有两个原因:[Plain Text] 纯文本查看 复制代码* Kafka写磁盘是异步的,不是同步的。就是说,除了启动、停止之外,Kafka的任何操作都不会去等待磁盘同步(sync)完成;而磁盘同步(disk syncs)总是在后台完成的。这就是为什么Kafka消息至少复制到三个副本是至关重要的,因为一旦单个副本崩溃,这个副本就会丢失数据无法同步写到磁盘。* 每一个Kafka Partition被存储为一个串行的WAL(Write Ahead Log)日志文件。因此,除了极少数的数据查询,Kafka中的磁盘读写都是串行的。现代的操作系统已经对串行读写做了大量的优化工作。如何对Kafka Broker上持久化的数据进行加密目前,Kafka不提供任何机制对Broker上持久化的数据进行加密。用户可以自己对写入到Kafka的数据进行加密,即是,生产者(Producers)在写Kafka之前加密数据,消费者(Consumers)能解密收到的消息。这就要求生产者(Producers)把加密协议(protocols)和密钥(keys)分享给消费者(Consumers)。另外一种选择,就是使用软件提供的文件系统级别的加密,例如Cloudera Navigator Encrypt。Cloudera Navigator Encrypt是Cloudera企业版(Cloudera Enterprise)的一部分,在应用程序和文件系统之间提供了一个透明的加密层。Apache Zookeeper正成为Kafka集群的一个痛点(pain point),真的吗?Kafka高级消费者(high-level consumer)的早期版本(0.8.1或更早)使用Zookeeper来维护读的偏移量(offsets,主要是Topic的每个Partition的读偏移量)。如果有大量生产者(consumers)同时从Kafka中读数据,对Kafka的读写负载可能就会超出它的容量,Zookeeper就变成一个瓶颈(bottleneck)。当然,这仅仅出现在一些很极端的案例中(extreme cases),即有成百上千个消费者(consumers)在使用同一个Zookeeper集群来管理偏移量(offset)。不过,这个问题已经在Kafka当前的版本(0.8.2)中解决。从版本0.8.2开始,高级消费者(high-level consumer)能够使用Kafka自己来管理偏移量(offsets)。本质上讲,它使用一个单独的Kafka Topic来管理最近的读偏移量(read offsets),因此偏移量管理(offset management)不再要求Zookeeper必须存在。然后,用户将不得不面临选择是用Kafka还是Zookeeper来管理偏移量(offsets),由消费者(consumer)配置参数 offsets.storage 决定。Cloudera强烈推荐使用Kafka来存储偏移量。当然,为了保证向后兼容性,你可以继续选择使用Zookeeper存储偏移量。(例如,你可能有一个监控平台需要从Zookeeper中读取偏移量信息。) 假如你不得不使用Zookeeper进行偏移量(offset)管理,我们推荐你为Kafka集群使用一个专用的Zookeeper集群。假如一个专用的Zookeeper集群仍然有性能瓶颈,你依然可以通过在Zookeeper节点上使用固态硬盘(SSD)来解决问题。Kafka是否支持跨数据中心的可用性Kafka跨数据中心可用性的推荐解决方案是使用MirrorMaker(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId= ) 。在你的每一个数据中心都搭建一个Kafka集群,在Kafka集群之间使用MirrorMaker来完成近实时的数据复制。使用MirrorMaker的架构模式是为每一个”逻辑”的topic在每一个数据中心创建一个topic:例如,在逻辑上你有一个”clicks”的topic,那么你实际上有”DC1.clicks”和“DC2.clicks”两个topic(DC1和DC2指得是你的数据中心)。DC1向DC1.clicks中写数据,DC2向DC2.clicks中写数据。MirrorMaker将复制所有的DC1 topics到DC2,并且复制所有的DC2 topics到DC1。现在每个DC上的应用程序都能够访问写入到两个DC的事件。这个应用程序能够合并信息和处理相应的冲突。另一种更复杂的模式是在每一个DC都搭建本地和聚合Kafka集群。这个模式已经被Linkedin使用,Linkedin Kafka运维团队已经在这篇Blog(/kafka/running-kafka-scale )中有详细的描述(参见“Tiers and Aggregation”)。Kafka支持哪些类型的数据转换(data transformation)数据流过的Kafka的时候,Kafka并不能进行数据转换。为了处理数据转换,我们推荐如下方法:[Plain Text] 纯文本查看 复制代码* 对于简单事件处理,使用Flume Kafka integration([url=/blog/20]/blog/20[/url] ... or-event-processing ),并且写一个简单的Apache Flume Interceptor。* 对于复杂(事件)处理,使用Apache Spark Streaming从Kafka中读数据和处理数据。 在这两种情况下,被转换或者处理的数据可被写会到新的Kafka Topic中,或者直接传送到数据的最终消费者(Consumer)那里。对于实时事件处理模式更全面的描述,看看这篇文章(/blog/2015/06/architectural-patterns-for-near-real-time-data-processing-with-apache-hadoop/ )。如何通过Kafka发送大消息或者超大负荷量?Cloudera的性能测试表明Kafka达到最大吞吐量的消息大小为10K左右。更大的消息将导致吞吐量下降。然后,在一些情况下,用户需要发送比10K大的多的消息。如果消息负荷大小是每100s处理MB级别,我们推荐探索以下选择:[Plain Text] 纯文本查看 复制代码* 如果可以使用共享存储(HDFS、S3、NAS),那么将超负载放在共享存储上,仅用Kafka发送负载数据位置的消息。* 对于大消息,在写入Kafka之前将消息拆分成更小的部分,使用消息Key确保所有的拆分部分都写入到同一个partition中,以便于它们能被同一个消息着(Consumer)消费的到,在消费的时候将拆分部分重新组装成一个大消息。在通过Kafka发送大消息时,请记住以下几点:压缩配置[Plain Text] 纯文本查看 复制代码* Kafka生产者(Producers)能够压缩消息。通过配置参数compression.codec确保压缩已经开启。有效的选项为"gzip"和"snappy"。Broker配置[Plain Text] 纯文本查看 复制代码* message.max.bytes (default: 1000000): Broker能够接受的最大消息。增加这个值以便于匹配你的最大消息。* log.segment.bytes (default: 1GB): Kafka数据文件的大小。确保它至少大于一条消息。默认情况下已经够用,一般最大的消息不会超过1G大小。* replica.fetch.max.bytes (default: 1MB): Broker间复制的最大的数据大小。这个值必须大于message.max.bytes,否则一个Broker接受到消息但是会复制失败,从而导致潜在的数据丢失。Consumer配置[Plain Text] 纯文本查看 复制代码* fetch.message.max.bytes (default: 1MB): Consumer所读消息的最大大小。这个值应该大于或者等于Broker配置的message.max.bytes的值。其他方面的考虑:[Plain Text] 纯文本查看 复制代码* Broker需要针对复制为每一个partition分配一个replica.fetch.max.bytes大小的缓存区。需要计算确认( partition的数量 * 最大消息的大小 )不会超过可用的内存,否则就会引发OOMs(内存溢出异常)。* Consumers有同样的问题,因子参数为 fetch.message.max.bytes :确认每一个partition的消费者针对最大的消息有足够可用的内存。* 大消息可能引发更长时间的垃圾回收停顿(garbage collection pauses)(brokers需要申请更大块的内存)。注意观察GC日志和服务器日志。假如发现长时间的GC停顿导致Kafka丢失了Zookeeper session,你可能需要为zookeeper.session.timeout.ms配置更长的timeout值。Kafka是否支持MQTT或JMS协议目前,Kafka针对上述协议不提供直接支持。但是,用户可以自己编写Adaptors从MQTT或者JMS中读取数据,然后写入到Kafka中。来源:navigating
弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率
稳定可靠、可弹性伸缩的在线数据库服务,全球最受欢迎的开源数据库之一
1共10页10) ? 10 :
location='/detail/327190.html?page='+page+'';}">10) ? 10 :
location='/detail/327190.html?page='+page+'';">Go
访问内容超出本站范围,不能确定是否安全
限100 字节
如果您在写长篇帖子又不马上发表,建议存为草稿
您目前还是游客,请
验证问题: 阿里云官网域名是什么? 正确答案:
&回复后跳转到最后一页
开发者论坛为你提供“部署kafka常见问答”的内容,论坛中还有更多关于
的内容供你使用,该内容是网友上传,与开发者论坛无关,如果需要删除请联系zixun-group@,工作人员会在5个工作日内回复您。kafka的消费者组该怎么删除? - 知乎10被浏览3974分享邀请回答1添加评论分享收藏感谢收起阅读(3155)
kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说kafka并不能完全保证数据不丢失。
尽管kafka官网声称能够保证at-least-once,但如果consumer进程数小于partition_num,这个结论不一定成立。
考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。
如果mit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。
如果mit.enable=false,假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里需要着重说明的是,当手动执行commit的时候,实际上是对这个consumer进程所占有的所有partition进行commit,kafka暂时还没有提供更细粒度的commit方式,也就是说,即使t2没有处理完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在处理的这条数据就丢失了。
如果希望能够严格的不丢数据,解决办法有两个:
手动commit offset,并针对partition_num启同样数目的consumer进程,这样就能保证一个consumer进程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个方法比较局限,因为partition和consumer进程的数目必须严格对应。
另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。当然这只是基本思路,实际上操作起来不是这么简单,具体做法以后我再另开一篇。
阅读排行榜6153人阅读
kafka(3)
运维kafka时,会有时需要删除无效或已下线的consumer group,若设置的offset保存位置是zookeepr,则数据都在zookeeper中,可自行删除,方式就是删除zookeeper相应节点。
kafka的offset若想按照生产时间进行恢复,事实严重不准,具体可参见:&
恢复步骤:
1. 获取该consumer group消费topic的各partition该时间的offset
2. 在zookeeper中修改这些offset
进行操作期间需要暂停该group对topic的消费,恢复offset后再重启,否则修改不生效。
ResetOffsetOperator.resetOffset(topic, group, null, whichTime);
ZookeeperOperator zookeeperOperator = new ZookeeperOperator();
zookeeperOperator.deleteUselessConsumer(&test1&);
public class ZookeeperOperator implements Watcher {
private ZooKeeper zooKeeper =
private static final Logger log = LoggerFactory
.getLogger(ZookeeperOperator.class);
public ZookeeperOperator() throws IOException {
this(KafkaConf.ZOOKEEPERHOST);
public ZookeeperOperator(String zookeeperHost) throws IOException {
zooKeeper = new ZooKeeper(zookeeperHost, 5000, ZookeeperOperator.this);
public void close() {
zooKeeper.close();
} catch (InterruptedException e) {
log.error(&Failed to close zookeeper:&, e);
* 读取指定节点下孩子节点数目
* @param path 节点path
private int readChildren(String path) {
return zooKeeper.getChildren(path, false, null).size();
} catch (KeeperException e) {
log.error(&Read error,KeeperException:& + path, e);
} catch (InterruptedException e) {
log.error(&Read error,InterruptedException:& + path, e);
private List&String& getChildrenList(String path) {
return zooKeeper.getChildren(path, false, null);
} catch (KeeperException e) {
log.error(&Read error,KeeperException:& + path, e);
} catch (InterruptedException e) {
log.error(&Read error,InterruptedException:& + path, e);
private boolean setData(String path, String data) {
zooKeeper.setData(path, data.getBytes(), -1);
} catch (KeeperException e) {
log.error(&Set error,KeeperException:& + path + & data:& + data, e);
} catch (InterruptedException e) {
log.error(&Set error,InterruptedException:& + path + & data:& + data, e);
private boolean deleteData(String path) {
zooKeeper.delete(path, -1);
} catch (InterruptedException e) {
log.error(&delete error,InterruptedException:& + path, e);
} catch (KeeperException e) {
log.error(&delete error,KeeperException:& + path, e);
private boolean recursivelyDeleteData(String path) {
List&String& childList = getChildrenList(path);
if (childList == null) {
} else if (childList.isEmpty()) {
deleteData(path);
for (String childName : childList) {
String childPath = path + &/& + childN
List&String& grandChildList = getChildrenList(childPath);
if (grandChildList == null) {
} else if (grandChildList.isEmpty()) {
deleteData(childPath);
recursivelyDeleteData(childPath);
deleteData(path);
private String getData(String path) {
return new String(zooKeeper.getData(path, false, null));
} catch (KeeperException e) {
log.error(&Read error,KeeperException:& + path, e);
return &&;
} catch (InterruptedException e) {
log.error(&Read error,InterruptedException:& + path, e);
return &&;
* 读取指定节点下孩子节点数目
* @param topic kafka topic 名称
public int readTopicChildren(String topic) {
StringBuilder sb = new StringBuilder().append(&/brokers/topics/&)
.append(topic).append(&/partitions&);
return readChildren(sb.toString());
public boolean setTopicGroupOffset(String topic, String group,
String partition, String data) {
StringBuilder sb = new StringBuilder().append(&/consumers/&).append(group)
.append(&/offsets/&).append(topic).append(&/&).append(partition);
return setData(sb.toString(), data);
public String getTopicGroupOffset(String topic, String group,
String partition) {
StringBuilder sb = new StringBuilder().append(&/consumers/&).append(group)
.append(&/offsets/&).append(topic).append(&/&).append(partition);
System.out.println(sb.toString());
return getData(sb.toString());
public boolean deleteUselessConsumer(String topic, String group) {
if (topic.endsWith(&-1&)) {
StringBuilder sb = new StringBuilder().append(&/consumers/&)
.append(group);
return recursivelyDeleteData(sb.toString());
StringBuilder sb = new StringBuilder().append(&/consumers/&).append(group)
.append(&/offsets/&).append(topic);
return recursivelyDeleteData(sb.toString());
public boolean deleteUselessLikeConsumer(String topic, String group) {
String path = &/consumers&;
List&String& childList = getChildrenList(path);
int success = 0;
int count = 0;
for (String child : childList) {
if (child.startsWith(group)) {
if (deleteUselessConsumer(topic, child)) {
success++;
if (success == count) {
public boolean deleteUselessLikeConsumer(String group) {
return deleteUselessLikeConsumer(&-1&, group);
public boolean deleteUselessConsumer(String group) {
return deleteUselessConsumer(&-1&, group);
public void process(WatchedEvent event) {
(&Receive Event:& + event.getState());
}public class ResetOffsetOperator {
public static boolean resetOffset(final String topic, String group,
Properties properties, long whichTime) throws IOException {
if (StringUtils.isBlank(topic) || StringUtils.isBlank(group)) {
System.err.println(&topic or group can not be null or empty!&);
System.exit(2);
if (properties == null) {
properties = new Properties();
properties.setProperty(&zookeeper.connect&, KafkaConf.ZOOKEEPERHOST);
properties.setProperty(&group.id&, group);
// zookeeper最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
properties.setProperty(&zookeeper.session.timeout.ms&, &10000&);
// ZooKeeper集群中leader和follower之间的同步时间
properties.setProperty(&zookeeper.sync.time.ms&, &2000&);
// 自动提交offset到zookeeper的时间间隔
properties.setProperty(&mit.interval.ms&, &3000&);
// 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anything else:抛出异常
properties.setProperty(&auto.offset.reset&, &largest&);
ZookeeperOperator zookeeper = new ZookeeperOperator(
properties.getProperty(&zookeeper.connect&));
GroupOperator groupOperator = new GroupOperator(topic, group, whichTime,
zookeeper);
return groupOperator.retryResetGroupOffset();
}public class GroupOperator {
private static final Logger LOG = LoggerFactory
.getLogger(GroupOperator.class);
private List&String& m_replicaB
private ZookeeperO
private List&String&
private long whichT
private int partitionN
private int retryNum = 5;
* @param topic
* @param whichTime timestamp(13位)/-1(latest)/-2(earliest)
* @throws java.io.IOException
public GroupOperator(String topic, String group, long whichTime,
ZookeeperOperator zookeeper)
throws IOException {
this.topic =
this.group =
this.whichTime = whichT
m_replicaBrokers = new ArrayList&String&();
seeds = KafkaConf.getBrokerHost();
port = Integer.parseInt(KafkaConf.BROKERPORT);
this.zookeeper =
partitionNum = zookeeper.readTopicChildren(topic);
* 将zookeeper中该group对应该topic下的所有分区的offset恢复为所希望的时间
public boolean resetGroupOffset() {
List&Long& offsetList = new ArrayList&Long&();
for (int partition = 0; partition & partitionN partition++) {
long offset = getOffset(partition);
if (offset == -1) {
LOG.error(&Failed to get offset of & + group + & with partition:&
+ partition);
offsetList.add(offset);
for (int partition = 0; partition & partitionN partition++) {
boolean isSuccess = zookeeper
.setTopicGroupOffset(topic, group, String.valueOf(partition),
String.valueOf(offsetList.get(partition)));
if (!isSuccess) {
LOG.error(&Failed to reset offset of topic:& + topic + & group:& + group
+ & partition& + partition + & value:& + offsetList.get(partition));
public boolean retryResetGroupOffset() {
for (int retry = 0; retry & retryN retry++) {
if (resetGroupOffset()) {
* 获取该partition要恢复时间的offset
* @param partition
public long getOffset(int partition) {
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(partition);
if (metadata == null) {
LOG.error(&Can't find metadata for Topic and Partition. Exiting&);
return -1;
if (metadata.leader() == null) {
LOG.error(&Can't find Leader for Topic and Partition. Exiting&);
return -1;
String leadBroker = metadata.leader().host();
String clientName = &Client_& + topic + &_& +
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,
64 * 1024, clientName);
long readOffset = getAssignedOffset(consumer, partition, clientName);
if (consumer != null) {
consumer.close();
return readO
* Finding the Lead Broker for a Topic and Partition
* @param a_partition 分区id,从0开始
private PartitionMetadata findLeader(int a_partition) {
PartitionMetadata returnMetaData =
for (String seed : seeds) {
SimpleConsumer consumer =
consumer = new SimpleConsumer(seed, port,
&leaderLookup&);
List&String& topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List&TopicMetadata& metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData =
} catch (Exception e) {
LOG.error(&Error communicating with Broker [& + seed
+ &] to find Leader for [& + topic + &, & + a_partition
+ &] Reason: & + e);
} finally {
if (consumer != null)
consumer.close();
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
return returnMetaD
public long getAssignedOffset(SimpleConsumer consumer, int partition,
String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map&TopicAndPartition, PartitionOffsetRequestInfo& requestInfo = new HashMap&TopicAndPartition, PartitionOffsetRequestInfo&();
requestInfo
.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println(
&Error fetching data Offset Data the Broker. Reason: & + response
.errorCode(topic, partition));
return -1;
long[] offsets = response.offsets(topic, partition);
return offsets[0];
& & & & & & & & &&
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:26540次
排名:千里之外
原创:18篇
(12)(1)(1)(4)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 kafka 消费者组 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信