当前位置: 首页 > article >正文

Win10安装kafka并用C#调用

 kafka安装

jdk、kafka版本如下,zookeeper使用kafka自带版本

安装包下载位置:https://download.csdn.net/download/henreash/90087368 (赚点csdn下载资源分)

安装jdk后,解压kafka压缩包,修改配置文件:

kafka_2.13-3.9.0\config\zookeeper.properties内修改:dataDir=D:/Kafka/zookeeper/data

kafka_2.13-3.9.0\config\server.properties内修改:log.dirs=D:/Kafka/kafka-logs

在目录内创建批处理文件,启动zookeeper和kafka:

kfk.cmd内容:call bin/windows/kafka-server-start.bat config/server.properties

zk.bat内容:call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

双击zk.bat、kfk.cmd启动zookeeper和kafka

kafka-manager安装

将kafka-manager2解压到d盘,注意目录结构不能太深,否则启动报错。

修改config\application.conf,修改节点kafka-manager.zkhosts指向zookeeper

 kafka-manager.zkhosts="192.168.0.109:2181"

创建start.bat批处理文件,内容:.\bin\kafka-manager.bat

双击启动kafka-manager。

打开浏览器,输入地址:http://localhost:9000/

点击Cluster菜单,创建一个默认Cluster Test001;

kafka环境配置完毕。

C#调用

创建C#8项目,在nuget中下载Confluent.Kafka(2.6.1),如下代码进行消息发布和订阅。

       private async void simpleButton2_Click(object sender, EventArgs e)
       {
           var config = new ProducerConfig 
           { 
               BootstrapServers = "localhost:9092"
           };
           using (var producer = new ProducerBuilder<Null, string>(config).Build())
           {
               var dr = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = "hello,world" });
               Debug.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
           }

       }

       private void simpleButton3_Click(object sender, EventArgs e)
       {
           Task.Factory.StartNew(() => {
               var config = new ConsumerConfig
               {
                   GroupId = "test-consumer-group",
                   BootstrapServers = "localhost:9092",
                   AutoOffsetReset = AutoOffsetReset.Earliest
               };
               var cts = new CancellationTokenSource();
               using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
               {
                   consumer.Subscribe("test-topic");
                   
                   try
                   {
                       while (true)
                       {
                           try
                           {
                               var cr = consumer.Consume(cts.Token);
                               Debug.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                           }
                           catch (Exception ex)
                           {
                               Console.WriteLine(ex);
                           }
                       }
                   }
                   catch (Exception ex2)
                   {
                       Console.WriteLine(ex2);
                   }
               }
           });

       }

执行后kafka-manager界面如下图:


http://www.kler.cn/a/428327.html

相关文章:

  • 除了基本的事件绑定,鸿蒙的ArkUI
  • Ansible fetch模块详解:轻松从远程主机抓取文件
  • 解释 RESTful API,以及如何使用它构建 web 应用程序
  • leetcode 121. 买卖股票的最佳时机
  • 嵌入式 工程配置
  • Mysql面试题----为什么B+树比B树更适合实现数据库索引
  • Qt源码阅读(六) ⏱️QTimer
  • 【成功解决】:VS2019(Visual Studio 2019)遇到E2870问题:此配置中不支持 128 位浮点类型
  • 【计算机网络】实验13:运输层端口
  • k8s中镜像导出的报错 not found
  • 【Django】在view中调用channel来主动进行websocket通信
  • 什么是数据架构?
  • 卸载windows
  • 第四节、电机定角度转动【51单片机-TB6600驱动器-步进电机教程】
  • 深入解析二叉树算法
  • 开源之夏 2024 KubeSphere 社区项目总结
  • 注意力机制介绍
  • Windows 中将某个安装文件安装到指定目录
  • 机器学习之Nemenyi检验
  • 模型优化与迁移学习
  • [NSSRound#7 Team]ec_RCE
  • 海外的bug-hunters,不一样的403bypass
  • DR、HIS、PACS的交互,以及与其他软件系统之间的交互
  • Python学习(一)—— 编程环境安装
  • 动手学深度学习-线性神经网络-1线性回归
  • 项目搭建:springboot,mybatis, maven