溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PostgreSQL中ReceiveXlogStream有什么作用

發布時間:2021-11-09 15:13:50 來源:億速云 閱讀:204 作者:iii 欄目:關系型數據庫

這篇文章主要介紹“PostgreSQL中ReceiveXlogStream有什么作用”,在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”PostgreSQL中ReceiveXlogStream有什么作用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

本節簡單介紹了PostgreSQL的備份工具pg_basebackup源碼中實際執行備份邏輯的BaseBackup中對WAL數據進行備份的實現函數StartLogStreamer->LogStreamerMain及其主要的實現函數ReceiveXlogStream.

一、數據結構

logstreamer_param
WAL data streamer參數.

typedef struct
{
     ////后臺連接
    PGconn     *bgconn;
    //開始位置
    XLogRecPtr  startptr;
    //目錄或者tar文件,依賴于使用的模式
    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */
    //系統標識符
    char       *sysidentifier;
    //時間線
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流數據時的全局參數

/*
 * Global parameters when receiving xlog stream. For details about the individual fields,
 * see the function comment for ReceiveXlogStream().
 * 接收xlog流數據時的全局參數.
 * 每個域字段的詳細解釋,參見ReceiveXlogStream()函數注釋.
 */
typedef struct StreamCtl
{
    //streaming的開始位置
    XLogRecPtr  startpos;       /* Start position for streaming */
    //時間線
    TimeLineID  timeline;       /* Timeline to stream data from */
    //系統標識符
    char       *sysidentifier;  /* Validate this system identifier and
                                 * timeline */
    //standby超時信息
    int         standby_message_timeout;    /* Send status messages this often */
    //是否同步(寫入時是否馬上Flush WAL data)
    bool        synchronous;    /* Flush immediately WAL data on write */
    //在已歸檔的數據中標記segment為已完成
    bool        mark_done;      /* Mark segment as done in generated archive */
    //刷新到磁盤上以確保數據的一致性狀態(是否已刷新到磁盤上)
    bool        do_sync;        /* Flush to disk to ensure consistent state of
                                 * data */
    //在返回T時停止streaming
    stream_stop_callback stream_stop;   /* Stop streaming when returns true */
    //如有效,監測該socket中的輸入并檢查stream_stop()的返回
    pgsocket    stop_socket;    /* if valid, watch for input on this socket
                                 * and check stream_stop() when there is any */
    //如何寫WAL
    WalWriteMethod *walmethod;  /* How to write the WAL */
    //附加到部分接受文件的后綴
    char       *partial_suffix; /* Suffix appended to partially received files */
    //使用的replication slot,如無則為NULL
    char       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源碼解讀

LogStreamerMain
WAL流復制主函數,用于fork后的子進程調用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流數據時的全局參數
    in_log_streamer = true;
    //初始化StreamCtl結構體
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收數據
    if (!ReceiveXlogStream(param->bgconn, &stream))
        /*
         * Any errors will already have been reported in the function process,
         * but we need to tell the parent that we didn't shutdown in a nice
         * way.
         * 在函數執行過程中出現的錯誤已通過警告的方式發出,
         * 但仍需要告知父進程不能優雅的關閉本進程.
         */
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //結束連接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否內存
    pg_free(stream.walmethod);
    return 0;
}

ReceiveXlogStream
在指定的開始位置接收log stream

/*
 * Receive a log stream starting at the specified position.
 * 在指定的開始位置接收log stream
 *
 * Individual parameters are passed through the StreamCtl structure.
 * 通過StreamCtl結構體傳遞參數.
 *
 * If sysidentifier is specified, validate that both the system
 * identifier and the timeline matches the specified ones
 * (by sending an extra IDENTIFY_SYSTEM command)
 * 如指定了系統標識符,驗證系統標識符和timeline是否匹配指定的信息.
 * (通過發送額外的IDENTIFY_SYSTEM命令)
 *
 * All received segments will be written to the directory
 * specified by basedir. This will also fetch any missing timeline history
 * files.
 * 所有接收到的segments會寫入到basedir中.
 * 這同時會提前所有缺失的timeline history文件.
 *
 * The stream_stop callback will be called every time data
 * is received, and whenever a segment is completed. If it returns
 * true, the streaming will stop and the function
 * return. As long as it returns false, streaming will continue
 * indefinitely.
 * stream_stop回調函數在每次接收到數據以及segment完成傳輸后調用.
 * 如返回T,streaming會停止,函數返回.
 * 如返回F,streaming會一直繼續.
 *
 * If stream_stop() checks for external input, stop_socket should be set to
 * the FD it checks.  This will allow such input to be detected promptly
 * rather than after standby_message_timeout (which might be indefinite).
 * Note that signals will interrupt waits for input as well, but that is
 * race-y since a signal received while busy won't interrupt the wait.
 * 如stream_stop()用于檢測額外的輸入,stop_socket變量應設置為該函數需檢查的FD.
 * 這會允許立即檢測此類輸入,而不是在standby_message_timeout之后(可能會無限循環).
 * 注意信號也會中斷輸入等待,但這是存在競爭的,因為在忙時接收到信號不會中斷等待.
 *
 * standby_message_timeout controls how often we send a message
 * back to the master letting it know our progress, in milliseconds.
 * Zero means no messages are sent.
 * This message will only contain the write location, and never
 * flush or replay.
 * standby_message_timeout控制發送進度消息回master的頻度,單位為ms.
 * 0意味著沒有消息會發送.
 * 該消息只保存寫入位置,永遠不會flush或replay.
 *
 * If 'partial_suffix' is not NULL, files are initially created with the
 * given suffix, and the suffix is removed once the file is finished. That
 * allows you to tell the difference between partial and completed files,
 * so that you can continue later where you left.
 * 如'partial_suffix'不為NULL,文件已通過給定的suffix創建,
 *   一旦文件完成傳輸,則suffix會被清除.
 * 這是部分和完整完成文件的異同,以便在離開后可以繼續.
 *
 * If 'synchronous' is true, the received WAL is flushed as soon as written,
 * otherwise only when the WAL file is closed.
 * 如'synchronous'為T,接收到的WAL會刷新為寫入,否則的話只會在WAL file關閉時才寫入.
 *
 * Note: The WAL location *must* be at a log segment start!
 * 注意:WAL位置必須是log segment的起始位置.
 */
bool
ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
    char        query[128];
    char        slotcmd[128];
    PGresult   *res;
    XLogRecPtr  stoppos;
    /*
     * The caller should've checked the server version already, but doesn't do
     * any harm to check it here too.
     * 調用者已完成版本校驗,但這里重復校驗并沒有什么問題.
     */
    if (!CheckServerVersionForStreaming(conn))
        return false;
    /*
     * Decide whether we want to report the flush position. If we report the
     * flush position, the primary will know what WAL we'll possibly
     * re-request, and it can then remove older WAL safely. We must always do
     * that when we are using slots.
     * 確定是否需要報告flush位置.
     * 如果我們報告了flush位置,主服務器將會知道可能重復請求的WAL file,
     *   這樣可以安全的移除更老的WAL.
     * 如使用slots,應經常執行該操作.
     *
     * Reporting the flush position makes one eligible as a synchronous
     * replica. People shouldn't include generic names in
     * synchronous_standby_names, but we've protected them against it so far,
     * so let's continue to do so unless specifically requested.
     * 報告flush位置使其符合同步副本的條件.
     * DBA不應該在synchronous_standby_names中包含常規的名稱,但我們截止目前位置已很好的保護了它們,
     *   因此可以繼續這樣執行除非特別請求.
     */
    if (stream->replication_slot != NULL)
    {
        //存在slot
        reportFlushPosition = true;
        sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
    }
    else
    {
        if (stream->synchronous)
            reportFlushPosition = true;//同步
        else
            reportFlushPosition = false;//異步
        slotcmd[0] = 0;//ASCII 0
    }
    if (stream->sysidentifier != NULL)
    {
        //系統標識符不為NULL
        /* Validate system identifier hasn't changed */
        //驗證系統標識符沒有改變
        //發送IDENTIFY_SYSTEM命令
        res = PQexec(conn, "IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            fprintf(stderr,
                    _("%s: could not send replication command \"%s\": %s"),
                    progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
            PQclear(res);
            return false;
        }
        if (PQntuples(res) != 1 || PQnfields(res) < 3)
        {
            fprintf(stderr,
                    _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),
                    progname, PQntuples(res), PQnfields(res), 1, 3);
            PQclear(res);
            return false;
        }
        if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
        {
            fprintf(stderr,
                    _("%s: system identifier does not match between base backup and streaming onnection\n"),
                    progname);
            PQclear(res);
            return false;
        }
        if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
        {
            fprintf(stderr,
                    _("%s: starting timeline %u is not present in the server\n"),
                    progname, stream->timeline);
            PQclear(res);
            return false;
        }
        PQclear(res);
    }
    /*
     * initialize flush position to starting point, it's the caller's
     * responsibility that that's sane.
     * 初始化flush位置為開始點,這是調用者的責任.
     */
    lastFlushPosition = stream->startpos;
    while (1)
    {
        /*
         * Fetch the timeline history file for this timeline, if we don't have
         * it already. When streaming log to tar, this will always return
         * false, as we are never streaming into an existing file and
         * therefore there can be no pre-existing timeline history file.
         * 為該timeline提前timeline history,如我們已不需要.
         * 如streaming日志為tar格式,這通常會返回F,這如同從來沒有streaming到已存在的文件中,
         *   因此沒有已存在的timeline history文件.
         */
        if (!existsTimeLineHistoryFile(stream))
        {
            //如不存在history文件
            snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
            //發送TIMELINE_HISTORY命令
            res = PQexec(conn, query);
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
            {
                /* FIXME: we might send it ok, but get an error */
                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
                PQclear(res);
                return false;
            }
            /*
             * The response to TIMELINE_HISTORY is a single row result set
             * with two fields: filename and content
             * TIMELINE_HISTORY的響應是一個單行結果集,有兩個字段:filename和content
             */
            if (PQnfields(res) != 2 || PQntuples(res) != 1)
            {
                fprintf(stderr,
                        _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),
                        progname, PQntuples(res), PQnfields(res), 1, 2);
            }
            /* Write the history file to disk */
            //寫入history文件到磁盤上
            writeTimeLineHistoryFile(stream,
                                     PQgetvalue(res, 0, 0),
                                     PQgetvalue(res, 0, 1));
            PQclear(res);
        }
        /*
         * Before we start streaming from the requested location, check if the
         * callback tells us to stop here.
         * 從請求的位置開始streaming前,檢查回調函數告訴我們在哪停止
         */
        if (stream->stream_stop(stream->startpos, stream->timeline, false))
            return true;
        /* Initiate the replication stream at specified location */
        //在指定的位置初始化復制流
        snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
                 slotcmd,
                 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
                 stream->timeline);
        //發送START_REPLICATION命令
        res = PQexec(conn, query);
        if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
            fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                    progname, "START_REPLICATION", PQresultErrorMessage(res));
            PQclear(res);
            return false;
        }
        PQclear(res);
        /* Stream the WAL */
        //流化WAL
        res = HandleCopyStream(conn, stream, &stoppos);
        if (res == NULL)
            goto error;
        /*
         * Streaming finished.
         *
         * There are two possible reasons for that: a controlled shutdown, or
         * we reached the end of the current timeline. In case of
         * end-of-timeline, the server sends a result set after Copy has
         * finished, containing information about the next timeline. Read
         * that, and restart streaming from the next timeline. In case of
         * controlled shutdown, stop here.
         * Streaming完成.
         * 這里有兩個可能的原因:可控的shutdown或者到達了當前時間線的末尾.
         * 在end-of-timeline這種情況下,服務器在Copy完成后發送結果集,
         *   含有關于下一個時間線的相關信息.
         * 讀取這些信息,在下一個時間線開始重新啟動streaming.
         * 如為可控的關閉,可以停止了.
         */
        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
            /*
             * End-of-timeline. Read the next timeline's ID and starting
             * position. Usually, the starting position will match the end of
             * the previous timeline, but there are corner cases like if the
             * server had sent us half of a WAL record, when it was promoted.
             * The new timeline will begin at the end of the last complete
             * record in that case, overlapping the partial WAL record on the
             * old timeline.
             * 這是End-of-timeline的情況.
             * 讀取下一個時間線ID和開始位置.通常來說,開始位置將匹配先前時間線的末尾,
             *   但會存在特殊的情況比如服務器已經傳輸了WAL Record的一部分.
             * 這種情況下,新的時間線會在上次已完成的記錄末尾開始,與舊時間線的部分WAL Record重疊.
             */
            uint32      newtimeline;//新的時間線
            bool        parsed;//是否解析
            //讀取結果集的末尾
            parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
            PQclear(res);
            if (!parsed)
                goto error;
            /* Sanity check the values the server gave us */
            //執行校驗和堅持
            if (newtimeline <= stream->timeline)
            {
                //新的時間線不可能小于等于stream中的時間線
                fprintf(stderr,
                        _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
                        progname, newtimeline, stream->timeline);
                goto error;
            }
            if (stream->startpos > stoppos)
            {
                //開始位置大于結束位置
                fprintf(stderr,
                        _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),
                        progname,
                        stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
                        newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
                goto error;
            }
            /* Read the final result, which should be CommandComplete. */
            //讀取最后的結果,應為命令結束
            res = PQgetResult(conn);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
                fprintf(stderr,
                        _("%s: unexpected termination of replication stream: %s"),
                        progname, PQresultErrorMessage(res));
                PQclear(res);
                goto error;
            }
            PQclear(res);
            /*
             * Loop back to start streaming from the new timeline. Always
             * start streaming at the beginning of a segment.
             * 從新時間線開始循環,通常會在segment的開始出開始streaming
             */
            stream->timeline = newtimeline;
            stream->startpos = stream->startpos -
                XLogSegmentOffset(stream->startpos, WalSegSz);
            continue;//繼續循環
        }
        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
            PQclear(res);
            /*
             * End of replication (ie. controlled shut down of the server).
             * replication完成(比如服務器關閉了復制)
             *
             * Check if the callback thinks it's OK to stop here. If not,
             * complain.
             * 檢查是否回調函數認為在這里停止就OK了,如果不是,則報警.
             */
            if (stream->stream_stop(stoppos, stream->timeline, false))
                return true;
            else
            {
                fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
                        progname);
                goto error;
            }
        }
        else
        {
            /* Server returned an error. */
            //返回錯誤
            fprintf(stderr,
                    _("%s: unexpected termination of replication stream: %s"),
                    progname, PQresultErrorMessage(res));
            PQclear(res);
            goto error;
        }
    }
