Hasura 中间件API go操作示例
功能概述
这是一个用 Go 语言编写的 Hasura GraphQL 引擎管理工具,主要功能包括:
- 自动发现和追踪数据库中的表和视图
- 自动发现和追踪表之间的外键关系
3. 自动配置基础的访问权限
4. 提供数据库结构的统计信息
核心功能模块
1. 连接管理
- TestHasuraConnection() - 测试 Hasura 连接是否正常
- GetAllSources() - 获取所有配置的数据源
2. Schema 管理
- GetDatabaseSchemas() - 获取数据库所有 schema 信息
- GetDatabaseSources() - 获取数据源配置信息
3. 表和视图追踪
- TrackUnTrackedObjects() - 自动追踪未追踪的表和视图
- TrackTableOrView() - 追踪单个表或视图
- GetUnTrackedTablesAndViews() - 获取未追踪的表和视图列表
4. 关系管理
- TrackUnTrackedRelationships() - 自动追踪未追踪的外键关系
- TrackRelationship() - 追踪单个外键关系
技术特点
- 自动化管理
- 自动发现数据库对象
- 自动配置追踪关系
- 自动设置基础权限
- 完善的错误处理
- 详细的错误信息
- 失败处理机制
- 统计信息输出
- 模块化设计
- 功能模块清晰分离
- 代码复用性高
- 易于维护和扩展
- 友好的输出
- 详细的进度信息
- 清晰的统计数据
- 格式化的结果展示
应用场景
1. 数据库初始化配置
- 数据库结构变更后的同步
- 新增数据源的快速接入
- 数据库结构审计
使用建议
- 首次配置 Hasura 时使用
- 数据库结构发生变更后运行
- 定期运行以确保配置同步
- 用于检查数据库结构完整性
这个工具极大地简化了 Hasura GraphQL 引擎的配置过程,特别适合处理大型数据库的初始化配置和后续维护工作。
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
)
const (
// 从环境变量获取 Hasura 配置
hasuraEndpoint = "http://127.0.0.1:8080/v1/graphql" // 更新为实际的 Hasura 端点
metadataEndpoint = "http://127.0.0.1:8080/v1/metadata" // 更新为实际的 metadata 端点
adminSecret = "HEllo" // 更新为实际的 admin secret
)
// GraphQLRequest represents a GraphQL query request
type GraphQLRequest struct {
Query string `json:"query"`
Variables map[string]interface{} `json:"variables,omitempty"`
}
// TestHasuraConnection tests the connection to Hasura
func TestHasuraConnection() error {
// Simple health check query
query := `
query HealthCheck {
__schema {
__typename
}
}
`
// Create request body
reqBody := GraphQLRequest{
Query: query,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("请求体编码失败: %v", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建请求失败: %v", err)
}
// Add headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
// Send request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("请求失败: %v", err)
}
defer resp.Body.Close()
// Read response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %v", err)
}
// Check response status
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("意外的状态码: %d, 响应内容: %s", resp.StatusCode, string(body))
}
fmt.Printf("连接成功! 响应: %s\n", string(body))
return nil
}
// TestQueryUsers tests querying users from Hasura
func TestQueryUsers() error {
query := `
query MyQuery {
role(where: {id: {_eq: 34}}) {
id
role_name
}
}
`
reqBody := GraphQLRequest{
Query: query,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("marshal request failed: %v", err)
}
req, err := http.NewRequest("POST", hasuraEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %v", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read response failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}
fmt.Printf("查询用户成功! 响应: %s\n", string(body))
return nil
}
// HasuraMetadata 结构体用于解析 Hasura 元数据
type HasuraMetadata struct {
Sources []struct {
Name string `json:"name"`
Configuration struct {
ConnectionInfo struct {
DatabaseURL string `json:"database_url"`
} `json:"connection_info"`
} `json:"configuration"`
} `json:"sources"`
}
func GetDatabaseSources() error {
reqBody := map[string]interface{}{
"type": "export_metadata",
"args": map[string]interface{}{},
"version": 2,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("请求编码失败: %v", err)
}
req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("请求失败: %v", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read response failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("parse metadata failed: %v", err)
}
// 获取 sources 数组
sources, ok := result["metadata"].(map[string]interface{})["sources"].([]interface{})
if !ok {
return fmt.Errorf("在元数据中未找到数据源")
}
fmt.Println("\n数据库连接:")
fmt.Println("--------------------")
for _, source := range sources {
if sourceMap, ok := source.(map[string]interface{}); ok {
if name, ok := sourceMap["name"].(string); ok {
fmt.Printf("Name: %s\n", name)
fmt.Println("--------------------")
}
}
}
return nil
}
// TrackTableOrView 追踪单个表或视图
func TrackTableOrView(sourceName, schema, objectName, objectType string) error {
// 构建 track 请求
trackQuery := map[string]interface{}{
"type": "pg_track_table", // 修改为正确的 API 端点
"args": map[string]interface{}{
"source": sourceName,
"table": map[string]interface{}{
"schema": schema,
"name": objectName,
},
"configuration": map[string]interface{}{
"custom_root_fields": map[string]interface{}{
"select": fmt.Sprintf("select_%s", objectName),
"select_by_pk": fmt.Sprintf("select_%s_by_pk", objectName),
"select_aggregate": fmt.Sprintf("select_%s_aggregate", objectName),
"insert": fmt.Sprintf("insert_%s", objectName),
"insert_one": fmt.Sprintf("insert_%s_one", objectName),
"update": fmt.Sprintf("update_%s", objectName),
"update_by_pk": fmt.Sprintf("update_%s_by_pk", objectName),
"delete": fmt.Sprintf("delete_%s", objectName),
"delete_by_pk": fmt.Sprintf("delete_%s_by_pk", objectName),
},
},
},
}
jsonBody, err := json.Marshal(trackQuery)
if err != nil {
return fmt.Errorf("追踪请求编码失败: %v", err)
}
// 打���请求内容以便调试
fmt.Printf("追踪 %s.%s 的请求:\n%s\n", schema, objectName, string(jsonBody))
req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建追踪请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("追踪请求失败: %v", err)
}
defer resp.Body.Close()
// 读取并打印响应内容
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read response failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("追踪失败,状态码 %d: %s", resp.StatusCode, string(body))
}
fmt.Printf("追踪响应: %s\n", string(body))
// 如果是视图,可能需要额外的配置
if objectType == "VIEW" {
// 为视图添加额外的配置
configQuery := map[string]interface{}{
"type": "pg_set_table_customization",
"args": map[string]interface{}{
"source": sourceName,
"table": map[string]interface{}{
"schema": schema,
"name": objectName,
},
"configuration": map[string]interface{}{
"identifier": objectName,
"custom_name": objectName,
},
},
}
jsonBody, err = json.Marshal(configQuery)
if err != nil {
return fmt.Errorf("marshal view config request failed: %v", err)
}
req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create view config request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("view config request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("view config failed with status %d: %s", resp.StatusCode, string(body))
}
}
// 添加权限(可选)
permissionQuery := map[string]interface{}{
"type": "pg_create_select_permission",
"args": map[string]interface{}{
"source": sourceName,
"table": map[string]interface{}{
"schema": schema,
"name": objectName,
},
"role": "user",
"permission": map[string]interface{}{
"columns": "*",
"filter": map[string]interface{}{},
"limit": 100,
},
},
}
jsonBody, err = json.Marshal(permissionQuery)
if err != nil {
return fmt.Errorf("marshal permission request failed: %v", err)
}
req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create permission request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("permission request failed: %v", err)
}
defer resp.Body.Close()
// 不要因为权限设置失败而中断整个过程
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Warning: Failed to set permissions for %s.%s: %s\n", schema, objectName, string(body))
}
return nil
}
// TrackUnTrackedObjects 追踪未追踪的表和视图
func TrackUnTrackedObjects(sourceName string) error {
// 1. 获取所有表和视图
objectsQuery := map[string]interface{}{
"type": "run_sql",
"args": map[string]interface{}{
"source": sourceName,
"sql": `
SELECT
table_schema,
table_name,
table_type
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
AND table_type IN ('BASE TABLE', 'VIEW')
ORDER BY table_schema, table_type, table_name;
`,
},
}
dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
jsonBody, err := json.Marshal(objectsQuery)
if err != nil {
return fmt.Errorf("marshal objects request failed: %v", err)
}
req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create objects request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("objects request failed: %v", err)
}
defer resp.Body.Close()
var result struct {
Result [][]string `json:"result"`
}
if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
return fmt.Errorf("decode objects response failed: %v", err)
}
// 2. 获取已追踪的对象
trackedQuery := map[string]interface{}{
"type": "export_metadata",
"args": map[string]interface{}{},
"version": 2,
}
jsonBody, err = json.Marshal(trackedQuery)
if err != nil {
return fmt.Errorf("marshal tracked request failed: %v", err)
}
req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create tracked request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("tracked request failed: %v", err)
}
defer resp.Body.Close()
var trackedResult map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
return fmt.Errorf("decode tracked response failed: %v", err)
}
fmt.Printf("\n正在追踪数据源 '%s' 中未追踪的对象:\n", sourceName)
fmt.Println("--------------------")
stats := struct {
TrackedTables int
TrackedViews int
UntrackedTables int
UntrackedViews int
}{}
// 3. 追踪未追踪的对象
if len(result.Result) > 1 { // 跳过标题行
for _, row := range result.Result[1:] {
schema := row[0]
objectName := row[1]
objectType := row[2]
// 检查对象是否已被追踪
isTracked := false
if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
for _, source := range sources {
if sourceMap, ok := source.(map[string]interface{}); ok {
if sourceTables, ok := sourceMap["tables"].([]interface{}); ok {
for _, sourceTable := range sourceTables {
if tableMap, ok := sourceTable.(map[string]interface{}); ok {
if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
if tableInfo["name"] == objectName && tableInfo["schema"] == schema {
isTracked = true
if objectType == "BASE TABLE" {
stats.TrackedTables++
} else {
stats.TrackedViews++
}
break
}
}
}
}
}
}
}
}
if !isTracked {
fmt.Printf("追踪新的 %s: %s.%s\n", strings.ToLower(objectType), schema, objectName)
if err := TrackTableOrView(sourceName, schema, objectName, objectType); err != nil {
fmt.Printf("无法追踪 %s %s.%s: %v\n", strings.ToLower(objectType), schema, objectName, err)
continue
}
if objectType == "BASE TABLE" {
stats.UntrackedTables++
} else {
stats.UntrackedViews++
}
}
}
}
fmt.Println("--------------------")
fmt.Println("\n统计信息:")
fmt.Printf("表格:\n")
fmt.Printf(" - 已追踪: %d\n", stats.TrackedTables)
fmt.Printf(" - 新追踪: %d\n", stats.UntrackedTables)
fmt.Printf(" - 总计: %d\n", stats.TrackedTables+stats.UntrackedTables)
fmt.Printf("视图:\n")
fmt.Printf(" - 已追踪: %d\n", stats.TrackedViews)
fmt.Printf(" - 新追踪: %d\n", stats.UntrackedViews)
fmt.Printf(" - 总计: %d\n", stats.TrackedViews+stats.UntrackedViews)
fmt.Println("--------------------")
return nil
}
// GetDatabaseSchemas 获取所有数据库的 schema 信息
func GetDatabaseSchemas(sourceName string) error {
// 构建获取 schema 的查询
schemaQuery := map[string]interface{}{
"type": "run_sql",
"args": map[string]interface{}{
"source": sourceName,
"sql": `
SELECT
schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY schema_name;
`,
},
}
jsonBody, err := json.Marshal(schemaQuery)
if err != nil {
return fmt.Errorf("marshal schema request failed: %v", err)
}
// 发送请求到 data endpoint
dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create schema request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("schema request failed: %v", err)
}
defer resp.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return fmt.Errorf("decode schema response failed: %v", err)
}
// 输出 schema 信息
fmt.Printf("\n数据源 '%s' 中的 Schema:\n", sourceName)
fmt.Println("--------------------")
if result["result"] != nil {
rows, ok := result["result"].([]interface{})
if !ok || len(rows) < 2 { // 第一行是列名
return fmt.Errorf("unexpected schema result format")
}
// 跳过第一行(列名),显示所有 schema
for _, row := range rows[1:] {
if schemaRow, ok := row.([]interface{}); ok && len(schemaRow) > 0 {
if schemaName, ok := schemaRow[0].(string); ok {
fmt.Printf("Schema: %s\n", schemaName)
}
}
}
}
fmt.Println("--------------------")
return nil
}
// GetUnTrackedTablesAndViews 获取未追踪的表和视图
func GetUnTrackedTablesAndViews(sourceName string) error {
// 1. 获取已追踪的对象
trackedQuery := map[string]interface{}{
"type": "export_metadata",
"args": map[string]interface{}{},
"version": 2,
}
jsonBody, err := json.Marshal(trackedQuery)
if err != nil {
return fmt.Errorf("marshal tracked request failed: %v", err)
}
req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create tracked request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("tracked request failed: %v", err)
}
defer resp.Body.Close()
// 2. 获取所有表和视图
allObjectsQuery := map[string]interface{}{
"type": "run_sql",
"args": map[string]interface{}{
"source": sourceName,
"sql": `
SELECT
table_schema,
table_name,
table_type
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
AND table_type IN ('BASE TABLE', 'VIEW')
ORDER BY table_schema, table_type, table_name;
`,
},
}
jsonBody, err = json.Marshal(allObjectsQuery)
if err != nil {
return fmt.Errorf("marshal objects request failed: %v", err)
}
dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
req, err = http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("create objects request failed: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("objects request failed: %v", err)
}
defer resp.Body.Close()
var result struct {
Result [][]string `json:"result"`
}
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Raw Response: %s\n", string(body))
if err = json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("decode objects response failed: %v", err)
}
// 组织数据结构
type SchemaObjects struct {
Tables []string
Views []string
}
untracked := make(map[string]*SchemaObjects)
// 跳过标题行
if len(result.Result) > 1 {
for _, row := range result.Result[1:] {
schema := row[0]
objName := row[1]
objType := row[2]
if _, exists := untracked[schema]; !exists {
untracked[schema] = &SchemaObjects{
Tables: make([]string, 0),
Views: make([]string, 0),
}
}
if objType == "BASE TABLE" {
untracked[schema].Tables = append(untracked[schema].Tables, objName)
} else if objType == "VIEW" {
untracked[schema].Views = append(untracked[schema].Views, objName)
}
}
}
// 输出结果
fmt.Printf("\n数据源 '%s' 中未追踪的对象:\n", sourceName)
fmt.Println("--------------------")
for schema, objects := range untracked {
if len(objects.Tables) > 0 || len(objects.Views) > 0 {
fmt.Printf("\nSchema: %s\n", schema)
if len(objects.Tables) > 0 {
fmt.Println(" 未追踪的表格:")
for _, table := range objects.Tables {
fmt.Printf(" - %s\n", table)
}
}
if len(objects.Views) > 0 {
fmt.Println(" 未追踪的视图:")
for _, view := range objects.Views {
fmt.Printf(" - %s\n", view)
}
}
}
}
fmt.Println("--------------------")
return nil
}
// GetAllSources 获取所有数据源
func GetAllSources() ([]string, error) {
// 获取所有数据源
metadataQuery := map[string]interface{}{
"type": "export_metadata",
"args": map[string]interface{}{},
"version": 2,
}
jsonBody, err := json.Marshal(metadataQuery)
if err != nil {
return nil, fmt.Errorf("元数据请求编码失败: %v", err)
}
req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return nil, fmt.Errorf("创建元数据请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("元数据请求失败: %v", err)
}
defer resp.Body.Close()
var result map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解码元数据响应失败: %v", err)
}
var sources []string
if metadata, ok := result["metadata"].(map[string]interface{}); ok {
if sourcesList, ok := metadata["sources"].([]interface{}); ok {
for _, source := range sourcesList {
if sourceMap, ok := source.(map[string]interface{}); ok {
if name, ok := sourceMap["name"].(string); ok {
sources = append(sources, name)
}
}
}
}
}
return sources, nil
}
// TrackRelationship 追踪单个外键关系
func TrackRelationship(sourceName, schemaName, tableName, columnName, foreignSchemaName, foreignTableName, foreignColumnName, constraintName string) error {
// 创建对象关系
relationshipName := fmt.Sprintf("%s_%s_rel", tableName, columnName)
createRelationshipQuery := map[string]interface{}{
"type": "pg_create_object_relationship",
"args": map[string]interface{}{
"source": sourceName,
"table": map[string]interface{}{
"schema": schemaName,
"name": tableName,
},
"name": relationshipName,
"using": map[string]interface{}{
"foreign_key_constraint_on": columnName,
},
},
}
jsonBody, err := json.Marshal(createRelationshipQuery)
if err != nil {
return fmt.Errorf("关系请求编码失败: %v", err)
}
// 打印请求内容以便调试
fmt.Printf("创建关系请求:\n%s\n", string(jsonBody))
req, err := http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建关系请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("关系请求失败: %v", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %v", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("创建关系失败,状态码 %d: %s", resp.StatusCode, string(body))
}
fmt.Printf("成功创建关系: %s\n", relationshipName)
return nil
}
// TrackUnTrackedRelationships 追踪所有未追踪的外键关系
func TrackUnTrackedRelationships(sourceName string) error {
// 查询所有外键关系
relationshipQuery := map[string]interface{}{
"type": "run_sql",
"args": map[string]interface{}{
"source": sourceName,
"sql": `
SELECT
tc.table_schema as schema_name,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_schema_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE
tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY
tc.table_schema,
tc.table_name,
tc.constraint_name;
`,
},
}
dataEndpoint := strings.Replace(metadataEndpoint, "/v1/metadata", "/v2/query", 1)
jsonBody, err := json.Marshal(relationshipQuery)
if err != nil {
return fmt.Errorf("关系请求编码失败: %v", err)
}
req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建关系请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("关系请求失败: %v", err)
}
defer resp.Body.Close()
var result struct {
Result [][]string `json:"result"`
}
if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
return fmt.Errorf("解码关系响应失败: %v", err)
}
// 获取已追踪的关系
trackedQuery := map[string]interface{}{
"type": "export_metadata",
"args": map[string]interface{}{},
"version": 2,
}
jsonBody, err = json.Marshal(trackedQuery)
if err != nil {
return fmt.Errorf("关系请求编码失败: %v", err)
}
req, err = http.NewRequest("POST", metadataEndpoint, bytes.NewBuffer(jsonBody))
if err != nil {
return fmt.Errorf("创建关系请求失败: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Hasura-Admin-Secret", adminSecret)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("关系请求失败: %v", err)
}
defer resp.Body.Close()
var trackedResult map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&trackedResult); err != nil {
return fmt.Errorf("解码已追踪关系响应失败: %v", err)
}
fmt.Printf("\n追踪数据源 '%s' 中的外键关系:\n", sourceName)
fmt.Println("--------------------")
var trackedCount, untrackedCount, errorCount int
if len(result.Result) > 1 { // 跳过标题行
for _, row := range result.Result[1:] {
schemaName := row[0]
constraintName := row[1]
tableName := row[2]
columnName := row[3]
foreignSchemaName := row[4]
foreignTableName := row[5]
foreignColumnName := row[6]
// 检查关系是否已被追踪
isTracked := false
if sources, ok := trackedResult["metadata"].(map[string]interface{})["sources"].([]interface{}); ok {
for _, source := range sources {
if sourceMap, ok := source.(map[string]interface{}); ok {
if sourceName == sourceMap["name"] {
if tables, ok := sourceMap["tables"].([]interface{}); ok {
for _, table := range tables {
if tableMap, ok := table.(map[string]interface{}); ok {
if tableInfo, ok := tableMap["table"].(map[string]interface{}); ok {
if tableInfo["name"] == tableName && tableInfo["schema"] == schemaName {
if relationships, ok := tableMap["object_relationships"].([]interface{}); ok {
for _, rel := range relationships {
if relMap, ok := rel.(map[string]interface{}); ok {
if using, ok := relMap["using"].(map[string]interface{}); ok {
if fKeyConstraint, ok := using["foreign_key_constraint_on"].(string); ok {
if fKeyConstraint == columnName {
isTracked = true
trackedCount++
break
}
}
}
}
}
}
}
}
}
}
}
}
}
}
}
if !isTracked {
fmt.Printf("追踪关系: %s.%s.%s -> %s.%s.%s (约束: %s)\n",
schemaName, tableName, columnName,
foreignSchemaName, foreignTableName, foreignColumnName,
constraintName)
err := TrackRelationship(
sourceName,
schemaName,
tableName,
columnName,
foreignSchemaName,
foreignTableName,
foreignColumnName,
constraintName,
)
if err != nil {
fmt.Printf("追踪关系出错: %v\n", err)
errorCount++
} else {
untrackedCount++
}
}
}
}
fmt.Println("\n统计信息:")
fmt.Printf("- 已追踪的关系: %d\n", trackedCount)
fmt.Printf("- 新追踪的关系: %d\n", untrackedCount)
fmt.Printf("- 追踪失败: %d\n", errorCount)
fmt.Printf("- 关系总数: %d\n", trackedCount+untrackedCount+errorCount)
fmt.Println("--------------------")
return nil
}
func main() {
fmt.Println("测试 Hasura 连接...")
if err := TestHasuraConnection(); err != nil {
fmt.Printf("连接测试失败: %v\n", err)
os.Exit(1)
}
// 获取所有数据源
sources, err := GetAllSources()
if err != nil {
fmt.Printf("获取数据源失败: %v\n", err)
os.Exit(1)
}
fmt.Printf("\n发现 %d 个数据源\n", len(sources))
// 检查并追踪每个数据源中未追踪的对象
for _, source := range sources {
fmt.Printf("\n=== 处理数据源: %s ===\n", source)
// 显示未追踪的对象
if err := GetUnTrackedTablesAndViews(source); err != nil {
fmt.Printf("获取未追踪对象失败,数据源 %s: %v\n", source, err)
continue
}
// 追踪未追踪的对象
if err := TrackUnTrackedObjects(source); err != nil {
fmt.Printf("追踪对象失败,数据源 %s: %v\n", source, err)
continue
}
}
// 检查每个数据源中未追踪的关系
// 处理每个数据源
for _, source := range sources {
fmt.Printf("\n=== 处理数据源: %s ===\n", source)
// 显示未追踪的��系
if err := GetUnTrackedTablesAndViews(source); err != nil {
fmt.Printf("获取未追踪关系失败,数据源 %s: %v\n", source, err)
continue
}
// 追踪未追踪的关系
if err := TrackUnTrackedRelationships(source); err != nil {
fmt.Printf("追踪关系失败,数据源 %s: %v\n", source, err)
continue
}
}
fmt.Println("\n处理完成!")
}