当前位置: 首页 > article >正文

php实现kafka

kafka类:

<?php

class b2c_kafka
{
    public $broker_list;
    public $topic;
    public $group_id;
    protected $producer = null;
    protected $consumer = null;
    protected $receive_wait_time;
    protected $receive_wait_num;

    /**
     * 构造方法
     * @param object app
     */
    public function __construct()
    {
        $this->broker_list = 'kafka';
        $this->topic = 'local-cn';
        $this->group_id = ' kafka-map';
        $this->producer = null;
        $this->consumer = null;
        $this->receive_wait_time = 10;
        $this->receive_wait_num = 100;

    }
    /**
     * 获取生产者
     */
    public function Producer()
    {
        $conf = new \RdKafka\Conf();
        // $conf->set('bootstrap.servers', $this->broker_list);
        $conf->set('metadata.broker.list', $this->broker_list);

        // 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成
        // 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功
        // all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成
        $conf->set('acks', '0');

        //If you need to produce exactly once and want to keep the original produce order, uncomment the line below
        //$conf->set('enable.idempotence', 'true');

        $producer = new \RdKafka\Producer($conf);
        $this->producer = $producer;
        return $this;
    }
    /**
     * 发送消息
     *
     * @param string|array $msg
     * @param string $topic
     * @return void
     */
    public function SendMsg($msg = '', $topic = '')
    {
        if (empty($topic)) {
            $topic = $this->topic;
        }
        $producer = $this->producer;

        $topic = $producer->newTopic($topic);

        if (!is_array($msg)) {
            $msg = [$msg];
        }
        foreach ($msg as $value) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);
            $producer->poll(0);
        }
        for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {
            $result = $producer->flush(10000);            
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);
        }
    }

    /**
     * 获取消费者
     *
     * @param string $group_id
     * @return void
     */
    public function Consumer($group_id = '')
    {
        $conf = new \RdKafka\Conf();

        // Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    echo "Assign: ";
                    var_dump($partitions);
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    echo "Revoke: ";
                    var_dump($partitions);
                    $kafka->assign(NULL);
                    break;

                default:
                    throw new \Exception($err);
            }
        });
        // Configure the group.id. All consumer with the same group.id will consume
        // different partitions.
        if (empty($group_id)) {
            $group_id = $this->group_id;
        }
        // 设置相同的group,防止一次消息被多次消费。
        // 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的
        $conf->set('group.id', $group_id);

        // Initial list of Kafka brokers
        // $conf->set('bootstrap.servers', $this->broker_list);
        $conf->set('metadata.broker.list', $this->broker_list);

        // Set where to start consuming messages when there is no initial offset in
        // offset store or the desired offset is out of range.
        // 'smallest': start from the beginning
        //earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费
        $conf->set('auto.offset.reset', 'earliest');
        // 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量
        // 自动提交分区消费的位置,手动可确保消息被消费
        $conf->set('enable.auto.commit', true);
        $conf->set('auto.commit.interval.ms', 1000);

        $consumer = new \RdKafka\KafkaConsumer($conf);

        $this->consumer = $consumer;
        return $this;
    }

    /**
     * 接收消息
     *
     * @param string $topic
     * @param array $callback
     * @return void
     */
    public function ReceiveMsg($topic = '', array $callback = [])
    {
        $consumer = $this->consumer;

        if (empty($topic)) {
            $topic = $this->topic;
        }
        if (!is_array($topic)) {
            $topic = [$topic];
        }
        // Subscribe to topic 'test'
        $consumer->subscribe($topic);

        echo "Waiting for partition assignment... (make take some time when\n";
        echo "quickly re-joining the group after leaving it.)\n";

        $i = 0;
        $msg_list = [];
        while (true) {
            $i++;
            if ($i > $this->receive_wait_time) {
                $i = 0;
                if (empty($msg_list)) {
                    continue;
                }
                if (!empty($callback)) {
                    call_user_func_array($callback, [$msg_list]);
                }
                $msg_list = [];
            }
            // 阻塞一秒钟
            $message = $consumer->consume(1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    $msg_list[] = $message->payload;
                    if (count($msg_list) < $this->receive_wait_num) {
                        continue;
                    }
                    if (!empty($callback)) {
                        call_user_func_array($callback, [$msg_list]);
                    }
                    $i = 0;
                    $msg_list = [];
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    // echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    // echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }


}

调用:

$data['member_id']     = '121221';
$data['ip']            = $this->getIp();
$data['timestamp']     = $this->getMicrotime();
        
$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));

http://www.kler.cn/a/301748.html

相关文章:

  • 函数指针示例
  • MySQL中Flashback(闪回)技术
  • Oracle Instant Client 23.5安装配置完整教程
  • Qt / Qt Quick程序打包的一些坑 (四)
  • Android OpenGL ES详解——glTexImage2D方法
  • PostgreSQL TRUNCATE TABLE
  • 一篇文章,讲清SQL的 joins 语法
  • Java贪心算法每日一题——179.最大数
  • 【QT】Qt窗口
  • Pr:序列设置 - VR 视频
  • 【区块链 + 基层治理】社区防疫管理平台 | FISCO BCOS应用案例
  • 404 error when doing workload anlysis using locust on OpenAI API (GPT.35)
  • 【深度学习 Pytorch】深入浅出:使用PyTorch进行模型训练与GPU加速
  • 泛零售行业的营销自动化现状如何?
  • Vue3+vite使用i18n国际化
  • 军事目标无人机视角检测数据集 3500张 坦克 带标注voc
  • 剖析 MySQL 数据库连接池(C++版)
  • Docker简介在Centos和Ubuntu环境下安装Docker
  • 详细介绍 Redis 列表的应用场景
  • 【三刷C语言】各种注意事项
  • 常用Java API
  • c# resource en-US
  • 4.qml单例模式
  • 智能医学(四)——Elsevier特刊推荐
  • 科技之光,照亮未来之路“2024南京国际人工智能展会”
  • 系统架构设计师:数据库设计