PG逻辑订阅功能
目录标题
- 一、复制标识、存储过程
- 排查处理过程
- 步骤1:确认表的复制标识设置
- 步骤2:设置复制标识
- 步骤3:检查存储过程的逻辑
- 步骤4:重新尝试执行存储过程
- 步骤5:验证表结构
- 总结
- 二、相关视图
- 分析查询结果
- 查询`pg_stat_subscription`
- 查询`pg_stat_replication`
- 查询`pg_replication_slot`
- 查询`pg_subscription_rel`
- 总结
- 建议
- 三、逻辑订阅功能
- 1. 确保逻辑复制是启用的
- 2. 创建一个复制插槽
- 3. 创建一个订阅
- 4. 查看订阅状态
- 5. 查看复制插槽状态
- 6. 管理订阅的同步状态
- 7. 处理订阅数据
- 8. 取消订阅
- 注意事项:
一、复制标识、存储过程
排查处理过程
步骤1:确认表的复制标识设置
-
查询当前表的复制标识设置:
SHOW replica_identity ON t_my_table;
-
如果返回结果不是
FULL
或DEFAULT
:- 说明表没有正确的复制标识,需要设置复制标识。
步骤2:设置复制标识
-
设置复制标识:
ALTER TABLE t_my_table REPLICA IDENTITY FULL;
-
验证设置是否成功:
SHOW replica_identity ON t_my_table;
步骤3:检查存储过程的逻辑
-
查看存储过程
p_altercolumn
的定义:CREATE OR REPLACE FUNCTION p_altercolumn(p_table character varying, p_column character varying, p_datatype character varying, p_isnotnull character varying, p_defaultvalue character varying, p_others character varying) RETURNS void AS $$ BEGIN -- 逻辑处理,例如: EXECUTE format('ALTER TABLE %I ALTER COLUMN %I TYPE %s', p_table, p_column, p_datatype); IF p_isnotnull = 'NOT NULL' THEN EXECUTE format('ALTER TABLE %I ALTER COLUMN %I SET NOT NULL', p_table, p_column); END IF; IF p_defaultvalue <> 'NULL' THEN EXECUTE format('ALTER TABLE %I ALTER COLUMN %I SET DEFAULT %s', p_table, p_column, p_defaultvalue); END IF; -- 其他逻辑 END; $$ LANGUAGE plpgsql;
-
确认存储过程能够处理各种数据类型和约束。
步骤4:重新尝试执行存储过程
-
调用存储过程:
CALL p_altercolumn('t_my_table', 'fid', 'BIGINT', 'NOT NULL', '1111', '0');
-
检查执行结果:
- 确认存储过程执行没有错误。
步骤5:验证表结构
-
查询表结构:
\d t_my_table
-
确认列
fid
的数据类型和约束:- 数据类型应该是
BIGINT
。 - 应该设置为
NOT NULL
。
- 数据类型应该是
总结
通过以上步骤,我们可以确保:
- 表的复制标识设置正确。
- 存储过程逻辑正确。
- 存储过程能够成功执行。
如果在执行过程中遇到任何错误,请记录错误信息并进一步排查。例如:
-
检查日志:
- 分析数据库日志,查看是否有更多详细信息。
-
检查权限:
- 确保执行存储过程的用户具有足够的权限。
-
检查依赖:
- 确保所有依赖的服务和组件都正常运行。
通过这些步骤,应该能够解决更新表时遇到的问题。如果还有其他问题,请提供更多信息。
二、相关视图
分析查询结果
查询pg_stat_subscription
- 目的:获取数据库订阅的状态。
- 结果:
(0 rows)
,表明数据库中没有活跃的订阅。
查询pg_stat_replication
- 目的:获取流复制的状态信息。
- 结果:返回了多个流复制进程的信息。
- 示例信息:
pid
:16394,表示复制进程的进程ID。usename
:未提供具体值,表示执行复制的数据库角色用户名。application_name
:PostgreSQL JDBC
,表明复制客户端使用的是PostgreSQL JDBC驱动连接。client_addr
:245.0.0.9
,表示复制客户端的IP地址。
查询pg_replication_slot
- 目的:获取复制插槽的状态。
- 结果:返回了复制插槽的详细信息。
- 示例信息:
slot_name
:dbz
,表示复制插槽的名称。plugin
:decoderbufs
,表示使用的解码插件,用于逻辑解码。
查询pg_subscription_rel
- 目的:获取订阅关联的关系状态。
- 结果:
(0 rows)
,表明没有订阅关系信息。
总结
- 订阅信息:数据库中没有配置订阅,
pg_stat_subscription
和pg_subscription_rel
查询均返回空结果。 - 流复制状态:
pg_stat_replication
查询显示存在活跃的流复制进程,表明数据库复制正在运行。 - 复制插槽:
pg_replication_slot
查询显示了复制插槽的详细信息,表明复制插槽已正确配置。
建议
- 检查订阅配置:如果预期数据库中应该有订阅配置,需要检查为什么没有订阅信息。可能需要配置订阅或检查订阅是否被删除。
- 监控复制状态:继续监控
pg_stat_replication
以确保流复制的健康状态。 - 检查复制插槽使用情况:如果需要使用逻辑解码,检查
pg_replication_slot
以确保插槽被正确使用。
三、逻辑订阅功能
PostgreSQL的逻辑订阅功能允许你订阅一个或多个表的变化,这样当这些表发生变化时,你可以得到通知。订阅可以用于多种用途,比如数据同步、审计日志、缓存更新等。
以下是设置和使用PostgreSQL订阅的一般步骤:
1. 确保逻辑复制是启用的
在PostgreSQL的配置文件(通常是postgresql.conf
)中,确保以下参数被设置:
wal_level = logical
max_replication_slots = 1 # 或更多,取决于你的需要
max_wal_senders = 1 # 或更多
2. 创建一个复制插槽
复制插槽是用于逻辑复制的WAL(Write-Ahead Logging)流的持久化存储区。可以使用以下命令创建一个:
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
这里my_slot
是新插槽的名字,pgoutput
是输出插件的名字,用于逻辑解码。
3. 创建一个订阅
你可以使用以下命令创建一个订阅:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=myhost user=myuser password=mypass dbname=mydb'
PUBLICATION my_publication
WITH (copy_data = true);
这里:
my_subscription
是新订阅的名字。CONNECTION
指定了发布服务器的连接参数。my_publication
是发布服务器上的一个发布名称,它定义了哪些更改应该被复制。WITH (copy_data = true)
表示初始同步会复制表中现有的数据。
4. 查看订阅状态
你可以使用以下命令查看订阅的状态:
SELECT * FROM pg_stat_subscription;
5. 查看复制插槽状态
SELECT * FROM pg_replication_slots;
6. 管理订阅的同步状态
你可以使用以下命令查看订阅的同步状态:
SELECT * FROM pg_subscription_rel;
7. 处理订阅数据
订阅创建后,你可以连接到订阅数据库并从pg_subscription_rel
视图中读取更改。你还可以创建一个逻辑解码的程序来读取my_slot
中的更改。
8. 取消订阅
如果不再需要订阅,可以删除它:
DROP SUBSCRIPTION my_subscription;
删除订阅后,系统会自动释放相关的复制插槽。
注意事项:
- 确保发布服务器和订阅服务器的PostgreSQL版本兼容。
- 订阅服务器需要能够连接到发布服务器。
- 根据需要调整复制插槽和WAL发送者的数量。
- 订阅可能需要一些额外的配置,比如设置
pg_hba.conf
文件中的访问权限。