立即注册找回密码

QQ登录

只需一步,快速开始

微信登录

微信扫一扫,快速登录

手机动态码快速登录

手机号快速注册登录

搜索

图文播报

查看: 286|回复: 0

[讨论] PD 分离系列 - NVIDIA Dynamo 代码原理解析 (二)

[复制链接]
发表于 2025-6-1 16:04 | 显示全部楼层 |阅读模式

登陆有奖并可浏览互动!

您需要 登录 才可以下载或查看,没有账号?立即注册 微信登录 手机动态码快速登录

×
Planner

在具体介绍之前,由于Dynamo 更新了planner, 在这个地方也给大家解释说明一下。这个 planner 主要的作用是实时来根据系统负载来动态扩缩容,通过 Prefill 队列大小以及 kv_load 负载来控制扩缩容的策略。
具体是调用 Circus 来监控和控制进程,通过 add_component 和 remove_component来从状态文件加载当前的系统配置,然后构建对应的命令来执行资源更新。另外 Planner还设置了一些保护机制,例如采用 blocking 的方式来扩缩容,每次只增加或减少一个工作节点,新增 decoding 节点之后的 3 个调整周期之内不变化等等。 具体的细节大家可以查阅 Planner 的源码,都不是特别复杂。不过现在 Planner 也是在持续更新,目前只支持 vLLM 的backend。  
现在我们接着上个系列来讲解Dynamo 的架构和实现,本文会着重介绍 Rust 部分对于 KV cache 的处理,对于前文不清楚的可以移步:
PD 分离系列 -  NVIDIA Dynamo 代码原理解析 (一) - SuriWu.NTS的文章 - 知乎
SuriWu.NTS:PD 分离系列 -  NVIDIA Dynamo 代码原理解析 (一)

其实在 PD 分离的架构中,比较关键的就是 P和 D 的调度,以及 KV cache 的传输和储存。本文会主要分析 Rust 代码中 KV_Router 和 KV部分。
KV_Router

/dynamo/lib/llm/src/kv_router.rs 下的 KV Router 的主要功能实际上和前文提到的是类似的,但是会多出来一些KV cache 的发布和订阅操作。下面的 scheduler 是具体执行 KV router 调度决策的核心代码,其实 KvScheduler::start里面部分的实现和之前 python 的版本是一致的,都采用了相似的计算逻辑,感兴趣的可以到dynamo/lib/llm/src/kv_router/scheduler.rs下看一下WorkerSelector的具体代码。

        pub const KV_EVENT_SUBJECT: &str = "kv_events";
        let metrics_aggregator =
            KvMetricsAggregator::new(component.clone(), cancellation_token.clone()).await;
        let indexer = KvIndexer::new(cancellation_token.clone(), block_size);
        let scheduler = KvScheduler::start(
            component.namespace().clone(),
            block_size,
            metrics_aggregator.endpoints_watcher(),
            selector,
        )
        .await?;

        // [gluo TODO] try subscribe_with_type::<RouterEvent>,
        // error checking below will be different.
        let mut kv_events_rx = component.subscribe(KV_EVENT_SUBJECT).await?;
        let kv_events_tx = indexer.event_sender();
我们着重介绍一下 Dynamo 里面的 publish 和 subscribe 机制(即发布和订阅)。其中会涉及到RouterEvent,KvCacheEvent,KvCacheEventData 等,这种设计能够跟踪分布式环境中的 KV 缓存状态,并支持高效的路由决策和缓存管理。我们现在来跟着整个调用的链路来过一遍整个Event 的流程:
假如我们有1 个 KvRouter,2 个工作节点 worker1 和 worker2
1. 用户发送一个请求 RouterRequest,包含token 序列[100,200,300,400]
pub struct RouterRequest {
    pub tokens: Vec<Token>,
}
2. KvRouter接收到RouterRequest之后,根据 token 来计算 block_hash
        let local_block_hashes: Vec<LocalBlockHash> = tokio::task::spawn_blocking(move || {
            Tokens::compute_block_hash(&request.tokens, block_size)
                .into_iter()
                .map(LocalBlockHash)
                .collect()
        })
