当前位置: 首页 > article >正文

postgres 客户端请求处理1——创建保存监听套接字

**瀚高数据库

目录

环境

文档用途

详细信息

环境

系统平台:Linux x86-64 Red Hat Enterprise Linux 7
版本:14

文档用途

了解pg创建监听套接字的过程,主要是TCP协议通信

详细信息

1.数据类型
socket提供的网络编程API会创建关于socket的文件描述符,在Linux上为整型,跟普通的文件描述符在类型上保持一致。

source: /src/include/port.h

/*

* Windows has enough specialized port stuff that we push most of it off into another file.
* Note: Some CYGWIN includes might #define WIN32.
*/
#if defined(WIN32) && !defined(__CYGWIN__)
#include "port/win32_port.h"
#endif

/* socket has a different definition on WIN32 */
#ifndef WIN32
typedef int pgsocket;

#define PGINVALID_SOCKET (-1)
#else

typedef SOCKET pgsocket;



source: /src/backend/postmaster/postmaster.c

/* The socket(s) we're listening to. */
#define MAXLISTEN 64
static pgsocket ListenSocket[MAXLISTEN];

2.初始化ListenSocket
在创建套接字之前,每一个监听套接字都初始化为PGINVALID_SOCKET,即-1。

source: /src/backend/postmaster/postmaster.c

/*

* Establish input sockets.
*
* First, mark them all closed, and set up an on_proc_exit function that's
* charged with closing the sockets again at postmaster shutdown.
*/
for (i = 0; i < MAXLISTEN; i++)
    ListenSocket[i] = PGINVALID_SOCKET;

3.初始化on_proc_exit

当postmaster shutdown时,在on_proc_exit注册的回调函数CloseSeverPorts将被调用。一般用来解决防止意外退出,一些系统资源得不到释放的问题。

Unix Domain Socket用来进行本机进程间通信,在退出时同时要移除文件系统的文件,比如/tmp/.s.PGSQL.5432,这是RemoveSocketFiles()的作用。

source: /src/backend/postmaster/postmaster.c

on_proc_exit(CloseServerPorts, 0);



source: /src/backend/postmaster/postmaster.c

/*
* on_proc_exit callback to close server's listen sockets
*/
static void
CloseServerPorts(int status, Datum arg)
{
    int i;

    /*
     * First, explicitly close all the socket FDs.  We used to just let this
     * happen implicitly at postmaster exit, but it's better to close them
     * before we remove the postmaster.pid lockfile; otherwise there's a race
     * condition if a new postmaster wants to re-use the TCP port number.
     */
    for (i = 0; i < MAXLISTEN; i++)
    {
        if (ListenSocket[i] != PGINVALID_SOCKET)
        {
            StreamClose(ListenSocket[i]); // Linux下就是close(ListenSocket[i]);
            ListenSocket[i] = PGINVALID_SOCKET;
        }
    }

    /*
     * Next, remove any filesystem entries for Unix sockets.  To avoid race
     * conditions against incoming postmasters, this must happen after closing
     * the sockets and before removing lock files.
     */
    RemoveSocketFiles();

    /*
     * We don't do anything about socket lock files here; those will be
     * removed in a later on_proc_exit callback.
     */
}



source: /src/backend/libpq/pqcomm.c

/*
* RemoveSocketFiles -- unlink socket files at postmaster shutdown
*/

void RemoveSocketFiles(void)
{
    ListCell   *l;

    /* Loop through all created sockets... */
    foreach(l, sock_paths)
    {
         char   *sock_path = (char *) lfirst(l);

         /* Ignore any error. */
         (void) unlink(sock_path);
    }
    /* Since we're about to exit, no need to reclaim storage */
    sock_paths = NIL;
}

4.解析ListenAddresses参数

* The TCP listen address(es) */

char       *ListenAddresses; // 用来存储postgresql.conf中的配置项listen_addresses的值。e.g. listen_addresses='127.0.0.1, 210.3.147.123',那么ListenAddresses="127.0.0.1, 210.3.147.123"。

需要两个函数InitializeGUCoptions()和SelectConfigFiles()将配置文件种的参数解析到全局变量ListenAddresses。

如果listenAddresses不为空,将进行如下处理:

if (ListenAddresses)

	{

		char	   *rawstring;

		List	   *elemlist;

		ListCell   *l;

		int			success = 0;



		/* Need a modifiable copy of ListenAddresses */

               // 在当前进程内存上下文分配一块空间,用于存放ListenAddresses的备份

		rawstring = pstrdup(ListenAddresses);



		/* Parse string into list of hostnames */

                // 将字符串根据分隔符解析到链表中

		if (!SplitGUCList(rawstring, ',', &elemlist))

		{

			/* syntax error in list */

			ereport(FATAL,

					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),

					 errmsg("invalid list syntax in parameter \"%s\"",

							"listen_addresses")));

		}

      //遍历链表,依次取出ip地址,使用StreamServerPort处理ip,主要是根据ip通过socket->bind->listen来创建1个监听套接字同时设置套接字的一些选项,尤其是解决快速重启服务端产生的端口占用问题。

		foreach(l, elemlist)

		{

			char	   *curhost = (char *) lfirst(l);



			if (strcmp(curhost, "*") == 0)

				status = StreamServerPort(AF_UNSPEC, NULL,

										  (unsigned short) PostPortNumber,

										  NULL,

										  ListenSocket, MAXLISTEN);

			else

				status = StreamServerPort(AF_UNSPEC, curhost,

										  (unsigned short) PostPortNumber,

										  NULL,

										  ListenSocket, MAXLISTEN);



			if (status == STATUS_OK)

			{

				success++;

				/* record the first successful host addr in lockfile */

				if (!listen_addr_saved)

				{

					AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);

					listen_addr_saved = true;

				}

			}

			else

				ereport(WARNING,

						(errmsg("could not create listen socket for \"%s\"",

								curhost)));

		}



		if (!success && elemlist != NIL)

			ereport(FATAL,

					(errmsg("could not create any TCP/IP sockets")));



		list_free(elemlist);

		pfree(rawstring);

	}

4.1 StreamServerPort

/*

 * Streams -- wrapper around Unix socket system calls

 *

 *

 *Stream functions are used for vanilla TCP connection protocol.

 */



协议如果是AF_UNIX,hostname应该为空,并且提供UnixSocketDir,比如/tmp;如果不是AF_UNIX,hostname如果为空,表示所有网卡,UnixSocketDir应该为空。



/*

 * StreamServerPort -- open a "listening" port to accept connections.

 *

 * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.

 * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be

 * specified.  For TCP ports, hostName is either NULL for all interfaces or

 * the interface to listen on, and unixSocketDir is ignored (can be NULL).

 *

 * Successfully opened sockets are added to the ListenSocket[] array (of

 * length MaxListen), at the first position that isn't PGINVALID_SOCKET.

 *

 * RETURNS: STATUS_OK or STATUS_ERROR

 */



int

StreamServerPort(int family, const char *hostName, unsigned short portNumber,

				 const char *unixSocketDir,

				 pgsocket ListenSocket[], int MaxListen)

