RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
PostgreSQL源码解读(234)-查询#127(NOTIN实现#5)

本节简单解释了PostgreSQL NOT IN在执行时写入临时表空间的实现。

目前创新互联建站已为上1000+的企业提供了网站建设、域名、虚拟主机、网站改版维护、企业网站设计、洱源网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

测试数据如下:

[local]:5432 pg12@testdb=# select count(*) from tbl;
 count 
-------
     1
(1 row)
Time: 6.009 ms
[local]:5432 pg12@testdb=# select count(*) from t_big_null;
  count   
----------
 10000001
(1 row)
[local]:5432 pg12@testdb=#

一、数据结构

Tuplestorestate
Tuplestore相关操作的私有状态。


/*
 * Possible states of a Tuplestore object.  These denote the states that
 * persist between calls of Tuplestore routines.
 */
typedef enum
{
    TSS_INMEM,                    /* Tuples still fit in memory */
    TSS_WRITEFILE,                /* Writing to temp file */
    TSS_READFILE                /* Reading from temp file */
} TupStoreStatus;
/*
 * Private state of a Tuplestore operation.
 */
struct Tuplestorestate
{
    TupStoreStatus status;        /* 状态枚举值;enumerated value as shown above */
    int            eflags;            /* capability flags (OR of pointers' flags) */
    bool        backward;        /* store extra length words in file? */
    bool        interXact;        /* keep open through transactions? */
    bool        truncated;        /* tuplestore_trim has removed tuples? */
    int64        availMem;        /* remaining memory available, in bytes */
    int64        allowedMem;        /* total memory allowed, in bytes */
    int64        tuples;            /* number of tuples added */
    BufFile    *myfile;            /* underlying file, or NULL if none */
    MemoryContext context;        /* memory context for holding tuples */
    ResourceOwner resowner;        /* resowner for holding temp files */
    /*
     * These function pointers decouple the routines that must know what kind
     * of tuple we are handling from the routines that don't need to know it.
     * They are set up by the tuplestore_begin_xxx routines.
     *
     * (Although tuplestore.c currently only supports heap tuples, I've copied
     * this part of tuplesort.c so that extension to other kinds of objects
     * will be easy if it's ever needed.)
     *
     * Function to copy a supplied input tuple into palloc'd space. (NB: we
     * assume that a single pfree() is enough to release the tuple later, so
     * the representation must be "flat" in one palloc chunk.) state->availMem
     * must be decreased by the amount of space used.
     */
    void       *(*copytup) (Tuplestorestate *state, void *tup);
    /*
     * Function to write a stored tuple onto tape.  The representation of the
     * tuple on tape need not be the same as it is in memory; requirements on
     * the tape representation are given below.  After writing the tuple,
     * pfree() it, and increase state->availMem by the amount of memory space
     * thereby released.
     */
    void        (*writetup) (Tuplestorestate *state, void *tup);
    /*
     * Function to read a stored tuple from tape back into memory. 'len' is
     * the already-read length of the stored tuple.  Create and return a
     * palloc'd copy, and decrease state->availMem by the amount of memory
     * space consumed.
     */
    void       *(*readtup) (Tuplestorestate *state, unsigned int len);
    /*
     * This array holds pointers to tuples in memory if we are in state INMEM.
     * In states WRITEFILE and READFILE it's not used.
     *
     * When memtupdeleted > 0, the first memtupdeleted pointers are already
     * released due to a tuplestore_trim() operation, but we haven't expended
     * the effort to slide the remaining pointers down.  These unused pointers
     * are set to NULL to catch any invalid accesses.  Note that memtupcount
     * includes the deleted pointers.
     */
    void      **memtuples;        /* array of pointers to palloc'd tuples */
    int            memtupdeleted;    /* the first N slots are currently unused */
    int            memtupcount;    /* number of tuples currently present */
    int            memtupsize;        /* allocated length of memtuples array */
    bool        growmemtuples;    /* memtuples' growth still underway? */
    /*
     * These variables are used to keep track of the current positions.
     *
     * In state WRITEFILE, the current file seek position is the write point;
     * in state READFILE, the write position is remembered in writepos_xxx.
     * (The write position is the same as EOF, but since BufFileSeek doesn't
     * currently implement SEEK_END, we have to remember it explicitly.)
     */
    TSReadPointer *readptrs;    /* array of read pointers */
    int            activeptr;        /* index of the active read pointer */
    int            readptrcount;    /* number of pointers currently valid */
    int            readptrsize;    /* allocated length of readptrs array */
    int            writepos_file;    /* file# (valid if READFILE state) */
    off_t        writepos_offset;    /* offset (valid if READFILE state) */
};
#define COPYTUP(state,tup)    ((*(state)->copytup) (state, tup))
#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
#define READTUP(state,len)    ((*(state)->readtup) (state, len))
#define LACKMEM(state)        ((state)->availMem < 0)
#define USEMEM(state,amt)    ((state)->availMem -= (amt))
#define FREEMEM(state,amt)    ((state)->availMem += (amt))

