当前位置: 首页 > article >正文

kamailio-ACC_JSON模块详解【后端语言go】

要确认 ACC_JSON 模块是否已经成功将计费信息推送到消息队列(MQueue),以及如何从队列中取值,可以按照以下步骤进行操作:


1. 确认 ACC_JSON 已推送到队列

1.1 配置 ACC_JSON

确保 ACC_JSON 模块已正确配置并启用。以下是一个示例配置:

loadmodule "acc_json.so"
modparam("acc_json", "log_flag", 1)  # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)")  # 记录额外信息

route {
    if (method == "INVITE") {
        setflag(1);  # 设置计费标志
        t_relay();   # 转发请求
    }
}
1.2 检查 Kamailio 日志
  • 启动 Kamailio 并观察日志输出。
  • 如果 ACC_JSON 模块成功将数据推送到队列,日志中会显示类似以下内容:
    INFO: acc_json: JSON accounting data pushed to MQueue
    
1.3 检查消息队列
  • ACC_JSON 模块使用 Kamailio 的消息队列(MQueue)来存储 JSON 数据。
  • 默认情况下,消息队列的数据会存储在 Kamailio 的共享内存中。
  • 你可以使用 Kamailio 的 MI(Management Interface)RPC(Remote Procedure Call) 命令来检查队列状态。

2. 从队列中取值

2.1 使用 MI 命令

Kamailio 提供了 MI 命令来管理消息队列。以下是一些常用的 MI 命令:

2.1.1 检查队列状态
kamcmd mq.stats
  • 输出示例:
    {
        "queues": {
            "acc_json_queue": {
                "size": 10,          # 队列中当前的消息数量
                "max_size": 1000,    # 队列的最大容量
                "dropped": 0         # 丢弃的消息数量
            }
        }
    }
    
2.1.2 从队列中读取消息
kamcmd mq.read acc_json_queue
  • 输出示例:
    {
        "messages": [
            {
                "method": "INVITE",
                "from_tag": "abc123",
                "to_tag": "xyz456",
                "callid": "12345",
                "sip_code": "200",
                "sip_reason": "OK",
                "time": "2025-02-01 12:34:56",
                "ua": "SomeUserAgent/1.0",
                "uuid": "12345"
            },
            ...
        ]
    }
    
2.2 使用 RPC 命令

Kamailio 也支持通过 RPC 命令管理消息队列。以下是一些常用的 RPC 命令:

2.2.1 检查队列状态
kamctl rpc mq.stats
  • 输出示例:
    {
        "queues": {
            "acc_json_queue": {
                "size": 10,
                "max_size": 1000,
                "dropped": 0
            }
        }
    }
    
2.2.2 从队列中读取消息
kamctl rpc mq.read acc_json_queue
  • 输出示例:
    {
        "messages": [
            {
                "method": "INVITE",
                "from_tag": "abc123",
                "to_tag": "xyz456",
                "callid": "12345",
                "sip_code": "200",
                "sip_reason": "OK",
                "time": "2025-02-01 12:34:56",
                "ua": "SomeUserAgent/1.0",
                "uuid": "12345"
            },
            ...
        ]
    }
    

3. 自定义队列处理

如果默认的消息队列功能无法满足需求,可以通过以下方式自定义队列处理:

3.1 使用事件路由

Kamailio 支持通过事件路由(Event Route)处理消息队列中的数据。例如:

event_route[mq:acc_json_queue] {
    xlog("L_INFO", "Received JSON accounting data: $mqk($mqv)\n");
    # 在这里处理队列中的数据
}
3.2 编写外部脚本

可以编写外部脚本(如 Python、Shell 脚本)通过 Kamailio 的 MI 或 RPC 接口定期从队列中读取数据,并进行进一步处理。


4. 调试和验证

  • 调试日志:启用 Kamailio 的调试日志,观察 ACC_JSON 模块的行为:
    kamailio -dd -E -e
    
  • 队列监控:使用 MI 或 RPC 命令定期检查队列状态,确保数据被正确推送和读取。

总结

  • 通过 Kamailio 的 MIRPC 命令可以确认 ACC_JSON 是否成功将数据推送到队列,并从队列中读取数据。
  • 如果需要更复杂的处理,可以使用事件路由或编写外部脚本。
  • 确保 Kamailio 配置正确,并定期监控队列状态以避免数据丢失。

要实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言从队列中取值并写入数据库,可以按照以下步骤操作:


1. Kamailio 配置

1.1 加载 ACC_JSON 模块

在 Kamailio 配置文件中加载 ACC_JSON 模块,并配置消息队列:

loadmodule "acc_json.so"

# 配置 ACC_JSON
modparam("acc_json", "log_flag", 1)  # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)")  # 记录额外信息

