当前位置: 首页 > article >正文

基于SynxFlow库实现GPU加速的雨洪仿真(Python)

SynxFlow

全称和简介

SynxFlow: Synergising High-Performance Hazard Simulation with Data Flow
This software can dynamically simulate flood inundation, landslides runout and debris flows using multiple CUDA-enabled GPUs. It also offers an user-friendly yet versatile Python interface that can be fully integrated into data science workflows, aiming to streamline and accelerate hazard risk assessment tasks.
Github地址
官方文档
官方论文

用于雨洪仿真的优点

相比我的上一篇博客中介绍的Landlab,SynxFlow具备以下优点:

  1. GPU加速,同时可以使用1块或多块GPU进行计算,也支持CPU模式。实测对于4096*4096左右的tif数据,一块3090可以同时处理4-5个任务,5个任务时显存和算力基本上被用完了,有爆显存的风险。
  2. 其排水网络的设置相对简单。很容易使用单个掩膜来表征下水道等的位置和排水能力。
  3. 对于降雨的开始结束过程设置更为简单,相比于Landlab而言。
  4. 很容易对图片上不同的区域定义下垫面的粗糙程度,不同区域的降雨强度等信息。

用于雨洪仿真的缺点

基于博主的使用效果,其主要缺点在于:

  1. 边界排水问题。这个问题并不是SynxFlow本身问题,而是我在使用浅水方程处理洪水时都面临的问题,如果有更好解决方案的兄弟欢迎给我留言。这个问题是说DEM是被分成一块一块来进行计算的,这导致你对图片边界排水能力的定义是有问题的,例如一个凹地被左右两景DEM数据切分,那左图的右边界和右图的坐边界排水能力如何计算呢?如果定义每小时排XX的水,那这样子并不符合凹的的定义,但是简单将图片边界定义为不排水,那仿真结果一定是边界区域存在一定程度的积水,当把多张DEM分析结果和在一起放在地图上时,会看到明显的每块DEM的边界存在积水,这并不符合真实的世界情况。
  2. SynxFlow提供的编译好的包是你可以通过python来调用,但是代码运行过程中有很多过程文件会被缓存下来。这意味着算法运行过程中会与本地磁盘多次IO,降低效率。如果是研究等使用是没问题,但是真正的生产使用时建议个人修改下源码重新编译,添加参数可以控制信息的本地缓存问题。

本文目的

用户输入降雨强度,降雨时长和排水网信息,基于已有的DEM给出不同区域积水情况。

本文代码及解释

代码中task是一个json方便API调用的,其定义为:
{‘watershed_dem’: ‘dem.tif’, ‘starting_precip_mmhr’: 142.0, ‘storm_duration’: 60.0, ‘elapsed_time’: 0.0, ‘model_run_time’: 60.0, ‘GPU’: 1}
这里各自的格式要求和单位为:

watershed_dem: DEM的地址,tif、tiff、asc、gz都可以,具体你可以在下文代码中的ext_list来做格式限制
starting_precip_mmhr: 降雨强度,单位是 毫米/小时
storm_duration: 降雨时长,单位是 秒
elapsed_time: 可被认为是开始的时刻 单位是秒
model_run_time: 整个仿真运行的时间,不小于storm_duration,单位是秒。这里其实是整个事件的时间,比如说24小时,其中只有1个小时下雨了,因此 model_run_time >= storm_duration
GPU: 这个参数只有0和1,只是为了配合上一篇博客使用的,在下面代码中无用
下述代码如果和我的上一个博客一起使用的话,那么不要考虑try catch的问题,因为我之前的一篇博客已经考虑了错误处理,这里处理错误的话反而没有很好将报错信息传递给celery

import os
import copy
import time
import numpy as np
import pandas as pd
import matplotlib as plt

from synxflow import IO, flood