error:
    if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
                progname, current_walfile_name, stream->walmethod->getlasterror());
    walfile = NULL;
    return false;
}
/*
 * The main loop of ReceiveXlogStream. Handles the COPY stream after
 * initiating streaming with the START_REPLICATION command.
 * ReceiveXlogStream中的主循環實現函數.
 * 在使用START_REPLICATION命令初始化streaming后處理COPY stream.
 *
 * If the COPY ends (not necessarily successfully) due a message from the
 * server, returns a PGresult and sets *stoppos to the last byte written.
 * On any other sort of error, returns NULL.
 * 如COPY由于服務器端的原因終止,返回PGresult并設置*stoppos為最后寫入的字節.
 * 如出現錯誤,則返回NULL.
 */
static PGresult *
HandleCopyStream(PGconn *conn, StreamCtl *stream,
                 XLogRecPtr *stoppos)
{
    char       *copybuf = NULL;
    TimestampTz last_status = -1;
    XLogRecPtr  blockpos = stream->startpos;
    still_sending = true;
    while (1)
    {
        //循環處理
        int         r;
        TimestampTz now;//時間戳
        long        sleeptime;
        /*
         * Check if we should continue streaming, or abort at this point.
         * 檢查我們是否應該繼續streaming,或者在當前就退出
         */
        if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
            goto error;
        now = feGetCurrentTimestamp();
        /*
         * If synchronous option is true, issue sync command as soon as there
         * are WAL data which has not been flushed yet.
         * 如同步選項為T,只要存在未flushed的WAL data,馬上執行sync命令.
         */
        if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
        {
            if (stream->walmethod->sync(walfile) != 0)
            {
                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
                        progname, current_walfile_name, stream->walmethod->getlasterror());
                goto error;
            }
            lastFlushPosition = blockpos;
            /*
             * Send feedback so that the server sees the latest WAL locations
             * immediately.
             * 發送反饋以便服務器馬上可看到最后的WAL位置.
             */
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Potentially send a status message to the master
         * 可能向主服務器發送狀態消息
         */
        if (still_sending && stream->standby_message_timeout > 0 &&
            feTimestampDifferenceExceeds(last_status, now,
                                         stream->standby_message_timeout))
        {
            /* Time to send feedback! */
            //是時候發送反饋了.
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Calculate how long send/receive loops should sleep
         * 計算send/receive循環應該睡眠多長時間
         */
        sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                 last_status);
        //拷貝stream中接收到的內容
        r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ?buf);
        while (r != 0)
        {
            if (r == -1)
                goto error;//出錯
            if (r == -2)
            {
                //已完結或出錯
                PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
                if (res == NULL)
                    goto error;
                else
                    return res;
            }
            /* Check the message type. */
            //檢查消息類型
            if (copybuf[0] == 'k')
            {
                if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                         &last_status))
                    goto error;
            }
            else if (copybuf[0] == 'w')
            {
                if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
                    goto error;
                /*
                 * Check if we should continue streaming, or abort at this
                 * point.
                 * 檢查我們是否應該繼續streaming或者在此就停止
                 */
                if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
                    goto error;
            }
            else
            {
                fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
                        progname, copybuf[0]);
                goto error;
            }
            /*
             * Process the received data, and any subsequent data we can read
             * without blocking.
             * 處理接收到的數據,后續的數據可以無阻塞的讀取.
             */
            r = CopyStreamReceive(conn, 0, stream->stop_socket, ?buf);
        }
    }
