当前位置: 首页 > article >正文

Hasura 中间件API go操作示例


这是一个用 Go 语言编写的 Hasura GraphQL 引擎管理工具,主要功能包括:

  • 自动发现和追踪数据库中的表和视图
  • 自动发现和追踪表之间的外键关系

3. 自动配置基础的访问权限

4. 提供数据库结构的统计信息


1. 连接管理

  • TestHasuraConnection() - 测试 Hasura 连接是否正常
  • GetAllSources() - 获取所有配置的数据源

2. Schema 管理

  • GetDatabaseSchemas() - 获取数据库所有 schema 信息
  • GetDatabaseSources() - 获取数据源配置信息

3. 表和视图追踪

  • TrackUnTrackedObjects() - 自动追踪未追踪的表和视图
  • TrackTableOrView() - 追踪单个表或视图
  • GetUnTrackedTablesAndViews() - 获取未追踪的表和视图列表

4. 关系管理

  • TrackUnTrackedRelationships() - 自动追踪未追踪的外键关系
  • TrackRelationship() - 追踪单个外键关系


  • 自动化管理
  • 自动发现数据库对象
  • 自动配置追踪关系
  • 自动设置基础权限
  • 完善的错误处理
  • 详细的错误信息
  • 失败处理机制
  • 统计信息输出
  • 模块化设计
  • 功能模块清晰分离
  • 代码复用性高
  • 易于维护和扩展
  • 友好的输出
  • 详细的进度信息
  • 清晰的统计数据
  • 格式化的结果展示


1. 数据库初始化配置

  • 数据库结构变更后的同步
  • 新增数据源的快速接入
  • 数据库结构审计


  • 首次配置 Hasura 时使用
  • 数据库结构发生变更后运行
  • 定期运行以确保配置同步
  • 用于检查数据库结构完整性

这个工具极大地简化了 Hasura GraphQL 引擎的配置过程,特别适合处理大型数据库的初始化配置和后续维护工作。

package main

