基于 GDAL 的 RPC 信息处理及影像校正相关操作实现
本文主要围绕遥感影像的 RPC(Rational Polynomial Coefficients,有理多项式系数)信息处理及地理校正相关操作展开。通过 Python 语言,利用gdal、spectral等库实现了多个功能函数,包括从 HDR 文件中读取并提取 RPC 信息、将 HDR 文件中的 RPC 信息转换并设置到栅格数据集中以进行地理校正并输出为 GeoTIFF 格式,还涵盖了对.rpc文件的解析、向 TIFF 影像写入 RPC 域信息以及进行 RPC 校正等操作,详细介绍了各功能函数的具体实现逻辑及相关参数设置情况,为遥感影像基于 RPC 信息的处理及校正提供了一套可行的代码实现方案。
import numpy as np
from osgeo import gdal
import spectral
def read_hdr_file(hdr_file):
"""
使用 spectral 读取 HDR 文件,并提取其中的元数据
"""
hdr = spectral.io.envi.read_envi_header(hdr_file)
return hdr
def extract_rpc_info(rpc_list):
"""
格式化 RPC 信息
"""
rpc_metadata = {
'LINE_OFF': float(rpc_list[0]),
'SAMP_OFF': float(rpc_list[1]),
'LAT_OFF': float(rpc_list[2]),
'LONG_OFF': float(rpc_list[3]),
'HEIGHT_OFF': float(rpc_list[4]),
'LINE_SCALE': float(rpc_list[5]),
'SAMP_SCALE': float(rpc_list[6]),
'LAT_SCALE': float(rpc_list[7]),
'LONG_SCALE': float(rpc_list[8]),
'HEIGHT_SCALE': float(rpc_list[9]),
'LINE_NUM_COEFF': list(map(float, rpc_list[10:30])),
'LINE_DEN_COEFF': list(map(float, rpc_list[30:50])),
'SAMP_NUM_COEFF': list(map(float, rpc_list[50:70])),
'SAMP_DEN_COEFF': list(map(float, rpc_list[70:90])),
}
return rpc_metadata
# keys = ['ERR_BIAS', 'ERR_RAND', 'LINE_OFF', 'SAMP_OFF', 'LAT_OFF', 'LONG_OFF',
# 'HEIGHT_OFF', 'LINE_SCALE', 'SAMP_SCALE', 'LAT_SCALE',
# 'LONG_SCALE', 'HEIGHT_SCALE', 'LINE_NUM_COEFF', 'LINE_DEN_COEFF',
# 'SAMP_NUM_COEFF', 'SAMP_DEN_COEFF']
def convert_hdr_rpc_to_geotiff(raster_file, hdr_file, output_tiff):
# 打开栅格数据集
raster_dataset = gdal.Open(raster_file)
# 使用 spectral 读取并提取 HDR 文件中的元数据
metadata = read_hdr_file(hdr_file)
rpc_list = metadata.get('rpc info', '')
if not rpc_list:
raise ValueError("HDR 文件中没有找到有效的 RPC 信息")
# 提取并格式化 RPC 信息
rpc_metadata = extract_rpc_info(rpc_list)
# 设置 RPC 元数据
for key, value in rpc_metadata.items():
if isinstance(value, list):
raster_dataset.SetMetadataItem(key, ' '.join(map(str, value)), "RPC")
else:
raster_dataset.SetMetadataItem(key, str(value), "RPC")
# 创建 GeoTIFF 驱动
driver = gdal.GetDriverByName("GTiff")
# 使用 RPC 变换进行地理校正
warp_options = gdal.WarpOptions(dstSRS='EPSG:4326', rpc=True, resampleAlg='bilinear')
output_dataset = gdal.Warp(output_tiff, raster_dataset, options=warp_options)
# 清理资源
output_dataset = None
raster_dataset = None
print(f"导出完成:{output_tiff}")
if __name__ == "__main__":
# 输入栅格文件和 HDR 文件路径
raster_file = r'JL1GF02F_PMS2_202112141423144_rad.dat'
hdr_file = r'JL1GF02F_PMS2_20211214142314_rad.hdr'
output_tiff = r'JL1GF02F_PMS2_20211214142314_rad.tif'
# 调用函数进行转换
convert_hdr_rpc_to_geotiff(raster_file, hdr_file, output_tiff)
def write_rpc_to_tiff(inputpath,ap = True,outpath = None):
rpc_file = inputpath.replace('tiff', 'rpb')
rpc_dict = parse_rpc_file(rpc_file)
if ap:
# 可修改读取
dataset = gdal.Open(inputpath,gdal.GA_Update)
# 向tif影像写入rpc域信息
# 注意,这里虽然写入了RPC域信息,但实际影像还没有进行实际的RPC校正
# 尽管有些RS/GIS能加载RPC域信息,并进行动态校正
for k in rpc_dict.keys():
dataset.SetMetadataItem(k, rpc_dict[k], 'RPC')
dataset.FlushCache()
del dataset
else:
dataset = gdal.Open(inputpath,gdal.GA_Update)
tiff_driver= gdal.GetDriverByName('Gtiff')
out_ds = tiff_driver.CreateCopy(outpath, dataset, 1)
for k in rpc_dict.keys():
out_ds.SetMetadataItem(k, rpc_dict[k], 'RPC')
out_ds.FlushCache()
del out_ds,dataset
def rpc_correction(inputpath,corrtiff,dem_tif_file = None):
## 设置rpc校正的参数
# 原图像和输出影像缺失值设置为0,输出影像坐标系为WGS84(EPSG:4326), 重采样方法为双线性插值(bilinear,还有最邻近‘near’、三次卷积‘cubic’等可选)
# 注意DEM的覆盖范围要比原影像的范围大,此外,DEM不能有缺失值,有缺失值会报错
# 通常DEM在水域是没有值的(即缺失值的情况),因此需要将其填充设置为0,否则在RPC校正时会报错
# 这里使用的DEM是填充0值后的SRTM V4.1 3秒弧度的DEM(分辨率为90m)
# RPC_DEMINTERPOLATION=bilinear 表示对DEM重采样使用双线性插值算法
# 如果要修改输出的坐标系,则要修改dstSRS参数值,使用该坐标系统的EPSG代码
# 可以在网址https://spatialreference.org/ref/epsg/32650/ 查询得到EPSG代码
write_rpc_to_tiff(inputpath,ap = True,outpath = None)
if dem_tif_file is None:
wo = gdal.WarpOptions(srcNodata=0, dstNodata=0, dstSRS='EPSG:4326', resampleAlg='bilinear',
format='Gtiff',rpc=True, warpOptions=["INIT_DEST=NO_DATA"])
wr = gdal.Warp(corrtiff, inputpath, options=wo)
print("RPC_GEOcorr>>>")
else:
wo = gdal.WarpOptions(srcNodata=0, dstNodata=0, dstSRS='EPSG:4326', resampleAlg='bilinear', format='ENVI',rpc=True, warpOptions=["INIT_DEST=NO_DATA"],
transformerOptions=["RPC_DEM=%s"%(dem_tif_file), "RPC_DEMINTERPOLATION=bilinear"])
wr = gdal.Warp(corrtiff, inputpath, options=wo)
print("RPC_GEOcorr>>>")
## 对于全海域的影像或者不使用DEM校正的话,可以将transformerOptions有关的RPC DEM关键字删掉
## 即将上面gdal.WarpOptions注释掉,将下面的语句取消注释,无DEM时,影像范围的高程默认全为0
del wr
def parse_rpc_file(rpc_file):
# rpc_file:.rpc文件的绝对路径
# rpc_dict:符号RPC域下的16个关键字的字典
# 参考网址:http://geotiff.maptools.org/rpc_prop.html;
# https://www.osgeo.cn/gdal/development/rfc/rfc22_rpc.html
rpc_dict = {}
with open(rpc_file) as f:
text = f.read()
# .rpc文件中的RPC关键词
words = ['errBias', 'errRand', 'lineOffset', 'sampOffset', 'latOffset',
'longOffset', 'heightOffset', 'lineScale', 'sampScale', 'latScale',
'longScale', 'heightScale', 'lineNumCoef', 'lineDenCoef','sampNumCoef', 'sampDenCoef',]
# GDAL库对应的RPC关键词
keys = ['ERR_BIAS', 'ERR_RAND', 'LINE_OFF', 'SAMP_OFF', 'LAT_OFF', 'LONG_OFF',
'HEIGHT_OFF', 'LINE_SCALE', 'SAMP_SCALE', 'LAT_SCALE',
'LONG_SCALE', 'HEIGHT_SCALE', 'LINE_NUM_COEFF', 'LINE_DEN_COEFF',
'SAMP_NUM_COEFF', 'SAMP_DEN_COEFF']
for old, new in zip(words, keys):
text = text.replace(old, new)
# 以‘;\n’作为分隔符
text_list = text.split(';\n')
# 删掉无用的行
text_list = text_list[3:-2]
#
text_list[0] = text_list[0].split('\n')[1]
# 去掉制表符、换行符、空格
text_list = [item.strip('\t').replace('\n', '').replace(' ', '') for item in text_list]
for item in text_list:
# 去掉‘=’
key, value = item.split('=')
# 去掉多余的括号‘(’,‘)’
if '(' in value:
value = value.replace('(', '').replace(')', '')
rpc_dict[key] = value
for key in keys[:12]:
# 为正数添加符号‘+’
if not rpc_dict[key].startswith('-'):
rpc_dict[key] = '+' + rpc_dict[key]
# 为归一化项和误差标志添加单位
if key in ['LAT_OFF', 'LONG_OFF', 'LAT_SCALE', 'LONG_SCALE']:
rpc_dict[key] = rpc_dict[key] + ' degrees'
if key in ['LINE_OFF', 'SAMP_OFF', 'LINE_SCALE', 'SAMP_SCALE']:
rpc_dict[key] = rpc_dict[key] + ' pixels'
if key in ['ERR_BIAS', 'ERR_RAND', 'HEIGHT_OFF', 'HEIGHT_SCALE']:
rpc_dict[key] = rpc_dict[key] + ' meters'
# 处理有理函数项
for key in keys[-4:]:
values = []
for item in rpc_dict[key].split(','):
#print(item)
if not item.startswith('-'):
values.append('+'+item)
else:
values.append(item)
rpc_dict[key] = ' '.join(values)
return rpc_dict
rpc_correction(inputpath,corrtiff,dem_tif_file = None)