# 配置消息队列
modparam("mq", "mq_size", 1000)  # 设置队列大小
modparam("mq", "mq_name", "acc_json_queue")  # 设置队列名称

route {
    if (method == "INVITE") {
        setflag(1);  # 设置计费标志
        t_relay();   # 转发请求
    }
}
1.2 验证数据推送

启动 Kamailio 并验证数据是否成功推送到队列:

kamcmd mq.stats
  • 如果队列中有数据,说明 ACC_JSON 模块已成功推送。

2. Go 语言实现

Go 语言程序需要从 Kamailio 的消息队列中读取数据,并将其写入数据库。以下是详细实现思路和代码示例。

2.1 实现思路
  1. 连接 Kamailio:通过 Kamailio 的 RPC 接口连接到消息队列。
  2. 读取队列数据:定期从队列中读取 JSON 格式的计费信息。
  3. 解析 JSON 数据:将读取的 JSON 数据解析为 Go 结构体。
  4. 写入数据库:将解析后的数据写入数据库(如 MySQL、PostgreSQL 等)。
2.2 代码示例
2.2.1 安装依赖

首先,安装 Go 语言的相关依赖:

go get github.com/zero-os/gorpc        # Kamailio RPC 客户端
go get github.com/go-sql-driver/mysql  # MySQL 驱动
2.2.2 Go 代码实现

以下是一个完整的 Go 程序示例:

package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/zero-os/gorpc"
	_ "github.com/go-sql-driver/mysql"
)

// 定义计费信息结构体
type AccountingRecord struct {
	Method     string `json:"method"`
	FromTag    string `json:"from_tag"`
	ToTag      string `json:"to_tag"`
	CallID     string `json:"callid"`
	SipCode    string `json:"sip_code"`
	SipReason  string `json:"sip_reason"`
	Time       string `json:"time"`
	UserAgent  string `json:"ua"`
	UUID       string `json:"uuid"`
}

// 数据库配置
const (
	dbDriver = "mysql"
	dbUser   = "root"
	dbPass   = "password"
	dbName   = "kamailio_acc"
)

func main() {
	// 连接 Kamailio RPC
	client := gorpc.NewClient("tcp", "127.0.0.1:2049") // Kamailio RPC 地址
	defer client.Close()

	// 连接数据库
	db, err := sql.Open(dbDriver, fmt.Sprintf("%s:%s@/%s", dbUser, dbPass, dbName))
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}
	defer db.Close()

	// 定期从队列中读取数据
	for {
		// 从队列中读取消息
		var result map[string]interface{}
		err := client.Call("mq.read", "acc_json_queue", &result)
		if err != nil {
			log.Printf("Failed to read from queue: %v", err)
			time.Sleep(5 * time.Second) // 等待 5 秒后重试
			continue
		}

		// 解析 JSON 数据
		messages, ok := result["messages"].([]interface{})
		if !ok {
			log.Println("No messages in queue")
			time.Sleep(5 * time.Second)
			continue
		}

		// 处理每条消息
		for _, msg := range messages {
			msgJSON, err := json.Marshal(msg)
			if err != nil {
				log.Printf("Failed to marshal message: %v", err)
				continue
			}

			var record AccountingRecord
			if err := json.Unmarshal(msgJSON, &record); err != nil {
				log.Printf("Failed to unmarshal message: %v", err)
				continue
			}

			// 将数据写入数据库
			if err := saveToDatabase(db, record); err != nil {
				log.Printf("Failed to save record to database: %v", err)
			}
		}

		time.Sleep(1 * time.Second) // 每隔 1 秒检查一次队列
	}
}

// 将数据写入数据库
func saveToDatabase(db *sql.DB, record AccountingRecord) error {
	query := `
		INSERT INTO acc_records 
		(method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) 
		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
	`
	_, err := db.Exec(query,
		record.Method,
		record.FromTag,
		record.ToTag,
		record.CallID,
		record.SipCode,
		record.SipReason,
		record.Time,
		record.UserAgent,
		record.UUID,
	)
	return err
}

3. 数据库表设计

在 MySQL 中创建用于存储计费信息的表:

CREATE TABLE acc_records (
    id INT AUTO_INCREMENT PRIMARY KEY,
    method VARCHAR(50) NOT NULL,
    from_tag VARCHAR(100) NOT NULL,
    to_tag VARCHAR(100) NOT NULL,
    callid VARCHAR(100) NOT NULL,
    sip_code VARCHAR(10) NOT NULL,
    sip_reason VARCHAR(100) NOT NULL,
    time DATETIME NOT NULL,
    ua VARCHAR(255),
    uuid VARCHAR(100)
);