error:
    if (copybuf != NULL)
        PQfreemem(copybuf);
    return NULL;
}
/*
 * Check if we should continue streaming, or abort at this point.
 */
static bool
CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
                    XLogRecPtr *stoppos)
{
    if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    {
        if (!close_walfile(stream, blockpos))
        {
            /* Potential error message is written by close_walfile */
            return false;
        }
        if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
        {
            fprintf(stderr, _("%s: could not send copy-end packet: %s"),
                    progname, PQerrorMessage(conn));
            return false;
        }
        still_sending = false;
    }
    return true;
}
/*
 * Receive CopyData message available from XLOG stream, blocking for
 * maximum of 'timeout' ms.
 * 接收從XLOG stream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞.
 *
 * If data was received, returns the length of the data. *buffer is set to
 * point to a buffer holding the received message. The buffer is only valid
 * until the next CopyStreamReceive call.
 * 如接收到數據,則返回數據的大小.
 * 變量*buffer設置為指向含有接收到消息的buffer.buffer在下一個CopyStreamReceive調用才會生效.
 *
 * Returns 0 if no data was available within timeout, or if wait was
 * interrupted by signal or stop_socket input.
 * -1 on error. -2 if the server ended the COPY.
 * 如在timeout時間內沒有數據返回,或者如果因為信號等待/stop_socket輸入中斷,則返回0.
 * -1:表示出現錯誤.-2表示服務器完成了COPY
 */
