kamailio-ACC_JSON模块详解【后端语言go】
要确认 ACC_JSON 模块是否已经成功将计费信息推送到消息队列(MQueue),以及如何从队列中取值,可以按照以下步骤进行操作:
1. 确认 ACC_JSON 已推送到队列
1.1 配置 ACC_JSON
确保 ACC_JSON 模块已正确配置并启用。以下是一个示例配置:
loadmodule "acc_json.so"
modparam("acc_json", "log_flag", 1) # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)") # 记录额外信息
route {
if (method == "INVITE") {
setflag(1); # 设置计费标志
t_relay(); # 转发请求
}
}
1.2 检查 Kamailio 日志
- 启动 Kamailio 并观察日志输出。
- 如果 ACC_JSON 模块成功将数据推送到队列,日志中会显示类似以下内容:
INFO: acc_json: JSON accounting data pushed to MQueue
1.3 检查消息队列
- ACC_JSON 模块使用 Kamailio 的消息队列(MQueue)来存储 JSON 数据。
- 默认情况下,消息队列的数据会存储在 Kamailio 的共享内存中。
- 你可以使用 Kamailio 的 MI(Management Interface) 或 RPC(Remote Procedure Call) 命令来检查队列状态。
2. 从队列中取值
2.1 使用 MI 命令
Kamailio 提供了 MI 命令来管理消息队列。以下是一些常用的 MI 命令:
2.1.1 检查队列状态
kamcmd mq.stats
- 输出示例:
{ "queues": { "acc_json_queue": { "size": 10, # 队列中当前的消息数量 "max_size": 1000, # 队列的最大容量 "dropped": 0 # 丢弃的消息数量 } } }
2.1.2 从队列中读取消息
kamcmd mq.read acc_json_queue
- 输出示例:
{ "messages": [ { "method": "INVITE", "from_tag": "abc123", "to_tag": "xyz456", "callid": "12345", "sip_code": "200", "sip_reason": "OK", "time": "2025-02-01 12:34:56", "ua": "SomeUserAgent/1.0", "uuid": "12345" }, ... ] }
2.2 使用 RPC 命令
Kamailio 也支持通过 RPC 命令管理消息队列。以下是一些常用的 RPC 命令:
2.2.1 检查队列状态
kamctl rpc mq.stats
- 输出示例:
{ "queues": { "acc_json_queue": { "size": 10, "max_size": 1000, "dropped": 0 } } }
2.2.2 从队列中读取消息
kamctl rpc mq.read acc_json_queue
- 输出示例:
{ "messages": [ { "method": "INVITE", "from_tag": "abc123", "to_tag": "xyz456", "callid": "12345", "sip_code": "200", "sip_reason": "OK", "time": "2025-02-01 12:34:56", "ua": "SomeUserAgent/1.0", "uuid": "12345" }, ... ] }
3. 自定义队列处理
如果默认的消息队列功能无法满足需求,可以通过以下方式自定义队列处理:
3.1 使用事件路由
Kamailio 支持通过事件路由(Event Route)处理消息队列中的数据。例如:
event_route[mq:acc_json_queue] {
xlog("L_INFO", "Received JSON accounting data: $mqk($mqv)\n");
# 在这里处理队列中的数据
}
3.2 编写外部脚本
可以编写外部脚本(如 Python、Shell 脚本)通过 Kamailio 的 MI 或 RPC 接口定期从队列中读取数据,并进行进一步处理。
4. 调试和验证
- 调试日志:启用 Kamailio 的调试日志,观察 ACC_JSON 模块的行为:
kamailio -dd -E -e
- 队列监控:使用 MI 或 RPC 命令定期检查队列状态,确保数据被正确推送和读取。
总结
- 通过 Kamailio 的 MI 或 RPC 命令可以确认 ACC_JSON 是否成功将数据推送到队列,并从队列中读取数据。
- 如果需要更复杂的处理,可以使用事件路由或编写外部脚本。
- 确保 Kamailio 配置正确,并定期监控队列状态以避免数据丢失。
要实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言从队列中取值并写入数据库,可以按照以下步骤操作:
1. Kamailio 配置
1.1 加载 ACC_JSON 模块
在 Kamailio 配置文件中加载 ACC_JSON 模块,并配置消息队列:
loadmodule "acc_json.so"
# 配置 ACC_JSON
modparam("acc_json", "log_flag", 1) # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)") # 记录额外信息
# 配置消息队列
modparam("mq", "mq_size", 1000) # 设置队列大小
modparam("mq", "mq_name", "acc_json_queue") # 设置队列名称
route {
if (method == "INVITE") {
setflag(1); # 设置计费标志
t_relay(); # 转发请求
}
}
1.2 验证数据推送
启动 Kamailio 并验证数据是否成功推送到队列:
kamcmd mq.stats
- 如果队列中有数据,说明 ACC_JSON 模块已成功推送。
2. Go 语言实现
Go 语言程序需要从 Kamailio 的消息队列中读取数据,并将其写入数据库。以下是详细实现思路和代码示例。
2.1 实现思路
- 连接 Kamailio:通过 Kamailio 的 RPC 接口连接到消息队列。
- 读取队列数据:定期从队列中读取 JSON 格式的计费信息。
- 解析 JSON 数据:将读取的 JSON 数据解析为 Go 结构体。
- 写入数据库:将解析后的数据写入数据库(如 MySQL、PostgreSQL 等)。
2.2 代码示例
2.2.1 安装依赖
首先,安装 Go 语言的相关依赖:
go get github.com/zero-os/gorpc # Kamailio RPC 客户端
go get github.com/go-sql-driver/mysql # MySQL 驱动
2.2.2 Go 代码实现
以下是一个完整的 Go 程序示例:
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
"github.com/zero-os/gorpc"
_ "github.com/go-sql-driver/mysql"
)
// 定义计费信息结构体
type AccountingRecord struct {
Method string `json:"method"`
FromTag string `json:"from_tag"`
ToTag string `json:"to_tag"`
CallID string `json:"callid"`
SipCode string `json:"sip_code"`
SipReason string `json:"sip_reason"`
Time string `json:"time"`
UserAgent string `json:"ua"`
UUID string `json:"uuid"`
}
// 数据库配置
const (
dbDriver = "mysql"
dbUser = "root"
dbPass = "password"
dbName = "kamailio_acc"
)
func main() {
// 连接 Kamailio RPC
client := gorpc.NewClient("tcp", "127.0.0.1:2049") // Kamailio RPC 地址
defer client.Close()
// 连接数据库
db, err := sql.Open(dbDriver, fmt.Sprintf("%s:%s@/%s", dbUser, dbPass, dbName))
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// 定期从队列中读取数据
for {
// 从队列中读取消息
var result map[string]interface{}
err := client.Call("mq.read", "acc_json_queue", &result)
if err != nil {
log.Printf("Failed to read from queue: %v", err)
time.Sleep(5 * time.Second) // 等待 5 秒后重试
continue
}
// 解析 JSON 数据
messages, ok := result["messages"].([]interface{})
if !ok {
log.Println("No messages in queue")
time.Sleep(5 * time.Second)
continue
}
// 处理每条消息
for _, msg := range messages {
msgJSON, err := json.Marshal(msg)
if err != nil {
log.Printf("Failed to marshal message: %v", err)
continue
}
var record AccountingRecord
if err := json.Unmarshal(msgJSON, &record); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
// 将数据写入数据库
if err := saveToDatabase(db, record); err != nil {
log.Printf("Failed to save record to database: %v", err)
}
}
time.Sleep(1 * time.Second) // 每隔 1 秒检查一次队列
}
}
// 将数据写入数据库
func saveToDatabase(db *sql.DB, record AccountingRecord) error {
query := `
INSERT INTO acc_records
(method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(query,
record.Method,
record.FromTag,
record.ToTag,
record.CallID,
record.SipCode,
record.SipReason,
record.Time,
record.UserAgent,
record.UUID,
)
return err
}
3. 数据库表设计
在 MySQL 中创建用于存储计费信息的表:
CREATE TABLE acc_records (
id INT AUTO_INCREMENT PRIMARY KEY,
method VARCHAR(50) NOT NULL,
from_tag VARCHAR(100) NOT NULL,
to_tag VARCHAR(100) NOT NULL,
callid VARCHAR(100) NOT NULL,
sip_code VARCHAR(10) NOT NULL,
sip_reason VARCHAR(100) NOT NULL,
time DATETIME NOT NULL,
ua VARCHAR(255),
uuid VARCHAR(100)
);
4. 运行程序
- 启动 Kamailio,确保 ACC_JSON 模块正常运行。
- 运行 Go 程序:
go run main.go
- 程序会定期从 Kamailio 的消息队列中读取数据,并将其写入数据库。
5. 验证结果
- 检查数据库中的
acc_records
表,确认数据是否成功写入。 - 观察 Go 程序的日志输出,确保没有错误。
总结
通过以上步骤,可以实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言程序从队列中读取数据并写入数据库。这种方法适用于需要将 Kamailio 的计费信息与外部系统集成的场景。
Gin框架代码实际
如果使用 Gin 框架,可以将 Go 程序组织成一个更结构化的 Web 服务,同时保留从 Kamailio 消息队列中读取数据并写入数据库的功能。以下是重新整理的代码示例:
1. 项目结构
kamailio-acc/
├── main.go
├── handlers/
│ └── acc_handler.go
├── models/
│ └── acc_model.go
├── services/
│ └── queue_service.go
├── config/
│ └── config.go
└── go.mod
2. 代码实现
2.1 main.go
主程序入口,初始化 Gin 框架并启动服务。
package main
import (
"kamailio-acc/config"
"kamailio-acc/handlers"
"kamailio-acc/services"
"log"
"time"
"github.com/gin-gonic/gin"
)
func main() {
// 加载配置
cfg, err := config.LoadConfig()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 初始化数据库
db, err := config.InitDB(cfg)
if err != nil {
log.Fatalf("Failed to initialize database: %v", err)
}
// 初始化 Kamailio RPC 客户端
client := services.NewKamailioClient(cfg.KamailioRPCAddr)
defer client.Close()
// 启动队列监听服务
go services.StartQueueListener(client, db)
// 初始化 Gin 框架
r := gin.Default()
// 注册路由
handlers.RegisterRoutes(r, db)
// 启动 Web 服务
if err := r.Run(cfg.ServerAddr); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
2.2 config/config.go
配置文件加载和数据库初始化。
package config
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
"github.com/spf13/viper"
)
type Config struct {
ServerAddr string `mapstructure:"SERVER_ADDR"`
KamailioRPCAddr string `mapstructure:"KAMAILIO_RPC_ADDR"`
DBDriver string `mapstructure:"DB_DRIVER"`
DBUser string `mapstructure:"DB_USER"`
DBPassword string `mapstructure:"DB_PASSWORD"`
DBName string `mapstructure:"DB_NAME"`
}
func LoadConfig() (*Config, error) {
viper.SetConfigFile(".env")
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("failed to read config file: %v", err)
}
var cfg Config
if err := viper.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %v", err)
}
return &cfg, nil
}
func InitDB(cfg *Config) (*sql.DB, error) {
dsn := fmt.Sprintf("%s:%s@/%s", cfg.DBUser, cfg.DBPassword, cfg.DBName)
db, err := sql.Open(cfg.DBDriver, dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %v", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %v", err)
}
log.Println("Connected to database")
return db, nil
}
2.3 models/acc_model.go
定义数据模型和数据库操作方法。
package models
import (
"database/sql"
"log"
)
type AccountingRecord struct {
Method string `json:"method"`
FromTag string `json:"from_tag"`
ToTag string `json:"to_tag"`
CallID string `json:"callid"`
SipCode string `json:"sip_code"`
SipReason string `json:"sip_reason"`
Time string `json:"time"`
UserAgent string `json:"ua"`
UUID string `json:"uuid"`
}
func SaveRecord(db *sql.DB, record AccountingRecord) error {
query := `
INSERT INTO acc_records
(method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(query,
record.Method,
record.FromTag,
record.ToTag,
record.CallID,
record.SipCode,
record.SipReason,
record.Time,
record.UserAgent,
record.UUID,
)
if err != nil {
log.Printf("Failed to save record: %v", err)
return err
}
log.Printf("Saved record: %+v", record)
return nil
}
2.4 services/queue_service.go
从 Kamailio 消息队列中读取数据的服务。
package services
import (
"encoding/json"
"kamailio-acc/models"
"log"
"time"
"github.com/zero-os/gorpc"
)
type KamailioClient struct {
client *gorpc.Client
}
func NewKamailioClient(addr string) *KamailioClient {
return &KamailioClient{
client: gorpc.NewClient("tcp", addr),
}
}
func (kc *KamailioClient) Close() {
kc.client.Close()
}
func (kc *KamailioClient) ReadQueue(queueName string) ([]models.AccountingRecord, error) {
var result map[string]interface{}
if err := kc.client.Call("mq.read", queueName, &result); err != nil {
return nil, err
}
messages, ok := result["messages"].([]interface{})
if !ok {
return nil, nil
}
var records []models.AccountingRecord
for _, msg := range messages {
msgJSON, err := json.Marshal(msg)
if err != nil {
log.Printf("Failed to marshal message: %v", err)
continue
}
var record models.AccountingRecord
if err := json.Unmarshal(msgJSON, &record); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
records = append(records, record)
}
return records, nil
}
func StartQueueListener(client *KamailioClient, db *sql.DB) {
for {
records, err := client.ReadQueue("acc_json_queue")
if err != nil {
log.Printf("Failed to read from queue: %v", err)
time.Sleep(5 * time.Second)
continue
}
for _, record := range records {
if err := models.SaveRecord(db, record); err != nil {
log.Printf("Failed to save record: %v", err)
}
}
time.Sleep(1 * time.Second)
}
}
2.5 handlers/acc_handler.go
定义 HTTP 路由和处理程序。
package handlers
import (
"database/sql"
"net/http"
"github.com/gin-gonic/gin"
"kamailio-acc/models"
)
func RegisterRoutes(r *gin.Engine, db *sql.DB) {
r.GET("/records", func(c *gin.Context) {
// 示例:从数据库中获取所有记录
var records []models.AccountingRecord
rows, err := db.Query("SELECT * FROM acc_records")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer rows.Close()
for rows.Next() {
var record models.AccountingRecord
if err := rows.Scan(
&record.Method,
&record.FromTag,
&record.ToTag,
&record.CallID,
&record.SipCode,
&record.SipReason,
&record.Time,
&record.UserAgent,
&record.UUID,
); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
records = append(records, record)
}
c.JSON(http.StatusOK, records)
})
}
3. 配置文件 .env
SERVER_ADDR=:8080
KAMAILIO_RPC_ADDR=127.0.0.1:2049
DB_DRIVER=mysql
DB_USER=root
DB_PASSWORD=password
DB_NAME=kamailio_acc
4. 运行程序
- 启动 Kamailio,确保 ACC_JSON 模块正常运行。
- 运行 Go 程序:
go run main.go
- 访问
http://localhost:8080/records
,查看数据库中的计费记录。
总结
通过 Gin 框架,将 Kamailio 的计费信息处理逻辑组织成一个结构化的 Web 服务。Go 程序从 Kamailio 的消息队列中读取数据并写入数据库,同时提供 HTTP 接口供外部系统查询数据。