def synxflow_rainfall_simulation_gpu(task, ngpus=1, output_dir='/home/XXX/data/rainfall_simulation/'):




    process_time = {}

    ext_list = ['.tiff', ".tif", ".asc", ".gz"]


    try:

        # print ('processing the task: ' + str(task))
        watershed_dem = task['watershed_dem']

        # the intensity of starting_precip_mmhr mm/hr, with a duration of storm_duration seconds.
        starting_precip_mmhr = task['starting_precip_mmhr']
        storm_duration = int(task['storm_duration'])
        #
        elapsed_time = int(task['elapsed_time'])  # s
        model_run_time = int(task['model_run_time'])  # s

        assert os.path.exists(watershed_dem), "input cannot be None"

        ext = os.path.splitext(watershed_dem)[1]

        dem_file_name = os.path.split(watershed_dem)[-1]

        case_dir = (output_dir + os.path.splitext(dem_file_name)[0])

        output_tif_name = (os.path.splitext(dem_file_name)[0] + '-result-' + str(task['starting_precip_mmhr'])
                           + "-" + str(task['storm_duration']) + "-" + str(task['elapsed_time']) + "-" + str(
                    task['model_run_time']) + ext)

        assert ext in ext_list, "please check the input format, it can only be " + str(ext_list)

        print('read image')
        time1 = time.time()

        # step 1: load dem data
        DEM = IO.Raster(watershed_dem)
        # DEM.mapshow()  # plot the Raster object

        time2 = time.time()
        process_time['read_image'] = time2 - time1

        # step 2: initialize input model
        # here the algorithm is trying to preprocess data and cache them on your disk
        case_folder = os.path.join(case_dir, 'flood_case', str(time1))  # define a case folder
        case_input = IO.InputModel(DEM, num_of_sections=ngpus, case_folder=case_folder)

        # step 3: set initial conditions
        case_input.set_initial_condition('h0', 0.0)  # set initial flood water height to 0.0

        # step 4: define boundary conditions
        # here is the disadvantage-1 I mentioned at the beginning of the blog
        case_input.set_boundary_condition(outline_boundary='open')  # outline_boundary: (str) 'open'|'rigid',
        # default outline boundary is open and both h and hU are set as zero

        # step 5: define the rain intensity and storm duration
        # ‘rain_mask’ gives an index (0, 1, 2, …) to every grid cell. These indices decide which column of the
        # rainfall intensity time series will be used as the rainfall source term. As we can see from the figure,
        # the domain has four different indices ‘0’, ‘1’, ‘2’, ‘3’.
        
        # here the you can use configuration file to define different rainfall regions, for example region1 -> type:0, region2 -> type 1, etc.
        # for me, here i only define the only one region - 0, that why below codes are used for rain mask generation with only value 0 (region type)
        rain_mask = np.zeros((DEM.header['nrows'], DEM.header['ncols']))
        ind = DEM.array == (-32767 or -9999)
        if ind.sum() > 0:
            rain_mask[ind] = np.nan
        # rain_mask.mapshow()

        starting_precip_ms = starting_precip_mmhr / 3600000
        if model_run_time == storm_duration:  # s
            rain_source_np = np.array([[0, starting_precip_ms],
                                       [model_run_time, starting_precip_ms]])
        else:
            rain_source_np = np.array([[0, starting_precip_ms],
                                       [storm_duration, 0],
                                       [model_run_time, 0]])

        case_input.set_rainfall(rain_mask=rain_mask, rain_source=rain_source_np)

        # step 6: set Mannning's roughness coefficient
        #  that are spatially variable.
        landcover = copy.copy(DEM)
        # here I only defined the only one landcover region, that why I just copied the rain mask 
        landcover.array = rain_mask
        # landcover.mapshow()
        case_input.set_landcover(landcover)
        case_input.set_grid_parameter(manning={'param_value': [0.035],
                                               'land_value': [0],
                                               'default_value': 0.035})  # set Manning coefficients

		# if you defined 2 landcovers, the parameters could be like this
        # case_input.set_grid_parameter(manning={'param_value': [0.035, 0.1],
        #                                        'land_value': [0, 1],
        #                                        'default_value': 0.035})  # set Manning coefficients
        # step 7: define gauge points and runtime parameters
        # We want to see the time histories of the depth at both the downstream and upstream of the main river,
        # so we need to set up a gauging point in the model.
        # no use in my use case
        case_input.set_gauges_position(np.array([[0, 0],
                                                 [DEM.header['nrows'] - 1,
                                                  DEM.header['ncols'] - 1]]))  # set gauge points
        # The final thing we want to set up is the runtime. We want to run the simulation for 2 hours,
        # get the raster outputs for every 15 minutes and backup the simulation for every half an hour.
        # case_input.set_runtime([0, 7200, 900, 1800])  # set runtime parameters (start, end, output, backup intervals)
        case_input.set_runtime([elapsed_time, model_run_time, int(model_run_time / 2),
                                int(model_run_time / 4)])  # set runtime parameters (start, end, output, backup intervals)

        # step 8: cache input information and run simulation
        # GPU or not
        case_input.write_input_files()
        if ngpus > 1:
            flood.run_mgpus(case_folder)
        else:
            flood.run(case_folder)

        # step 9: save results
        case_output = IO.OutputModel(input_obj=case_input)
        # gauges_pos, times, values = case_output.read_gauges_file(file_tag='h')
        # lines = plt.plot(times, values)
        # plt.xlabel('time (s)')
        # plt.ylabel('depth (m)')
        # plt.legend(lines[:2], ['downstream', 'upstream'])
        # plt.show()
        max_depth_file_name = 'h_max_' + str(model_run_time)
        max_depth = case_output.read_grid_file(file_tag=max_depth_file_name)
        max_depth.write_tif(output_file=os.path.join(case_dir, output_tif_name), src_epsg=32640)
        # max_depth.mapshow()
        for key, value in process_time.items():
            print(key, "=", value)
        return True, os.path.join(case_dir, output_tif_name)
    except Exception as exc:
        return False, exc

http://www.kler.cn/a/506883.html

相关文章:

  • 时序数据库TDengine 3.3.5.0 发布:高并发支持与增量备份功能引领新升级
  • postgresql分区表相关问题处理
  • 【0393】Postgres内核 checkpointer process ③ 构建 WAL records 工作缓存区
  • (01)FreeRTOS移植到STM32
  • 深度学习电影推荐-CNN算法
  • 2024年11月架构设计师综合知识真题回顾,附参考答案、解析及所涉知识点(一)
  • Linux 常用文件查看命令
  • android adb 无线连接 (wifi)
  • CPU负载与CPU使用率之区别
  • 网络科技有限公司网络设计
  • 数据结构漫游记:动态带头双向循环链表
  • 深度学习与浮点数精度问题探究
  • 【Unity-Game4Automation PRO 插件】
  • HCIP笔记1--IP路由基础回顾、BFD单臂回声、OSPF基础
  • wproxy客户端安装,代理返回JSON
  • 将图像输入批次扁平化为CNN
  • 掌握Golang strings包:高效字符串处理指南
  • Leetcode:3095
  • 中间件 MetaQ
  • 【树莓派3B】香瓜树莓派3B之与电脑的文件传输
  • 深入Node.js集群:原理、优势与搭建实战,如何应对高并发
  • CNN-GRU-MATT加入贝叶斯超参数优化,多输入单输出回归模型
  • SSL:WRONG_VERSION_NUMBER 或者 net::ERR_SSL_PROTOCAL_ERROR
  • 物料主数据报表
  • MySQL程序之:连接到服务器的命令选项
  • TCP 序列和确认号说明 | seq 和 ack 号计算方法