TSReadPointer
tuplestore读指针


/*
 * Possible states of a Tuplestore object.  These denote the states that
 * persist between calls of Tuplestore routines.
 */
typedef enum
{
    TSS_INMEM,                    /* Tuples still fit in memory */
    TSS_WRITEFILE,                /* Writing to temp file */
    TSS_READFILE                /* Reading from temp file */
} TupStoreStatus;
/*
 * State for a single read pointer.  If we are in state INMEM then all the
 * read pointers' "current" fields denote the read positions.  In state
 * WRITEFILE, the file/offset fields denote the read positions.  In state
 * READFILE, inactive read pointers have valid file/offset, but the active
 * read pointer implicitly has position equal to the temp file's seek position.
 *
 * Special case: if eof_reached is true, then the pointer's read position is
 * implicitly equal to the write position, and current/file/offset aren't
 * maintained.  This way we need not update all the read pointers each time
 * we write.
 */
typedef struct
{
    int            eflags;            /* capability flags */
    bool        eof_reached;    /* read has reached EOF */
    int            current;        /* next array index to read */
    int            file;            /* temp file# */
    off_t        offset;            /* byte offset in file */
} TSReadPointer;

BufFile
该数据结构表示包含一个或多个物理文件的buffered file(每一个通过fd.c管理的虚拟文件描述符进行访问)


/*
 * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
 * The reason is that we'd like large BufFiles to be spread across multiple
 * tablespaces when available.
 * BufFiles会拆分为以几个GB为单位的segments而不管RELSEG_SIZE的大小.
 * 原因是我们倾向于在可用时把很大的BufFiles在多个表空间中分布.
 */
#define MAX_PHYSICAL_FILESIZE    0x40000000
#define BUFFILE_SEG_SIZE        (MAX_PHYSICAL_FILESIZE / BLCKSZ)
/*
 * This data structure represents a buffered file that consists of one or
 * more physical files (each accessed through a virtual file descriptor
 * managed by fd.c).
 * 该数据结构表示包含一个或多个物理文件的buffered file(每一个通过fd.c管理的虚拟文件描述符进行访问)
 */
struct BufFile
{
    //集合中物理文件的数量
    int            numFiles;        /* number of physical files in set */
    /* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
    //------- 除了最后一个文件,其他文件的大小为MAX_PHYSICAL_FILESIZE
    //使用numFiles分配的数组
    File       *files;            /* palloc'd array with numFiles entries */
    //跨事务?
    bool        isInterXact;    /* keep open over transactions? */
    //脏数据?
    bool        dirty;            /* does buffer need to be written? */
    //是否只读?
    bool        readOnly;        /* has the file been set to read only? */
    //如共享,段文件的空间大小
    SharedFileSet *fileset;        /* space for segment files if shared */
    //如共享,该BufFile的名称
    const char *name;            /* name of this BufFile if shared */
    /*
     * resowner is the ResourceOwner to use for underlying temp files.  (We
     * don't need to remember the memory context we're using explicitly,
     * because after creation we only repalloc our arrays larger.)
     * 用于临时文件的ResourceOwner
     */
    ResourceOwner resowner;
    /*
     * "current pos" is position of start of buffer within the logical file.
     * Position as seen by user of BufFile is (curFile, curOffset + pos).
     * "current pos" 是逻辑文件中buffer的起始位置.
     * BufFile用户看到的位置是((curFile, curOffset + pos))
     */
    //文件索引,当前位置的第(0..n)部分
    int            curFile;        /* file index (0..n) part of current pos */
    //当前位置的偏移部分
    off_t        curOffset;        /* offset part of current pos */
    //buffer中的下一个R/W位置
    int            pos;            /* next read/write position in buffer */
    //buffer中的有效字节数
    int            nbytes;            /* total # of valid bytes in buffer */
    PGAlignedBlock buffer;
};

