当前位置: 首页 > article >正文

路径规划——D*算法

路径规划——D*算法

D Star算法是一种用于动态环境下的算法,它可以在环境变化时快速更新路径。

算法原理

D Star算法是一种反向增量式搜索算法,反向即算法从目标点开始向起点逐步搜索;增量式搜索,即算法在搜索过程中会计算每一个节点的距离度量信息H(x),在动态环境中若出现障碍物无法继续沿预先路径搜索,算法会根据原先已经得到的每个点的距离度量信息在当前状态点进行路径再规划,无需从目标点进行重新规划。

算法流程:

1.起初,起点和终点的位置是已知的,环境中的障碍物未知或部分未知。初始化一个开放列表openlist(优先队列),将终点添加到该列表中,并将起点作为搜索目标。
2.在路径搜索过程中,D Star算法不使用启发信息,更像Dijkstra算法,由于是反向搜索,D Star算法计算每个节点到终点的成本代价,然后通过扩展节点来寻找从起点到终点的最优路径。在初次运行时,它使用Dijkstra算法生成一条从起点到终点的最短路径。
3.然而,当环境发生变化时,例如,检测到新的障碍物或路径变得不可通行时,要求算法可以动态更新路径。它通过回溯先前的路径,从遇到障碍物的节点开始重新计算新的路径。
4.在发生变化的区域重新执行D Star算法,从变化点开始,重新计算新的路径。D Star算法的一个关键特性是局部修复,即它只需要重新计算受到变化影响的部分路径,而不是整个路径。这使得算法在动态环境中能够高效地适应环境变化。
5.一旦路径更新完成,算法继续按照新的最短路径前进。每当环境再次变化时,重复上述步骤,直到达到终点。

D*算法有三个重要的函数:
1.process_state():该函数是用来降低openlist表中的某个节点x(state)的h(x)值或者传播节点x的h(x)值变更信息给邻居节点,让邻居节点进行相应变更的同时进行路径搜索查找的;
2.insert(x,val):该函数是用来修改节点x的状态以及h(x)值和k(x)值的;
3.modify_cost(x,y,val):该函数是用来修改节点x和y之间的移动代价cost(x,y),而且根据节点y的状态t(y)的情况,可能对节点y的h(y)值和状态t(y)进行修改的

函数process_state()是D*算法的核心,具体代码如下:

def process_state(self):
    node =self.min_state() # Find the node whose value of node.k is the smallest.
    self.searched.append(node.position)

    if node is None:
        return -1

    k_old = self.get_kmin() # Find the minimum node.k as k_old
    self.delete(node) # Delete the node from openlist, and modify node.t from 'OPEN' to 'CLOSE'

    neighbors, distances = self.get_neighbors(node)
    # RASIE State, its path cost may not be optimal
    if k_old < node.h:                                                                              
        for neighbor,distance in zip(neighbors,distances):                                          #
            if neighbor.h <= k_old and node.h > neighbor.h + distance:                              #
                node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #
                node.h = neighbor.h + distance                                                      # 检查node的最优相邻状态,如果存在则降低 h 值

 	# LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .
    if k_old == node.h:
        for neighbor,distance in zip(neighbors,distances):
            if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \
            or (neighbor.parent != node.position and neighbor.h > node.h+distance):
                neighbor.parent = node.position # Searching commonly
                self.insert(neighbor,node.h+distance)
    else:
        for neighbor,distance in zip(neighbors,distances):
            if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):
                neighbor.parent = node.position
                self.insert(neighbor,node.h+distance)
            else:
                if neighbor.parent != node.position and neighbor.h > node.h + distance:
                    self.insert(node,node.h)
                else:
                      if neighbor.parent != node.position and node.h > neighbor.h+distance \
                      and neighbor.t == 'CLOSED' and neighbor.h > k_old:
                          self.insert(neighbor,neighbor.h)

	return self.get_kmin()

下面解析此函数:

# RASIE State, its path cost may not be optimal
if k_old < node.h:                                                                              #
    for neighbor,distance in zip(neighbors,distances):                                          #
        if neighbor.h <= k_old and node.h > neighbor.h + distance:                              #
            node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #
            node.h = neighbor.h + distance   

“RASIE”状态,其路径代价可能不是最优的,可以通过重新计算邻居节点来寻找最优路径;主要是在障碍物更新之后发挥作用。
当前访问的节点受到了新增障碍物的影响(比如,当前节点的父节点是新增障碍物),向h值越小的方向扩展,即从当前节点向靠近目标节点的方向扩展,并更新当前节点的父节点以更新路径.

# LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .
if k_old == node.h:
    for neighbor,distance in zip(neighbors,distances):
        if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \
        or (neighbor.parent != node.position and neighbor.h > node.h+distance):
            neighbor.parent = node.position # Searching commonly
            self.insert(neighbor,node.h+distance)

“LOWER”状态,其路径代价是最优的,正常搜索即可。
当前访问的节点还没有受新增障碍物的影响, 如果邻居节点neighbor的状态是"NEW"即未访问过, 或者邻居节点neighbor的父节点是当前节点node并且邻居节点的h值 不等于 当前节点的h值加上两节点间的距离,或者邻居节点neighbor的父节点不是当前节点node并且邻居节点的h值 大于 当前节点的h值加上两节点间的距离;那么!设邻居节点的父节点为当前节点,将neighbor节点放入openlist中进行访问搜索,并将其h值更新为当前节点的h值加上两节点间的距离.

else:
    for neighbor,distance in zip(neighbors,distances):
        if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):
            neighbor.parent = node.position
            self.insert(neighbor,node.h+distance)
			// 如果邻居节点neighbor的状态是"NEW"即未访问过,
            // 或者邻居节点neighbor的父节点是当前节点node并且邻居节点的h值 大于 当前节点的h值加上两节点间的距离
            // 那么!设邻居节点的父节点为当前节点,并且将邻居节点的h值更新为当前节点的h值加上两节点间的距离,
            // 将neighbor节点放入openlist中进行访问搜索,也就是将采用从当前节点node经过neighbor节点的路径(此路径不一定是最终选择的路径)
        else:
			if neighbor.parent != node.position and neighbor.h > node.h + distance:
               	// 如果邻居节点neighbor的父节点不是当前访问节点node,
			    // 并且邻居节点的h值 大于 当前节点node的h值加上两节点间的距离,那么邻居节点不是“好点”,
			    // 为了更小的代价,便不应该从node经过neighbor,将node加入到openlist中重新访问搜索
				self.insert(node,node.h)
			else:
			     if neighbor.parent != node.position and node.h > neighbor.h+distance \
			     and neighbor.t == 'CLOSED' and neighbor.h > k_old:
			         self.insert(neighbor,neighbor.h)
	                 // 如果邻居节点neighbor的父节点不是当前访问节点node,并且邻居节点是个“好点”,
				     // 并且邻居节点的状态是“已访问”,同时邻居节点的h值 大于 k_old,这时node应该是新增的障碍物,
				     // 这时应该添加neighbor到openlist中,对其进行访问搜索

其他的情况,如果邻居是新的或其父节点是当前节点但代价不匹配,则更新父节点并插入开放列表。如果邻居的父节点不是当前节点且代价不匹配,则可能需要插入当前节点或邻居节点进行进一步处理。

函数**process_state()**理解起来是有一定难度的,逻辑思路有一定的复杂性,大家可以结合具体案例慢慢理解。

D*论文链接:https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=351061

算法实现

"""
    Filename: dStar.py
    Description: Plan path using Dynamic A* Algorithm
    Author: Benxiaogu:https://github.com/Benxiaogu
    Date: 2024-08-29
"""

import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time


class Node:
    """
    Class for D* nodes.

    Parameters:
        position (tuple): current coordinate
        parent (tuple): coordinate of parent node
        t (str): state of node, including `NEW` `OPEN` and `CLOSED`
                    'NEW': the node was never put into openlist;
                    'OPEN': the node is in openlist
                    'CLOSED': the node is no longer in openlist
        h (float): cost from goal to current node
        k (float): minimum cost from goal to current node in history
    """
    def __init__(self, position, parent, t, h, k) -> None:
        self.position = position
        self.parent = parent
        self.t = t
        self.h = h
        self.k = k

    def __add__(self, node):
        return Node((self.x + node.x, self.y + node.y), 
                     self.parent, self.t, self.h + node.h, self.k)
    
    def __eq__(self, other) -> bool:
        return self.position == other.position
        
    def __lt__(self,other):
        return self.k+self.h < other.k+other.h or (self.k+self.h==other.k+other.h and self.h<other.h)


