python读取ATL15.nc数据并保存为
ATL15数据是nc格式,分成了很多组,arcgis不能直接打开
分别提取delta_h.nc\lag1.nc\lag4.nc
import netCDF4 as nc
def save_group_to_new_nc(input_file_path, output_file_path, group_name):
"""
将指定组的数据保存为一个新的 .nc 文件。
:param input_file_path: 输入 .nc 文件路径
:param output_file_path: 输出 .nc 文件路径
:param group_name: 要保存的组名
"""
# 打开输入文件
with nc.Dataset(input_file_path, mode='r') as src:
# 检查组是否存在
if group_name not in src.groups:
print(f"组 {group_name} 不存在于文件中。")
return
# 创建输出文件
with nc.Dataset(output_file_path, mode='w', format='NETCDF4') as dst:
# 获取源组
src_group = src.groups[group_name]
# 复制组的属性
for attr_name in src_group.ncattrs():
dst.setncattr(attr_name, src_group.getncattr(attr_name))
# 复制组的维度
for dim_name, dim in src_group.dimensions.items():
dst.createDimension(dim_name, dim.size)
# 复制组的变量
for var_name, var in src_group.variables.items():
# 检查是否存在 _FillValue 属性
fill_value = var.getncattr('_FillValue') if '_FillValue' in var.ncattrs() else None
# 创建变量
new_var = dst.createVariable(var_name, var.dtype, var.dimensions, fill_value=fill_value)
# 复制变量的属性
for attr_name in var.ncattrs():
if attr_name != '_FillValue': # _FillValue 已经在创建变量时设置
new_var.setncattr(attr_name, var.getncattr(attr_name))
# 复制变量的数据
new_var[:] = var[:]
print(f"组 {group_name} 已成功保存到 {output_file_path}")
# 使用示例
input_file_path = r'' # 替换为你的输入 .nc 文件路径
output_file_path = r'' # 输出 .nc 文件路径
group_name = 'dhdt_lag4' # 要保存的组名
save_group_to_new_nc(input_file_path, output_file_path, group_name)
nc文件导出tif
因为nc文件保存着n个时间维度的数据,所以一个nc会导出n个tif
import netCDF4 as nc
import numpy as np
import rasterio
from rasterio.transform import from_origin
import os
def export_dhdt_to_geotiff(input_nc, output_folder):
"""
将 dhdt 变量的每个时间步导出为单独的 GeoTIFF 文件。
:param input_nc: 输入的 .nc 文件路径
:param output_folder: 输出文件夹路径
"""
# 打开 .nc 文件
dataset = nc.Dataset(input_nc, mode='r')
# 获取变量
dhdt = dataset.variables['delta_h'][:]
x = dataset.variables['x'][:]
y = dataset.variables['y'][:]
time = dataset.variables['time'][:]
# 获取地理信息
polar_stereo = dataset.variables['Polar_Stereographic']
crs = polar_stereo.crs_wkt
# 获取 GeoTransform 参数
geo_transform = (polar_stereo.GeoTransform if hasattr(polar_stereo, 'GeoTransform') else
(x[0], x[1] - x[0], 0, y[-1], 0, y[0] - y[1]))
# 遍历每个时间步
for t_index, t_value in enumerate(time):
# 创建 GeoTIFF 文件名
output_file = os.path.join(output_folder, f"delta_h_time_{t_index}.tif")
# 创建 GeoTIFF 文件
with rasterio.open(
output_file,
'w',
driver='GTiff',
height=dhdt.shape[1],
width=dhdt.shape[2],
count=1,
dtype=dhdt.dtype,
crs=crs,
transform=from_origin(-500, -500, x[1] - x[0], y[0] - y[1]),
nodata=polar_stereo._FillValue if hasattr(polar_stereo, '_FillValue') else np.nan
) as dst:
dst.write(dhdt[t_index, :, :], 1)
print(f"所有时间步的 dhdt 数据已成功导出到 {output_folder}")
# 使用示例
input_nc = r'' # 替换为你的输入 .nc 文件路径
output_folder = r'' # 替换为你的输出文件夹路径
export_dhdt_to_geotiff(input_nc, output_folder)
批量掩膜
对自己的研究区进行掩膜
import netCDF4 as nc
import numpy as np
import rasterio
import geopandas as gpd
from rasterio.transform import from_origin
import os
from osgeo import gdal
import traceback
def mask_gdal(cutline_path, filepath, outfile):
"""
GDAL掩膜提取
:param cutline_path: 掩膜范围路径(shp文件)
:param filepath: 要处理的tif文件路径
:param outfile: 输出文件路径
"""
try:
# 验证输入文件存在
if not all([os.path.exists(cutline_path), os.path.exists(filepath)]):
raise FileNotFoundError("输入文件不存在")
# 执行掩膜操作
dataset = gdal.Open(filepath)
if dataset is None:
raise RuntimeError(f"无法打开文件: {filepath}")
# 执行掩膜操作
result = gdal.Warp(
outfile,
dataset,
cutlineDSName=cutline_path,
cropToCutline=True,
dstNodata=np.nan,
)
# 显式释放资源
dataset = None
if result is None:
raise RuntimeError(f"掩膜操作失败: {filepath}")
print(f'成功: {os.path.basename(filepath)}')
return True
except Exception as e:
print(f"处理失败 {filepath}")
print(f"错误详情: {str(e)}")
print(traceback.format_exc())
return False
def mask_tif_files(input_dir, shapefile_path, output_dir):
"""
处理目录中的tif文件
:param input_dir: 输入目录
:param shapefile_path: 掩膜shp文件路径
:param output_dir: 输出目录
"""
# 输入验证
if not os.path.exists(shapefile_path):
raise FileNotFoundError(f"Shapefile不存在: {shapefile_path}")
# 遍历处理文件
processed = 0
errors = 0
for root, dirs, files in os.walk(input_dir):
for file in files:
if file.lower().endswith('.tif'):
# 构造输出路径(保留目录结构)
rel_path = os.path.relpath(root, input_dir)
out_dir = os.path.join(output_dir, rel_path)
os.makedirs(out_dir, exist_ok=True)
in_path = os.path.join(root, file)
out_path = os.path.join(out_dir, f"{os.path.splitext(file)[0]}_masked.tif")
# 执行处理
if mask_gdal(shapefile_path, in_path, out_path):
processed += 1
else:
errors += 1
print(f"\n处理完成!成功: {processed} 个,失败: {errors} 个")
# 使用示例
if __name__ == "__main__":
input_dir = r''
shapefile_path = r'aio\AOI 4.shp' # 需确保路径正确
output_dir = r''
# 添加路径存在性检查
if not os.path.exists(input_dir):
raise NotADirectoryError(f"输入目录不存在: {input_dir}")
mask_tif_files(input_dir, shapefile_path, output_dir)