当前位置: 首页 > article >正文

Hasura 中间件API go操作示例

功能概述

这是一个用 Go 语言编写的 Hasura GraphQL 引擎管理工具,主要功能包括:

  • 自动发现和追踪数据库中的表和视图
  • 自动发现和追踪表之间的外键关系

3. 自动配置基础的访问权限

4. 提供数据库结构的统计信息

核心功能模块

1. 连接管理

  • TestHasuraConnection() - 测试 Hasura 连接是否正常
  • GetAllSources() - 获取所有配置的数据源

2. Schema 管理

  • GetDatabaseSchemas() - 获取数据库所有 schema 信息
  • GetDatabaseSources() - 获取数据源配置信息

3. 表和视图追踪

  • TrackUnTrackedObjects() - 自动追踪未追踪的表和视图
  • TrackTableOrView() - 追踪单个表或视图
  • GetUnTrackedTablesAndViews() - 获取未追踪的表和视图列表

4. 关系管理

  • TrackUnTrackedRelationships() - 自动追踪未追踪的外键关系
  • TrackRelationship() - 追踪单个外键关系

技术特点

  • 自动化管理
  • 自动发现数据库对象
  • 自动配置追踪关系
  • 自动设置基础权限
  • 完善的错误处理
  • 详细的错误信息
  • 失败处理机制
  • 统计信息输出
  • 模块化设计
  • 功能模块清晰分离
  • 代码复用性高
  • 易于维护和扩展
  • 友好的输出
  • 详细的进度信息
  • 清晰的统计数据
  • 格式化的结果展示

应用场景

1. 数据库初始化配置

  • 数据库结构变更后的同步
  • 新增数据源的快速接入
  • 数据库结构审计

使用建议

  • 首次配置 Hasura 时使用
  • 数据库结构发生变更后运行
  • 定期运行以确保配置同步
  • 用于检查数据库结构完整性

这个工具极大地简化了 Hasura GraphQL 引擎的配置过程,特别适合处理大型数据库的初始化配置和后续维护工作。

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"strings"
)

const (
	// 从环境变量获取 Hasura 配置
	hasuraEndpoint   = "http://127.0.0.1:8080/v1/graphql"  // 更新为实际的 Hasura 端点
	metadataEndpoint = "http://127.0.0.1:8080/v1/metadata" // 更新为实际的 metadata 端点
	adminSecret      = "HEllo"                        // 更新为实际的 admin secret
)

// GraphQLRequest represents a GraphQL query request
type GraphQLRequest struct {
	Query     string                 `json:"query"`
	Variables map[string]interface{} `json:"variables,omitempty"`
}

// TestHasuraConnection tests the connection to Hasura
func TestHasuraConnection() error {
	// Simple health check query
	query := `
		query HealthCheck {
			__schema {
				__typename
			}
		}
	`

	// Create request body
	reqBody := GraphQLRequest{
		Query: query,
	}
	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("请求体编码失败: %v", err)
	}

	// Create HTTP request
	req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建请求失败: %v", err)
	}

	// Add headers
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	// Send request
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("请求失败: %v", err)
	}
	defer resp.Body.Close()

	// Read response
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("读取响应失败: %v", err)
	}

	// Check response status
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("意外的状态码: %d, 响应内容: %s", resp.StatusCode, string(body))
	}

	fmt.Printf("连接成功! 响应: %s\n", string(body))
	return nil
}

// TestQueryUsers tests querying users from Hasura
func TestQueryUsers() error {
	query := `
		query MyQuery {
  role(where: {id: {_eq: 34}}) {
    id
    role_name
  }
}
	`

	reqBody := GraphQLRequest{
		Query: query,
	}
	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("marshal request failed: %v", err)
	}

	req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("request failed: %v", err)
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
	}

	fmt.Printf("查询用户成功! 响应: %s\n", string(body))
	return nil
}

// HasuraMetadata 结构体用于解析 Hasura 元数据
type HasuraMetadata struct {
	Sources []struct {
		Name          string `json:"name"`
		Configuration struct {
			ConnectionInfo struct {
				DatabaseURL string `json:"database_url"`
			} `json:"connection_info"`
		} `json:"configuration"`
	} `json:"sources"`
}