二、源码解读

tuplestore_puttupleslot
把接收到的tuple放到tuplestore中


/*
 * Accept one tuple and append it to the tuplestore.
 * 把接收到的tuple放到tuplestore中
 *
 * Note that the input tuple is always copied; the caller need not save it.
 * 要注意的是输入元组通常已被拷贝,调用者不需要存储该元组。
 *
 * If the active read pointer is currently "at EOF", it remains so (the read
 * pointer implicitly advances along with the write pointer); otherwise the
 * read pointer is unchanged.  Non-active read pointers do not move, which
 * means they are certain to not be "at EOF" immediately after puttuple.
 * This curious-seeming behavior is for the convenience of nodeMaterial.c and
 * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
 * steps.
 * 如果活动的读指针当前正处于EOF位置,那么仍会保留现状(读指针默认与写指针同步)。
 * 否则的话,读指针是不变的。非活动读指针不会移动,意味着在puttuple后没有马上就处于EOF状态下。
 * 这种看似奇怪的行为是便于nodeMaterial.c和nodeCtescan.c的处理,否则需要额外的指针重定位。
 *
 * tuplestore_puttupleslot() is a convenience routine to collect data from
 * a TupleTableSlot without an extra copy operation.
 * tuplestore_puttupleslot()例程不需要额外的拷贝动作从TupleTableSlot中收集数据。
 */
void
tuplestore_puttupleslot(Tuplestorestate *state,
                        TupleTableSlot *slot)
{
    MinimalTuple tuple;
    MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
    /*
     * Form a MinimalTuple in working memory
     * 在工作内存中组装MinimalTuple
     */
    tuple = ExecCopySlotMinimalTuple(slot);
    USEMEM(state, GetMemoryChunkSpace(tuple));
    tuplestore_puttuple_common(state, (void *) tuple);
    MemoryContextSwitchTo(oldcxt);
}

tuplestore_puttuple_common
tuplestore_puttupleslot函数的实现