static int
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                  char **buffer)
{
    char       *copybuf = NULL;
    int         rawlen;
    if (*buffer != NULL)
        PQfreemem(*buffer);
    *buffer = NULL;
    /* Try to receive a CopyData message */
    rawlen = PQgetCopyData(conn, ?buf, 1);
    if (rawlen == 0)
    {
        int         ret;
        /*
         * No data available.  Wait for some to appear, but not longer than
         * the specified timeout, so that we can ping the server.  Also stop
         * waiting if input appears on stop_socket.
         */
        ret = CopyStreamPoll(conn, timeout, stop_socket);
        if (ret <= 0)
            return ret;
        /* Now there is actually data on the socket */
        if (PQconsumeInput(conn) == 0)
        {
            fprintf(stderr,
                    _("%s: could not receive data from WAL stream: %s"),
                    progname, PQerrorMessage(conn));
            return -1;
        }
        /* Now that we've consumed some input, try again */
        rawlen = PQgetCopyData(conn, ?buf, 1);
        if (rawlen == 0)
            return 0;
    }
    if (rawlen == -1)           /* end-of-streaming or error */
        return -2;
    if (rawlen == -2)
    {
        fprintf(stderr, _("%s: could not read COPY data: %s"),
                progname, PQerrorMessage(conn));
        return -1;
    }
    /* Return received messages to caller */
    *buffer = copybuf;
    return rawlen;
}