4. 运行程序

  1. 启动 Kamailio,确保 ACC_JSON 模块正常运行。
  2. 运行 Go 程序:
    go run main.go
    
  3. 程序会定期从 Kamailio 的消息队列中读取数据,并将其写入数据库。

5. 验证结果

  • 检查数据库中的 acc_records 表,确认数据是否成功写入。
  • 观察 Go 程序的日志输出,确保没有错误。

总结

通过以上步骤,可以实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言程序从队列中读取数据并写入数据库。这种方法适用于需要将 Kamailio 的计费信息与外部系统集成的场景。

Gin框架代码实际
如果使用 Gin 框架,可以将 Go 程序组织成一个更结构化的 Web 服务,同时保留从 Kamailio 消息队列中读取数据并写入数据库的功能。以下是重新整理的代码示例:


1. 项目结构

kamailio-acc/
├── main.go
├── handlers/
│   └── acc_handler.go
├── models/
│   └── acc_model.go
├── services/
│   └── queue_service.go
├── config/
│   └── config.go
└── go.mod

2. 代码实现

2.1 main.go

主程序入口,初始化 Gin 框架并启动服务。

package main

import (
	"kamailio-acc/config"
	"kamailio-acc/handlers"
	"kamailio-acc/services"
	"log"
	"time"

	"github.com/gin-gonic/gin"
)

func main() {
	// 加载配置
	cfg, err := config.LoadConfig()
	if err != nil {
		log.Fatalf("Failed to load config: %v", err)
	}

	// 初始化数据库
	db, err := config.InitDB(cfg)
	if err != nil {
		log.Fatalf("Failed to initialize database: %v", err)
	}

	// 初始化 Kamailio RPC 客户端
	client := services.NewKamailioClient(cfg.KamailioRPCAddr)
	defer client.Close()

	// 启动队列监听服务
	go services.StartQueueListener(client, db)

	// 初始化 Gin 框架
	r := gin.Default()

	// 注册路由
	handlers.RegisterRoutes(r, db)

	// 启动 Web 服务
	if err := r.Run(cfg.ServerAddr); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

2.2 config/config.go

配置文件加载和数据库初始化。

package config

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/go-sql-driver/mysql"
	"github.com/spf13/viper"
)

type Config struct {
	ServerAddr      string `mapstructure:"SERVER_ADDR"`
	KamailioRPCAddr string `mapstructure:"KAMAILIO_RPC_ADDR"`
	DBDriver        string `mapstructure:"DB_DRIVER"`
	DBUser          string `mapstructure:"DB_USER"`
	DBPassword      string `mapstructure:"DB_PASSWORD"`
	DBName          string `mapstructure:"DB_NAME"`
}

func LoadConfig() (*Config, error) {
	viper.SetConfigFile(".env")
	if err := viper.ReadInConfig(); err != nil {
		return nil, fmt.Errorf("failed to read config file: %v", err)
	}

	var cfg Config
	if err := viper.Unmarshal(&cfg); err != nil {
		return nil, fmt.Errorf("failed to unmarshal config: %v", err)
	}

	return &cfg, nil
}

func InitDB(cfg *Config) (*sql.DB, error) {
	dsn := fmt.Sprintf("%s:%s@/%s", cfg.DBUser, cfg.DBPassword, cfg.DBName)
	db, err := sql.Open(cfg.DBDriver, dsn)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to database: %v", err)
	}

	if err := db.Ping(); err != nil {
		return nil, fmt.Errorf("failed to ping database: %v", err)
	}

	log.Println("Connected to database")
	return db, nil
}

2.3 models/acc_model.go

定义数据模型和数据库操作方法。

package models

import (
	"database/sql"
	"log"
)

type AccountingRecord struct {
	Method    string `json:"method"`
	FromTag   string `json:"from_tag"`
	ToTag     string `json:"to_tag"`
	CallID    string `json:"callid"`
	SipCode   string `json:"sip_code"`
	SipReason string `json:"sip_reason"`
	Time      string `json:"time"`
	UserAgent string `json:"ua"`
	UUID      string `json:"uuid"`
}

func SaveRecord(db *sql.DB, record AccountingRecord) error {
	query := `
		INSERT INTO acc_records 
		(method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) 
		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
	`
	_, err := db.Exec(query,
		record.Method,
		record.FromTag,
		record.ToTag,
		record.CallID,
		record.SipCode,
		record.SipReason,
		record.Time,
		record.UserAgent,
		record.UUID,
	)
	if err != nil {
		log.Printf("Failed to save record: %v", err)
		return err
	}

	log.Printf("Saved record: %+v", record)
	return nil
}

2.4 services/queue_service.go

从 Kamailio 消息队列中读取数据的服务。

package services

import (
	"encoding/json"
	"kamailio-acc/models"
	"log"
	"time"

	"github.com/zero-os/gorpc"
)