static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
    TSReadPointer *readptr;
    int            i;
    ResourceOwner oldowner;
    state->tuples++;
    switch (state->status)
    {
        case TSS_INMEM:
            /*
             * Update read pointers as needed; see API spec above.
             * 需要时更新读指针
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++)
            {
                if (readptr->eof_reached && i != state->activeptr)
                {
                    //已达末尾,且指针非活动,则设置相应的状态和位置
                    readptr->eof_reached = false;
                    readptr->current = state->memtupcount;
                }
            }
            /*
             * Grow the array as needed.  Note that we try to grow the array
             * when there is still one free slot remaining --- if we fail,
             * there'll still be room to store the incoming tuple, and then
             * we'll switch to tape-based operation.
             * 需要时扩展数组大小.
             * 注意:在仍有一个空闲slot剩余时尝试增大数组,如果失败仍有空间存储进入的元组,
             *      然后切换至tape-based操作.
             */
            if (state->memtupcount >= state->memtupsize - 1)
            {
                (void) grow_memtuples(state);
                Assert(state->memtupcount < state->memtupsize);
            }
            /* Stash the tuple in the in-memory array */
            //指向tuple
            state->memtuples[state->memtupcount++] = tuple;
            /*
             * Done if we still fit in available memory and have array slots.
             * 仍有可用内存并有数组slots,已完成所有工作,可返回.
             */
            if (state->memtupcount < state->memtupsize && !LACKMEM(state))
                return;
            //否则的话,需要落盘
            /*
             * Nope; time to switch to tape-based operation.  Make sure that
             * the temp file(s) are created in suitable temp tablespaces.
             * 切换至tape-base操作.
             * 确保临时文件在合适的temp表空间中创建.
             */
            PrepareTempTablespaces();
            /* associate the file with the store's resource owner */
            //关联文件与存储资源宿主
            oldowner = CurrentResourceOwner;
            CurrentResourceOwner = state->resowner;
            state->myfile = BufFileCreateTemp(state->interXact);
            CurrentResourceOwner = oldowner;
            /*
             * Freeze the decision about whether trailing length words will be
             * used.  We can't change this choice once data is on tape, even
             * though callers might drop the requirement.
             * 关于是否使用结尾长度字需要"冻结"此决定.
             * 一旦数据落盘就不能改变此选择,即使调用者可能会放弃此要求.
             */
            state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
            state->status = TSS_WRITEFILE;
            dumptuples(state);
            break;
        case TSS_WRITEFILE:
            /*
             * Update read pointers as needed; see API spec above. Note:
             * BufFileTell is quite cheap, so not worth trying to avoid
             * multiple calls.
             * 需要时更新读指针.
             * 注意:BufFileTell执行效率很高,因此不值得尝试避免循环多次调用.
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++)
            {
                if (readptr->eof_reached && i != state->activeptr)
                {
                    readptr->eof_reached = false;
                    BufFileTell(state->myfile,
                                &readptr->file,
                                &readptr->offset);
                }
            }
            //#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
            WRITETUP(state, tuple);
            break;
        case TSS_READFILE:
            /*
             * Switch from reading to writing.
             * 从读切换至写.
             */
            if (!state->readptrs[state->activeptr].eof_reached)
                BufFileTell(state->myfile,
                            &state->readptrs[state->activeptr].file,
                            &state->readptrs[state->activeptr].offset);
            if (BufFileSeek(state->myfile,
                            state->writepos_file, state->writepos_offset,
                            SEEK_SET) != 0)
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not seek in tuplestore temporary file: %m")));
            state->status = TSS_WRITEFILE;
            /*
             * Update read pointers as needed; see API spec above.
             * 需要时更新读指针.
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++)
            {
                if (readptr->eof_reached && i != state->activeptr)
                {
                    readptr->eof_reached = false;
                    readptr->file = state->writepos_file;
                    readptr->offset = state->writepos_offset;
                }
            }
            //#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
            WRITETUP(state, tuple);
            break;
        default:
            elog(ERROR, "invalid tuplestore state");
            break;
    }
}
void
BufFileTell(BufFile *file, int *fileno, off_t *offset)
{
    *fileno = file->curFile;
    *offset = file->curOffset + file->pos;
}

三、跟踪分析

执行SQL:

[local]:5432 pg12@testdb=# select * from tbl a where a.id not in (select b.id from t_big_null b);

启动gdb,进入断点

(gdb) b tuplestore_puttupleslot
Breakpoint 1 at 0xab9134: file tuplestore.c, line 712.
(gdb) c
Continuing.
Breakpoint 1, tuplestore_puttupleslot (state=0x1efec78, slot=0x1efd4e0) at tuplestore.c:712
712        MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
(gdb)

输入参数

(gdb) n
717        tuple = ExecCopySlotMinimalTuple(slot);
(gdb) 
718        USEMEM(state, GetMemoryChunkSpace(tuple));
(gdb) 
720        tuplestore_puttuple_common(state, (void *) tuple);
(gdb) p *state
$1 = {status = TSS_INMEM, eflags = 2, backward = false, interXact = false, truncated = false, 
  availMem = 4177840, allowedMem = 4194304, tuples = 0, myfile = 0x0, context = 0x1efce00, resowner = 0x1e5d308, 
  copytup = 0xaba7bd , writetup = 0xaba811 , readtup = 0xaba9d9 , 
  memtuples = 0x1f18ed0, memtupdeleted = 0, memtupcount = 0, memtupsize = 2048, growmemtuples = true, 
  readptrs = 0x1f056a0, activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0, writepos_offset = 0}
(gdb) p *slot
$2 = {type = T_TupleTableSlot, tts_flags = 16, tts_nvalid = 0, tts_ops = 0xc3e780 , 
  tts_tupleDescriptor = 0x7f16f33f5378, tts_values = 0x1efd550, tts_isnull = 0x1efd558, tts_mcxt = 0x1efce00, 
  tts_tid = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}, tts_tableOid = 49155}
(gdb) p slot->tts_values[0]
$3 = 0
(gdb)

进入tuplestore_puttuple_common

(gdb) step
tuplestore_puttuple_common (state=0x1efec78, tuple=0x1f05ce8) at tuplestore.c:771
771        state->tuples++;
(gdb)

当前状态TSS_INMEM

(gdb) p state->status
$4 = TSS_INMEM
(gdb)

如需要,更新读指针(无需更新)

(gdb) n
773        switch (state->status)
(gdb) 
780                readptr = state->readptrs;
(gdb) 
781                for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb) p *readptr
$5 = {eflags = 2, eof_reached = true, current = 0, file = 2139062143, offset = 9187201950435737471}
(gdb) n
783                    if (readptr->eof_reached && i != state->activeptr)
(gdb) p state->readptrcount
$6 = 1
(gdb) p state->activeptr
$7 = 0
(gdb) n
781                for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb)

如需要,扩展数组(实际不需要)

(gdb) 
796                if (state->memtupcount >= state->memtupsize - 1)
(gdb) p state->memtupcount
$8 = 0
(gdb) p state->memtupsize - 1
$9 = 2047
(gdb) n
803                state->memtuples[state->memtupcount++] = tuple;
(gdb)

放入到内存中,返回

(gdb) n
808                if (state->memtupcount < state->memtupsize && !LACKMEM(state))
(gdb) 
809                    return;
(gdb)

退出函数

(gdb) 
892    }
(gdb) 
tuplestore_puttupleslot (state=0x1efec78, slot=0x1efd4e0) at tuplestore.c:722
722        MemoryContextSwitchTo(oldcxt);
(gdb) 
723    }
(gdb) 
ExecMaterial (pstate=0x1efd1b8) at nodeMaterial.c:149
149            ExecCopySlot(slot, outerslot);
(gdb)

使用ignore N遍后,state->status状态变为TSS_WRITEFILE

(gdb) ignore 4 4194303
Will ignore next 4194303 crossings of breakpoint 4.
(gdb) c
Continuing.
Breakpoint 3, tuplestore_puttuple_common (state=0x160ba38, tuple=0x7f2cd90cc0b0) at tuplestore.c:771
771        state->tuples++;
(gdb) 
...
tuplestore_puttupleslot (state=0x160ba38, slot=0x160a2a0) at tuplestore.c:722
722        MemoryContextSwitchTo(oldcxt);
(gdb) c
Continuing.
Breakpoint 3, tuplestore_puttuple_common (state=0x160ba38, tuple=0x7f2cd90cc0e8) at tuplestore.c:771
771        state->tuples++;
(gdb) p *state
$9 = {status = TSS_WRITEFILE, eflags = 2, backward = false, interXact = false, truncated = false, 
  availMem = 3669944, allowedMem = 4194304, tuples = 4192545, myfile = 0x162ad80, context = 0x1609bc0, 
  resowner = 0x1579170, copytup = 0xaba7bd , writetup = 0xaba811 , 
  readtup = 0xaba9d9 , memtuples = 0x7f2cd914a050, memtupdeleted = 0, memtupcount = 0, 
  memtupsize = 65535, growmemtuples = false, readptrs = 0x1627590, activeptr = 0, readptrcount = 1, 
  readptrsize = 8, writepos_file = 0, writepos_offset = 0}
(gdb) n
773        switch (state->status)
(gdb) 
841                readptr = state->readptrs;
(gdb) 
842                for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb) 
844                    if (readptr->eof_reached && i != state->activeptr)
(gdb) 
842                for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb) 
853                WRITETUP(state, tuple);
(gdb) 
854                break;
(gdb) p *state->myfile
$10 = {numFiles = 1, files = 0x7f2cd934c008, isInterXact = false, dirty = true, readOnly = false, fileset = 0x0, 
  name = 0x0, resowner = 0x1579170, curFile = 0, curOffset = 58687488, pos = 8156, nbytes = 8156, buffer = {
    data = "\000\t\030\000\335\366?\000\016\000\000\000\001\000\000\t\030\000\336\366?\000\016\000\000\000\001\000\000\t\030\000\337\366?\000\016\000\000\000\001\000\000\t\030\000\340\366?\000\016\000\000\000\001\000\000\t\030\000\341\366?\000\016\000\000\000\001\000\000\t\030\000\342\366?\000\016\000\000\000\001\000\000\t\030\000\343\366?\000\016\000\000\000\001\000\000\t\030\000\344\366?\000\016\000\000\000\001\000\000\t\030\000\345\366?\000\016\000\000\000\001\000\000\t\030\000\346\366?\000\016\000\000\000\001\000\000\t\030\000\347\366?\000\016\000\000\000\001\000\000\t\030\000\350\366?\000\016\000\000\000\001\000\000\t\030\000\351\366?\000\016\000\000\000\001\000\000\t\030\000\352\366?\000\016\000\000\000\001\000\000\t\030\000"..., force_align_d = 1.7780737478550286e-307, 
    force_align_i64 = 18004352582551808}}
...

DONE

四、参考资料

N/A


新闻标题:PostgreSQL源码解读(234)-查询#127(NOTIN实现#5)
分享网址:http://lswzjz.com/article/jjisis.html