与spark类型,Datafusion程序执行期间也是使用Session作为统一的状态管理。
使用Session trait定义了行为。
#[async_trait]
pub trait Session: Send + Sync {
/// Return the session ID
fn session_id(&self) -> &str;
/// Return the [`SessionConfig`]
fn config(&self) -> &SessionConfig;
/// return the [`ConfigOptions`]
fn config_options(&self) -> &ConfigOptions {
self.config().options()
}
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>>;
fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>>;
/// Return reference to scalar_functions
fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>>;
/// Return reference to aggregate_functions
fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>>;
/// Return reference to window functions
fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>>;
/// Return the runtime env
fn runtime_env(&self) -> &Arc<RuntimeEnv>;
/// Return the execution properties
fn execution_props(&self) -> &ExecutionProps;
fn as_any(&self) -> &dyn Any;
/// Return the table options
fn table_options(&self) -> &TableOptions;
/// return the TableOptions options with its extensions
fn default_table_options(&self) -> TableOptions {
self.table_options()
.combine_with_session_config(self.config_options())
}
/// Returns a mutable reference to [`TableOptions`]
fn table_options_mut(&mut self) -> &mut TableOptions;
/// Get a new TaskContext to run in this session
fn task_ctx(&self) -> Arc<TaskContext>;
}使用SessionStore结构体作为外部使用的统一对象,其实也就是对Session实现对象的简单封装
type SessionRefLock = Arc<Mutex<Option<Weak<RwLock<dyn Session>>>>>;
/// The state store that stores the reference of the runtime session state.
#[derive(Debug)]
pub struct SessionStore {
session: SessionRefLock,
}而实现了Session trait的结构就是SessionState,这里包含一个会话进行计算所需的全部环境、状态信息,是一个核心结构。
从Session trait定义的方法当中就可以看出由几个比较重要的运行状态
- RuntimeEnv
- TaskContext 其中RuntimeEnv主要是程序运行过程中的资源管理,在RuntimeEnv当中说明。本模块下主要以TaskContext作为切入点,来进一步了解计算任务的执行过程。