3. KvRouter 调用 KVIndexer 来查找匹配的工作节点
// KvRouter::generate
let overlap_scores = self.indexer.find_matches(local_block_hashes).await?;
4. KvRouter调用 KvScheduler 来选择工作节点
let worker_id = self.scheduler.schedule(overlap_scores, isl_tokens).await?;5. 由于是第一次调用,没有匹配信息,Scheduler 会根据负载和资源使用情况选择 worker,假如选择了 worker1
let response = RouterResponse { worker_id };
6. 将请求发送到 worker1 ,在其中处理请求,创建并发布缓存 Event到 KV_EVENT_SUBJECT
    def enqueue_stored_event(self, parent: Optional[PrefixCachingBlock],
                             block: PrefixCachingBlock):
        token_ids_arr = (ctypes.c_uint32 *
                         len(block.token_ids))(*block.token_ids)
        num_block_tokens = (ctypes.c_size_t * 1)(len(block.token_ids))
        block_hash = (ctypes.c_uint64 * 1)(block.content_hash)
        parent_hash = ((ctypes.c_uint64 * 1)(parent.content_hash)
                       if parent is not None else None)

        # Publish the event
        result = self.lib.dynamo_kv_event_publish_stored(
            self.event_id_counter,  # uint64_t event_id
            token_ids_arr,  # const uint32_t *token_ids
            num_block_tokens,  # const uintptr_t *num_block_tokens
            block_hash,  # const uint64_t *block_ids
            1,  # uintptr_t num_blocks
            parent_hash,  # const uint64_t *parent_hash
            0,  # uint64_t lora_id
        )

        let kv_cache_event = KvCacheEvent {
            event_id: 1,
            data: KvCacheEventData::Stored(KvCacheStoreData {
                parent_hash: None,
                blocks: vec![KvCacheStoredBlockData {
                    block_hash: ExternalSequenceBlockHash(0),
                    tokens_hash: LocalBlockHash(13226331709069118873),
                }],
            }),
        };
7. KvRouter 订阅 KV_EVENT_SUBJECT接受缓存事件,反序列化 RouterEvent并转发给 KvIndexer 处理,更新 RadixTree
                let event: RouterEvent = match serde_json::from_slice(&event.payload) {
                    Ok(event) => {
                        tracing::debug!(&#34;received kv event: {:?}&#34;, event);
                        event
                    }
                    Err(e) => {
                        tracing::warn!(&#34;Failed to deserialize RouterEvent: {:?}&#34;, e);
                        // Choosing warn and continue to process other events from other workers
                        // A bad event likely signals a problem with a worker, but potentially other workers are still healthy
                        continue;
                    }
                };
                if let Err(e) = kv_events_tx.send(event).await {
                    tracing::trace!(&#34;failed to send kv event to indexer; shutting down: {:?}&#34;, e);
                }
            }
        });
8. 后续请求会按照上述流程,计算 hash 之后查找匹配的 Worker ID,执行上述流程循环。整体的架构图如下。


到这个地方大家可以看到,RadixTree 是 KV Router 的一个比较核心的组件,作为 KvIndexer 的一部分,维护 token block hash 和 woker 之间的映射,通过事件机制来与分布式的节点保持同步。这样中心化索引的设计简化了 Router 的决策逻辑,并且 Worker 节点不需要知道 RadixTree 的存在,只需要专注计算任务,发布缓存事件即可。
下面这一部分会涉及到 kv 的复制,管理,保存等等,也是 dynamo 里面关于 kv 的核心组件。我会按照 Rust 里的代码结构来给大家逐一介绍理解。
KV Layers

KV layers 实现了 KV 缓存的层级结构、存储管理和数据传输功能。这个文件构建了整个 KV 缓存系统的基础架构



  • 初始化 KVCache 并分配内存(在此之前会初始化Layers info来确定 KV缓存的大小),包括模型的结构参数以及 KVCache 的Layout 类型和 block 大小。接着会在 GPU 和 CPU 上分配储存,GPU 用来访问处理,CPU可以用来储存不常用的 KVCache
        let model_details = KvModelDetailsBuilder::default()
            .number_of_layers(number_of_layers)
            .number_of_heads(number_of_heads)
            .head_size(head_size)
            .dtype(DType::F32) // Use F32 for easier validation
            .build()?;

        let block_details = KvBlockDetailsBuilder::default()
            .layout(layout.clone())
            .block_size(block_size)
            .tp_size(1)
            .tp_rank(0)
            .model_details(model_details)
            .build()?;

        // Create the storage blocks
        let h_blocks = KvBlockStorage::allocate(
            number_of_cpu_blocks,
            block_details.clone(),
            StorageType::Pinned,
        )?;

        let d_blocks = KvBlockStorage::allocate(
            number_of_gpu_blocks,
            block_details.clone(),
            StorageType::Device(device.clone()),
        )?;
        