import (

const (
	// 从环境变量获取 Hasura 配置
	hasuraEndpoint   = ""  // 更新为实际的 Hasura 端点
	metadataEndpoint = "" // 更新为实际的 metadata 端点
	adminSecret      = "HEllo"                        // 更新为实际的 admin secret

// GraphQLRequest represents a GraphQL query request
type GraphQLRequest struct {
	Query     string                 `json:"query"`
	Variables map[string]interface{} `json:"variables,omitempty"`

// TestHasuraConnection tests the connection to Hasura
func TestHasuraConnection() error {
	// Simple health check query
	query := `
		query HealthCheck {
			__schema {

	// Create request body
	reqBody := GraphQLRequest{
		Query: query,
	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("请求体编码失败: %v", err)

	// Create HTTP request
	req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建请求失败: %v", err)

	// Add headers
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	// Send request
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("请求失败: %v", err)
	defer resp.Body.Close()

	// Read response
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("读取响应失败: %v", err)

	// Check response status
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("意外的状态码: %d, 响应内容: %s", resp.StatusCode, string(body))

	fmt.Printf("连接成功! 响应: %s\n", string(body))
	return nil

// TestQueryUsers tests querying users from Hasura
func TestQueryUsers() error {
	query := `
		query MyQuery {
  role(where: {id: {_eq: 34}}) {

	reqBody := GraphQLRequest{
		Query: query,
	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("marshal request failed: %v", err)

	req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("request failed: %v", err)
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))

	fmt.Printf("查询用户成功! 响应: %s\n", string(body))
	return nil

// HasuraMetadata 结构体用于解析 Hasura 元数据
type HasuraMetadata struct {
	Sources []struct {
		Name          string `json:"name"`
		Configuration struct {
			ConnectionInfo struct {
				DatabaseURL string `json:"database_url"`
			} `json:"connection_info"`
		} `json:"configuration"`
	} `json:"sources"`

func GetDatabaseSources() error {
	reqBody := map[string]interface{}{
		"type":    "export_metadata",
		"args":    map[string]interface{}{},
		"version": 2,

	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("请求编码失败: %v", err)

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建请求失败: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("请求失败: %v", err)
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))

	var result map[string]interface{}
	if err := json.Unmarshal(body, &result); err != nil {
		return fmt.Errorf("parse metadata failed: %v", err)

	// 获取 sources 数组
	sources, ok := result["metadata"].(map[string]interface{})["sources"].([]interface{})
	if !ok {
		return fmt.Errorf("在元数据中未找到数据源")

	for _, source := range sources {
		if sourceMap, ok := source.(map[string]interface{}); ok {
			if name, ok := sourceMap["name"].(string); ok {
				fmt.Printf("Name: %s\n", name)

	return nil

// TrackTableOrView 追踪单个表或视图
func TrackTableOrView(sourceName, schema, objectName, objectType string) error {
	// 构建 track 请求
	trackQuery := map[string]interface{}{
		"type": "pg_track_table",  // 修改为正确的 API 端点
		"args": map[string]interface{}{
			"source": sourceName,
			"table": map[string]interface{}{
				"schema": schema,
				"name": objectName,
			"configuration": map[string]interface{}{
				"custom_root_fields": map[string]interface{}{
					"select": fmt.Sprintf("select_%s", objectName),
					"select_by_pk": fmt.Sprintf("select_%s_by_pk", objectName),
					"select_aggregate": fmt.Sprintf("select_%s_aggregate", objectName),
					"insert": fmt.Sprintf("insert_%s", objectName),
					"insert_one": fmt.Sprintf("insert_%s_one", objectName),
					"update": fmt.Sprintf("update_%s", objectName),
					"update_by_pk": fmt.Sprintf("update_%s_by_pk", objectName),
					"delete": fmt.Sprintf("delete_%s", objectName),
					"delete_by_pk": fmt.Sprintf("delete_%s_by_pk", objectName),

	jsonBody, err := json.Marshal(trackQuery)
	if err != nil {
		return fmt.Errorf("追踪请求编码失败: %v", err)

	// 打���请求内容以便调试
	fmt.Printf("追踪 %s.%s 的请求:\n%s\n", schema, objectName, string(jsonBody))

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建追踪请求失败: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("追踪请求失败: %v", err)
	defer resp.Body.Close()

	// 读取并打印响应内容
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("追踪失败,状态码 %d: %s", resp.StatusCode, string(body))

	fmt.Printf("追踪响应: %s\n", string(body))

	// 如果是视图,可能需要额外的配置
	if objectType == "VIEW" {
		// 为视图添加额外的配置
		configQuery := map[string]interface{}{
			"type": "pg_set_table_customization",
			"args": map[string]interface{}{
				"source": sourceName,
				"table": map[string]interface{}{
					"schema": schema,
					"name": objectName,
				"configuration": map[string]interface{}{
					"identifier": objectName,
					"custom_name": objectName,

		jsonBody, err = json.Marshal(configQuery)
		if err != nil {
			return fmt.Errorf("marshal view config request failed: %v", err)

		req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
		if err != nil {
			return fmt.Errorf("create view config request failed: %v", err)

		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

		resp, err = client.Do(req)
		if err != nil {
			return fmt.Errorf("view config request failed: %v", err)
		defer resp.Body.Close()

		if resp.StatusCode != http.StatusOK {
			body, _ := ioutil.ReadAll(resp.Body)
			return fmt.Errorf("view config failed with status %d: %s", resp.StatusCode, string(body))

	// 添加权限(可选)
	permissionQuery := map[string]interface{}{
		"type": "pg_create_select_permission",
		"args": map[string]interface{}{
			"source": sourceName,
			"table": map[string]interface{}{
				"schema": schema,
				"name": objectName,
			"role": "user",
			"permission": map[string]interface{}{
				"columns": "*",
				"filter": map[string]interface{}{},
				"limit": 100,

	jsonBody, err = json.Marshal(permissionQuery)
	if err != nil {
		return fmt.Errorf("marshal permission request failed: %v", err)

	req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create permission request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("permission request failed: %v", err)
	defer resp.Body.Close()

	// 不要因为权限设置失败而中断整个过程
	if resp.StatusCode != http.StatusOK {
		body, _ := ioutil.ReadAll(resp.Body)
		fmt.Printf("Warning: Failed to set permissions for %s.%s: %s\n", schema, objectName, string(body))

	return nil

// TrackUnTrackedObjects 追踪未追踪的表和视图
func TrackUnTrackedObjects(sourceName string) error {
	// 1. 获取所有表和视图
	objectsQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				FROM information_schema.tables 
				WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				AND table_type IN ('BASE TABLE', 'VIEW')
				ORDER BY table_schema, table_type, table_name;

	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	jsonBody, err := json.Marshal(objectsQuery)
	if err != nil {
		return fmt.Errorf("marshal objects request failed: %v", err)

	req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create objects request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("objects request failed: %v", err)
	defer resp.Body.Close()

	var result struct {
		Result [][]string `json:"result"`
	if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return fmt.Errorf("decode objects response failed: %v", err)

	// 2. 获取已追踪的对象
	trackedQuery := map[string]interface{}{
		"type": "export_metadata",
		"args": map[string]interface{}{},
		"version": 2,

	jsonBody, err = json.Marshal(trackedQuery)
	if err != nil {
		return fmt.Errorf("marshal tracked request failed: %v", err)

	req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create tracked request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("tracked request failed: %v", err)
	defer resp.Body.Close()

	var trackedResult map[string]interface{}
	if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
		return fmt.Errorf("decode tracked response failed: %v", err)

	fmt.Printf("\n正在追踪数据源 '%s' 中未追踪的对象:\n", sourceName)

	stats := struct {
		TrackedTables   int
		TrackedViews    int
		UntrackedTables int
		UntrackedViews  int

	// 3. 追踪未追踪的对象
	if len(result.Result) > 1 { // 跳过标题行
		for _, row := range result.Result[1:] {
			schema := row[0]
			objectName := row[1]
			objectType := row[2]

			// 检查对象是否已被追踪
			isTracked := false
			if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
				for _, source := range sources {
					if sourceMap, ok := source.(map[string]interface{}); ok {
						if sourceTables, ok := sourceMap["tables"].([]interface{}); ok {
							for _, sourceTable := range sourceTables {
								if tableMap, ok := sourceTable.(map[string]interface{}); ok {
									if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
										if tableInfo["name"] == objectName && tableInfo["schema"] == schema {
											isTracked = true
											if objectType == "BASE TABLE" {
											} else {

			if !isTracked {
				fmt.Printf("追踪新的 %s: %s.%s\n", strings.ToLower(objectType), schema, objectName)
				if err := TrackTableOrView(sourceName, schema, objectName, objectType); err != nil {
					fmt.Printf("无法追踪 %s %s.%s: %v\n", strings.ToLower(objectType), schema, objectName, err)

				if objectType == "BASE TABLE" {
				} else {

	fmt.Printf("  - 已追踪: %d\n", stats.TrackedTables)
	fmt.Printf("  - 新追踪: %d\n", stats.UntrackedTables)
	fmt.Printf("  - 总计: %d\n", stats.TrackedTables+stats.UntrackedTables)
	fmt.Printf("  - 已追踪: %d\n", stats.TrackedViews)
	fmt.Printf("  - 新追踪: %d\n", stats.UntrackedViews)
	fmt.Printf("  - 总计: %d\n", stats.TrackedViews+stats.UntrackedViews)
	return nil

// GetDatabaseSchemas 获取所有数据库的 schema 信息
func GetDatabaseSchemas(sourceName string) error {
	// 构建获取 schema 的查询
	schemaQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				FROM information_schema.schemata 
				WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				ORDER BY schema_name;

	jsonBody, err := json.Marshal(schemaQuery)
	if err != nil {
		return fmt.Errorf("marshal schema request failed: %v", err)

	// 发送请求到 data endpoint
	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create schema request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("schema request failed: %v", err)
	defer resp.Body.Close()

	var result map[string]interface{}
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return fmt.Errorf("decode schema response failed: %v", err)

	// 输出 schema 信息
	fmt.Printf("\n数据源 '%s' 中的 Schema:\n", sourceName)

	if result["result"] != nil {
		rows, ok := result["result"].([]interface{})
		if !ok || len(rows) < 2 { // 第一行是列名
			return fmt.Errorf("unexpected schema result format")

		// 跳过第一行(列名),显示所有 schema
		for _, row := range rows[1:] {
			if schemaRow, ok := row.([]interface{}); ok && len(schemaRow) > 0 {
				if schemaName, ok := schemaRow[0].(string); ok {
					fmt.Printf("Schema: %s\n", schemaName)

	return nil

// GetUnTrackedTablesAndViews 获取未追踪的表和视图
func GetUnTrackedTablesAndViews(sourceName string) error {
	// 1. 获取已追踪的对象
	trackedQuery := map[string]interface{}{
		"type":    "export_metadata",
		"args":    map[string]interface{}{},
		"version": 2,

	jsonBody, err := json.Marshal(trackedQuery)
	if err != nil {
		return fmt.Errorf("marshal tracked request failed: %v", err)

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create tracked request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("tracked request failed: %v", err)
	defer resp.Body.Close()

	// 2. 获取所有表和视图
	allObjectsQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				FROM information_schema.tables 
				WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				AND table_type IN ('BASE TABLE', 'VIEW')
				ORDER BY table_schema, table_type, table_name;

	jsonBody, err = json.Marshal(allObjectsQuery)
	if err != nil {
		return fmt.Errorf("marshal objects request failed: %v", err)

	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	req, err = http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create objects request failed: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("objects request failed: %v", err)
	defer resp.Body.Close()

	var result struct {
		Result [][]string `json:"result"`

	body, _ := ioutil.ReadAll(resp.Body)
	fmt.Printf("Raw Response: %s\n", string(body))

	if err = json.Unmarshal(body, &result); err != nil {
		return fmt.Errorf("decode objects response failed: %v", err)

	// 组织数据结构
	type SchemaObjects struct {
		Tables []string
		Views  []string
	untracked := make(map[string]*SchemaObjects)

	// 跳过标题行
	if len(result.Result) > 1 {
		for _, row := range result.Result[1:] {
			schema := row[0]
			objName := row[1]
			objType := row[2]

			if _, exists := untracked[schema]; !exists {
				untracked[schema] = &SchemaObjects{
					Tables: make([]string, 0),
					Views:  make([]string, 0),

			if objType == "BASE TABLE" {
				untracked[schema].Tables = append(untracked[schema].Tables, objName)
			} else if objType == "VIEW" {
				untracked[schema].Views = append(untracked[schema].Views, objName)

	// 输出结果
	fmt.Printf("\n数据源 '%s' 中未追踪的对象:\n", sourceName)

	for schema, objects := range untracked {
		if len(objects.Tables) > 0 || len(objects.Views) > 0 {
			fmt.Printf("\nSchema: %s\n", schema)
			if len(objects.Tables) > 0 {
				fmt.Println("  未追踪的表格:")
				for _, table := range objects.Tables {
					fmt.Printf("    - %s\n", table)
			if len(objects.Views) > 0 {
				fmt.Println("  未追踪的视图:")
				for _, view := range objects.Views {
					fmt.Printf("    - %s\n", view)

	return nil

// GetAllSources 获取所有数据源
func GetAllSources() ([]string, error) {
	// 获取所有数据源
	metadataQuery := map[string]interface{}{
		"type": "export_metadata",
		"args": map[string]interface{}{},
		"version": 2,

	jsonBody, err := json.Marshal(metadataQuery)
	if err != nil {
		return nil, fmt.Errorf("元数据请求编码失败: %v", err)

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return nil, fmt.Errorf("创建元数据请求失败: %v", err)

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, fmt.Errorf("元数据请求失败: %v", err)
	defer resp.Body.Close()

	var result map[string]interface{}
	if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return nil, fmt.Errorf("解码元数据响应失败: %v", err)

	var sources []string
	if metadata, ok := result["metadata"].(map[string]interface{}); ok {
		if sourcesList, ok := metadata["sources"].([]interface{}); ok {
			for _, source := range sourcesList {
				if sourceMap, ok := source.(map[string]interface{}); ok {
					if name, ok := sourceMap["name"].(string); ok {
						sources = append(sources, name)

	return sources, nil

// TrackRelationship 追踪单个外键关系
func TrackRelationship(sourceName, schemaName, tableName, columnName, foreignSchemaName, foreignTableName, foreignColumnName, constraintName string) error {
    // 创建对象关系
    relationshipName := fmt.Sprintf("%s_%s_rel", tableName, columnName)
    createRelationshipQuery := map[string]interface{}{
        "type": "pg_create_object_relationship",
        "args": map[string]interface{}{
            "source": sourceName,
            "table": map[string]interface{}{
                "schema": schemaName,
                "name":   tableName,
            "name": relationshipName,
            "using": map[string]interface{}{
                "foreign_key_constraint_on": columnName,

    jsonBody, err := json.Marshal(createRelationshipQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)

    // 打印请求内容以便调试
    fmt.Printf("创建关系请求:\n%s\n", string(jsonBody))

    req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return fmt.Errorf("读取响应失败: %v", err)

    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("创建关系失败,状态码 %d: %s", resp.StatusCode, string(body))

    fmt.Printf("成功创建关系: %s\n", relationshipName)
    return nil

// TrackUnTrackedRelationships 追踪所有未追踪的外键关系
func TrackUnTrackedRelationships(sourceName string) error {
    // 查询所有外键关系
    relationshipQuery := map[string]interface{}{
        "type": "run_sql",
        "args": map[string]interface{}{
            "source": sourceName,
            "sql": `
                    tc.table_schema as schema_name,
                    ccu.table_schema AS foreign_schema_name,
                    ccu.table_name AS foreign_table_name,
                    ccu.column_name AS foreign_column_name
                    information_schema.table_constraints AS tc
                    JOIN information_schema.key_column_usage AS kcu
                        ON tc.constraint_name = kcu.constraint_name
                        AND tc.table_schema = kcu.table_schema
                    JOIN information_schema.constraint_column_usage AS ccu
                        ON ccu.constraint_name = tc.constraint_name
                        AND ccu.table_schema = tc.table_schema
                    tc.constraint_type = 'FOREIGN KEY'
                    AND tc.table_schema NOT IN ('information_schema', 'pg_catalog')
                ORDER BY

    dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
    jsonBody, err := json.Marshal(relationshipQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)

    req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    defer resp.Body.Close()

    var result struct {
        Result [][]string `json:"result"`
    if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return fmt.Errorf("解码关系响应失败: %v", err)

    // 获取已追踪的关系
    trackedQuery := map[string]interface{}{
        "type": "export_metadata",
        "args": map[string]interface{}{},
        "version": 2,

    jsonBody, err = json.Marshal(trackedQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)

    req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    resp, err = client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    defer resp.Body.Close()

    var trackedResult map[string]interface{}
    if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
        return fmt.Errorf("解码已追踪关系响应失败: %v", err)

    fmt.Printf("\n追踪数据源 '%s' 中的外键关系:\n", sourceName)

    var trackedCount, untrackedCount, errorCount int

    if len(result.Result) > 1 { // 跳过标题行
        for _, row := range result.Result[1:] {
            schemaName := row[0]
            constraintName := row[1]
            tableName := row[2]
            columnName := row[3]
            foreignSchemaName := row[4]
            foreignTableName := row[5]
            foreignColumnName := row[6]

            // 检查关系是否已被追踪
            isTracked := false
            if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
                for _, source := range sources {
                    if sourceMap, ok := source.(map[string]interface{}); ok {
                        if sourceName == sourceMap["name"] {
                            if tables, ok := sourceMap["tables"].([]interface{}); ok {
                                for _, table := range tables {
                                    if tableMap, ok := table.(map[string]interface{}); ok {
                                        if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
                                            if tableInfo["name"] == tableName && tableInfo["schema"] == schemaName {
                                                if relationships, ok := tableMap["object_relationships"].([]interface{}); ok {
                                                    for _, rel := range relationships {
                                                        if relMap, ok := rel.(map[string]interface{}); ok {
                                                            if using, ok := relMap["using"].(map[string]interface{}); ok {
                                                                if fKeyConstraint, ok := using["foreign_key_constraint_on"].(string); ok {
                                                                    if fKeyConstraint == columnName {
                                                                        isTracked = true

            if !isTracked {
                fmt.Printf("追踪关系: %s.%s.%s -> %s.%s.%s (约束: %s)\n",
                    schemaName, tableName, columnName,
                    foreignSchemaName, foreignTableName, foreignColumnName,

                err := TrackRelationship(

                if err != nil {
                    fmt.Printf("追踪关系出错: %v\n", err)
                } else {

    fmt.Printf("- 已追踪的关系: %d\n", trackedCount)
    fmt.Printf("- 新追踪的关系: %d\n", untrackedCount)
    fmt.Printf("- 追踪失败: %d\n", errorCount)
    fmt.Printf("- 关系总数: %d\n", trackedCount+untrackedCount+errorCount)

    return nil

func main() {
	fmt.Println("测试 Hasura 连接...")
	if err := TestHasuraConnection(); err != nil {
		fmt.Printf("连接测试失败: %v\n", err)

	// 获取所有数据源
	sources, err := GetAllSources()
	if err != nil {
		fmt.Printf("获取数据源失败: %v\n", err)

	fmt.Printf("\n发现 %d 个数据源\n", len(sources))
	// 检查并追踪每个数据源中未追踪的对象
	for _, source := range sources {
		fmt.Printf("\n=== 处理数据源: %s ===\n", source)
		// 显示未追踪的对象
		if err := GetUnTrackedTablesAndViews(source); err != nil {
			fmt.Printf("获取未追踪对象失败,数据源 %s: %v\n", source, err)

		// 追踪未追踪的对象
		if err := TrackUnTrackedObjects(source); err != nil {
			fmt.Printf("追踪对象失败,数据源 %s: %v\n", source, err)

    // 检查每个数据源中未追踪的关系
    // 处理每个数据源
    for _, source := range sources {
        fmt.Printf("\n=== 处理数据源: %s ===\n", source)
        // 显示未追踪的��系
		if err := GetUnTrackedTablesAndViews(source); err != nil {
            fmt.Printf("获取未追踪关系失败,数据源 %s: %v\n", source, err)

        // 追踪未追踪的关系
        if err := TrackUnTrackedRelationships(source); err != nil {
            fmt.Printf("追踪关系失败,数据源 %s: %v\n", source, err)




  • 专为高性能汽车设计的Armv9架构的Neoverse V3AE CPU基础知识与软件编码特性解析
  • 管理系统前端框架开发案例学习
  • redis-stack redisSearch环境安装搭建
  • 记录一下,解决js内存溢出npm ERR! code ELIFECYCLEnpm ERR! errno 134 以及 errno 9009
  • 智创 AI 新视界 -- AI 引领下的未来社会变革预测(16 - 6)
  • DP协议:术语表
  • Vue 3初始化工程
  • 从模型到实际:人工智能项目落地的关键要素
  • 【深度学习】深刻理解BERT
  • 4.长度最小的子数组:
  • Text2SQL(NL2sql)对话数据库:设计、实现细节与挑战
  • 上传word表格识别出table表格 转为二维数组并显示(vue)
  • C# 中的异常处理:构建健壮和可靠的程序
  • 简单易懂讲解LVM
  • 从方向导数到梯度:深度学习中的关键数学概念详解
  • 在ARM Linux应用层下使用SPI驱动WS2812
  • 数据结构 (36)各种排序方法的综合比较
  • vbo总结和使用
  • Datawhale AI 冬令营 模型微调
  • 动态规划part01