class DStar:
    def __init__(self,grid,start,goal,board_size) -> None:
        self.grid = grid
        self.start = Node(start,None,'NEW',float('inf'),float('inf'))
        self.goal = Node(goal,None,'NEW',0,float('inf'))
        self.board_size = board_size
        self.path = []
        self.open = []
        self.searched = [] # Used to record nodes that are searched

        grid_map = []
        for i in range(len(self.grid)):
            for j in range(len(self.grid[0])):
                grid_map.append((i,j))

        self.map = {n: Node(n, None, 'NEW', float('inf'), float('inf')) for n in grid_map}
        self.map[self.goal.position] = self.goal
        self.map[self.start.position] = self.start
        self.insert(self.goal,0)

        self.fig, self.ax = plt.subplots()

    def run(self):
        cost = self.plan()
        self.fig.canvas.mpl_connect('button_press_event', self.update)
        self.visualize_grid(cost)

    def plan(self):
        while True:
            self.process_state()
            if self.start.t == 'CLOSED':
                break
        self.searched.clear()
        # Find the goal
        pathnode = self.start
        cost = 0
        self.path.append(pathnode.position)
        while pathnode != self.goal:
            nodeparent = self.map[pathnode.parent]
            cost += self.cost(pathnode, nodeparent)
            pathnode = nodeparent
            self.path.append(pathnode.position)
            
        return cost

    class Neighbor:
        def __init__(self,direction,cost):
            self.direction = direction
            self.cost = cost
    
    def get_neighbors(self, current_node):
        neighbors = []
        distances = []
        node = current_node.position
        nexts = [self.Neighbor((-1, 0),1), self.Neighbor((0, 1),1), self.Neighbor((0, -1),1), self.Neighbor((1,0),1),
                self.Neighbor((-1,1),math.sqrt(2)), self.Neighbor((1,1),math.sqrt(2)),self.Neighbor((1, -1),math.sqrt(2)), self.Neighbor((-1,-1),math.sqrt(2))]
        for next in nexts:
            neighbor = (node[0] + next.direction[0], node[1] + next.direction[1])
            neighborNode = self.map[neighbor]
            if not self.isCollision(node,neighbor):
                neighbors.append(neighborNode)
                distances.append(next.cost)

        return neighbors,distances
    
    def isCollision(self,node1,node2):
        """
            Check if there will be a collision from node1 to node2
        """
        if self.board_size <= node1[0] < len(self.grid)-self.board_size and self.board_size <= node1[1] < len(self.grid[0])-self.board_size \
                and self.board_size <= node2[0] < len(self.grid)-self.board_size and self.board_size <= node2[1] < len(self.grid[0])-self.board_size:
            if self.grid[node1[0]][node1[1]] == 0 and self.grid[node2[0]][node2[1]] == 0:
                direction1 = node1[0]-node2[0]
                direction2 = node1[1]-node2[1]
                if direction1 != 0 and direction2 != 0:  # 对角线方向
                    if self.grid[node1[0]][node2[1]] == 0 and self.grid[node2[0]][node1[1]] == 0:
                        return False
                else:
                    return False

        return True

    def process_state(self):
        """
            Compute optimal path costs to the goal
        """
        node =self.min_state() # Find the node whose value of node.k is the smallest.
        self.searched.append(node.position)

        if node is None:
            return -1

        k_old = self.get_kmin() # Find the minimum node.k as k_old
        self.delete(node) # Delete the node from openlist, and modify node.t from 'OPEN' to 'CLOSE'
        
        neighbors, distances = self.get_neighbors(node)
        # RASIE State, its path cost may not be optimal
        if k_old < node.h:                                                                              #
            for neighbor,distance in zip(neighbors,distances):                                          #
                if neighbor.h <= k_old and node.h > neighbor.h + distance:                              #
                    node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #
                    node.h = neighbor.h + distance                                                      # 检查node的最优相邻状态,如果存在则降低 h 值
                   
        # LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .
        if k_old == node.h:
            for neighbor,distance in zip(neighbors,distances):
                if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \
                    or (neighbor.parent != node.position and neighbor.h > node.h+distance):
                    neighbor.parent = node.position # Searching commonly
                    self.insert(neighbor,node.h+distance)
                   
        else:
            for neighbor,distance in zip(neighbors,distances):
                if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):
                    neighbor.parent = node.position
                    self.insert(neighbor,node.h+distance)
                else:
                    if neighbor.parent != node.position and neighbor.h > node.h + distance:
                        self.insert(node,node.h)
                    else:
                        if neighbor.parent != node.position and node.h > neighbor.h+distance \
                            and neighbor.t == 'CLOSED' and neighbor.h > k_old:
                            self.insert(neighbor,neighbor.h)
            

        return self.get_kmin()

    def min_state(self):
        """
            Return the node who possesses the minimum value of k.
        """
        if not self.open:
            return None
        min_state = min(self.open, key=lambda x: x.k)
        return min_state

    def get_kmin(self):
        """
            Return the minimum value of k.
        """
        if not self.open:
            return -1
        k_min = min([x.k for x in self.open])
        return k_min

    def delete(self, node) -> None:
        """
            Remove node from openlist, and change node.t from 'OPEN' to 'CLOSED'
        """
        if node.t == 'OPEN':
            node.t = 'CLOSED'
        self.open.remove(node)

    def insert(self, node: Node, h_new: float) -> None:
        """
            Modify the state of node as well as the node.h value and node.k value.
        """
        if node.t == 'NEW':         node.k = h_new
        elif node.t == 'OPEN':      node.k = min(node.k, h_new)
        elif node.t == 'CLOSED':    node.k = min(node.h, h_new)
        node.h = h_new
        node.t = 'OPEN'
        self.open.append(node)

    def modify_cost(self,node,node_parent):
        """
            Change the cost and enter affected nodes on the OPEN list.
        """
        if node.t == 'CLOSED':
            self.insert(node,node_parent.h+self.cost(node,node_parent))
        while True:
            k_min = self.process_state()
            if k_min >= node.h:
                break

    def cost(self, node1: Node, node2: Node) -> float:
        """
            Return Euclidean distance if there is no collision from node1 to node2 otherwise 'inf'
        """
        if self.isCollision(node1.position, node2.position):
            return float("inf")
        return math.hypot(node2.position[0] - node1.position[0], node2.position[1] - node1.position[1])

    def update(self, event) -> None:
        """
            Update obstacles and replan from affected node.
        """
        x, y = int(event.xdata), int(event.ydata)
        if y < self.board_size or y > len(self.grid) -self.board_size- 1 or x < self.board_size or x > len(self.grid[0])-self.board_size - 1:
            print("Please choose right area!")
        else:
            self.searched.clear()
            if self.grid[y][x] == 0:
                # update obstacles
                self.grid[y][x] = 1

                node = self.start
                cost = 0
                self.path.clear()
                while node != self.goal:
                    node_parent = self.map[node.parent]
                    if self.isCollision(node.position, node_parent.position): 
                        # when meeting collisions, call modify_cost() function
                        self.modify_cost(node,node_parent)
                        continue
                    self.path.append(node.position)
                    cost += self.cost(node, node_parent)
                    node = node_parent
                self.path.append(self.goal.position)

                plt.cla()
                self.visualize_grid(cost)

    def visualize_grid(self, cost):
        ...