func GetDatabaseSources() error {
	reqBody := map[string]interface{}{
		"type":    "export_metadata",
		"args":    map[string]interface{}{},
		"version": 2,
	}

	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return fmt.Errorf("请求编码失败: %v", err)
	}

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建请求失败: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("请求失败: %v", err)
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
	}

	var result map[string]interface{}
	if err := json.Unmarshal(body, &result); err != nil {
		return fmt.Errorf("parse metadata failed: %v", err)
	}

	// 获取 sources 数组
	sources, ok := result["metadata"].(map[string]interface{})["sources"].([]interface{})
	if !ok {
		return fmt.Errorf("在元数据中未找到数据源")
	}

	fmt.Println("\n数据库连接:")
	fmt.Println("--------------------")
	for _, source := range sources {
		if sourceMap, ok := source.(map[string]interface{}); ok {
			if name, ok := sourceMap["name"].(string); ok {
				fmt.Printf("Name: %s\n", name)
				fmt.Println("--------------------")
			}
		}
	}

	return nil
}

// TrackTableOrView 追踪单个表或视图
func TrackTableOrView(sourceName, schema, objectName, objectType string) error {
	// 构建 track 请求
	trackQuery := map[string]interface{}{
		"type": "pg_track_table",  // 修改为正确的 API 端点
		"args": map[string]interface{}{
			"source": sourceName,
			"table": map[string]interface{}{
				"schema": schema,
				"name": objectName,
			},
			"configuration": map[string]interface{}{
				"custom_root_fields": map[string]interface{}{
					"select": fmt.Sprintf("select_%s", objectName),
					"select_by_pk": fmt.Sprintf("select_%s_by_pk", objectName),
					"select_aggregate": fmt.Sprintf("select_%s_aggregate", objectName),
					"insert": fmt.Sprintf("insert_%s", objectName),
					"insert_one": fmt.Sprintf("insert_%s_one", objectName),
					"update": fmt.Sprintf("update_%s", objectName),
					"update_by_pk": fmt.Sprintf("update_%s_by_pk", objectName),
					"delete": fmt.Sprintf("delete_%s", objectName),
					"delete_by_pk": fmt.Sprintf("delete_%s_by_pk", objectName),
				},
			},
		},
	}

	jsonBody, err := json.Marshal(trackQuery)
	if err != nil {
		return fmt.Errorf("追踪请求编码失败: %v", err)
	}

	// 打���请求内容以便调试
	fmt.Printf("追踪 %s.%s 的请求:\n%s\n", schema, objectName, string(jsonBody))

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("创建追踪请求失败: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("追踪请求失败: %v", err)
	}
	defer resp.Body.Close()

	// 读取并打印响应内容
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return fmt.Errorf("read response failed: %v", err)
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("追踪失败,状态码 %d: %s", resp.StatusCode, string(body))
	}

	fmt.Printf("追踪响应: %s\n", string(body))

	// 如果是视图,可能需要额外的配置
	if objectType == "VIEW" {
		// 为视图添加额外的配置
		configQuery := map[string]interface{}{
			"type": "pg_set_table_customization",
			"args": map[string]interface{}{
				"source": sourceName,
				"table": map[string]interface{}{
					"schema": schema,
					"name": objectName,
				},
				"configuration": map[string]interface{}{
					"identifier": objectName,
					"custom_name": objectName,
				},
			},
		}

		jsonBody, err = json.Marshal(configQuery)
		if err != nil {
			return fmt.Errorf("marshal view config request failed: %v", err)
		}

		req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
		if err != nil {
			return fmt.Errorf("create view config request failed: %v", err)
		}

		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

		resp, err = client.Do(req)
		if err != nil {
			return fmt.Errorf("view config request failed: %v", err)
		}
		defer resp.Body.Close()

		if resp.StatusCode != http.StatusOK {
			body, _ := ioutil.ReadAll(resp.Body)
			return fmt.Errorf("view config failed with status %d: %s", resp.StatusCode, string(body))
		}
	}

	// 添加权限(可选)
	permissionQuery := map[string]interface{}{
		"type": "pg_create_select_permission",
		"args": map[string]interface{}{
			"source": sourceName,
			"table": map[string]interface{}{
				"schema": schema,
				"name": objectName,
			},
			"role": "user",
			"permission": map[string]interface{}{
				"columns": "*",
				"filter": map[string]interface{}{},
				"limit": 100,
			},
		},
	}

	jsonBody, err = json.Marshal(permissionQuery)
	if err != nil {
		return fmt.Errorf("marshal permission request failed: %v", err)
	}

	req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create permission request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("permission request failed: %v", err)
	}
	defer resp.Body.Close()

	// 不要因为权限设置失败而中断整个过程
	if resp.StatusCode != http.StatusOK {
		body, _ := ioutil.ReadAll(resp.Body)
		fmt.Printf("Warning: Failed to set permissions for %s.%s: %s\n", schema, objectName, string(body))
	}

	return nil
}