type KamailioClient struct {
	client *gorpc.Client
}

func NewKamailioClient(addr string) *KamailioClient {
	return &KamailioClient{
		client: gorpc.NewClient("tcp", addr),
	}
}

func (kc *KamailioClient) Close() {
	kc.client.Close()
}

func (kc *KamailioClient) ReadQueue(queueName string) ([]models.AccountingRecord, error) {
	var result map[string]interface{}
	if err := kc.client.Call("mq.read", queueName, &result); err != nil {
		return nil, err
	}

	messages, ok := result["messages"].([]interface{})
	if !ok {
		return nil, nil
	}

	var records []models.AccountingRecord
	for _, msg := range messages {
		msgJSON, err := json.Marshal(msg)
		if err != nil {
			log.Printf("Failed to marshal message: %v", err)
			continue
		}

		var record models.AccountingRecord
		if err := json.Unmarshal(msgJSON, &record); err != nil {
			log.Printf("Failed to unmarshal message: %v", err)
			continue
		}

		records = append(records, record)
	}

	return records, nil
}

func StartQueueListener(client *KamailioClient, db *sql.DB) {
	for {
		records, err := client.ReadQueue("acc_json_queue")
		if err != nil {
			log.Printf("Failed to read from queue: %v", err)
			time.Sleep(5 * time.Second)
			continue
		}

		for _, record := range records {
			if err := models.SaveRecord(db, record); err != nil {
				log.Printf("Failed to save record: %v", err)
			}
		}

		time.Sleep(1 * time.Second)
	}
}

2.5 handlers/acc_handler.go

定义 HTTP 路由和处理程序。

package handlers

import (
	"database/sql"
	"net/http"

	"github.com/gin-gonic/gin"
	"kamailio-acc/models"
)

func RegisterRoutes(r *gin.Engine, db *sql.DB) {
	r.GET("/records", func(c *gin.Context) {
		// 示例:从数据库中获取所有记录
		var records []models.AccountingRecord
		rows, err := db.Query("SELECT * FROM acc_records")
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
			return
		}
		defer rows.Close()

		for rows.Next() {
			var record models.AccountingRecord
			if err := rows.Scan(
				&record.Method,
				&record.FromTag,
				&record.ToTag,
				&record.CallID,
				&record.SipCode,
				&record.SipReason,
				&record.Time,
				&record.UserAgent,
				&record.UUID,
			); err != nil {
				c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
				return
			}
			records = append(records, record)
		}

		c.JSON(http.StatusOK, records)
	})
}

3. 配置文件 .env

SERVER_ADDR=:8080
KAMAILIO_RPC_ADDR=127.0.0.1:2049
DB_DRIVER=mysql
DB_USER=root
DB_PASSWORD=password
DB_NAME=kamailio_acc

4. 运行程序

  1. 启动 Kamailio,确保 ACC_JSON 模块正常运行。
  2. 运行 Go 程序:
    go run main.go
    
  3. 访问 http://localhost:8080/records,查看数据库中的计费记录。

总结

通过 Gin 框架,将 Kamailio 的计费信息处理逻辑组织成一个结构化的 Web 服务。Go 程序从 Kamailio 的消息队列中读取数据并写入数据库,同时提供 HTTP 接口供外部系统查询数据。


http://www.kler.cn/a/528403.html

相关文章:

  • x86-64数据传输指令
  • 自制虚拟机(C/C++)(二、分析引导扇区,虚拟机读二进制文件img软盘)
  • Hot100之矩阵
  • Java数据结构和算法(一)
  • Java中对消息序列化和反序列化并且加入到Spring消息容器中
  • Leetcode:541
  • 数据结构(1)——算法时间复杂度与空间复杂度
  • 4. 劲舞团python解法——2024年省赛蓝桥杯真题
  • Kafka分区策略实现
  • 有没有个性化的UML图例
  • DevEco Studio的previewer一直转圈缓冲无法打开
  • 高性能消息队列Disruptor
  • 使用 Spring JDBC 进行数据库操作:深入解析 JdbcTemplate
  • dfs枚举问题
  • 【深度学习】softmax回归的从零开始实现
  • 想学习JAVA编程,请问应该如何去学习?
  • 深度学习之“线性代数”
  • DeepSeek超越ChatGPT的能力及部分核心原理
  • 算法【多重背包】
  • 【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(一)
  • 记7(激活函数+多层神经网络+梯度下降法及其优化
  • Unity3D仿星露谷物语开发26之创建场景控制管理器
  • 蓝桥杯刷题DAY1:前缀和
  • 项目练习:重写若依后端报错cannot be cast to com.xxx.model.LoginUser
  • C++ Primer 自定义数据结构
  • Linux-CentOS的yum源