在这里插入图片描述
完整代码:https://github.com/Benxiaogu/PathPlanning/tree/main/D_Star


http://www.kler.cn/a/298988.html

相关文章:

  • YOLO-World:Real-Time Open-Vocabulary Object Detection
  • springboot463学生信息管理系统论文(论文+源码)_kaic
  • 京准电钟:电厂自控NTP时间同步服务器技术方案
  • SQL进阶技巧:如何计算商品需求与到货队列表进出计划?
  • 【网络安全】用 Frida 修改软件为你所用
  • 彻底认识和理解探索分布式网络编程中的SSL安全通信机制
  • ubuntu内核升级后的问题修复
  • AMP网站如何适配提升SEO效果?
  • 编曲术语:编曲术语中英对照表
  • 安宝特方案 | 医疗AR眼镜,重新定义远程会诊体验
  • Unity Apple Vision Pro 开发(五):PolySpatial 2.0 导入方式
  • 【我的Android进阶之旅】解决CardView四个圆角有白边的问题
  • kkFileView PDF Image Mode Preview BUG
  • 【安全漏洞】Apache Tomcat 高危漏洞版本
  • 断点回归模型
  • 银行安全用电典型产品解决方案介绍-安科瑞-叶西平
  • gdb 调试带有 fork 的进程,如何在父进程和子进程之间切换?
  • SpringBoot2:请求处理原理分析-接口参数解析原理(argumentResolvers)
  • 第11章 32位x86处理器编程架构
  • Leetcode3266. K 次乘运算后的最终数组 II
  • 【Nacos】健康检查与环境隔离
  • 【数据结构】2——二叉树遍历
  • 用hiredis连接redis
  • 如何优化谷歌排名更有效?
  • Linux之Shell命令
  • 【YouTube采集】按搜索关键词批量爬取视频数据,并封装成exe界面软件!