2. 这里可以实现卸载部分指定 KVCache block 到 CPU 上,实现灵活的数据搬运策略。copy_blocks_to是一个同步阻塞操作,复杂简单场景的数据搬运
// 从 GPU 复制特定块到 CPU
let gpu_layer = gpu_blocks.layer(0)?;  // 获取GPU存储的第0层 (read only)
let mut cpu_layer = cpu_blocks.layer_mut(0)?;  // 获取CPU存储的第0层(read or write)

gpu_layer.copy_blocks_to(&[0, 1, 2], &mut cpu_layer, &[0, 1, 2])?;
3. 创建 h2d 和 d2h 的 BlockMap,包括源数据以及目标数据的 layer pointer 相关信息。然后创建 CopyStream,并设置对应的 layer pointer 信息。指定传输所有 Blocks,然后调用trigger_all_layers触发所有layer 的异步传输,如果需要的话用sync_stream来等待传输结束。这样的设计允许计算和传输 overlap,来提升整体的吞吐。
        let h2d_block_map = CopyStreamBlockMap::new(&h_blocks, &d_blocks).unwrap();
        let d2h_block_map = CopyStreamBlockMap::new(&d_blocks, &h_blocks).unwrap();

        let mut copy_stream = CopyStream::new(number_of_layers, number_of_gpu_blocks).unwrap();

        // block list 0..64 as i32
        let mut block_list: Vec<i32> = (0..number_of_gpu_blocks).map(|x| x as i32).collect();

        block_list.shuffle(&mut rng);
        let src_block_ids = block_list.clone();

        block_list.shuffle(&mut rng);
        let dst_block_ids = block_list.clone();
        
        // Select the appropriate block map based on direction
        if is_h2d {
            copy_stream.prepare_block_map(h2d_block_map).unwrap();
        } else {
            copy_stream.prepare_block_map(d2h_block_map).unwrap();
        }

        copy_stream
            .prepare_block_ids(src_block_ids, dst_block_ids)
            .unwrap();

        let timer = Instant::now();
        copy_stream.trigger_all_layers().unwrap();
        copy_stream.sync_stream().unwrap();
        for _ in 0..iterations {
            copy_stream.trigger_all_layers().unwrap();
            copy_stream.reuse().unwrap();
        }
        copy_stream.sync_stream().unwrap();
4. 另外 Dynamo 提供了一个Tensor 重排的操作,用来重新组织Tensor 维度
        copy_stream.scatter_copy_layer(
            0,
            &dims,
            elem_size,
            block_dim_idx,
            src_tp_size,
            dst_tp_size,
        )?;

KV Managers/Reserved/Reuse

Kv Managers的结构比较简单,主要是通过KvStorageManager来定义AvailableBlocks,inflight_blocks (ReservedBlocks)以及 block_size,从而管理 GPU 或 CPU 里面的 KV Block 的分配和重用。



  • 当新的推理请求到来时,首先为输入的 tokens 序列准备 KV cache 缓存,将 tokens 转化为固定大小的 block,然后尝试匹配inflight_blocks。