三、跟蹤分析

備份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

啟動gdb跟蹤(跟蹤fork的子進程)

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) set follow-fork-mode child
(gdb) b LogStreamerMain
Breakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_1604"
[New process 2036]
[Thread debugging using libthread_db enabled]backup/backup_label          )
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]
Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490
490     in_log_streamer = true;
305153/305153 kB (100%), 1/1 tablespace                                          )
pg_basebackup: write-ahead log end point: 0/5A0000F8
pg_basebackup: waiting for background process to finish streaming ...
(gdb)

輸入參數

(gdb) n
492     MemSet(&stream, 0, sizeof(stream));
(gdb) p *param
$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

設置StreamCtl結構體

(gdb) n
493     stream.startpos = param->startptr;
(gdb) 
494     stream.timeline = param->timeline;
(gdb) 
495     stream.sysidentifier = param->sysidentifier;
(gdb) 
496     stream.stream_stop = reached_end_position;
(gdb) 
498     stream.stop_socket = bgpipe[0];
(gdb) 
502     stream.standby_message_timeout = standby_message_timeout;
(gdb) 
503     stream.synchronous = false;
(gdb) 
504     stream.do_sync = do_sync;
(gdb) 
505     stream.mark_done = true;
(gdb) 
506     stream.partial_suffix = NULL;
(gdb) 
507     stream.replication_slot = replication_slot;
(gdb) 
509     if (format == 'p')
(gdb) 
510         stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
(gdb)

