PostgreSQL源码解读(150)-PGTools#2(BaseBackup函数)
本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的函数BaseBackup.
创新互联建站是专业的威远网站建设公司,威远接单;提供成都网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行威远网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
一、数据结构
option
使用工具时存储选项的数据结构
#ifndef HAVE_STRUCT_OPTION
//工具软件选项
struct option
{
const char *name;//名称
int has_arg;//是否包含参数,no_argument/required_argument/optional_argument
int *flag;//标记
int val;//参数值
};
#define no_argument 0
#define required_argument 1
#define optional_argument 2
#endif
/*
* On OpenBSD and some versions of Solaris, opterr and friends are defined in
* core libc rather than in a separate getopt module. Define these variables
* only if configure found they aren't there by default; otherwise, this
* module and its callers will just use libc's variables. (We assume that
* testing opterr is sufficient for all of these.)
*/
#ifndef HAVE_INT_OPTERR
int opterr = 1, /* if error message should be printed */
optind = 1, /* index into parent argv vector */
optopt; /* character checked for validity */
char *optarg; /* argument associated with option */
#endif
#define BADCH (int)'?'
#define BADARG (int)':'
#define EMSG ""
pg_result
用于接收PQgetResult的返回结果.
struct pg_result
{
//元组数量
int ntups;
//属性数量
int numAttributes;
PGresAttDesc *attDescs;
//PGresTuple数组
PGresAttValue **tuples; /* each PGresTuple is an array of
* PGresAttValue's */
//元组数组的大小
int tupArrSize; /* allocated size of tuples array */
//参数格式
int numParameters;
//参数描述符
PGresParamDesc *paramDescs;
//执行状态类型(枚举变量)
ExecStatusType resultStatus;
//从查询返回的命令状态
char cmdStatus[CMDSTATUS_LEN]; /* cmd status from the query */
//1-二进制的元组数据,否则为文本数据
int binary; /* binary tuple values if binary == 1,
* otherwise text */
/*
* These fields are copied from the originating PGconn, so that operations
* on the PGresult don't have to reference the PGconn.
* 这些字段从原始的PGconn中拷贝,以便不需要依赖PGconn
*/
//钩子函数
PGNoticeHooks noticeHooks;
PGEvent *events;
int nEvents;
int client_encoding; /* encoding id */
/*
* Error information (all NULL if not an error result). errMsg is the
* "overall" error message returned by PQresultErrorMessage. If we have
* per-field info then it is stored in a linked list.
* 错误信息(如没有错误,则全部为NULL)
* errMsg是PQresultErrorMessage返回的"overall"错误信息.
* 如果存在per-field信息,那么会存储在相互链接的链表中
*/
//错误信息
char *errMsg; /* error message, or NULL if no error */
//按字段拆分的信息
PGMessageField *errFields; /* message broken into fields */
//如可用,触发查询的文本信息
char *errQuery; /* text of triggering query, if available */
/* All NULL attributes in the query result point to this null string */
//查询结果中的所有NULL属性指向该null字符串
char null_field[1];
/*
* Space management information. Note that attDescs and error stuff, if
* not null, point into allocated blocks. But tuples points to a
* separately malloc'd block, so that we can realloc it.
* 空间管理信息.
* 注意attDescs和error,如为not null,则指向已分配的blocks.
* 但元组指向单独的已分配的block,因此可以重新分配空间.
*/
//最近已分配的block
PGresult_data *curBlock; /* most recently allocated block */
//块中空闲空间的开始偏移
int curOffset; /* start offset of free space in block */
//块中剩余的空闲字节
int spaceLeft; /* number of free bytes remaining in block */
//该PGresult结构体总共的分配空间
size_t memorySize; /* total space allocated for this PGresult */
};
/* Data about a single parameter of a prepared statement */
//prepared statement语句的单个参数的数据
typedef struct pgresParamDesc
{
//类型ID
Oid typid; /* type id */
} PGresParamDesc;
typedef enum
{
//空查询串
PGRES_EMPTY_QUERY = 0, /* empty query string was executed */
//后台进程正常执行了没有结果返回的查询命令
PGRES_COMMAND_OK, /* a query command that doesn't return
* anything was executed properly by the
* backend */
//后台进程正常执行了有元组返回的查询命令
//PGresult中有结果元组
PGRES_TUPLES_OK, /* a query command that returns tuples was
* executed properly by the backend, PGresult
* contains the result tuples */
//拷贝数据OUT,传输中
PGRES_COPY_OUT, /* Copy Out data transfer in progress */
//拷贝数据IN,传输中
PGRES_COPY_IN, /* Copy In data transfer in progress */
//从后台进程中收到非期望中的响应
PGRES_BAD_RESPONSE, /* an unexpected response was recv'd from the
* backend */
//提示或警告信息
PGRES_NONFATAL_ERROR, /* notice or warning message */
//查询失败
PGRES_FATAL_ERROR, /* query failed */
//拷贝I/O,传输中
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
//更大的结果集中的单个元组
PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
} ExecStatusType;
typedef union pgresult_data PGresult_data;
union pgresult_data
{
//链接到下一个block,或者为NULL
PGresult_data *next; /* link to next block, or NULL */
//以字节形式访问块
char space[1]; /* dummy for accessing block as bytes */
};
二、源码解读
BaseBackup,实际执行备份的函数.
主要逻辑是通过libpq接口向服务器端发起备份请求(BASE_BACKUP命令)
static void
BaseBackup(void)
{
PGresult *res;
char *sysidentifier;
TimeLineID latesttli;
TimeLineID starttli;
char *basebkp;
char escaped_label[MAXPGPATH];
char *maxrate_clause = NULL;
int i;
char xlogstart[64];
char xlogend[64];
int minServerMajor,
maxServerMajor;
int serverVersion,
serverMajor;
//数据库连接
Assert(conn != NULL);
/*
* Check server version. BASE_BACKUP command was introduced in 9.1, so we
* can't work with servers older than 9.1.
* 检查服务器版本.BASE_BACKUP在9.1+才出现,数据库版本不能低于9.1.
*/
minServerMajor = 901;
maxServerMajor = PG_VERSION_NUM / 100;
serverVersion = PQserverVersion(conn);
serverMajor = serverVersion / 100;
if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
{
const char *serverver = PQparameterStatus(conn, "server_version");
fprintf(stderr, _("%s: incompatible server version %s\n"),
progname, serverver ? serverver : "'unknown'");
exit(1);
}
/*
* If WAL streaming was requested, also check that the server is new
* enough for that.
* 要求WAL streaming,检查数据库是否支持
*/
if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))
{
/*
* Error message already written in CheckServerVersionForStreaming(),
* but add a hint about using -X none.
* 错误信息已在CheckServerVersionForStreaming()中体现,这里添加-X提示.
*/
fprintf(stderr, _("HINT: use -X none or -X fetch to disable log streaming\n"));
exit(1);
}
/*
* Build contents of configuration file if requested
* 如需要创建recovery.conf文件
*/
if (writerecoveryconf)
GenerateRecoveryConf(conn);
/*
* Run IDENTIFY_SYSTEM so we can get the timeline
* 执行RunIdentifySystem,获取时间线
*/
if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
exit(1);
/*
* Start the actual backup
* 开始实际的backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
if (maxrate > 0)
maxrate_clause = psprintf("MAX_RATE %u", maxrate);
if (verbose)
//提示信息
fprintf(stderr,
_("%s: initiating base backup, waiting for checkpoint to complete\n"),
progname);
if (showprogress && !verbose)
{
//进度信息
fprintf(stderr, "waiting for checkpoint");
if (isatty(fileno(stderr)))
fprintf(stderr, "\r");
else
fprintf(stderr, "\n");
}
//base backup命令
basebkp =
psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal == FETCH_WAL ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal == NO_WAL ? "" : "NOWAIT",
maxrate_clause ? maxrate_clause : "",
format == 't' ? "TABLESPACE_MAP" : "",
verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
//调用API
if (PQsendQuery(conn, basebkp) == 0)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "BASE_BACKUP", PQerrorMessage(conn));
exit(1);
}
/*
* Get the starting WAL location
* 获取WAL起始位置
*/
//获取PQ执行结果
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not initiate base backup: %s"),
progname, PQerrorMessage(conn));
exit(1);
}
//判断ntuples
if (PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
exit(1);
}
//获取WAL start位置
strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));
if (verbose)
fprintf(stderr, _("%s: checkpoint completed\n"), progname);
/*
* 9.3 and later sends the TLI of the starting point. With older servers,
* assume it's the same as the latest timeline reported by
* IDENTIFY_SYSTEM.
* 9.3+在起始点就传送了TLI.
*/
if (PQnfields(res) >= 2)
starttli = atoi(PQgetvalue(res, 0, 1));
else
starttli = latesttli;
PQclear(res);
MemSet(xlogend, 0, sizeof(xlogend));
if (verbose && includewal != NO_WAL)
fprintf(stderr, _("%s: write-ahead log start point: %s on timeline %u\n"),
progname, xlogstart, starttli);
/*
* Get the header
* 获取头部信息
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not get backup header: %s"),
progname, PQerrorMessage(conn));
exit(1);
}
if (PQntuples(res) < 1)
{
fprintf(stderr, _("%s: no data returned from server\n"), progname);
exit(1);
}
/*
* Sum up the total size, for progress reporting
* 统计总大小,用于进度报告
*/
totalsize = totaldone = 0;
tablespacecount = PQntuples(res);
for (i = 0; i < PQntuples(res); i++)
{
totalsize += atol(PQgetvalue(res, i, 2));
/*
* Verify tablespace directories are empty. Don't bother with the
* first once since it can be relocated, and it will be checked before
* we do anything anyway.
* 验证表空间目录是否为空.
* 首次验证不需要报警,因为可以重新定位并且在作其他事情前会检查.
*/
if (format == 'p' && !PQgetisnull(res, i, 1))
{
char *path = unconstify(char *, get_tablespace_mapping(PQgetvalue(res, i, 1)));
verify_dir_is_empty_or_create(path, &made_tablespace_dirs, &found_tablespace_dirs);
}
}
/*
* When writing to stdout, require a single tablespace
* 在写入stdout时,要求一个独立的表空间.
*/
if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)
{
fprintf(stderr,
_("%s: can only write single tablespace to stdout, database has %d\n"),
progname, PQntuples(res));
exit(1);
}
/*
* If we're streaming WAL, start the streaming session before we start
* receiving the actual data chunks.
* 如果正在streaming WAL,开始接收实际的数据chunks前,开始streaming session.
*/
if (includewal == STREAM_WAL)
{
if (verbose)
fprintf(stderr, _("%s: starting background WAL receiver\n"),
progname);
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
/*
* Start receiving chunks
* 开始接收chunks
*/
for (i = 0; i < PQntuples(res); i++)//所有的表空间
{
if (format == 't')
//tar包
ReceiveTarFile(conn, res, i);
else
//普通文件
ReceiveAndUnpackTarFile(conn, res, i);
} /* Loop over all tablespaces */
if (showprogress)
{
progress_report(PQntuples(res), NULL, true);
if (isatty(fileno(stderr)))
fprintf(stderr, "\n"); /* Need to move to next line */
}
PQclear(res);
/*
* Get the stop position
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr,
_("%s: could not get write-ahead log end position from server: %s"),
progname, PQerrorMessage(conn));
exit(1);
}
if (PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: no write-ahead log end position returned from server\n"),
progname);
exit(1);
}
strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
if (verbose && includewal != NO_WAL)
fprintf(stderr, _("%s: write-ahead log end point: %s\n"), progname, xlogend);
PQclear(res);
//
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
if (sqlstate &&
strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
{
fprintf(stderr, _("%s: checksum error occurred\n"),
progname);
checksum_failure = true;
}
else
{
fprintf(stderr, _("%s: final receive failed: %s"),
progname, PQerrorMessage(conn));
}
exit(1);
}
if (bgchild > 0)
{
#ifndef WIN32
int status;
pid_t r;
#else
DWORD status;
/*
* get a pointer sized version of bgchild to avoid warnings about
* casting to a different size on WIN64.
*/
intptr_t bgchild_handle = bgchild;
uint32 hi,
lo;
#endif
if (verbose)
fprintf(stderr,
_("%s: waiting for background process to finish streaming ...\n"), progname);
#ifndef WIN32//WIN32
if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
{
fprintf(stderr,
_("%s: could not send command to background pipe: %s\n"),
progname, strerror(errno));
exit(1);
}
/* Just wait for the background process to exit */
r = waitpid(bgchild, &status, 0);
if (r == (pid_t) -1)
{
fprintf(stderr, _("%s: could not wait for child process: %s\n"),
progname, strerror(errno));
exit(1);
}
if (r != bgchild)
{
fprintf(stderr, _("%s: child %d died, expected %d\n"),
progname, (int) r, (int) bgchild);
exit(1);
}
if (status != 0)
{
fprintf(stderr, "%s: %s\n",
progname, wait_result_to_str(status));
exit(1);
}
/* Exited normally, we're happy! */
#else /* WIN32 */
/*
* On Windows, since we are in the same process, we can just store the
* value directly in the variable, and then set the flag that says
* it's there.
* 在Windows平台,因为在同一个进程中,只需要直接存储值到遍历中,然后设置标记即可.
*/
if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse write-ahead log location \"%s\"\n"),
progname, xlogend);
exit(1);
}
xlogendptr = ((uint64) hi) << 32 | lo;
InterlockedIncrement(&has_xlogendptr);
/* First wait for the thread to exit */
if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
WAIT_OBJECT_0)
{
_dosmaperr(GetLastError());
fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
progname, strerror(errno));
exit(1);
}
if (GetExitCodeThread((HANDLE) bgchild_handle, &status) == 0)
{
_dosmaperr(GetLastError());
fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
progname, strerror(errno));
exit(1);
}
if (status != 0)
{
fprintf(stderr, _("%s: child thread exited with error %u\n"),
progname, (unsigned int) status);
exit(1);
}
/* Exited normally, we're happy */
#endif
}
/* Free the configuration file contents */
//释放配置文件内存
destroyPQExpBuffer(recoveryconfcontents);
/*
* End of copy data. Final result is already checked inside the loop.
* 拷贝数据完成.最终结果已在循环中检查.
*/
PQclear(res);
PQfinish(conn);
conn = NULL;
/*
* Make data persistent on disk once backup is completed. For tar format
* once syncing the parent directory is fine, each tar file created per
* tablespace has been already synced. In plain format, all the data of
* the base directory is synced, taking into account all the tablespaces.
* Errors are not considered fatal.
* 在备份结束后,持久化数据在磁盘上.
* 对于tar格式只需要同步父目录即可,每一个表空间创建一个tar文件,这些文件已同步.
* 对于普通格式,基础目录中的所有数据已同步,已兼顾了所有的表空间.
* 错误不会认为是致命的异常.
*/
if (do_sync)
{
if (verbose)
fprintf(stderr,
_("%s: syncing data to disk ...\n"), progname);
if (format == 't')
{
if (strcmp(basedir, "-") != 0)
(void) fsync_fname(basedir, true, progname);
}
else
{
(void) fsync_pgdata(basedir, progname, serverVersion);
}
}
if (verbose)
fprintf(stderr, _("%s: base backup completed\n"), progname);
}
/*
* PQgetvalue:
* return the value of field 'field_num' of row 'tup_num'
* 返回tuples数组第field_num字段的第tup_num行.
*/
char *
PQgetvalue(const PGresult *res, int tup_num, int field_num)
{
if (!check_tuple_field_number(res, tup_num, field_num))
return NULL;
return res->tuples[tup_num][field_num].value;
}
三、跟踪分析
备份命令
pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R
启动gdb跟踪
[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
...
Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.
(gdb) b BaseBackup
(gdb) set args -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R
(gdb) r
Starting program: /appdb/atlasdb/pg11.2/bin/pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Breakpoint 1, BaseBackup () at pg_basebackup.c:1740
1740 char *maxrate_clause = NULL;
(gdb)
连接pg_conn结构体
(gdb) n
1749 Assert(conn != NULL);
(gdb) p *conn
$1 = {pghost = 0x6282d0 "localhost", pghostaddr = 0x0, pgport = 0x6282f0 "5432", pgtty = 0x628310 "",
connect_timeout = 0x0, client_encoding_initial = 0x0, pgoptions = 0x628330 "", appname = 0x0,
fbappname = 0x628350 "pg_basebackup", dbName = 0x6282b0 "replication", replication = 0x6283d0 "true",
pguser = 0x628290 "xdb", pgpass = 0x0, pgpassfile = 0x628d30 "/home/xdb/.pgpass", keepalives = 0x0,
keepalives_idle = 0x0, keepalives_interval = 0x0, keepalives_count = 0x0, sslmode = 0x628370 "prefer",
sslcompression = 0x628390 "0", sslkey = 0x0, sslcert = 0x0, sslrootcert = 0x0, sslcrl = 0x0, requirepeer = 0x0,
krbsrvname = 0x6283b0 "postgres", target_session_attrs = 0x6283f0 "any", Pfdebug = 0x0, noticeHooks = {
noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,
noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,
eventArraySize = 0, status = CONNECTION_OK, asyncStatus = PGASYNC_IDLE, xactStatus = PQTRANS_IDLE,
queryclass = PGQUERY_SIMPLE, last_query = 0x61f1c0 "SHOW wal_segment_size", last_sqlstate = "\000\000\000\000\000",
options_valid = true, nonblocking = false, singleRowMode = false, copy_is_binary = 0 '\000', copy_already_done = 0,
notifyHead = 0x0, notifyTail = 0x0, nconnhost = 1, whichhost = 0, connhost = 0x627a50, sock = 7, laddr = {addr = {
ss_family = 10, __ss_padding = "\307\326", '\000' , "\001", '\000' ,
__ss_align = 0}, salen = 28}, raddr = {addr = {ss_family = 10,
__ss_padding = "\025\070", '\000' , "\001", '\000' , __ss_align = 0},
salen = 28}, pversion = 196608, sversion = 110002, auth_req_received = true, password_needed = false,
sigpipe_so = false, sigpipe_flag = true, try_next_addr = false, try_next_host = false, addr_cur = 0x0,
setenv_state = SETENV_STATE_IDLE, next_eo = 0x0, send_appname = true, be_pid = 1435, be_key = -828773845,
pstatus = 0x629570, client_encoding = 0, std_strings = true, verbosity = PQERRORS_DEFAULT,
show_context = PQSHOW_CONTEXT_ERRORS, lobjfuncs = 0x0, inBuffer = 0x61f600 "T", inBufSize = 16384, inStart = 75,
inCursor = 75, inEnd = 75, outBuffer = 0x623610 "Q", outBufSize = 16384, outCount = 0, outMsgStart = 1, outMsgEnd = 27,
rowBuf = 0x627620, rowBufLen = 32, result = 0x0, next_result = 0x0, sasl_state = 0x0, ssl_in_use = false,
allow_ssl_try = false, wait_ssl_try = false, ssl = 0x0, peer = 0x0, engine = 0x0, gctx = 0x0, gtarg_nam = 0x0,
errorMessage = {data = 0x627830 "", len = 0, maxlen = 256}, workBuffer = {data = 0x627940 "SELECT", len = 6,
maxlen = 256}, addrlist = 0x0, addrlist_family = 0}
(gdb)
判断版本,是否支持BaseBackup
(gdb) n
1755 minServerMajor = 901;
(gdb)
1756 maxServerMajor = PG_VERSION_NUM / 100;
(gdb) p PG_VERSION_NUM
$2 = 110002
(gdb) n
1757 serverVersion = PQserverVersion(conn);
(gdb)
1758 serverMajor = serverVersion / 100;
(gdb)
1759 if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
(gdb) p serverVersion
$3 = 110002
(gdb) n
判断服务器是否支持WAL streaming
(gdb) n
1772 if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))
(gdb)
如需要,生成recovery.conf文件
1785 if (writerecoveryconf)
(gdb) p includewal
$4 = STREAM_WAL
(gdb) p writerecoveryconf
$5 = true
(gdb) n
1786 GenerateRecoveryConf(conn);
(gdb)
获取系统标识符
1791 if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
(gdb)
1797 PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
(gdb) p sysidentifier
$6 = 0x6292d0 "6662151435832250464"
(gdb) p *sysidentifier
$7 = 54 '6'
(gdb) p latesttli
$8 = 1
(gdb)
开始实际的备份工作
(gdb) p escaped_label
$9 = "pg_basebackup base backup\000\000\000\001", '\000' , "\"`-\360\377\177\000\000\000\000\000\000\377\177\000\000` u\367\377\177\000\000\000\001\000\000\000\000\000\000\001\000\000\000\002\000\000\000]VA\000\000\000\000\000@\335\377\377\377\177\000\000b\343\377\377\377\177", '\000' , "\343\377\377\377\177\000\000B\335\377\377\377\177\000\000\370Ǹ\367\377\177\000\000\220\325\227\367\377\177\000\000\000\341\377\377\377\177\000\000\000\000\000\000\000\000\000\000\371D"...
(gdb) p label
$10 = 0x412610 "pg_basebackup base backup"
(gdb) p i
$11 = 0
(gdb)
构造backup命令
(gdb) n
1802 if (verbose)
(gdb)
1807 if (showprogress && !verbose)
(gdb)
1809 fprintf(stderr, "waiting for checkpoint");
(gdb)
waiting for checkpoint1810 if (isatty(fileno(stderr)))
(gdb)
1811 fprintf(stderr, "\r");
(gdb)
1817 psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb)
1824 format == 't' ? "TABLESPACE_MAP" : "",
(gdb)
1817 psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb)
1822 includewal == NO_WAL ? "" : "NOWAIT",
(gdb)
1817 psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb)
1820 includewal == FETCH_WAL ? "WAL" : "",
(gdb)
1817 psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb)
1816 basebkp =
(gdb)
1827 if (PQsendQuery(conn, basebkp) == 0)
(gdb)
(gdb) p basebkp
$12 = 0x6291f0 "BASE_BACKUP LABEL 'pg_basebackup base backup' PROGRESS NOWAIT "
(gdb)
发送命令到服务器端,获取执行结果
(gdb) n
1837 res = PQgetResult(conn);
(gdb)
1838 if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)
返回结果
(gdb) p *res
$13 = {ntups = 1, numAttributes = 2, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0,
paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,
cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {
noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,
noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,
client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680,
curOffset = 133, spaceLeft = 1915}
(gdb)
(gdb) p *res->attDescs
$14 = {name = 0x6296c8 "recptr", tableid = 0, columnid = 0, format = 0, typid = 25, typlen = -1, atttypmod = 0}
(gdb) p *res->tuples
$15 = (PGresAttValue *) 0x6296d8
(gdb) p **res->tuples
$16 = {len = 10, value = 0x6296f8 "1/57000028"}
(gdb) p *res->tuples[2]
Cannot access memory at address 0x0
(gdb) p *res->tuples[0]
$17 = {len = 10, value = 0x6296f8 "1/57000028"}
(gdb) p *res->tuples[1]
Cannot access memory at address 0x15171
(gdb)
判断ntuples,获取WAL start位置
(gdb) n
1844 if (PQntuples(res) != 1)
(gdb)
1852 strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));
(gdb) p PQgetvalue(res, 0, 0)
$18 = 0x6296f8 "1/57000028"
(gdb) p xlogstart
$19 = " `-\360\377\177\000\000\353\340\377\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000'
(gdb) n
1854 if (verbose)
(gdb) p xlogstart
$20 = "1/57000028\000\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000'
(gdb)
获取时间线timeline
(gdb) n
1862 if (PQnfields(res) >= 2)
(gdb) p PQnfields(res)
$21 = 2
(gdb) n
1863 starttli = atoi(PQgetvalue(res, 0, 1));
(gdb)
1866 PQclear(res);
(gdb) p atoi(PQgetvalue(res, 0, 1))
$22 = 1
(gdb) p res->tuples[1]
$23 = (PGresAttValue *) 0x15171
(gdb) p res->tuples[0]
$24 = (PGresAttValue *) 0x6296d8
(gdb)
(gdb) n
1867 MemSet(xlogend, 0, sizeof(xlogend));
(gdb)
1869 if (verbose && includewal != NO_WAL)
(gdb) p xlogend
$25 = '\000'
Get the header
(gdb) n
1876 res = PQgetResult(conn);
(gdb) n
1877 if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) p *res
$26 = {ntups = 1, numAttributes = 3, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0,
paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,
cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {
noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,
noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,
client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680,
curOffset = 183, spaceLeft = 1865}
统计总大小,用于进度报告
(gdb)
1892 totalsize = totaldone = 0;
(gdb) n
1893 tablespacecount = PQntuples(res);
(gdb)
1894 for (i = 0; i < PQntuples(res); i++)
(gdb) p tablespacecount
$29 = 1
(gdb) n
1896 totalsize += atol(PQgetvalue(res, i, 2));
(gdb) p PQgetvalue(res, i, 2)
$30 = 0x629730 "445480"
(gdb) p atol(PQgetvalue(res, i, 2))
$31 = 445480
(gdb) n
1903 if (format == 'p' && !PQgetisnull(res, i, 1))
(gdb)
1894 for (i = 0; i < PQntuples(res); i++)
(gdb) p res->tuples[0][0]
$33 = {len = -1, value = 0x629658 ""}
(gdb) p res->tuples[0][1]
$34 = {len = -1, value = 0x629658 ""}
(gdb) p res->tuples[0][2]
$35 = {len = 6, value = 0x629730 "445480"}
(gdb)
开始接收实际的数据chunks前,开始streaming session.
(gdb) n
1914 if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)
(gdb)
1926 if (includewal == STREAM_WAL)
(gdb)
1928 if (verbose)
(gdb)
1931 StartLogStreamer(xlogstart, starttli, sysidentifier);
(gdb) n
Detaching after fork from child process 1511.
1937 for (i = 0; i < PQntuples(res); i++)
(gdb)
查看操作系统中的日志目录
[xdb@localhost backup]$ ll
total 0
drwx------. 3 xdb xdb 60 Mar 15 15:46 pg_wal
[xdb@localhost backup]$ ll ./pg_wal/
total 16384
-rw-------. 1 xdb xdb 16777216 Mar 15 15:46 000000010000000100000057
drwx------. 2 xdb xdb 6 Mar 15 15:46 archive_status
[xdb@localhost backup]$
Start receiving chunks,开始接收chunks
(gdb) n
Detaching after fork from child process 1511.
1937 for (i = 0; i < PQntuples(res); i++)
(gdb) n
1939 if (format == 't')
(gdb)
1942 ReceiveAndUnpackTarFile(conn, res, i);
(gdb)
193789/445489 kB
(gdb) for (i = 0; i < PQntuples(res); i++)
查看操作系统中的备份目录
[xdb@localhost backup]$ ll
total 56
-rw-------. 1 xdb xdb 226 Mar 15 15:47 backup_label
drwx------. 6 xdb xdb 58 Mar 15 15:48 base
drwx------. 2 xdb xdb 4096 Mar 15 15:48 global
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_commit_ts
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_dynshmem
-rw-------. 1 xdb xdb 4513 Mar 15 15:48 pg_hba.conf
-rw-------. 1 xdb xdb 1636 Mar 15 15:48 pg_ident.conf
drwx------. 4 xdb xdb 68 Mar 15 15:48 pg_logical
drwx------. 4 xdb xdb 36 Mar 15 15:47 pg_multixact
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_notify
drwx------. 2 xdb xdb 6 Mar 15 15:48 pg_replslot
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_serial
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_snapshots
drwx------. 2 xdb xdb 6 Mar 15 15:48 pg_stat
drwx------. 2 xdb xdb 6 Mar 15 15:48 pg_stat_tmp
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_subtrans
drwx------. 2 xdb xdb 6 Mar 15 15:48 pg_tblspc
drwx------. 2 xdb xdb 6 Mar 15 15:47 pg_twophase
-rw-------. 1 xdb xdb 3 Mar 15 15:48 PG_VERSION
drwx------. 3 xdb xdb 92 Mar 15 15:48 pg_wal
drwx------. 2 xdb xdb 18 Mar 15 15:48 pg_xact
-rw-------. 1 xdb xdb 88 Mar 15 15:48 postgresql.auto.conf
-rw-------. 1 xdb xdb 23812 Mar 15 15:48 postgresql.conf
-rw-------. 1 xdb xdb 183 Mar 15 15:48 recovery.conf
[xdb@localhost backup]$
显示进度
(gdb) n
1945 if (showprogress)
(gdb)
1947 progress_report(PQntuples(res), NULL, true);
(gdb)
194889/445489 kB (100%),if (isatty(fileno(stderr)))
(gdb)
(gdb) n
1949 fprintf(stderr, "\n"); /* Need to move to next line */
(gdb)
1952 PQclear(res);
(gdb)
Get the stop position
(gdb)
1957 res = PQgetResult(conn);
(gdb) n
1958 if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) p *res
$36 = {ntups = 1, numAttributes = 2, attDescs = 0x6295a8, tuples = 0x629db0, tupArrSize = 128, numParameters = 0,
paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,
cmdStatus = "SELECT", '\000' , "\300!", , binary = 0, noticeHooks = {
noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,
noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,
client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x6295a0,
curOffset = 133, spaceLeft = 1915}
(gdb)
(gdb) n
1965 if (PQntuples(res) != 1)
(gdb)
1972 strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
(gdb) p xlogend
$37 = '\000'
(gdb) p *xlogend
$38 = 0 '\000'
(gdb) n
1973 if (verbose && includewal != NO_WAL)
(gdb)
1975 PQclear(res);
(gdb)
COMMAND is OK
(gdb)
1977 res = PQgetResult(conn);
(gdb)
1978 if (PQresultStatus(res) != PGRES_COMMAND_OK)
(gdb) p *res
$39 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0,
resultStatus = PGRES_COMMAND_OK, cmdStatus = "SELECT", '\000' , "\300!", ,
binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,
noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,
client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, curOffset = 0,
spaceLeft = 0}
(gdb)
善后工作,如在备份结束后,持久化数据在磁盘上等
(gdb) n
1997 if (bgchild > 0)
(gdb)
2014 if (verbose)
(gdb)
2019 if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
(gdb)
2028 r = waitpid(bgchild, &status, 0);
(gdb) p bgchild
$40 = 1511
(gdb) n
2029 if (r == -1)
(gdb)
2035 if (r != bgchild)
(gdb) p r
$41 = 1511
(gdb) n
2041 if (!WIFEXITED(status))
(gdb)
2047 if (WEXITSTATUS(status) != 0)
(gdb)
2098 destroyPQExpBuffer(recoveryconfcontents);
(gdb)
2103 PQclear(res);
(gdb)
2104 PQfinish(conn);
(gdb)
2113 if (do_sync)
(gdb)
2115 if (format == 't')
(gdb)
2122 (void) fsync_pgdata(basedir, progname, serverVersion);
(gdb)
2126 if (verbose)
(gdb)
2128 }
(gdb)
main (argc=12, argv=0x7fffffffe4b8) at pg_basebackup.c:2534
2534 success = true;
(gdb)
再次启动跟踪,监控后台数据库的活动.
进入BaseBackup函数
Breakpoint 1, BaseBackup () at pg_basebackup.c:1740
1740 char *maxrate_clause = NULL;
(gdb)
数据库活动
15:56:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
[local] xdb@testdb-# where backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid |
datname |
pid | 1566
usesysid | 10
usename | xdb
application_name | pg_basebackup
client_addr | ::1
client_hostname |
client_port | 51162
backend_start | 2019-03-15 15:56:13.82013+08
xact_start |
query_start |
state_change | 2019-03-15 15:56:13.821507+08
wait_event_type | Client
wait_event | ClientRead
state | idle
backend_xid |
backend_xmin |
query |
backend_type | walsender
开启WAL streaming
1931 StartLogStreamer(xlogstart, starttli, sysidentifier);
(gdb)
Detaching after fork from child process 1602.
1937 for (i = 0; i < PQntuples(res); i++)
(gdb)
数据库活动
16:01:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
where backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid |
datname |
pid | 1566
usesysid | 10
usename | xdb
application_name | pg_basebackup
client_addr | ::1
client_hostname |
client_port | 51162
backend_start | 2019-03-15 15:56:13.82013+08
xact_start |
query_start |
state_change | 2019-03-15 16:00:47.345326+08
wait_event_type | Client
wait_event | ClientWrite
state | active
backend_xid |
backend_xmin |
query |
backend_type | walsender
-[ RECORD 2 ]----+------------------------------
datid |
datname |
pid | 1601
usesysid | 10
usename | xdb
application_name | pg_basebackup
client_addr | ::1
client_hostname |
client_port | 51164
backend_start | 2019-03-15 16:01:47.150434+08
xact_start |
query_start |
state_change | 2019-03-15 16:01:47.159234+08
wait_event_type | Activity
wait_event | WalSenderMain
state | active
backend_xid |
backend_xmin |
query |
backend_type | walsender
16:01:56 (xdb@[local]:5432)testdb=#
16:01:56 (xdb@[local]:5432)testdb=
拷贝数据
(gdb)
1942 ReceiveAndUnpackTarFile(conn, res, i);
(gdb)
193789/445489 kBfor (i = 0; i < PQntuples(res); i++)
(gdb)
1945 if (showprogress)
(gdb)
数据库活动
...
-[ RECORD 3 ]----+------------------------------
datid |
datname |
pid | 1352
usesysid |
usename |
application_name |
client_addr |
client_hostname |
client_port |
backend_start | 2019-03-15 15:03:01.923092+08
xact_start |
query_start |
state_change |
wait_event_type | Activity
wait_event | WalWriterMain
state |
backend_xid |
backend_xmin |
query |
backend_type | walwriter
执行善后工作
2113 if (do_sync)
(gdb)
2115 if (format == 't')
(gdb)
2122 (void) fsync_pgdata(basedir, progname, serverVersion);
(gdb)
2126 if (verbose)
(gdb)
2128 }
(gdb)
数据库活动
16:05:01 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
where backend_type not in ('checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid |
datname |
pid | 1352
usesysid |
usename |
application_name |
client_addr |
client_hostname |
client_port |
backend_start | 2019-03-15 15:03:01.923092+08
xact_start |
query_start |
state_change |
wait_event_type | Activity
wait_event | WalWriterMain
state |
backend_xid |
backend_xmin |
query |
backend_type | walwriter
DONE!
四、参考资料
PG Source Code
本文题目:PostgreSQL源码解读(150)-PGTools#2(BaseBackup函数)
转载来于:http://pwwzsj.com/article/jcjsjp.html