{

	pgsocket	fd;

	int			err;

	int			maxconn;

	int			ret;

	char		portNumberStr[32];

	const char *familyDesc;

	char		familyDescBuf[64];

	const char *addrDesc;

	char		addrBuf[NI_MAXHOST];

	char	   *service;

	struct addrinfo *addrs = NULL,

			   *addr;

	struct addrinfo hint;

	int			listen_index = 0;

	int			added = 0;

	char		unixSocketPath[MAXPGPATH];

#if !defined(WIN32) || defined(IPV6_V6ONLY)

	int			one = 1;

#endif



	/* Initialize hint structure */

        // 初始化结构体,给getddrinfo()使用的hints参数

	MemSet(&hint, 0, sizeof(hint));

	hint.ai_family = family;                                // 指定协议族,比如AF_INET, AF_INET6, AF_UNIX....

	hint.ai_flags = AI_PASSIVE;                       // 告诉getaddrinfo()返回适合用于被动连接(服务端)使用的套接字

	hint.ai_socktype = SOCK_STREAM;         // 数据类型是流数据,基本上现在就是TCP



	if (family == AF_UNIX)

	{

		/*

		 * Create unixSocketPath from portNumber and unixSocketDir and lock

		 * that file path

		 */

      // 构造用于Unix Domain Socket本机进程间通信的文件



{#define UNIXSOCK_PATH(path, port, sockdir) \

	   (AssertMacro(sockdir), \

		AssertMacro(*(sockdir) != '\0'), \

		snprintf(path, sizeof(path), "%s/.s.PGSQL.%d", \

				 (sockdir), (port)))

}

             // 文件的路径长度有有要求,有的系统是128,有的不是,甚至有的struct sockaddr_un除了协议族和一个字符串数组还包含别的成员,所以使用下面的宏保证兼容性,长度包括'\0',所以下边文件路径长度是(UNIXSOCK_PATH_BUFLEN - 1)。

{

#define UNIXSOCK_PATH_BUFLEN sizeof(((struct sockaddr_un *) NULL)->sun_path)

}

		UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);

		if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)

		{

			ereport(LOG,

					(errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",

							unixSocketPath,

							(int) (UNIXSOCK_PATH_BUFLEN - 1))));

			return STATUS_ERROR;

		}

                // 文件上锁只是简单的使用了文件系统提供的属性,在open一个文件时如果提供了O_CREAT | O_EXCL,即创建文件但是文件已经存在,会报错并设置errno。带来的问题时如果程序异常退出并且没有unlink该文件,会出现问题。

		if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)

			return STATUS_ERROR;

               // 

		service = unixSocketPath;

	}

	else

	{

		snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);

		service = portNumberStr;

	}

        // 到这里service要么是unixSocketPath要么是portNumber

  // 下边要根据你提供的IP值,service类型,hint参数,将符合条件的结果放入到addrs结构体当中,这个addrs代表的是元素为struct addrinfo的链表并不是单个的结构体,除非返回的结果只有1个,可man 3 getaddrinfo查看

	ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);



	if (ret || !addrs)

	{

		if (hostName)

			ereport(LOG,

					(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",

							hostName, service, gai_strerror(ret))));

		else

			ereport(LOG,

					(errmsg("could not translate service \"%s\" to address: %s",

							service, gai_strerror(ret))));

		if (addrs)

			pg_freeaddrinfo_all(hint.ai_family, addrs);

		return STATUS_ERROR;

	}

  // 遍历结果链表,做处理

	for (addr = addrs; addr; addr = addr->ai_next)

	{

		if (family != AF_UNIX && addr->ai_family == AF_UNIX)

		{

			/*

			 * Only set up a unix domain socket when they really asked for it.

			 * The service/port is different in that case.

			 */

			continue;

		}



		/* See if there is still room to add 1 more socket. */

		for (; listen_index < MaxListen; listen_index++)

		{

			if (ListenSocket[listen_index] == PGINVALID_SOCKET)

				break;

		}

		if (listen_index >= MaxListen)

		{

			ereport(LOG,

					(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",

							MaxListen)));

			break;

		}



		/* set up address family name for log messages */

		switch (addr->ai_family)

		{

			case AF_INET:

				familyDesc = _("IPv4");

				break;

			case AF_INET6:

				familyDesc = _("IPv6");

				break;

			case AF_UNIX:

				familyDesc = _("Unix");

				break;

			default:

				snprintf(familyDescBuf, sizeof(familyDescBuf),

						 _("unrecognized address family %d"),

						 addr->ai_family);

				familyDesc = familyDescBuf;

				break;

		}



		/* set up text form of address for log messages */

                // 根据address寻找主机名,记录在addrbuf中,打日志用的。

		if (addr->ai_family == AF_UNIX)

			addrDesc = unixSocketPath;

		else

		{

			pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,

							   addr->ai_addrlen,

							   addrBuf, sizeof(addrBuf),

							   NULL, 0,

							   NI_NUMERICHOST);

			addrDesc = addrBuf;

		}

     

     // 根据协议类型,流数据类型,创建监听套接字

		if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)

		{

			ereport(LOG,

					(errcode_for_socket_access(),

			/* translator: first %s is IPv4, IPv6, or Unix */

					 errmsg("could not create %s socket for address \"%s\": %m",

							familyDesc, addrDesc)));

			continue;

		}



#ifndef WIN32

     // 与TIME_WAIT有关,主动关闭套接字一端在四次挥手期间为了保证能够重发报文(报文可能丢失)会占用一段时间的端口,这时候如果重启服务器可能会出现端口占用的情况,所以使用SO_REUSEADDR设置标志。

     // 在监听套接字设置的属性,连接套接字可以继承,所以在这儿设置也是有效的。

		/*

		 * Without the SO_REUSEADDR flag, a new postmaster can't be started

		 * right away after a stop or crash, giving "address already in use"

		 * error on TCP ports.

		 *

		 * On win32, however, this behavior only happens if the

		 * SO_EXCLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows

		 * multiple servers to listen on the same address, resulting in

		 * unpredictable behavior. With no flags at all, win32 behaves as Unix

		 * with SO_REUSEADDR.

		 */

		if (addr->ai_family != AF_UNIX)

		{

			if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,

							(char *) &one, sizeof(one))) == -1)

			{

				ereport(LOG,

						(errcode_for_socket_access(),

				/* translator: third %s is IPv4, IPv6, or Unix */

						 errmsg("%s(%s) failed for %s address \"%s\": %m",

								"setsockopt", "SO_REUSEADDR",

								familyDesc, addrDesc)));

				closesocket(fd);

				continue;

			}

		}

#endif