let seq = tokens.into_sequence(self.block_size);
        let (blocks, tail_block) = seq.into_parts();
        log::debug!(
            &#34;request translates to {} blocks; remaining tokens: {}&#34;,
            blocks.len(),
            tail_block.tokens().len()
        );

        // first match blocks to inflight blocks
        let mut inflight_blocks = self.inflight_blocks.match_token_blocks(&blocks)?;
        log::debug!(&#34;matched {} inflight blocks&#34;, inflight_blocks.len());
2. 对于没有匹配到的 block,尝试从 available_block 中去匹配,将匹配到的 block 注册为inflight_blocks
let unmatched_blocks = &blocks[inflight_blocks.len()..];
        let unmatched_hashes = unmatched_blocks
            .iter()
            .map(|b| b.sequence_hash())
            .collect::<Vec<_>>();

        // match the remaining blocks to freed gpu blocks (available_blocks)
        let unregistered_blocks = self.available_blocks.match_blocks(unmatched_hashes).await?;
        log::debug!(&#34;matched {} freed blocks&#34;, unregistered_blocks.len());

        // the blocks from the freed blocks pool must be registered as inflight blocks
        // todo - we might have to register the list of unregistered blocks as a single transaction
        for block in unregistered_blocks {
            inflight_blocks.push(self.inflight_blocks.register(block)?);
        }
3. 对于上述操作中没有匹配的Blocks,需要分配新的储存空间,从可用池中获取足够数量的块,将未匹配的 tokens 和可用Blocks 关联。
let mut blocks_to_reuse = self
            .available_blocks
            .take_blocks(remaining_blocks.len() as u32 + 1)
            .await?;

        if blocks_to_reuse.len() != remaining_blocks.len() + 1 {
            raise!(
                &#34;expected {} blocks, got {}&#34;,
                remaining_blocks.len() + 1,
                blocks_to_reuse.len()
            );
        }

        // update the blocks_to_reuse with the token block from remaining_blocks
        let complete_prefill_blocks: Vec<UniqueBlock> = remaining_blocks
            .into_iter()
            .map(|b| {
                let mut block = blocks_to_reuse.pop().unwrap();
                block.update_token_block(b);
                block
            })
            .collect();

        assert_eq!(blocks_to_reuse.len(), 1);
        let tail_kv_block = blocks_to_reuse.pop().unwrap();
KV Storage

该部分代码主要实现了一个底层的内存管理组件,提供高效灵活的方案。通过固定内存、张量视图、异步传输的组合,简化了相对复杂的内存管理任务,使上层代码能够专注于模型推理逻辑。它解决了例如跨设备内存管理,多维数据操作,高性能数据传输等等问题。 接下来会结合 Rust 中给出的样例来展示一下 KV Storage 的核心功能。

  • 首先创建一个 cudaContext 上下文,绑定 cudaStream。然后创建一个固定大小的 pinned 内存块,以及引用该内存块的 Tensor View 。在避免额外内存复制的同时,可以按照行列 index 简单的访问数据。
        // Initialize CUDA
        let context = CudaContext::new(0).unwrap();
        let stream = context.default_stream();

        // Create a host tensor with f32 elements (6 elements)
        let pinned_storage = OwnedStorage::create_pinned_array(6 * 4).unwrap();

        // Create a host tensor view
        let shape = [2, 3];
        let mut host_view = TensorView::<_, 2>::new(&pinned_storage, shape, 4).unwrap();
2. 重新定义一个包含 6 个 f32 值的数组,按照行优先的顺序填入 Tensor View。并在 GPU 上创建一个内存块,以及Tensor View。然后将数据从 CPU 传输到 GPU
        // Set some values
        let values = [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0];
        for i in 0..2 {
            for j in 0..3 {
                host_view
                    .set_element::<f32>(&[i, j], values[i * 3 + j])
                    .unwrap();
            }
        }

        // Create a device tensor
        let device_storage = OwnedStorage::create_device_array(6 * 4, context.clone()).unwrap();
        let mut device_view = TensorView::<_, 2>::new(&device_storage, shape, 4).unwrap();
        // Copy from host to device using h2d method
        host_view.h2d(&mut device_view, &stream).unwrap();
3. 创建另一个固定内存块和 Tensor View,用于接收从 GPU 返回的数据流。这里分离输入和输出的内存,防止堵塞竞争。
// Create another host tensor for receiving data back
        let pinned_storage2 = OwnedStorage::create_pinned_array(6 * 4).unwrap();
        let mut host_view2 = TensorView::<_, 2>::new(&pinned_storage2, shape, 4).unwrap();

        // Copy from device to host using d2h method
        device_view.d2h(&mut host_view2, &stream).unwrap();
        stream.synchronize().unwrap();4. 检查确保整个传输过程保持数据的完整性
        // Verify the data was correctly transferred
        for i in 0..2 {
            for j in 0..3 {
                assert_eq!(
                    host_view2.get_element::<f32>(&[i, j]).unwrap(),
                    values[i * 3 + j]
                );
            }
        }
原文地址:https://zhuanlan.zhihu.com/p/1911201145034110047
楼主热帖
回复

使用道具 举报

发表回复

您需要登录后才可以回帖 登录 | 立即注册 微信登录 手机动态码快速登录

本版积分规则

关闭

官方推荐 上一条 /3 下一条

快速回复 返回列表 客服中心 搜索 官方QQ群 洽谈合作
快速回复返回顶部 返回列表