MPI程序实例:自适应数值积分
目录
一、梯形积分公式
二、局部二分自适应区间加密
三、串行程序
四、基于简单区域分解的并行算法
上一节我们介绍了一个用梯形公式积分计算π的MPI程序实例,本节我们将对这个实例进行扩展,采用梯形公式结合自适应局部区间加密,计算一个函数在给定区间上的定积分达到指定精度。
一、梯形积分公式
相关理论在另一个专栏《常微分方程》中也介绍过,有兴趣的小伙伴可以参考:↓传送门↓
常微分方程_猿核试Bug愁的博客-CSDN博客https://blog.csdn.net/l_peanut/category_12628498.html?spm=1001.2014.3001.5482
设是定义在区间上的近似积分的梯形公式为:
(1)
梯形积分公式(1)的误差为
(2)
其中。为了提高计算精度,将积分区间分成n个小区间:,在每个小区间上应用梯形公式计算积分的近似值,然后对这些小区间的近似值求和来得到整个区间上的近似积分,该方法称为“复化梯形公式”。计算公式为:
(3)
当每个小区间的长度相等时,复化梯形公式变为:
(4)
其中为小区间长度(也称积分步长),。
用公式(4)计算积分的误差为:
(5)
其中为区间中的某个点。
二、局部二分自适应区间加密
从公式(2)中可知,梯形公式的计算误差与函数的二阶导数,即函数梯度的变化速度(曲线的弧度)有关。为达到在给定精度的前提下减少计算函数次数的目的,可以根据函数导数的变化情况在不同地方使用不同长度的积分步长。下面为局部二分自适应区间加密算法:
算法1:
(0)取当前计算区间为;
(1)用梯形公式(1)计算函数在当前计算区间上的积分的近似值;
(2)判断近似值的误差,如果误差满足要求,则该值即为该区间上的计算结果,该区间上的计算过程结束;
(3)否则将区间一分为二:令,分别对子区间和重复步骤1-3,直到它们的计算误差分别满足要求,然后将子区间和上的计算结果之和做为区间上的计算结果。
算法1是一个递归过程,为了判断当前区间上的计算结果是否满足精度要求,需要对误差进行估计。根据公式(2),区间上梯形公式的计算误差为:
(6)
其中为中的某个点。实际计算时,可以用该区间的中点近似代替来对误差进行估计,当区间长度较小时,它们的差别是很小的。然后再用二阶中心差商近似代替区间中点处的二阶导数值:
(7)
将公式(7)代入公式(6)得到下面的近似误差估计:
(8)
假设希望整个区间上的计算误差小于,则只需让每个小区间上的误差小于即可,其中h代表小区间的长度。对区间而言,,因此,可以通过下式来判断误差是否满足要求:
(9)
记为当前计算区间的长度,为梯形公式计算得到的积分近似值:
(10)
公式(9)中需要计算区间中点的函数值,可以利用它来计算一个比更为精确的近似值:
(11)
的误差近似等于的误差的1/4,因此如果用作为近似积分结果,则误差判断依据可以改为:
(12)
由公式(10)和(11)可知:
因此可以在程序中使用下面的误差判据:
(13)
即当一个区间上计算的值与将区间分半后计算的值间的误差小于该区间长度的3倍乘以时,便可以认为计算达到精度要求,并将作为该区间上的计算结果。导出的算法如下:
算法2:
梯形公式结合自适应逐次区间分半方法,计算函数在区间上的定积分值。当计算区间上的梯形积分公式不满足精度要求时,该函数将计算区间分半,并对自己进行递归调用来分别计算两个子区间上的积分值(递归过程在算法第12行)。
三、串行程序
根据算法2编写C语言程序,程序中增加了两个常量MAX_DEPTH和MACHINE_PREC,用于控制递归深度,避免出现过深的嵌套递归。
工程文件由:integration.h、integration.cpp、function.h、function.cpp、main.cpp组成。
(1)integration.h文件
double integration(double a, double fa, double b, double fb, double eps, double (*F)(double x));
(2)integration.cpp文件
#include <stdio.h>
#include <stdlib.h>
#include <cmath>
#include "integration.h"
#define MAX_DEPTH 1024 /*最大递归深度*/
#define MACHINE_PREC 1e-15 /*机器精度*/
//梯形公式递归计算积分值
double integration(double a, double fa, double b, double fb, double eps, double (*F)(double x))
{
static int depth = 0; /*当前递归深度*/
double fc, v, v0, h, xc;
++depth;
v = 0.0;
if(a != b)
{
xc = (a + b)*0.5;
h = b - a;
v = v0 = h*(fa + fb)*0.5;
if(xc != a && xc !=b)
{
double err;
fc = (*F)(xc);
v = (v0 + fc *h)*0.5;
err = fabs(v - v0);
if(err >= 3.0*h*eps && fabs(h) >= (1.0 + fabs(xc))*MACHINE_PREC)
{
v = integration(a, fa, xc, fc, eps, F) + integration(xc, fc, b, fb, eps, F);
}
}
}
--depth;
return v;
}
(3)function.h文件
extern int evaluation_count;
extern double RESULT;
double F(double x);
void gettime(double *cpu, double *wall);
(4)function.cpp文件
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
int evaluation_count = 0;
/*被积函数*/
double F(double x)
{
size_t i;
for(i = 0; i < 5000000; i++); /*为增加计算时间引入的空循环*/
evaluation_count++;
return 4.0/(1.0 + x*x);
}
/*积分精确值(用于检验结果)*/
double RESULT = 3.141592653589793;
void gettime(double *cpu, double *wall) /*返回当前CPU和墙上时间*/
{
struct timeval tv;
struct rusage ru;
if(cpu != NULL)
{
getrusage(RUSAGE_SELF, &ru);
*cpu = ru.ru_utime.tv_sec + (double)ru.ru_utime.tv_usec*1e-16;
}
if(wall != NULL)
{
gettimeofday(&tv, NULL);
*wall = tv.tv_sec +(double).tv.tv_usec*1e-6;
}
}
(5)main.cpp文件
#include <stdio.h>
#include <stdlib.h>
#include <cmath>
#include "function.h"
#include "integration.h"
int main(int argc, char *argv[])
{
double a = 0.0, b = 1.0;
double res, cpu0, cpu1, wall0, wall1;
if(argc != 2)
{
printf("Usage: %s epsilon.\n", argv[0]);
printf("Example: %s 1e-4.\n", argv[0]);
return 1;
}
gettime(&cpu0, &wall0);
res = integration(a, F(a), b, F(b), atof(argv[1])/(b - a), F);
gettime(&cpu1, &wall1);
printf("result = %0.16f, error = %.0.4le, cputime = %0.4lf, wtime = %0.4lf.\n", res, res - RESULT, cpu1 - cpu0, wall1 - wall0);
printf("%u function evaluations.\n", evaluation_count);
return 0;
}
编译并运行,结果如下:
下图为运行上例得到的最终积分区间,可知在x=0附件积分区间比较密集。
四、基于简单区域分解的并行算法
为了使用P个进程实现算法2的并行计算,最简单的方法是将积分区间等分成P个子区间,每个进程独立地调用integration函数计算一个子区间上的近似积分,然后对这些子区间上的近似积分值求和即可得到计算结果,只需修改串行程序的主程序main.cpp即可得到MPI并行程序。
/*递归方式自适应数值积分MPI主程序:均匀区间划分*/
#include <stdio.h>
#include <stdlib.h>
#include <cmath>
#include "mpi.h"
#include "function.h"
#include "integration.h"
int main(int argc, char *argv[])
{
int nprocs, myrank;
double a = 0.0, b = 1.0;
double eps, res, res0, a0, b0, cpu0, cpu1, wall0, wall1, wall2;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if(myrank == 0)
{
if(argc != 2)
{
printf("Usage: %s epsilon.\n", argv[0]);
printf("Example: %s 1e-4.\n", argv[0]);
MPI_Abort(MPI_COMM_WORLD, 1);
return 1;
}
eps = atof(argv[1]);
}
/*将eps广播给所有进程*/
MPI_Bcast(&eps, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
/*计算本进程的积分区间*/
a0 = a + (myrank + 0)*(b - a)/nprocs;
b0 = a + (myrank + 1)*(b - a)/nprocs;
/*开始时间*/
gettime(&cpu0, &wall0);
res0 = integration(a0, F(a0), b0, F(b0), eps/(b - a), F);
gettime(&cpu1, &wall1);
printf("\tRank = %d, # of evaluations = %u, cputime = %0.4lf, wtime = %0.4lf.\n", myrank, evalution_count, cpu1 - cpu0, wall1 - wall0);
MPI_Reduce(&res0, &res, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
gettime(NULL, &wall2);
if(myrank == 0)
{
printf("result = %0.16lf, error = %0.4le, wtime = %0.4lf.\n", res, res - RESULT, wall2 - wall0);
}
MPI_Finalize();
return 0;
}
编译及运行结果如下: