postgres是多进程的架构。每个sql客户端连接对应一个后端进程。主进程接收到客户端连接之后就会创建一个后端进程,入口函数为postgres.c下的 PostgresMain

在这个函数有有个循环,负责处理客户端的查询请求,这里先简要进行说明

  1. 清理和初始化。每次循环开始之前先清理一下上次执行遗留的一些状态并准备好处理新的查询
  2. 进入空闲状态则发送Ready消息给客户端,表示可以接受新的请求。这里在空闲的时候还可以做一些统计信息的更新,以及启用空闲连接超时定时器。
  3. 读取客户端消息。firstchar存储消息类型,消息体存在inputMessage
  4. 前面启动了空闲超时了的话,这里进行取消。
  5. 检查并处理信号或者中断(如cancel、超时、配置reload等)
  6. 如果发生了配置变更(SIGHUP信号),重新加载配置文件
  7. 根据firstchar的消息类型,将消息进行分发走对应的处理流程。现在我们只关注simple query这类消息就行。

overview

在本节我们直接关注循环的最后一步,处理客户端查询的逻辑就可以了。simple query被分发给exec_simple_query函数执行。

exec_simple_query(const char *query_string)
{
	// 开启事务
	start_xact_command();
	// 切换上下文
	oldcontext = MemoryContextSwitchTo(MessageContext);
	// 解析sql为查询描述。可能会有多条语句,所以解析的结果是一个列表。这里解析的结束是最原始的描述, RawStmt
	parsetree_list = pg_parse_query(query_string);
	// 消息处理完毕,重新切换会原来的上下文
	MemoryContextSwitchTo(oldcontext);
	
	// 遍历查询描述,依次处理查询
	foreach(parsetree_item, parsetree_list)
	{
		RawStmt    *parsetree = lfirst_node(RawStmt, parsetree_item);
		// 设置命令状态,这样通过ps命令就可以看到该进程再干什么了
		commandTag = CreateCommandTag(parsetree->stmt);
		cmdtagname = GetCommandTagNameAndLen(commandTag, &cmdtaglen);
		set_ps_display_with_len(cmdtagname, cmdtaglen);
		BeginCommand(commandTag, dest);
		
		// 中止事务状态检查,在该事务中只允许commit/abort
		if (IsAbortedTransactionBlockState() &&
	    !IsTransactionExitStmt(parsetree->stmt))
		    ereport(ERROR, ...);
		// 再次确保在事务当中
		start_xact_command();
		if (use_implicit_block)
		    BeginImplicitTransactionBlock();
		}
		// parse需要事务快照则设置事务快照
		if (analyze_requires_snapshot(parsetree))
		{
		    PushActiveSnapshot(GetTransactionSnapshot());
		    snapshot_set = true;
		}
		// 分析,重写
		// 分析:将原始的解析树转为查询树,进行语义分析
		// 应用规则系统,进行重写
		querytree_list = pg_analyze_and_rewrite_fixedparams(parsetree, query_string,
                                                   NULL, 0, NULL);
        // 规划:生成执行计划
		plantree_list = pg_plan_queries(querytree_list, query_string,
                               CURSOR_OPT_PARALLEL_OK, NULL);
        
		// 创建并启动portal。这是postgres执行任意查询的统一入口               
        portal = CreatePortal("", true, true);
		portal->visible = false;
 
		PortalDefineQuery(portal, NULL, query_string, commandTag, 
		                 plantree_list, NULL);
		PortalStart(portal, NULL, 0, InvalidSnapshot);
		
		// 准备好查询的接收
		format = 0; // TEXT 格式
		if (IsA(parsetree->stmt, FetchStmt)) {
		    // 处理 FETCH 的二进制格式
		}
		PortalSetResultFormat(portal, 1, &format);
		receiver = CreateDestReceiver(dest);
		// 运行portal
		(void) PortalRun(portal,
                FETCH_ALL,
                true,  /* always top level */
                receiver,
                receiver,
                &qc);
                
	    // 做一些清理工作
	    // 清理portal和事务
	    receiver->rDestroy(receiver);
		PortalDrop(portal, false);
 
		if (lnext(parsetree_list, parsetree_item) == NULL)
		{
		    if (use_implicit_block)
		        EndImplicitTransactionBlock();
		    finish_xact_command();
		}
		else if (IsA(parsetree->stmt, TransactionStmt))
		{
		    finish_xact_command();
		}
		else
		{
		    CommandCounterIncrement();
		    disable_statement_timeout();
		}
		// 完成消息发送
		EndCommand(&qc, dest, false);
		
		// 最终的清理和日志记录
		finish_xact_command();  // 关闭事务命令
 
		if (!parsetree_list)
		    NullCommand(dest);  // 空查询响应
		
		// 记录执行时间
		switch (check_log_duration(msec_str, was_logged)) {
		    case 1: /* 只记录时间 */
		    case 2: /* 记录时间和查询 */
		}
}

从上面精简的流程代码中我们可以获得以下关键信息

  1. 一个simple query可以执行多条命令,并且所有命令都在事务下
  2. 默认一个simple query都在一个事务下,但是可以显示使用begin/commit/abort等命令来控制事务。可以看到每个循环开始执行之前会做事务的状态检查以及确认后续执行流程在一个有效事务当中。
  3. 用户提交的命令,要经过parse tree 、query tree、plan tree的成功转换,才能得到一个有效的执行计划。
  4. 所有命令的执行都有一个统一的门面,那就是Portal,从上面的代码可以简单讲其生命周期划分为create、start、run、drop等多个阶段,完成一个命令的执行。

由于这里我们主要介绍执行流程,所以接下来直接讲视角放到最终的执行计划和Portal上就好。 有关plan tree这里就先不多讲,总之就是一个树状结构,描述了命令执行的流程,有这个认识就可以了。下面来说明portal的每个生命周期的执行过程。

Portal create

portalmem.cCreatePortal函数

Portal
CreatePortal(const char *name, bool allowDup, bool dupSilent)
{
	Portal		portal;
 
	Assert(PointerIsValid(name));
 
	portal = GetPortalByName(name);
	// 省略一些检查代码
 
	/* make new portal structure */
	portal = (Portal) MemoryContextAllocZero(TopPortalContext, sizeof *portal);
 
	/* initialize portal context; typically it won't store much */
	portal->portalContext = AllocSetContextCreate(TopPortalContext,
												  "PortalContext",
												  ALLOCSET_SMALL_SIZES);
 
	/* create a resource owner for the portal */
	portal->resowner = ResourceOwnerCreate(CurTransactionResourceOwner,
										   "Portal");
 
	/* initialize portal fields that don't start off zero */
	portal->status = PORTAL_NEW;
	// 省略一些字段的初始化
 
	/* put portal in table (sets portal->name) */
	PortalHashTableInsert(portal, name);
 
	/* for named portals reuse portal->name copy */
	MemoryContextSetIdentifier(portal->portalContext, portal->name[0] ? portal->name : "<unnamed>");
 
	return portal;
}

上面的代码很直观,就是初始化portal以及上下文的状态。以一个崭新的portal对象来执行命令。

portal define query

portal是所有命令的统一执行入口。上面的create也是共用的初始化代码。当然要执行具体的一个命令,还需要这个命令相关的信息,就是define query这一步所做的事情

void
PortalDefineQuery(Portal portal,
				  const char *prepStmtName,
				  const char *sourceText,
				  CommandTag commandTag,
				  List *stmts,
				  CachedPlan *cplan)
{
	Assert(PortalIsValid(portal));
	Assert(portal->status == PORTAL_NEW);
 
	Assert(sourceText != NULL);
	Assert(commandTag != CMDTAG_UNKNOWN || stmts == NIL);
 
	portal->prepStmtName = prepStmtName;
	portal->sourceText = sourceText;
	portal->qc.commandTag = commandTag;
	portal->qc.nprocessed = 0;
	portal->commandTag = commandTag;
	portal->stmts = stmts;
	portal->cplan = cplan;
	portal->status = PORTAL_DEFINED;
}

就是简单的赋值操作,将要执行的命令相关的内容赋值给portal中对应的字段。

Portal start

源码在pquery.cPortalStart函数当中。负责确认portal的执行策略,并根据策略做对应的环境准备工作。设置好快照和执行器

  • 环境检查和切换。
Assert(PortalIsValid(portal));
Assert(portal->status == PORTAL_DEFINED);
 
// 保存当前全局状态
saveActivePortal = ActivePortal;
saveResourceOwner = CurrentResourceOwner;
savePortalContext = PortalContext;
 
// 设置 Portal 相关的全局状态
ActivePortal = portal;
if (portal->resowner)
    CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
oldContext = MemoryContextSwitchTo(PortalContext);
  • 根据执行语句选择策略
portal->portalParams = params;
portal->strategy = ChoosePortalStrategy(portal->stmts);
  • 根据策略初始化执行环境。
    • PORTAL_ONE_SELECT:单个select,可以流式执行
    • PORTAL_ONE_RETURNING:单个带RETURING的DML,需要收集所有结果
    • PORTAL_ONE_MOD_WITH:单个带修改CTE的查询
    • PORTAL_UTIL_SELECT:返回结果的工具命令
    • PORTAL_MULTI_QUERY:多语句或其他复杂情况
	switch (portal->strategy)
		{
			case PORTAL_ONE_SELECT:
 
				/* Must set snapshot before starting executor. */
				if (snapshot)
					PushActiveSnapshot(snapshot);
				else
					PushActiveSnapshot(GetTransactionSnapshot());
 
				/*
				 * We could remember the snapshot in portal->portalSnapshot,
				 * but presently there seems no need to, as this code path
				 * cannot be used for non-atomic execution.  Hence there can't
				 * be any commit/abort that might destroy the snapshot.  Since
				 * we don't do that, there's also no need to force a
				 * non-default nesting level for the snapshot.
				 */
 
				/*
				 * Create QueryDesc in portal's context; for the moment, set
				 * the destination to DestNone.
				 */
				queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
											portal->sourceText,
											GetActiveSnapshot(),
											InvalidSnapshot,
											None_Receiver,
											params,
											portal->queryEnv,
											0);
 
				/*
				 * If it's a scrollable cursor, executor needs to support
				 * REWIND and backwards scan, as well as whatever the caller
				 * might've asked for.
				 */
				if (portal->cursorOptions & CURSOR_OPT_SCROLL)
					myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
				else
					myeflags = eflags;
 
				/*
				 * Call ExecutorStart to prepare the plan for execution
				 */
				ExecutorStart(queryDesc, myeflags);
 
				/*
				 * This tells PortalCleanup to shut down the executor
				 */
				portal->queryDesc = queryDesc;
 
				/*
				 * Remember tuple descriptor (computed by ExecutorStart)
				 */
				portal->tupDesc = queryDesc->tupDesc;
 
				/*
				 * Reset cursor position data to "start of query"
				 */
				portal->atStart = true;
				portal->atEnd = false;	/* allow fetches */
				portal->portalPos = 0;
 
				PopActiveSnapshot();
				break;
 
			case PORTAL_ONE_RETURNING:
			case PORTAL_ONE_MOD_WITH:
 
				/*
				 * We don't start the executor until we are told to run the
				 * portal.  We do need to set up the result tupdesc.
				 */
				{
					PlannedStmt *pstmt;
 
					pstmt = PortalGetPrimaryStmt(portal);
					portal->tupDesc =
						ExecCleanTypeFromTL(pstmt->planTree->targetlist);
				}
 
				/*
				 * Reset cursor position data to "start of query"
				 */
				portal->atStart = true;
				portal->atEnd = false;	/* allow fetches */
				portal->portalPos = 0;
				break;
 
			case PORTAL_UTIL_SELECT:
 
				/*
				 * We don't set snapshot here, because PortalRunUtility will
				 * take care of it if needed.
				 */
				{
					PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
 
					Assert(pstmt->commandType == CMD_UTILITY);
					portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
				}
 
				/*
				 * Reset cursor position data to "start of query"
				 */
				portal->atStart = true;
				portal->atEnd = false;	/* allow fetches */
				portal->portalPos = 0;
				break;
 
			case PORTAL_MULTI_QUERY:
				/* Need do nothing now */
				portal->tupDesc = NULL;
				break;
		}

Portal run

源码在pquery.cPortalRun函数中。用于执行Portal当中的查询并返回结果。

  • 前置检查和初始化
Assert(PortalIsValid(portal));
TRACE_POSTGRESQL_QUERY_EXECUTE_START();
// 初始化空完成状态数据
if (qc)
    InitializeQueryCompletion(qc);
// 记录统计信息
if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
    ResetUsage();
// 标记 Portal 为活动状态
MarkPortalActive(portal);
  • 保存并切换上下文
// 保存当前全局状态
saveTopTransactionResourceOwner = TopTransactionResourceOwner;
saveTopTransactionContext = TopTransactionContext;
saveActivePortal = ActivePortal;
saveResourceOwner = CurrentResourceOwner;
savePortalContext = PortalContext;
saveMemoryContext = CurrentMemoryContext;
 
// 设置 Portal 相关的全局状态
ActivePortal = portal;
if (portal->resowner)
    CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
MemoryContextSwitchTo(PortalContext);
  • 按照策略执行不同的逻辑
case PORTAL_ONE_SELECT:
case PORTAL_ONE_RETURNING:
case PORTAL_ONE_MOD_WITH:
case PORTAL_UTIL_SELECT:
 
    // 对于非 PORTAL_ONE_SELECT,如果还没有运行,先将结果存储到 tuplestore
    // 比如带returning的更新语句,必须需要让更新语句执行完成才能获取到更新的结果
    if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
        FillPortalStore(portal, isTopLevel);
 
    // 获取期望的结果部分
    nprocessed = PortalRunSelect(portal, true, count, dest);
 
    // 如果 Portal 包含命令标签且调用者提供了存储指针,复制并更新行数
    if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
    {
        CopyQueryCompletion(qc, &portal->qc);
        qc->nprocessed = nprocessed;
    }
 
    // 标记 Portal 不再活动
    portal->status = PORTAL_READY;
 
    // 由于是前向获取,当 atEnd 为 true 时表示完成
    result = portal->atEnd;
    break;
 
case PORTAL_MULTI_QUERY:
    PortalRunMulti(portal, isTopLevel, false, dest, altdest, qc);
    
    // 防止 Portal 的命令被重新执行
    MarkPortalDone(portal);
    
    // RunMulti 结束时总是完成
    result = true;
    break;
  • 清理还原上下文
PG_CATCH();
{
    // 未捕获的错误:标记 Portal 失败
    MarkPortalFailed(portal);
 
    // 恢复全局变量并传播错误
    if (saveMemoryContext == saveTopTransactionContext)
        MemoryContextSwitchTo(TopTransactionContext);
    else
        MemoryContextSwitchTo(saveMemoryContext);
    ActivePortal = saveActivePortal;
    // ... 恢复其他全局状态
    
    PG_RE_THROW();
}

Portal drop

portalmem.cPortalDrop函数当中。逻辑倒是也没有什么特别好讲的,就是将Portal里面的资源全部进行释放。