// TrackUnTrackedObjects 追踪未追踪的表和视图
func TrackUnTrackedObjects(sourceName string) error {
	// 1. 获取所有表和视图
	objectsQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				SELECT 
					table_schema,
					
					table_name,
					table_type
				FROM information_schema.tables 
				WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				AND table_type IN ('BASE TABLE', 'VIEW')
				ORDER BY table_schema, table_type, table_name;
			`,
		},
	}

	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	jsonBody, err := json.Marshal(objectsQuery)
	if err != nil {
		return fmt.Errorf("marshal objects request failed: %v", err)
	}

	req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create objects request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("objects request failed: %v", err)
	}
	defer resp.Body.Close()

	var result struct {
		Result [][]string `json:"result"`
	}
	if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return fmt.Errorf("decode objects response failed: %v", err)
	}

	// 2. 获取已追踪的对象
	trackedQuery := map[string]interface{}{
		"type": "export_metadata",
		"args": map[string]interface{}{},
		"version": 2,
	}

	jsonBody, err = json.Marshal(trackedQuery)
	if err != nil {
		return fmt.Errorf("marshal tracked request failed: %v", err)
	}

	req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create tracked request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("tracked request failed: %v", err)
	}
	defer resp.Body.Close()

	var trackedResult map[string]interface{}
	if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
		return fmt.Errorf("decode tracked response failed: %v", err)
	}

	fmt.Printf("\n正在追踪数据源 '%s' 中未追踪的对象:\n", sourceName)
	fmt.Println("--------------------")

	stats := struct {
		TrackedTables   int
		TrackedViews    int
		UntrackedTables int
		UntrackedViews  int
	}{}

	// 3. 追踪未追踪的对象
	if len(result.Result) > 1 { // 跳过标题行
		for _, row := range result.Result[1:] {
			schema := row[0]
			objectName := row[1]
			objectType := row[2]

			// 检查对象是否已被追踪
			isTracked := false
			if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
				for _, source := range sources {
					if sourceMap, ok := source.(map[string]interface{}); ok {
						if sourceTables, ok := sourceMap["tables"].([]interface{}); ok {
							for _, sourceTable := range sourceTables {
								if tableMap, ok := sourceTable.(map[string]interface{}); ok {
									if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
										if tableInfo["name"] == objectName && tableInfo["schema"] == schema {
											isTracked = true
											if objectType == "BASE TABLE" {
												stats.TrackedTables++
											} else {
												stats.TrackedViews++
											}
											break
										}
									}
								}
							}
						}
					}
				}
			}

			if !isTracked {
				fmt.Printf("追踪新的 %s: %s.%s\n", strings.ToLower(objectType), schema, objectName)
				
				if err := TrackTableOrView(sourceName, schema, objectName, objectType); err != nil {
					fmt.Printf("无法追踪 %s %s.%s: %v\n", strings.ToLower(objectType), schema, objectName, err)
					continue
				}

				if objectType == "BASE TABLE" {
					stats.UntrackedTables++
				} else {
					stats.UntrackedViews++
				}
			}
		}
	}

	fmt.Println("--------------------")
	fmt.Println("\n统计信息:")
	fmt.Printf("表格:\n")
	fmt.Printf("  - 已追踪: %d\n", stats.TrackedTables)
	fmt.Printf("  - 新追踪: %d\n", stats.UntrackedTables)
	fmt.Printf("  - 总计: %d\n", stats.TrackedTables+stats.UntrackedTables)
	fmt.Printf("视图:\n")
	fmt.Printf("  - 已追踪: %d\n", stats.TrackedViews)
	fmt.Printf("  - 新追踪: %d\n", stats.UntrackedViews)
	fmt.Printf("  - 总计: %d\n", stats.TrackedViews+stats.UntrackedViews)
	fmt.Println("--------------------")
	
	return nil
}

// GetDatabaseSchemas 获取所有数据库的 schema 信息
func GetDatabaseSchemas(sourceName string) error {
	// 构建获取 schema 的查询
	schemaQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				SELECT 
					schema_name 
				FROM information_schema.schemata 
				WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				ORDER BY schema_name;
			`,
		},
	}

	jsonBody, err := json.Marshal(schemaQuery)
	if err != nil {
		return fmt.Errorf("marshal schema request failed: %v", err)
	}

	// 发送请求到 data endpoint
	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create schema request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("schema request failed: %v", err)
	}
	defer resp.Body.Close()

	var result map[string]interface{}
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return fmt.Errorf("decode schema response failed: %v", err)
	}

	// 输出 schema 信息
	fmt.Printf("\n数据源 '%s' 中的 Schema:\n", sourceName)
	fmt.Println("--------------------")

	if result["result"] != nil {
		rows, ok := result["result"].([]interface{})
		if !ok || len(rows) < 2 { // 第一行是列名
			return fmt.Errorf("unexpected schema result format")
		}

		// 跳过第一行(列名),显示所有 schema
		for _, row := range rows[1:] {
			if schemaRow, ok := row.([]interface{}); ok && len(schemaRow) > 0 {
				if schemaName, ok := schemaRow[0].(string); ok {
					fmt.Printf("Schema: %s\n", schemaName)
				}
			}
		}
	}

	fmt.Println("--------------------")
	return nil
}

