洪水灾害多智能体分布式模拟示例代码
1. 环境定义:支持灾害动态、地理数据和分布式架构
import numpy as np
import random
import matplotlib.pyplot as plt
# 新疆主要城市及邻接关系
XINJIANG_CITIES = {
'Urumqi': ['Changji', 'Shihezi'],
'Changji': ['Urumqi', 'Shihezi', 'Turpan'],
'Shihezi': ['Urumqi', 'Changji', 'Karamay'],
'Karamay': ['Shihezi'],
'Turpan': ['Changji']
}
CITY_COORDINATES = {
'Urumqi': (43.8256, 87.6168),
'Changji': (44.0169, 87.3082),
'Shihezi': (44.3024, 86.0369),
'Karamay': (45.5798, 84.8892),
'Turpan': (42.9513, 89.1895)
}
class XinjiangFloodEnvironment:
def __init__(self, cities, max_steps=50, disaster_spread_prob=0.3):
"""
新疆多城市洪水灾害环境。
参数:
- cities: 城市及邻接关系字典。
- max_steps: 每回合最大步数。
- disaster_spread_prob: 灾害扩散概率。
"""
self.cities = cities
self.city_list = list(cities.keys())
self.num_cities = len(self.city_list)
self.max_steps = max_steps
self.disaster_spread_prob = disaster_spread_prob
self.reset()
def reset(self):
""" 重置环境,初始化灾害和资源分布。 """
self.steps = 0
self.disaster_status = {city: 0 for city in self.city_list} # 灾害状态:0为无灾,1为灾害中
self.resource_status = {city: 5 for city in self.city_list} # 每城市初始资源
disaster_city = random.choice(self.city_list)
self.disaster_status[disaster_city] = 1 # 随机选择灾害城市
return self._get_state()
def _get_state(self):
""" 获取当前状态,包含灾害和资源信息。 """
return {
'disaster_status': self.disaster_status,
'resource_status': self.resource_status
}
def _spread_disaster(self):
""" 模拟灾害扩散。 """
new_disaster_status = self.disaster_status.copy()
for city, status in self.disaster_status.items():
if status == 1: # 当前城市有灾害
for neighbor in self.cities[city]:
if random.random() < self.disaster_spread_prob:
new_disaster_status[neighbor] = 1
self.disaster_status = new_disaster_status
def step(self, actions):
"""
执行动作。
参数:
- actions: 每个城市的动作字典 {city: allocated_resources}。
返回:
- 下一状态
- 总奖励
- 是否结束
"""
total_reward = 0
for city, allocated_resources in actions.items():
if self.disaster_status[city] == 1: # 如果该城市有灾害
if allocated_resources > 0:
self.resource_status[city] -= allocated_resources
if self.resource_status[city] < 0: # 资源不能为负
allocated_resources += self.resource_status[city]
self.resource_status[city] = 0
self.disaster_status[city] = 0 # 灾害解决
total_reward += 10 # 成功解决灾害
else:
total_reward -= 5 # 未响应灾害的惩罚
self.steps += 1
if self.steps >= self.max_steps or sum(self.disaster_status.values()) == 0:
return self._get_state(), total_reward, True # 所有灾害解决或步数结束
# 更新灾害状态(灾害扩散)
self._spread_disaster()
return self._get_state(), total_reward, False # 继续运行
def render(self):
""" 可视化当前环境状态。 """
disaster_cities = [city for city, status in self.disaster_status.items() if status == 1]
print(f"Step {self.steps}:")
print(f"Disaster Cities: {disaster_cities}")
print(f"Resource Status: {self.resource_status}")
plt.figure(figsize=(8, 8))
for city, (lat, lon) in CITY_COORDINATES.items():
plt.scatter(lon, lat, color='blue' if city in self.resource_status else 'red', s=100)
plt.text(lon, lat, city, fontsize=10)
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.title("Xinjiang Flood Simulation")
plt.show()
2. 分布式多智能体实现
class DistributedAgent:
def __init__(self, city, action_size):
"""
分布式智能体。
参数:
- city: 智能体负责的城市。
- action_size: 动作空间大小。
"""
self.city = city
self.action_size = action_size
self.epsilon = 1.0 # 探索概率
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def act(self, state, available_resources):
""" 基于当前状态和资源选择动作。 """
if np.random.rand() <= self.epsilon:
return random.randint(0, available_resources) # 随机分配资源
return available_resources # 简化决策:全分配
def update_epsilon(self):
""" 衰减探索概率。 """
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
3. 动态奖励函数
def calculate_reward(disaster_map, response_time, resources_used, weights):
"""
计算动态奖励。
参数:
- disaster_map: 当前灾害状态。
- response_time: 当前响应时间。
- resources_used: 当前使用的资源量。
- weights: 奖励函数的权重 (dict)。
返回:
- reward: 总奖励值。
"""
covered_disasters = disaster_map.sum() # 未解决的灾害数量
return (
weights['covered'] * (1 - covered_disasters) -
weights['time'] * response_time -
weights['resources'] * resources_used
)
4. 主训练循环
if __name__ == "__main__":
env = XinjiangFloodEnvironment(XINJIANG_CITIES) # 初始化环境
agents = {city: DistributedAgent(city, 5) for city in XINJIANG_CITIES.keys()} # 每城市一个智能体
episodes = 100 # 训练轮次
for e in range(episodes):
state = env.reset() # 重置环境
total_reward = 0
while True:
actions = {}
for city, agent in agents.items():
available_resources = state['resource_status'][city]
actions[city] = agent.act(state, available_resources) # 每个智能体选择动作
next_state, reward, done = env.step(actions) # 执行动作
total_reward += reward
if done:
print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward}")
env.render()
break
state = next_state
# 更新每个智能体的探索概率
for agent in agents.values():
agent.update_epsilon()
功能扩展与总结
-
灾害动态扩展:
- 灾害通过邻接城市扩散。
- 动态更新灾害状态,提升模拟真实性。
-
多智能体分布式协作:
- 每个智能体管理自己城市的资源。
- 集中式奖励计算与全局决策评估。
-
动态奖励函数:
- 同时优化响应时间、资源成本和覆盖范围。
-
新疆地理数据支持:
- 模拟新疆主要城市及其邻接关系。
- 地理坐标可视化,帮助分析决策动态。