postgres 客户端请求处理1——创建保存监听套接字
**瀚高数据库
目录
环境
文档用途
详细信息
环境
系统平台:Linux x86-64 Red Hat Enterprise Linux 7
版本:14
文档用途
了解pg创建监听套接字的过程,主要是TCP协议通信
详细信息
1.数据类型
socket提供的网络编程API会创建关于socket的文件描述符,在Linux上为整型,跟普通的文件描述符在类型上保持一致。
source: /src/include/port.h
/*
* Windows has enough specialized port stuff that we push most of it off into another file.
* Note: Some CYGWIN includes might #define WIN32.
*/
#if defined(WIN32) && !defined(__CYGWIN__)
#include "port/win32_port.h"
#endif
/* socket has a different definition on WIN32 */
#ifndef WIN32
typedef int pgsocket;
#define PGINVALID_SOCKET (-1)
#else
typedef SOCKET pgsocket;
source: /src/backend/postmaster/postmaster.c
/* The socket(s) we're listening to. */
#define MAXLISTEN 64
static pgsocket ListenSocket[MAXLISTEN];
2.初始化ListenSocket
在创建套接字之前,每一个监听套接字都初始化为PGINVALID_SOCKET,即-1。
source: /src/backend/postmaster/postmaster.c
/*
* Establish input sockets.
*
* First, mark them all closed, and set up an on_proc_exit function that's
* charged with closing the sockets again at postmaster shutdown.
*/
for (i = 0; i < MAXLISTEN; i++)
ListenSocket[i] = PGINVALID_SOCKET;
3.初始化on_proc_exit
当postmaster shutdown时,在on_proc_exit注册的回调函数CloseSeverPorts将被调用。一般用来解决防止意外退出,一些系统资源得不到释放的问题。
Unix Domain Socket用来进行本机进程间通信,在退出时同时要移除文件系统的文件,比如/tmp/.s.PGSQL.5432,这是RemoveSocketFiles()的作用。
source: /src/backend/postmaster/postmaster.c
on_proc_exit(CloseServerPorts, 0);
source: /src/backend/postmaster/postmaster.c
/*
* on_proc_exit callback to close server's listen sockets
*/
static void
CloseServerPorts(int status, Datum arg)
{
int i;
/*
* First, explicitly close all the socket FDs. We used to just let this
* happen implicitly at postmaster exit, but it's better to close them
* before we remove the postmaster.pid lockfile; otherwise there's a race
* condition if a new postmaster wants to re-use the TCP port number.
*/
for (i = 0; i < MAXLISTEN; i++)
{
if (ListenSocket[i] != PGINVALID_SOCKET)
{
StreamClose(ListenSocket[i]); // Linux下就是close(ListenSocket[i]);
ListenSocket[i] = PGINVALID_SOCKET;
}
}
/*
* Next, remove any filesystem entries for Unix sockets. To avoid race
* conditions against incoming postmasters, this must happen after closing
* the sockets and before removing lock files.
*/
RemoveSocketFiles();
/*
* We don't do anything about socket lock files here; those will be
* removed in a later on_proc_exit callback.
*/
}
source: /src/backend/libpq/pqcomm.c
/*
* RemoveSocketFiles -- unlink socket files at postmaster shutdown
*/
void RemoveSocketFiles(void)
{
ListCell *l;
/* Loop through all created sockets... */
foreach(l, sock_paths)
{
char *sock_path = (char *) lfirst(l);
/* Ignore any error. */
(void) unlink(sock_path);
}
/* Since we're about to exit, no need to reclaim storage */
sock_paths = NIL;
}
4.解析ListenAddresses参数
* The TCP listen address(es) */
char *ListenAddresses; // 用来存储postgresql.conf中的配置项listen_addresses的值。e.g. listen_addresses='127.0.0.1, 210.3.147.123',那么ListenAddresses="127.0.0.1, 210.3.147.123"。
需要两个函数InitializeGUCoptions()和SelectConfigFiles()将配置文件种的参数解析到全局变量ListenAddresses。
如果listenAddresses不为空,将进行如下处理:
if (ListenAddresses)
{
char *rawstring;
List *elemlist;
ListCell *l;
int success = 0;
/* Need a modifiable copy of ListenAddresses */
// 在当前进程内存上下文分配一块空间,用于存放ListenAddresses的备份
rawstring = pstrdup(ListenAddresses);
/* Parse string into list of hostnames */
// 将字符串根据分隔符解析到链表中
if (!SplitGUCList(rawstring, ',', &elemlist))
{
/* syntax error in list */
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid list syntax in parameter \"%s\"",
"listen_addresses")));
}
//遍历链表,依次取出ip地址,使用StreamServerPort处理ip,主要是根据ip通过socket->bind->listen来创建1个监听套接字同时设置套接字的一些选项,尤其是解决快速重启服务端产生的端口占用问题。
foreach(l, elemlist)
{
char *curhost = (char *) lfirst(l);
if (strcmp(curhost, "*") == 0)
status = StreamServerPort(AF_UNSPEC, NULL,
(unsigned short) PostPortNumber,
NULL,
ListenSocket, MAXLISTEN);
else
status = StreamServerPort(AF_UNSPEC, curhost,
(unsigned short) PostPortNumber,
NULL,
ListenSocket, MAXLISTEN);
if (status == STATUS_OK)
{
success++;
/* record the first successful host addr in lockfile */
if (!listen_addr_saved)
{
AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
listen_addr_saved = true;
}
}
else
ereport(WARNING,
(errmsg("could not create listen socket for \"%s\"",
curhost)));
}
if (!success && elemlist != NIL)
ereport(FATAL,
(errmsg("could not create any TCP/IP sockets")));
list_free(elemlist);
pfree(rawstring);
}
4.1 StreamServerPort
/*
* Streams -- wrapper around Unix socket system calls
*
*
*Stream functions are used for vanilla TCP connection protocol.
*/
协议如果是AF_UNIX,hostname应该为空,并且提供UnixSocketDir,比如/tmp;如果不是AF_UNIX,hostname如果为空,表示所有网卡,UnixSocketDir应该为空。
/*
* StreamServerPort -- open a "listening" port to accept connections.
*
* family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
* For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
* specified. For TCP ports, hostName is either NULL for all interfaces or
* the interface to listen on, and unixSocketDir is ignored (can be NULL).
*
* Successfully opened sockets are added to the ListenSocket[] array (of
* length MaxListen), at the first position that isn't PGINVALID_SOCKET.
*
* RETURNS: STATUS_OK or STATUS_ERROR
*/
int
StreamServerPort(int family, const char *hostName, unsigned short portNumber,
const char *unixSocketDir,
pgsocket ListenSocket[], int MaxListen)
{
pgsocket fd;
int err;
int maxconn;
int ret;
char portNumberStr[32];
const char *familyDesc;
char familyDescBuf[64];
const char *addrDesc;
char addrBuf[NI_MAXHOST];
char *service;
struct addrinfo *addrs = NULL,
*addr;
struct addrinfo hint;
int listen_index = 0;
int added = 0;
char unixSocketPath[MAXPGPATH];
#if !defined(WIN32) || defined(IPV6_V6ONLY)
int one = 1;
#endif
/* Initialize hint structure */
// 初始化结构体,给getddrinfo()使用的hints参数
MemSet(&hint, 0, sizeof(hint));
hint.ai_family = family; // 指定协议族,比如AF_INET, AF_INET6, AF_UNIX....
hint.ai_flags = AI_PASSIVE; // 告诉getaddrinfo()返回适合用于被动连接(服务端)使用的套接字
hint.ai_socktype = SOCK_STREAM; // 数据类型是流数据,基本上现在就是TCP
if (family == AF_UNIX)
{
/*
* Create unixSocketPath from portNumber and unixSocketDir and lock
* that file path
*/
// 构造用于Unix Domain Socket本机进程间通信的文件
{#define UNIXSOCK_PATH(path, port, sockdir) \
(AssertMacro(sockdir), \
AssertMacro(*(sockdir) != '\0'), \
snprintf(path, sizeof(path), "%s/.s.PGSQL.%d", \
(sockdir), (port)))
}
// 文件的路径长度有有要求,有的系统是128,有的不是,甚至有的struct sockaddr_un除了协议族和一个字符串数组还包含别的成员,所以使用下面的宏保证兼容性,长度包括'\0',所以下边文件路径长度是(UNIXSOCK_PATH_BUFLEN - 1)。
{
#define UNIXSOCK_PATH_BUFLEN sizeof(((struct sockaddr_un *) NULL)->sun_path)
}
UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
{
ereport(LOG,
(errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
unixSocketPath,
(int) (UNIXSOCK_PATH_BUFLEN - 1))));
return STATUS_ERROR;
}
// 文件上锁只是简单的使用了文件系统提供的属性,在open一个文件时如果提供了O_CREAT | O_EXCL,即创建文件但是文件已经存在,会报错并设置errno。带来的问题时如果程序异常退出并且没有unlink该文件,会出现问题。
if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
return STATUS_ERROR;
//
service = unixSocketPath;
}
else
{
snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
service = portNumberStr;
}
// 到这里service要么是unixSocketPath要么是portNumber
// 下边要根据你提供的IP值,service类型,hint参数,将符合条件的结果放入到addrs结构体当中,这个addrs代表的是元素为struct addrinfo的链表并不是单个的结构体,除非返回的结果只有1个,可man 3 getaddrinfo查看
ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
if (ret || !addrs)
{
if (hostName)
ereport(LOG,
(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
hostName, service, gai_strerror(ret))));
else
ereport(LOG,
(errmsg("could not translate service \"%s\" to address: %s",
service, gai_strerror(ret))));
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
return STATUS_ERROR;
}
// 遍历结果链表,做处理
for (addr = addrs; addr; addr = addr->ai_next)
{
if (family != AF_UNIX && addr->ai_family == AF_UNIX)
{
/*
* Only set up a unix domain socket when they really asked for it.
* The service/port is different in that case.
*/
continue;
}
/* See if there is still room to add 1 more socket. */
for (; listen_index < MaxListen; listen_index++)
{
if (ListenSocket[listen_index] == PGINVALID_SOCKET)
break;
}
if (listen_index >= MaxListen)
{
ereport(LOG,
(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
MaxListen)));
break;
}
/* set up address family name for log messages */
switch (addr->ai_family)
{
case AF_INET:
familyDesc = _("IPv4");
break;
case AF_INET6:
familyDesc = _("IPv6");
break;
case AF_UNIX:
familyDesc = _("Unix");
break;
default:
snprintf(familyDescBuf, sizeof(familyDescBuf),
_("unrecognized address family %d"),
addr->ai_family);
familyDesc = familyDescBuf;
break;
}
/* set up text form of address for log messages */
// 根据address寻找主机名,记录在addrbuf中,打日志用的。
if (addr->ai_family == AF_UNIX)
addrDesc = unixSocketPath;
else
{
pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
addr->ai_addrlen,
addrBuf, sizeof(addrBuf),
NULL, 0,
NI_NUMERICHOST);
addrDesc = addrBuf;
}
// 根据协议类型,流数据类型,创建监听套接字
if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: first %s is IPv4, IPv6, or Unix */
errmsg("could not create %s socket for address \"%s\": %m",
familyDesc, addrDesc)));
continue;
}
#ifndef WIN32
// 与TIME_WAIT有关,主动关闭套接字一端在四次挥手期间为了保证能够重发报文(报文可能丢失)会占用一段时间的端口,这时候如果重启服务器可能会出现端口占用的情况,所以使用SO_REUSEADDR设置标志。
// 在监听套接字设置的属性,连接套接字可以继承,所以在这儿设置也是有效的。
/*
* Without the SO_REUSEADDR flag, a new postmaster can't be started
* right away after a stop or crash, giving "address already in use"
* error on TCP ports.
*
* On win32, however, this behavior only happens if the
* SO_EXCLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows
* multiple servers to listen on the same address, resulting in
* unpredictable behavior. With no flags at all, win32 behaves as Unix
* with SO_REUSEADDR.
*/
if (addr->ai_family != AF_UNIX)
{
if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(char *) &one, sizeof(one))) == -1)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: third %s is IPv4, IPv6, or Unix */
errmsg("%s(%s) failed for %s address \"%s\": %m",
"setsockopt", "SO_REUSEADDR",
familyDesc, addrDesc)));
closesocket(fd);
continue;
}
}
#endif
#ifdef IPV6_V6ONLY
// 只使用IPv6,不再支持IPv4,没这个选项IPv6和IPv4都是支持的。
if (addr->ai_family == AF_INET6)
{
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
(char *) &one, sizeof(one)) == -1)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: third %s is IPv4, IPv6, or Unix */
errmsg("%s(%s) failed for %s address \"%s\": %m",
"setsockopt", "IPV6_V6ONLY",
familyDesc, addrDesc)));
closesocket(fd);
continue;
}
}
#endif
/*
* Note: This might fail on some OS's, like Linux older than
* 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
* ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all ipv4
* connections.
*/
// 下边就是常规操作,bind将服务端地址绑定到套接字,并且listen创建监听队列,因为之后要用accept从全连接队列取客户端请求,最终把fd记录到全局数组中。
err = bind(fd, addr->ai_addr, addr->ai_addrlen);
if (err < 0)
{
int saved_errno = errno;
ereport(LOG,
(errcode_for_socket_access(),
/* translator: first %s is IPv4, IPv6, or Unix */
errmsg("could not bind %s address \"%s\": %m",
familyDesc, addrDesc),
saved_errno == EADDRINUSE ?
(addr->ai_family == AF_UNIX ?
errhint("Is another postmaster already running on port %d?",
(int) portNumber) :
errhint("Is another postmaster already running on port %d?"
" If not, wait a few seconds and retry.",
(int) portNumber)) : 0));
closesocket(fd);
continue;
}
if (addr->ai_family == AF_UNIX)
{
if (Setup_AF_UNIX(service) != STATUS_OK)
{
closesocket(fd);
break;
}
}
/*
* Select appropriate accept-queue length limit. It seems reasonable
* to use a value similar to the maximum number of child processes
* that the postmaster will permit.
*/
maxconn = MaxConnections * 2;
err = listen(fd, maxconn);
if (err < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: first %s is IPv4, IPv6, or Unix */
errmsg("could not listen on %s address \"%s\": %m",
familyDesc, addrDesc)));
closesocket(fd);
continue;
}
if (addr->ai_family == AF_UNIX)
ereport(LOG,
(errmsg("listening on Unix socket \"%s\"",
addrDesc)));
else
ereport(LOG,
/* translator: first %s is IPv4 or IPv6 */
(errmsg("listening on %s address \"%s\", port %d",
familyDesc, addrDesc, (int) portNumber)));
ListenSocket[listen_index] = fd;
added++;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
if (!added)
return STATUS_ERROR;
return STATUS_OK;
}