获取阿里云nacos注册接口状态
import json
import time
import nacos
def nacos_client():
SERVER_ADDRESSES = "http://127.0.0.1:8848"
NAMESPACE = "46c54160-9dd9-4f2187-b3a5-20a23cf01111a9e0"
ACCESS_KEY_ID = "LTAI5tRmcgKdACqN222323MDmpHo"
ACCESS_KEY_SECRET = "ZPQa6D121Qv3YIiLI8Ma21l13Z111W2uTYZnXC"
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE, ak=ACCESS_KEY_ID, sk=ACCESS_KEY_SECRET)
return client
def get_service():
with open("service.json","r",encoding="utf-8") as file:
json_data = json.load(file)['wrshg']
return json_data
def list_naming_instance_def(client,service):
services = client.list_naming_instance(service_name=service)
data = services['hosts']
sum_all = len(data)
sum_list = []
for i in data:
sum_data = []
sum_data.append(service)
sum_data.append(i['clusterName'])
sum_data.append(i['ip'])
sum_data.append(i['port'])
sum_list.append(sum_data)
return sum_all,sum_list
def get_naming_instance_def(client,data_all):
Number_of=data_all[0]
data_sum = []
for data in data_all[1]:
services = client.get_naming_instance(service_name=data[0], ip=data[2], port=data[3],
cluster_name=data[1])
weight=services['weight']
data.append(weight)
data_sum.append(data)
return Number_of,data_sum
def service_check(data_sum,json_sum):
online = data_sum[0]
normal_online = json_sum['sum']
if online == normal_online:
for data1 in data_sum[1]:
service_status = data1[4]
if service_status == 1.0:
continue
elif service_status == 0.0:
print(data1)
print("权重异常")
elif online < normal_online:
print(json_sum['name'])
print("在线个数异常")
elif online > normal_online:
print(json_sum['name'])
print("在线个数大于正常个数,此服务需要更新在线个数")
json_data = get_service()
client=nacos_client()
for i in json_data:
print(i['name'])
data_all = list_naming_instance_def(client,i['name'])
data_sum = get_naming_instance_def(client,data_all)
service_check(data_sum,i)
time.sleep(1)
vim service.json
{
"wrshg": [
{
"name": "production-test-open-api",
"sum": 2
},
{
"name": "datacenter-internal-api",
"sum": 3
}
}