#ifdef IPV6_V6ONLY

            // 只使用IPv6,不再支持IPv4,没这个选项IPv6和IPv4都是支持的。

		if (addr->ai_family == AF_INET6)

		{

			if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,

						   (char *) &one, sizeof(one)) == -1)

			{

				ereport(LOG,

						(errcode_for_socket_access(),

				/* translator: third %s is IPv4, IPv6, or Unix */

						 errmsg("%s(%s) failed for %s address \"%s\": %m",

								"setsockopt", "IPV6_V6ONLY",

								familyDesc, addrDesc)));

				closesocket(fd);

				continue;

			}

		}

#endif



		/*

		 * Note: This might fail on some OS's, like Linux older than

		 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map

		 * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4

		 * connections.

		 */

                // 下边就是常规操作,bind将服务端地址绑定到套接字,并且listen创建监听队列,因为之后要用accept从全连接队列取客户端请求,最终把fd记录到全局数组中。

		err = bind(fd, addr->ai_addr, addr->ai_addrlen);

		if (err < 0)

		{

			int			saved_errno = errno;



			ereport(LOG,

					(errcode_for_socket_access(),

			/* translator: first %s is IPv4, IPv6, or Unix */

					 errmsg("could not bind %s address \"%s\": %m",

							familyDesc, addrDesc),

					 saved_errno == EADDRINUSE ?

					 (addr->ai_family == AF_UNIX ?

					  errhint("Is another postmaster already running on port %d?",

							  (int) portNumber) :

					  errhint("Is another postmaster already running on port %d?"

							  " If not, wait a few seconds and retry.",

							  (int) portNumber)) : 0));

			closesocket(fd);

			continue;

		}



		if (addr->ai_family == AF_UNIX)

		{

			if (Setup_AF_UNIX(service) != STATUS_OK)

			{

				closesocket(fd);

				break;

			}

		}



		/*

		 * Select appropriate accept-queue length limit.  It seems reasonable

		 * to use a value similar to the maximum number of child processes

		 * that the postmaster will permit.

		 */

		maxconn = MaxConnections * 2;



		err = listen(fd, maxconn);

		if (err < 0)

		{

			ereport(LOG,

					(errcode_for_socket_access(),

			/* translator: first %s is IPv4, IPv6, or Unix */

					 errmsg("could not listen on %s address \"%s\": %m",

							familyDesc, addrDesc)));

			closesocket(fd);

			continue;

		}



		if (addr->ai_family == AF_UNIX)

			ereport(LOG,

					(errmsg("listening on Unix socket \"%s\"",

							addrDesc)));

		else

			ereport(LOG,

			/* translator: first %s is IPv4 or IPv6 */

					(errmsg("listening on %s address \"%s\", port %d",

							familyDesc, addrDesc, (int) portNumber)));



		ListenSocket[listen_index] = fd;

		added++;

	}



	pg_freeaddrinfo_all(hint.ai_family, addrs);



	if (!added)

		return STATUS_ERROR;



	return STATUS_OK;

}

http://www.kler.cn/a/272924.html

相关文章:

  • 【前端】框架-构建-包管理-语言-语法-生态工具
  • 具身导航如何利用取之不尽的网络视频资源!RoomTour3D:基于几何感知的视频-指令训练调优
  • 【NLP】语言模型的发展历程 (1)
  • 编译pytorch——cuda-toolkit-nvcc
  • JavaScript系列(28)--模块化开发详解
  • Android 调用系统服务接口获取屏幕投影(需要android.uid.system)
  • JDBC的概念
  • C语言选择语句概览
  • 用python写网络爬虫:2.urllib库的基本用法
  • Android14之报错:error:add its name to the whitelist(一百九十四)
  • ✅技术社区—通过Canal框架实现MySQL与ElasticSearch的数据同步
  • 机器学习-绪论
  • SSH远程连接断开后,程序继续运行
  • 10:00面试,10:06就出来了,问的问题有点变态。。。
  • 【黑马程序员】Python高阶
  • VS Code安装Live Server插件搭建web网页结合内网穿透实现公网访问
  • matlab FR共轭梯度法求解无约束问题
  • 深度学习-2.8模型拟合概念和欠拟合模型、过拟合调整策略
  • vulhub中GIT-SHELL 沙盒绕过漏洞复现(CVE-2017-8386)
  • 寒假作业Day 13
  • 【C语言】打印闰年
  • 疯狂 META:Aavegotchi 新一季稀有度挖矿来了!
  • 【Linux网络编程七】网络序列化和反序列化(网络版本计算器)
  • 信息检索(十三):On Complementarity Objectives for Hybrid Retrieval
  • 基于单片机的灭火机器人设计
  • C 练习实例77-指向指针的指针-二维数组