// GetUnTrackedTablesAndViews 获取未追踪的表和视图
func GetUnTrackedTablesAndViews(sourceName string) error {
	// 1. 获取已追踪的对象
	trackedQuery := map[string]interface{}{
		"type":    "export_metadata",
		"args":    map[string]interface{}{},
		"version": 2,
	}

	jsonBody, err := json.Marshal(trackedQuery)
	if err != nil {
		return fmt.Errorf("marshal tracked request failed: %v", err)
	}

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create tracked request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("tracked request failed: %v", err)
	}
	defer resp.Body.Close()

	// 2. 获取所有表和视图
	allObjectsQuery := map[string]interface{}{
		"type": "run_sql",
		"args": map[string]interface{}{
			"source": sourceName,
			"sql": `
				SELECT 
					table_schema,
					table_name,
					table_type
				FROM information_schema.tables 
				WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
				AND table_type IN ('BASE TABLE', 'VIEW')
				ORDER BY table_schema, table_type, table_name;
			`,
		},
	}

	jsonBody, err = json.Marshal(allObjectsQuery)
	if err != nil {
		return fmt.Errorf("marshal objects request failed: %v", err)
	}

	dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
	req, err = http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return fmt.Errorf("create objects request failed: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	resp, err = client.Do(req)
	if err != nil {
		return fmt.Errorf("objects request failed: %v", err)
	}
	defer resp.Body.Close()

	var result struct {
		Result [][]string `json:"result"`
	}

	body, _ := ioutil.ReadAll(resp.Body)
	fmt.Printf("Raw Response: %s\n", string(body))

	if err = json.Unmarshal(body, &result); err != nil {
		return fmt.Errorf("decode objects response failed: %v", err)
	}

	// 组织数据结构
	type SchemaObjects struct {
		Tables []string
		Views  []string
	}
	untracked := make(map[string]*SchemaObjects)

	// 跳过标题行
	if len(result.Result) > 1 {
		for _, row := range result.Result[1:] {
			schema := row[0]
			objName := row[1]
			objType := row[2]

			if _, exists := untracked[schema]; !exists {
				untracked[schema] = &SchemaObjects{
					Tables: make([]string, 0),
					Views:  make([]string, 0),
				}
			}

			if objType == "BASE TABLE" {
				untracked[schema].Tables = append(untracked[schema].Tables, objName)
			} else if objType == "VIEW" {
				untracked[schema].Views = append(untracked[schema].Views, objName)
			}
		}
	}

	// 输出结果
	fmt.Printf("\n数据源 '%s' 中未追踪的对象:\n", sourceName)
	fmt.Println("--------------------")

	for schema, objects := range untracked {
		if len(objects.Tables) > 0 || len(objects.Views) > 0 {
			fmt.Printf("\nSchema: %s\n", schema)
			if len(objects.Tables) > 0 {
				fmt.Println("  未追踪的表格:")
				for _, table := range objects.Tables {
					fmt.Printf("    - %s\n", table)
				}
			}
			if len(objects.Views) > 0 {
				fmt.Println("  未追踪的视图:")
				for _, view := range objects.Views {
					fmt.Printf("    - %s\n", view)
				}
			}
		}
	}

	fmt.Println("--------------------")
	return nil
}

