MemoryPool

内存可以说是计算过程当中最重要的资源了。因为内存的占用是不可抢占的,一般内存管控没有做好,那么可能会导致应用程序的崩溃。

内存资源的管理是通过MemoryPool这个trait的实现来进行的。 一是用来追踪内存的使用情况,二是为了限制在计算过程中内存的使用。datafusion作为一个流式查询引擎,大多数算子需要的内存大小都是可以通过schema和batch size计算的固定大小。但是像sort,group,join这类算子会使用缓存中间计算结果并且内存占用和输入的数据大小成比率的。

datafusion为了简便并没有追踪所有的内存,像是算子计算过程中使用的临时内存通常认为很小就进行了忽略。但是会产生较大内存使用的算子还是需要追踪的。同时由于并不是所有的内存被追踪,因此最好还是预留10%的内存空间比较好。

MemoryPool trait如下

pub trait MemoryPool: Send + Sync + std::fmt::Debug {
    /// Registers a new [`MemoryConsumer`]
    ///
    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
    fn register(&self, _consumer: &MemoryConsumer) {}
 
    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
    ///
    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
    fn unregister(&self, _consumer: &MemoryConsumer) {}
 
    /// Infallibly grow the provided `reservation` by `additional` bytes
    ///
    /// This must always succeed
    fn grow(&self, reservation: &MemoryReservation, additional: usize);
 
    /// Infallibly shrink the provided `reservation` by `shrink` bytes
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
 
    /// Attempt to grow the provided `reservation` by `additional` bytes
    ///
    /// On error the `allocation` will not be increased in size
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
 
    /// Return the total amount of memory reserved
    fn reserved(&self) -> usize;
}

这里有几个对象

/// 代表一个消费内存的对象,包含消费者名和是否可以溢出到磁盘的标志。
pub struct MemoryConsumer {
    name: String,
    can_spill: bool,
}

还有一个中间对象SharedRegistration,用来记录MemoryConsumerMemoryPool的注册关系。

#[derive(Debug)]
struct SharedRegistration {
    pool: Arc<dyn MemoryPool>,
    consumer: MemoryConsumer,
}

而实际记录内存追踪情况的MemoryReservation就是通过中间对象关联上的。

pub struct MemoryReservation {
    registration: Arc<SharedRegistration>,
    size: usize,
}

通过讲MemoryConsumer注册到MemoryPool上,从而获得MemoryReservation,在计算过程中就可以通过这个对象从而进行内存追踪和管理了。可以看到MemoryConsumer实现的方法

impl MemoryConsumer {
    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
	    // 注册到MemoryPool
        pool.register(&self);
        // 创建新的MemoryReservation
        MemoryReservation {
            registration: Arc::new(SharedRegistration {
                pool: Arc::clone(pool),
                consumer: self,
            }),
            size: 0,
        }
    }
}

在Datafusion中默认提供了几种MemoryPool实现

  • UnboundedMemoryPool: 无内存限制
  • GreedyMemoryPool:限制内存上限为一个固定值。但是按照一个先到先得顺序分配内存,也就是说先申请的可以先获得内存
  • FairSpillPool:提供了一种比较公平的内存分配方式。同样的内存上限也是一个固定值,但是每个任务都可以分配一些固定的内存

GreedyMemoryPool示例说明

以GreedyMemoryPool为示例说明简单说明MemoryPool的实现

pub struct GreedyMemoryPool {
    pool_size: usize,
    used: AtomicUsize,
}

结构体非常简单,就是一个可以使用的内存上限以及已用的内存大小。trait的逻辑也很简单,就是修改used的值即可

impl MemoryPool for GreedyMemoryPool {
    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
        self.used.fetch_add(additional, Ordering::Relaxed);
    }
 
    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
        self.used.fetch_sub(shrink, Ordering::Relaxed);
    }
	// 尝试申请,剩余空间足够则成功,否则就失败
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        self.used
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
                let new_used = used + additional;
                (new_used <= self.pool_size).then_some(new_used)
            })
            .map_err(|used| {
                insufficient_capacity_err(
                    reservation,
                    additional,
                    self.pool_size.saturating_sub(used),
                )
            })?;
        Ok(())
    }
 
    fn reserved(&self) -> usize {
        self.used.load(Ordering::Relaxed)
    }
}

TrackConsumersPool

不过在实际使用的时候,并不是直接使用这几个默认实现,而是通过TrackConsumersPool进行统一包装

    pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
        let pool_size = (max_memory as f64 * memory_fraction) as usize;
        self.with_memory_pool(Arc::new(TrackConsumersPool::new(
            GreedyMemoryPool::new(pool_size),
            NonZeroUsize::new(5).unwrap(),
        )))
    }

理所当然地,TrackConsumersPool也实现了MemoryPool这个trait。

pub struct TrackConsumersPool<I> {
    inner: I,
    top: NonZeroUsize,
    tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
}

处理内部的实现外,还额外记录了每一个注册的MemoryConsumer以及使用的内存情况。 直接MemoryPool trait的实现,则是在内部调用的基础上,加上了注册的MemoryConsumer的管理。

impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
    fn register(&self, consumer: &MemoryConsumer) {
        self.inner.register(consumer);
		// 添加consumer并设置初始值
        let mut guard = self.tracked_consumers.lock();
        if let Some(already_reserved) = guard.insert(consumer.clone(), Default::default())
        {
            guard.entry_ref(consumer).and_modify(|bytes| {
                bytes.fetch_add(
                    already_reserved.load(Ordering::Acquire),
                    Ordering::AcqRel,
                );
            });
        }
    }
 
    fn unregister(&self, consumer: &MemoryConsumer) {
        self.inner.unregister(consumer);
        // 移除consumer
        self.tracked_consumers.lock().remove(consumer);
    }
 
    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
        self.inner.grow(reservation, additional);
        // 更新内存
        self.tracked_consumers
            .lock()
            .entry_ref(reservation.consumer())
            .and_modify(|bytes| {
                bytes.fetch_add(additional as u64, Ordering::AcqRel);
            });
    }
 
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
        self.inner.shrink(reservation, shrink);
        // 更新内存
        self.tracked_consumers
            .lock()
            .entry_ref(reservation.consumer())
            .and_modify(|bytes| {
                bytes.fetch_sub(shrink as u64, Ordering::AcqRel);
            });
    }
 
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        self.inner
            .try_grow(reservation, additional)
            .map_err(|e| match e {
                DataFusionError::ResourcesExhausted(e) => {
                    // wrap OOM message in top consumers
                    DataFusionError::ResourcesExhausted(
                        provide_top_memory_consumers_to_error_msg(
                            e,
                            self.report_top(self.top.into()),
                        ),
                    )
                }
                _ => e,
            })?;
		// 更新内存
        self.tracked_consumers
            .lock()
            .entry_ref(reservation.consumer())
            .and_modify(|bytes| {
                bytes.fetch_add(additional as u64, Ordering::AcqRel);
            });
        Ok(())
    }
 
    fn reserved(&self) -> usize {
	    // 返回可用大小
        self.inner.reserved()
    }
}

memory pool会被RuntimeEnv持有,在整个Datafusion实例期间存在,任何需要进行内存管理的地方都可以从中取出并进行管理。

pub struct RuntimeEnv {
    /// Runtime memory management
    pub memory_pool: Arc<dyn MemoryPool>,
    ...
}

就像这样

let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
            .with_can_spill(true)
            .register(&runtime.memory_pool);
 
// 先申请内存成功后,再执行后续处理
if self.reservation.try_grow(size).is_err() {
            let before = self.reservation.size();
            self.in_mem_sort().await?;
}