基于SynxFlow库实现GPU加速的雨洪仿真(Python)
SynxFlow
全称和简介
SynxFlow: Synergising High-Performance Hazard Simulation with Data Flow
This software can dynamically simulate flood inundation, landslides runout and debris flows using multiple CUDA-enabled GPUs. It also offers an user-friendly yet versatile Python interface that can be fully integrated into data science workflows, aiming to streamline and accelerate hazard risk assessment tasks.
Github地址
官方文档
官方论文
用于雨洪仿真的优点
相比我的上一篇博客中介绍的Landlab,SynxFlow具备以下优点:
- GPU加速,同时可以使用1块或多块GPU进行计算,也支持CPU模式。实测对于4096*4096左右的tif数据,一块3090可以同时处理4-5个任务,5个任务时显存和算力基本上被用完了,有爆显存的风险。
- 其排水网络的设置相对简单。很容易使用单个掩膜来表征下水道等的位置和排水能力。
- 对于降雨的开始结束过程设置更为简单,相比于Landlab而言。
- 很容易对图片上不同的区域定义下垫面的粗糙程度,不同区域的降雨强度等信息。
用于雨洪仿真的缺点
基于博主的使用效果,其主要缺点在于:
- 边界排水问题。这个问题并不是SynxFlow本身问题,而是我在使用浅水方程处理洪水时都面临的问题,如果有更好解决方案的兄弟欢迎给我留言。这个问题是说DEM是被分成一块一块来进行计算的,这导致你对图片边界排水能力的定义是有问题的,例如一个凹地被左右两景DEM数据切分,那左图的右边界和右图的坐边界排水能力如何计算呢?如果定义每小时排XX的水,那这样子并不符合凹的的定义,但是简单将图片边界定义为不排水,那仿真结果一定是边界区域存在一定程度的积水,当把多张DEM分析结果和在一起放在地图上时,会看到明显的每块DEM的边界存在积水,这并不符合真实的世界情况。
- SynxFlow提供的编译好的包是你可以通过python来调用,但是代码运行过程中有很多过程文件会被缓存下来。这意味着算法运行过程中会与本地磁盘多次IO,降低效率。如果是研究等使用是没问题,但是真正的生产使用时建议个人修改下源码重新编译,添加参数可以控制信息的本地缓存问题。
本文目的
用户输入降雨强度,降雨时长和排水网信息,基于已有的DEM给出不同区域积水情况。
本文代码及解释
代码中task是一个json方便API调用的,其定义为:
{‘watershed_dem’: ‘dem.tif’, ‘starting_precip_mmhr’: 142.0, ‘storm_duration’: 60.0, ‘elapsed_time’: 0.0, ‘model_run_time’: 60.0, ‘GPU’: 1}
这里各自的格式要求和单位为:
watershed_dem: DEM的地址,tif、tiff、asc、gz都可以,具体你可以在下文代码中的ext_list来做格式限制
starting_precip_mmhr: 降雨强度,单位是 毫米/小时
storm_duration: 降雨时长,单位是 秒
elapsed_time: 可被认为是开始的时刻 单位是秒
model_run_time: 整个仿真运行的时间,不小于storm_duration,单位是秒。这里其实是整个事件的时间,比如说24小时,其中只有1个小时下雨了,因此 model_run_time >= storm_duration
GPU: 这个参数只有0和1,只是为了配合上一篇博客使用的,在下面代码中无用
下述代码如果和我的上一个博客一起使用的话,那么不要考虑try catch的问题,因为我之前的一篇博客已经考虑了错误处理,这里处理错误的话反而没有很好将报错信息传递给celery
import os
import copy
import time
import numpy as np
import pandas as pd
import matplotlib as plt
from synxflow import IO, flood
def synxflow_rainfall_simulation_gpu(task, ngpus=1, output_dir='/home/XXX/data/rainfall_simulation/'):
process_time = {}
ext_list = ['.tiff', ".tif", ".asc", ".gz"]
try:
# print ('processing the task: ' + str(task))
watershed_dem = task['watershed_dem']
# the intensity of starting_precip_mmhr mm/hr, with a duration of storm_duration seconds.
starting_precip_mmhr = task['starting_precip_mmhr']
storm_duration = int(task['storm_duration'])
#
elapsed_time = int(task['elapsed_time']) # s
model_run_time = int(task['model_run_time']) # s
assert os.path.exists(watershed_dem), "input cannot be None"
ext = os.path.splitext(watershed_dem)[1]
dem_file_name = os.path.split(watershed_dem)[-1]
case_dir = (output_dir + os.path.splitext(dem_file_name)[0])
output_tif_name = (os.path.splitext(dem_file_name)[0] + '-result-' + str(task['starting_precip_mmhr'])
+ "-" + str(task['storm_duration']) + "-" + str(task['elapsed_time']) + "-" + str(
task['model_run_time']) + ext)
assert ext in ext_list, "please check the input format, it can only be " + str(ext_list)
print('read image')
time1 = time.time()
# step 1: load dem data
DEM = IO.Raster(watershed_dem)
# DEM.mapshow() # plot the Raster object
time2 = time.time()
process_time['read_image'] = time2 - time1
# step 2: initialize input model
# here the algorithm is trying to preprocess data and cache them on your disk
case_folder = os.path.join(case_dir, 'flood_case', str(time1)) # define a case folder
case_input = IO.InputModel(DEM, num_of_sections=ngpus, case_folder=case_folder)
# step 3: set initial conditions
case_input.set_initial_condition('h0', 0.0) # set initial flood water height to 0.0
# step 4: define boundary conditions
# here is the disadvantage-1 I mentioned at the beginning of the blog
case_input.set_boundary_condition(outline_boundary='open') # outline_boundary: (str) 'open'|'rigid',
# default outline boundary is open and both h and hU are set as zero
# step 5: define the rain intensity and storm duration
# ‘rain_mask’ gives an index (0, 1, 2, …) to every grid cell. These indices decide which column of the
# rainfall intensity time series will be used as the rainfall source term. As we can see from the figure,
# the domain has four different indices ‘0’, ‘1’, ‘2’, ‘3’.
# here the you can use configuration file to define different rainfall regions, for example region1 -> type:0, region2 -> type 1, etc.
# for me, here i only define the only one region - 0, that why below codes are used for rain mask generation with only value 0 (region type)
rain_mask = np.zeros((DEM.header['nrows'], DEM.header['ncols']))
ind = DEM.array == (-32767 or -9999)
if ind.sum() > 0:
rain_mask[ind] = np.nan
# rain_mask.mapshow()
starting_precip_ms = starting_precip_mmhr / 3600000
if model_run_time == storm_duration: # s
rain_source_np = np.array([[0, starting_precip_ms],
[model_run_time, starting_precip_ms]])
else:
rain_source_np = np.array([[0, starting_precip_ms],
[storm_duration, 0],
[model_run_time, 0]])
case_input.set_rainfall(rain_mask=rain_mask, rain_source=rain_source_np)
# step 6: set Mannning's roughness coefficient
# that are spatially variable.
landcover = copy.copy(DEM)
# here I only defined the only one landcover region, that why I just copied the rain mask
landcover.array = rain_mask
# landcover.mapshow()
case_input.set_landcover(landcover)
case_input.set_grid_parameter(manning={'param_value': [0.035],
'land_value': [0],
'default_value': 0.035}) # set Manning coefficients
# if you defined 2 landcovers, the parameters could be like this
# case_input.set_grid_parameter(manning={'param_value': [0.035, 0.1],
# 'land_value': [0, 1],
# 'default_value': 0.035}) # set Manning coefficients
# step 7: define gauge points and runtime parameters
# We want to see the time histories of the depth at both the downstream and upstream of the main river,
# so we need to set up a gauging point in the model.
# no use in my use case
case_input.set_gauges_position(np.array([[0, 0],
[DEM.header['nrows'] - 1,
DEM.header['ncols'] - 1]])) # set gauge points
# The final thing we want to set up is the runtime. We want to run the simulation for 2 hours,
# get the raster outputs for every 15 minutes and backup the simulation for every half an hour.
# case_input.set_runtime([0, 7200, 900, 1800]) # set runtime parameters (start, end, output, backup intervals)
case_input.set_runtime([elapsed_time, model_run_time, int(model_run_time / 2),
int(model_run_time / 4)]) # set runtime parameters (start, end, output, backup intervals)
# step 8: cache input information and run simulation
# GPU or not
case_input.write_input_files()
if ngpus > 1:
flood.run_mgpus(case_folder)
else:
flood.run(case_folder)
# step 9: save results
case_output = IO.OutputModel(input_obj=case_input)
# gauges_pos, times, values = case_output.read_gauges_file(file_tag='h')
# lines = plt.plot(times, values)
# plt.xlabel('time (s)')
# plt.ylabel('depth (m)')
# plt.legend(lines[:2], ['downstream', 'upstream'])
# plt.show()
max_depth_file_name = 'h_max_' + str(model_run_time)
max_depth = case_output.read_grid_file(file_tag=max_depth_file_name)
max_depth.write_tif(output_file=os.path.join(case_dir, output_tif_name), src_epsg=32640)
# max_depth.mapshow()
for key, value in process_time.items():
print(key, "=", value)
return True, os.path.join(case_dir, output_tif_name)
except Exception as exc:
return False, exc