// GetAllSources 获取所有数据源
func GetAllSources() ([]string, error) {
	// 获取所有数据源
	metadataQuery := map[string]interface{}{
		"type": "export_metadata",
		"args": map[string]interface{}{},
		"version": 2,
	}

	jsonBody, err := json.Marshal(metadataQuery)
	if err != nil {
		return nil, fmt.Errorf("元数据请求编码失败: %v", err)
	}

	req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
	if err != nil {
		return nil, fmt.Errorf("创建元数据请求失败: %v", err)
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, fmt.Errorf("元数据请求失败: %v", err)
	}
	defer resp.Body.Close()

	var result map[string]interface{}
	if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return nil, fmt.Errorf("解码元数据响应失败: %v", err)
	}

	var sources []string
	if metadata, ok := result["metadata"].(map[string]interface{}); ok {
		if sourcesList, ok := metadata["sources"].([]interface{}); ok {
			for _, source := range sourcesList {
				if sourceMap, ok := source.(map[string]interface{}); ok {
					if name, ok := sourceMap["name"].(string); ok {
						sources = append(sources, name)
					}
				}
			}
		}
	}

	return sources, nil
}

// TrackRelationship 追踪单个外键关系
func TrackRelationship(sourceName, schemaName, tableName, columnName, foreignSchemaName, foreignTableName, foreignColumnName, constraintName string) error {
    // 创建对象关系
    relationshipName := fmt.Sprintf("%s_%s_rel", tableName, columnName)
    createRelationshipQuery := map[string]interface{}{
        "type": "pg_create_object_relationship",
        "args": map[string]interface{}{
            "source": sourceName,
            "table": map[string]interface{}{
                "schema": schemaName,
                "name":   tableName,
            },
            "name": relationshipName,
            "using": map[string]interface{}{
                "foreign_key_constraint_on": columnName,
            },
        },
    }

    jsonBody, err := json.Marshal(createRelationshipQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)
    }

    // 打印请求内容以便调试
    fmt.Printf("创建关系请求:\n%s\n", string(jsonBody))

    req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)
    }

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return fmt.Errorf("读取响应失败: %v", err)
    }

    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("创建关系失败,状态码 %d: %s", resp.StatusCode, string(body))
    }

    fmt.Printf("成功创建关系: %s\n", relationshipName)
    return nil
}