進入ReceiveXlogStream函數

(gdb) 
514     if (!ReceiveXlogStream(param->bgconn, &stream))
(gdb) step
ReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458
458     if (!CheckServerVersionForStreaming(conn))
(gdb) 
(gdb) n
472     if (stream->replication_slot != NULL)
(gdb) p *stream
$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474", 
  standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true, 
  stream_stop = 0x403953 <reached_end_position>, stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0, 
  replication_slot = 0x62a1e0 "pg_basebackup_1604"}
(gdb)

判斷系統標識符和時間線

(gdb) n
474         reportFlushPosition = true;
(gdb) 
475         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
(gdb) 
486     if (stream->sysidentifier != NULL)
(gdb) 
489         res = PQexec(conn, "IDENTIFY_SYSTEM");
(gdb) 
490         if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
498         if (PQntuples(res) != 1 || PQnfields(res) < 3)
(gdb) 
506         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
(gdb) p PQgetvalue(res, 0, 0)
$3 = 0x633500 "6666964067616600474"
(gdb) n
514         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
(gdb) 
522         PQclear(res);
(gdb) p PQgetvalue(res, 0, 1)
$4 = 0x633514 "16"
(gdb)

不存在時間線history文件,生成history文件

(gdb) n
529     lastFlushPosition = stream->startpos;
(gdb) 
539         if (!existsTimeLineHistoryFile(stream))
(gdb) 
541             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
(gdb) 
542             res = PQexec(conn, query);
(gdb) 
543             if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
556             if (PQnfields(res) != 2 || PQntuples(res) != 1)
(gdb) 
564             writeTimeLineHistoryFile(stream,
(gdb) 
568             PQclear(res);
(gdb)

調用START_REPLICATION命令初始化

(gdb) 
575         if (stream->stream_stop(stream->startpos, stream->timeline, false))
(gdb) n
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
583         res = PQexec(conn, query);
(gdb) 
584         if (PQresultStatus(res) != PGRES_COPY_BOTH)
(gdb) 
591         PQclear(res);
(gdb)

執行命令,處理stream WAL,完成調用

595         if (res == NULL)
(gdb) p *res
$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0, 
  resultStatus = PGRES_COMMAND_OK, 
  cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' <repeats 19 times>, "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 <defaultNoticeReceiver>, 
    noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 <defaultNoticeProcessor>, noticeProcArg = 0x0}, events = 0x0, 
  nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, 
  curOffset = 0, spaceLeft = 0}
(gdb) n
608         if (PQresultStatus(res) == PGRES_TUPLES_OK)
(gdb) 
666         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
(gdb) 
668             PQclear(res);
(gdb) 
676             if (stream->stream_stop(stoppos, stream->timeline, false))
(gdb) 
677                 return true;
(gdb) 
702 }
(gdb) 
LogStreamerMain (param=0x629db0) at pg_basebackup.c:523
523     if (!stream.walmethod->finish())
(gdb)

到此,關于“PostgreSQL中ReceiveXlogStream有什么作用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女