当前位置: 首页 > article >正文

使用 Temporal 管理和调度 Couchbase SQL 脚本的实际例子

场景描述

目标:使用 Temporal 管理和调度一组 Couchbase SQL 脚本来完成以下任务:

同步数据:从其他数据源同步数据到 Couchbase。

执行数据聚合:运行统计 SQL 查询。

清理过期数据:定期清理 Couchbase 中过期或无效的数据。

提供任务失败的自动重试、定时调度、任务状态跟踪。

实现步骤

  1. Couchbase SQL 脚本准备

sync_data.sql:

INSERT INTO bucket-name (KEY, VALUE)
SELECT META().id, new_data.*
FROM source-bucket new_data
WHERE META().id NOT IN (SELECT RAW META().id FROM bucket-name);

aggregate_data.sql:

SELECT category, COUNT(*) AS count
FROM bucket-name
WHERE type = “product”
GROUP BY category;

cleanup_expired_data.sql:

DELETE FROM bucket-name
WHERE expiration_date < NOW_STR();

  1. 安装依赖

确保安装了以下库:

pip install temporalio couchbase

  1. Temporal Workflow 和 Activity 实现

Temporal 的核心是 Workflow(描述流程)和 Activity(执行具体任务)。

Activity 实现

创建一个 Activity,用来执行 SQL 脚本。

from couchbase.cluster import Cluster, ClusterOptions

from couchbase.auth import PasswordAuthenticator

Couchbase Activity

class CouchbaseActivities:
def init(self, couchbase_host, username, password):
self.cluster = Cluster(
f’couchbase://{couchbase_host}',
ClusterOptions(PasswordAuthenticator(username, password))
)
self.query_service = self.cluster.query_indexes()

def execute_sql(self, sql_file_path):
    with open(sql_file_path, 'r') as file:
        query = file.read()
    result = self.query_service.query(query)
    print(f"Executed SQL from {sql_file_path}: {result}")
    return result

Workflow 定义

定义一个 Workflow,描述任务的执行顺序。

from temporalio import workflow

@workflow.defn

class CouchbaseWorkflow:
@workflow.run
async def run(self):
activities = workflow.ActivityStub(CouchbaseActivities)

    # 1. 同步数据
    await activities.execute_sql('/path/to/sync_data.sql')

    # 2. 数据聚合
    await activities.execute_sql('/path/to/aggregate_data.sql')

    # 3. 清理过期数据
    await activities.execute_sql('/path/to/cleanup_expired_data.sql')

Worker 实现

将 Workflow 和 Activity 注册到 Temporal Worker。

from temporalio.worker import Worker

from couchbase_activities import CouchbaseActivities

from couchbase_workflow import CouchbaseWorkflow

async def main():
worker = Worker(
host=“localhost:7233”, # Temporal 服务地址
task_queue=“couchbase_task_queue”,
workflows=[CouchbaseWorkflow],
activities=[CouchbaseActivities(“localhost”, “username”, “password”)]
)
await worker.run()

if name == “main”:
import asyncio
asyncio.run(main())

  1. Workflow 启动代码

使用 Temporal 客户端启动 Workflow。

from temporalio.client import Client

async def main():
client = await Client.connect(“localhost:7233”)

# 启动 Workflow
handle = await client.start_workflow(
    CouchbaseWorkflow.run,
    id="couchbase_sql_workflow",
    task_queue="couchbase_task_queue",
)
print(f"Started workflow with ID: {handle.id}")

if name == “main”:
import asyncio
asyncio.run(main())

Temporal 的特性应用

任务调度:

Temporal 支持定时任务。可以通过 Temporal.schedule 定义定时运行的 Workflow。

自动重试:

每个 Activity 都可以配置重试策略。

from temporalio import activity
@activity.defn(retry_policy=activity.RetryPolicy(max_attempts=3))
async def execute_sql(sql_file_path):

任务依赖:

Workflow 中通过顺序执行 Activity 描述任务依赖关系。

可观察性:

使用 Temporal Web 界面查看 Workflow 和 Activity 的运行状态、历史和日志。

使用 Temporal 的优势

高可靠性:即使 Worker 崩溃,Workflow 的状态也能持久化并恢复。

灵活调度:支持定时任务和动态控制流程。

自动重试:内置的失败重试和错误处理机制。

开发简便:通过 Python SDK,快速实现分布式任务调度和管理。

通过 Temporal,Couchbase SQL 脚本的执行不仅具备高可用性和自动化,还可以轻松应对复杂的业务逻辑需求。


http://www.kler.cn/a/429228.html

相关文章:

  • 技术晋升读书笔记—华为研发
  • 128.最长连续序列
  • Java21 正则表达式
  • mac 安装 node
  • mac配置 iTerm2 使用lrzsz与服务器传输文件
  • 基于微信小程序的摄影竞赛系统设计与实现(LW+源码+讲解)
  • React第十四节useState使用详解差异
  • MongoDB 建模调优change stream实战
  • 【html网页页面010】html+css制作茶品牌文创网页制作含视频元素(7页面附效果及源码)
  • 华为TaurusDB与GaussDB:信创改造的“降本提效”之路
  • npm、yarn、pnpm 设置最新国内镜像源(附官方镜像源和最新阿里源),以及 nrm 的使用教程
  • 数据结构--树和二叉树
  • HTML区块
  • 【C/C++】关于链表插入函数可能的错误
  • Rust隐式返回(最后一个表达式后不加分号)与Rust显式返回(Rust return)(Rust隐示返回、Rust显示返回)
  • 理解多模态大语言模型,主流技术与最新模型简介
  • 【Linux探索学习】第二十弹——基础IO:深入理解C语言文件I/O与Linux操作系统中的文件操作
  • PyTorch基本使用-张量的索引操作
  • 【docker集群应用】Docker + consul的容器服务更新与发现
  • Google BERT入门(3)Transformer的自注意力机制的理解(下)
  • vue router 和route 区别
  • 【MySQL】——用一文解决@基础函数group by
  • 同步数据至ES时,数据丢失问题处理
  • 为什么 JavaScript 中的 `eval` 被禁止使用?
  • 运维工程师.云计算工程师.服务器操作集锦
  • Sui 集成 Phantom,生态迎来全新里程碑