// TrackUnTrackedRelationships 追踪所有未追踪的外键关系
func TrackUnTrackedRelationships(sourceName string) error {
    // 查询所有外键关系
    relationshipQuery := map[string]interface{}{
        "type": "run_sql",
        "args": map[string]interface{}{
            "source": sourceName,
            "sql": `
                SELECT
                    tc.table_schema as schema_name,
                    tc.constraint_name,
                    tc.table_name,
                    kcu.column_name,
                    ccu.table_schema AS foreign_schema_name,
                    ccu.table_name AS foreign_table_name,
                    ccu.column_name AS foreign_column_name
                FROM
                    information_schema.table_constraints AS tc
                    JOIN information_schema.key_column_usage AS kcu
                        ON tc.constraint_name = kcu.constraint_name
                        AND tc.table_schema = kcu.table_schema
                    JOIN information_schema.constraint_column_usage AS ccu
                        ON ccu.constraint_name = tc.constraint_name
                        AND ccu.table_schema = tc.table_schema
                WHERE
                    tc.constraint_type = 'FOREIGN KEY'
                    AND tc.table_schema NOT IN ('information_schema', 'pg_catalog')
                ORDER BY
                    tc.table_schema,
                    tc.table_name,
                    tc.constraint_name;
            `,
        },
    }

    dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
    jsonBody, err := json.Marshal(relationshipQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)
    }

    req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)
    }

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    }
    defer resp.Body.Close()

    var result struct {
        Result [][]string `json:"result"`
    }
    if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return fmt.Errorf("解码关系响应失败: %v", err)
    }

    // 获取已追踪的关系
    trackedQuery := map[string]interface{}{
        "type": "export_metadata",
        "args": map[string]interface{}{},
        "version": 2,
    }

    jsonBody, err = json.Marshal(trackedQuery)
    if err != nil {
        return fmt.Errorf("关系请求编码失败: %v", err)
    }

    req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
    if err != nil {
        return fmt.Errorf("创建关系请求失败: %v", err)
    }

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Hasura-Admin-Secret", adminSecret)

    resp, err = client.Do(req)
    if err != nil {
        return fmt.Errorf("关系请求失败: %v", err)
    }
    defer resp.Body.Close()

    var trackedResult map[string]interface{}
    if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
        return fmt.Errorf("解码已追踪关系响应失败: %v", err)
    }

    fmt.Printf("\n追踪数据源 '%s' 中的外键关系:\n", sourceName)
    fmt.Println("--------------------")

    var trackedCount, untrackedCount, errorCount int

    if len(result.Result) > 1 { // 跳过标题行
        for _, row := range result.Result[1:] {
            schemaName := row[0]
            constraintName := row[1]
            tableName := row[2]
            columnName := row[3]
            foreignSchemaName := row[4]
            foreignTableName := row[5]
            foreignColumnName := row[6]

            // 检查关系是否已被追踪
            isTracked := false
            if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
                for _, source := range sources {
                    if sourceMap, ok := source.(map[string]interface{}); ok {
                        if sourceName == sourceMap["name"] {
                            if tables, ok := sourceMap["tables"].([]interface{}); ok {
                                for _, table := range tables {
                                    if tableMap, ok := table.(map[string]interface{}); ok {
                                        if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
                                            if tableInfo["name"] == tableName && tableInfo["schema"] == schemaName {
                                                if relationships, ok := tableMap["object_relationships"].([]interface{}); ok {
                                                    for _, rel := range relationships {
                                                        if relMap, ok := rel.(map[string]interface{}); ok {
                                                            if using, ok := relMap["using"].(map[string]interface{}); ok {
                                                                if fKeyConstraint, ok := using["foreign_key_constraint_on"].(string); ok {
                                                                    if fKeyConstraint == columnName {
                                                                        isTracked = true
                                                                        trackedCount++
                                                                        break
                                                                    }
                                                                }
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }

            if !isTracked {
                fmt.Printf("追踪关系: %s.%s.%s -> %s.%s.%s (约束: %s)\n",
                    schemaName, tableName, columnName,
                    foreignSchemaName, foreignTableName, foreignColumnName,
                    constraintName)

                err := TrackRelationship(
                    sourceName,
                    schemaName,
                    tableName,
                    columnName,
                    foreignSchemaName,
                    foreignTableName,
                    foreignColumnName,
                    constraintName,
                )

                if err != nil {
                    fmt.Printf("追踪关系出错: %v\n", err)
                    errorCount++
                } else {
                    untrackedCount++
                }
            }
        }
    }

    fmt.Println("\n统计信息:")
    fmt.Printf("- 已追踪的关系: %d\n", trackedCount)
    fmt.Printf("- 新追踪的关系: %d\n", untrackedCount)
    fmt.Printf("- 追踪失败: %d\n", errorCount)
    fmt.Printf("- 关系总数: %d\n", trackedCount+untrackedCount+errorCount)
    fmt.Println("--------------------")

    return nil
}




func main() {
	fmt.Println("测试 Hasura 连接...")
	if err := TestHasuraConnection(); err != nil {
		fmt.Printf("连接测试失败: %v\n", err)
		os.Exit(1)
	}

	// 获取所有数据源
	sources, err := GetAllSources()
	if err != nil {
		fmt.Printf("获取数据源失败: %v\n", err)
		os.Exit(1)
	}

	fmt.Printf("\n发现 %d 个数据源\n", len(sources))
	
	// 检查并追踪每个数据源中未追踪的对象
	for _, source := range sources {
		fmt.Printf("\n=== 处理数据源: %s ===\n", source)
		
		// 显示未追踪的对象
		if err := GetUnTrackedTablesAndViews(source); err != nil {
			fmt.Printf("获取未追踪对象失败,数据源 %s: %v\n", source, err)
			continue
		}

		// 追踪未追踪的对象
		if err := TrackUnTrackedObjects(source); err != nil {
			fmt.Printf("追踪对象失败,数据源 %s: %v\n", source, err)
			continue
		}
	}


    // 检查每个数据源中未追踪的关系
    // 处理每个数据源
    for _, source := range sources {
        fmt.Printf("\n=== 处理数据源: %s ===\n", source)
        
        // 显示未追踪的��系
		if err := GetUnTrackedTablesAndViews(source); err != nil {
            fmt.Printf("获取未追踪关系失败,数据源 %s: %v\n", source, err)
            continue
        }

        // 追踪未追踪的关系
        if err := TrackUnTrackedRelationships(source); err != nil {
            fmt.Printf("追踪关系失败,数据源 %s: %v\n", source, err)
            continue
        }
    }


	fmt.Println("\n处理完成!")
}



http://www.kler.cn/a/429395.html

相关文章:

  • SpringBoot集成Mqtt服务实现消费发布和接收消费
  • 南京地铁路线和站点2021-2030含规划线路shp数据arcmap地铁图坐标wgs84无偏移内容分析测评
  • SpringBoot之OriginTrackedPropertiesLoader类源码学习
  • React 第三方状态管理库相关 -- Redux MobX 篇
  • Mysql--实战篇--数据库设计(范式和反范式,数据表设计原则)
  • 基于R计算皮尔逊相关系数
  • 专为高性能汽车设计的Armv9架构的Neoverse V3AE CPU基础知识与软件编码特性解析
  • 管理系统前端框架开发案例学习
  • redis-stack redisSearch环境安装搭建
  • 记录一下,解决js内存溢出npm ERR! code ELIFECYCLEnpm ERR! errno 134 以及 errno 9009
  • 智创 AI 新视界 -- AI 引领下的未来社会变革预测(16 - 6)
  • DP协议:术语表
  • Vue 3初始化工程
  • 从模型到实际:人工智能项目落地的关键要素
  • 【深度学习】深刻理解BERT
  • 4.长度最小的子数组:
  • Text2SQL(NL2sql)对话数据库:设计、实现细节与挑战
  • 上传word表格识别出table表格 转为二维数组并显示(vue)
  • C# 中的异常处理:构建健壮和可靠的程序
  • 简单易懂讲解LVM
  • 从方向导数到梯度:深度学习中的关键数学概念详解
  • 在ARM Linux应用层下使用SPI驱动WS2812
  • 数据结构 (36)各种排序方法的综合比较
  • vbo总结和使用
  • Datawhale AI 冬令营 模型微调
  • 动态规划part01