立即注册找回密码

QQ登录

只需一步,快速开始

微信登录

微信扫一扫,快速登录

手机动态码快速登录

手机号快速注册登录

搜索

图文播报

查看: 206|回复: 0

[讨论] PD 分离系列 - NVIDIA Dynamo 代码原理解析 (一)

[复制链接]
发表于 2025-4-13 06:07 | 显示全部楼层 |阅读模式

登陆有奖并可浏览互动!

您需要 登录 才可以下载或查看,没有账号?立即注册 微信登录 手机动态码快速登录

×
Why PD and Dynamo ?

大模型推理服务长期面临三重矛盾:首 Token 生成速度(TTFT)单 Token 生成时延(TPOT)资源利用率。传统架构将 Prefill(输入处理)与 Decoding(自回归生成)耦合在同一 GPU 上,导致两种负载互相掣肘:

  • 算力错配:Prefill 阶段需并行计算长序列,而 Decoding 阶段依赖高频次显存读写逐 Token 生成,前者吃满算力时后者被迫排队;
  • 资源浪费:Decoding 阶段 GPU 利用率常不足 30%,但 Prefill 突发流量可能挤占生成带宽,造成「饥饿式延迟」;
  • 弹性缺失:长短文本混合场景下,单卡难以动态分配 Prefill 与 Decoding 资源,被迫超配硬件。
PD 分离架构通过解耦计算图异构资源池化打破僵局:将 Prefill 阶段卸载到高算力设备,Decoding 交由大显存/高带宽集群,通过分布式 KV Cache 调度实现「术业专攻」。以 DistServe 论文为例,分离后单卡吞吐提升 2-4.5 倍,而月之暗面 Kimi 的 Mooncake 系统更通过全局 KV Cache 池化实现长文本推理成本下降 30%(上述数据大家自行甄别)。
关于更多的 PD 分离相关知识可以参考以下大佬的文章:
Mooncake阅读笔记:深入学习以Cache为中心的调度思想,谱写LLM服务降本增效新篇章 - 方佳瑞的文章 - 知乎 https://zhuanlan.zhihu.com/p/706097807
大模型推理分离架构五虎上将 - 手抓饼熊的文章 - 知乎 https://zhuanlan.zhihu.com/p/706218732
Mooncake (1): 在月之暗面做月饼,Kimi 以 KVCache 为中心的分离式推理架构 - ZHANG Mingxing的文章 - 知乎 https://zhuanlan.zhihu.com/p/705754254



PD 分离会盟


从学术界的 Splitwise、DistServe 到工业界的 Mooncake,PD 分离已成为头部厂商的「降本利器」。NVIDIA 推出的 Dynamo 框架进一步将这一理念工程化。为什么要选择介绍 Dynamo 呢?抛开其他原因,我觉得在框架之上搭一层 router 的这种形式,与我的想法不谋而合(不要脸)。早在思考speculative sampling和prompt compression的时候,就觉得可以抽离出一个专门的 router 来做这些事情,包括但不限于:判断 prompt 难易程度,tokenize流程, 处理 cache 的问题,判断直接使用 draft model or full model,负载均衡或者是调度。并且 @Byron Hsu  也于去年 11 月在SGLANG slack上 create 了 dev-router 的channel  。所以借Dynamo 发布的东风,鄙人也可以学习一下 router 的设计以及架构。
其实官方的 repo 已经提供了很多文档来细致介绍整体的架构设计以及实现细节,但是缺少从coding角度的说明。再加上核心代码是用 Rust 编写,存在一定的理解成本。因此本文会从代码流程的角度出发,将 Dynamo 拆解为三个部分来介绍一下 Dynamo 的具体实现,以及其中一些巧思。有不足或者理解错误之处,望各位大佬多多指出讨论。
正文

话不多说,我们从这张图开始。这是根据 Dynamo Repo 中的原始架构图修改而成的,标注出了 components 的一些核心功能以及要介绍的核心 funcation,可以配合之后的解释来更好的理解。



Fig1. 核心 components 结构图

Fronted

负责add 或者 remove model,启动 http 服务,这个不是太重要。大家可以自行看一下代码。
Processor

Processor 的核心逻辑比较简单,负责前后处理的流程。除开上图提到的初始化部分,其核心功能就是根据 router_mode(后续默认按照 “kv”模式)以及输入 tokens来确定 Worker 的选择方式。然后将请求调用到对应的 worker_client 执行推理操作。最后会接收返回数据并 yield。 由于其核心执行代码都是在其他 components中,因此我们只需要大致了解 Processor 的作用即可。 Processor 大致的 workflow 如下:
用户请求 → Processor接收raw_request → parse_raw_request进行tokenization → 选择合适的vLLMWorker(基于路由策略)→ 发请求到Worker → Worker执行模型推理 → 返回生成结果 → Processor处理和格式化响应 → 流式返回给用户

Worker


Worker 负责推理阶段,决策是否做 remote prefill,以及维护与inference engine的 connection。 初始化 worker,包括 engine, nats server,PrefillQueue以及 KvMetricsPublisher 等。这里着重聊一下 PrefillQueue 和 KvMetricsPublisher。
PrefillQueue(NATSQueue)是一个异步消息队列,可以灵活实现 push 或者 fetch。如果确定 request 需要在 remote 执行,会通过enqueue_task入队,然后在对应的 prefillworker 上执行dequeue_task出队,进行下一步流程。
KvMetricsPublisher是一个基于 KvMetrics 的 Publisher。Publish 是Dynamo 中的一个常见概念,会发布 Event 到指定的 subject,这样所有订阅了该 subject 的都会收到该 Event (后续会详细介绍 KvCacheEvent 的一系列逻辑)。其中 KVmetrics 包括的参数如下,但是最后能够用来判断 Worker 负载的只有KvMetricsAggregator里的指标:num_requests_waiting, gpu_cache_usgae以及gpu_prefix_cache_hit_rate。
        if self.engine_args.router == "kv":
            assert self.engine_client is not None, "engine_client was not initialized"
            self.engine_client.set_metrics_publisher(self.metrics_publisher)
            # Initially send dummy metrics to kick start,
            # vLLM will not update stat until forward pass is triggered
            self.metrics_publisher.publish(
                0,  # request_active_slots
                1024,  # request_total_slots
                0,  # kv_active_blocks
                1024,  # kv_total_blocks
                0,  # num_requests_waiting
                0.0,  # gpu_cache_usage_perc
                0.0,  # gpu_prefix_cache_hit_rate
            )
            task = asyncio.create_task(self.create_metrics_publisher_endpoint())
            task.add_done_callback(
                lambda _: print("metrics publisher endpoint created")
            )
然后是 generate 函数:在disaggregated_router的情况下(反之则always remote prefill),根据promts 的长度,prefix_hit_rate,以及 prefill_queue_size 来判断是否在远端做 prefill。这个算是worker 里面的核心判断逻辑,其代码如下:
def prefill_remote(
        self, prompt_length: int, prefix_hit_rate: float, queue_size: int
    ):
        absolute_prefill_length = int(prompt_length * (1 - prefix_hit_rate))
        # TODO: consider size of each request in the queue when making the decision
        decision = (
            absolute_prefill_length > self.max_local_prefill_length
            and queue_size < self.max_prefill_queue_size
        )
        vllm_logger.info(
            f&#34;Remote prefill: {decision} (prefill length: {absolute_prefill_length}/{prompt_length}, prefill queue size: {queue_size}/{self.max_prefill_queue_size})&#34;
        )
        return decision一句话总结就是:打开 remote prefill ,在prefill_queue 队列长度小于启动 yaml 设置的 max_prefill_queue_size 的情况下,绝对 prefill 长度(剔除 prefix hit rate 的占比)大于 yaml 设置的 max_local_prefill_length的时候,启用 remote prefill。 具体执行方式是将下列 callback 函数传递给RemotePrefillParams,然后交由对应的 engine 来处理。
    def get_remote_prefill_request_callback(self):
        # TODO: integrate prefill_queue to dynamo endpoint
        async def callback(request: RemotePrefillRequest):
            async with PrefillQueue.get_instance(
                nats_server=self._prefill_queue_nats_server,
                stream_name=self._prefill_queue_stream_name,
            ) as prefill_queue:
                await prefill_queue.enqueue_prefill_request(request)

        return callbackKV router

KV router 主要功能是根据 workers 的各项指标来获得分数,从而来选择使用哪一个 worker。由于这个地方会涉及到 Rust 里面的匹配计算逻辑,所以我会先梳理一下 KV router 的功能,然后再进入到 Rust 里面来着重介绍。 首先是初始化 router和 KV 组件,包括KVIndexer,KvMetricsAggregator 等。 这里的 KV 组件都是为了后面计算 Router 的负载和分数服务。
        kv_listener = self.runtime.namespace(&#34;dynamo&#34;).component(&#34;VllmWorker&#34;)
        await kv_listener.create_service()
        self.indexer = KvIndexer(kv_listener, self.args.block_size)
        self.metrics_aggregator = KvMetricsAggregator(kv_listener)
        print(&#34;KV Router initialized&#34;)然后是对应的 generate 函数:通过find_matches_for_request计算出的 scores + metrics + token_length 来计算出最佳的 worker_id 以及对应 prefix_hit_rate。
find_matches_for_request的具体实现很重要,我们先放在这里,走完后续_cost_funcation的流程之后回头再来看。
from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores

@dynamo_endpoint()
    async def generate(self, request: Tokens) -> AsyncIterator[WorkerId]:
        lora_id = 0
        try:
            scores = await self.indexer.find_matches_for_request(
                request.tokens, lora_id
            )
        except Exception as e:
            scores = {}
            vllm_logger.exception(f&#34;Error finding matches: {e}&#34;)

        metrics = await self.metrics_aggregator.get_metrics()
        worker_id, prefix_hit_rate = self._cost_function(
            scores, metrics, len(request.tokens)
        )

        vllm_logger.info(
            f&#34;Scheduling to worker_id: {worker_id} with estimated prefix hit rate: {prefix_hit_rate}&#34;
        )
        yield f&#34;{worker_id}_{prefix_hit_rate}&#34;cost_function 的核心计算逻辑如下:
其中 score 是匹配的 block 数量,worker_scores 是匹配的 tokens 数目/总的 token 长度。gpu_cache_usage 是metrics_aggregator中提取到的信息。normalized_waiting 是根据所有 workers 的 max waiting 进行的 normalized。最后再从worker_logits 里面拿到最大 logtis 对应的 worker_id (如果不止一个,则从中随机选取),并返回对应的worker_scores,作为prefix_hit_rate。
worker_scores[worker_id] = (
    score * self.indexer.block_size() / token_length
)
worker_logits[worker_id] = 2 * worker_scores - gpu_cache_usage - normalized_waiting
vllm_logger.info(
                f&#34;Formula for {worker_id}: {worker_logits[worker_id]:.3f} = 2.0 * {score:.3f} - {gpu_cache_usage:.3f} - {normalized_waiting:.3f}&#34;
            )
其实到上述为止,整个 python 接口的核心功能都已经介绍完了。这些逻辑和实现都是比较直观简单的,大家可以先结合上面的Fig.1 来梳理清楚整个结构,对于 Dynamo 的功能和逻辑有一个粗浅的认识。

<hr/>接下来我们以kv_router 中 find_matches_for_request 这个函数为 Rust 的切入点,介绍一下其背后的执行逻辑。

KvIndexer


KvIndexer 的代码逻辑在dynamo/lib/llm/src/kv_router/indexer.rs,主要负责管理KV储存,处理 events 以及match requests。在 kv_router 中调用的find_matches_for_request会根据 tokens 和 kv_block_size 来计算 sequences 的 hash 值,然后进行实际的匹配流程。 首先 tokens 会根据 kv_block_size 来进行分块(不够 kv_block_size 的元素会被 remove 掉),然后以小字节序字节顺序将浮点数的内存表示形式返回为字节数组(https://rustwiki.org/zh-CN/std/primitive.f32.html#method.to_le_bytes),并展平成一维数组。接着再计算 Hash 值(具体计算方式参考https://docs.rs/xxhash-rust/latest/xxhash_rust/xxh3/fn.xxh3_64_with_seed.html
async fn find_matches_for_request(
        &self,
        tokens: &[u32],
    ) -> Result<OverlapScores, KvRouterError> {
        log::debug!(
            &#34;Finding matches for request tokens: {:?} / len: {}&#34;,
            tokens,
            tokens.len()
        );
        let sequence = compute_block_hash_for_seq(tokens, self.kv_block_size);
        log::debug!(&#34;Computed sequence: {:?}&#34;, sequence);
        self.find_matches(sequence).await
    }
   
   
   pub fn compute_block_hash_for_seq(tokens: &[u32], kv_block_size: usize) -> Vec<LocalBlockHash> {
    tokens
        .chunks_exact(kv_block_size) // Split into chunks of kv_block_size elements
        .map(|chunk| {
            let bytes: Vec<u8> = chunk
                .iter()
                .flat_map(|&num| num.to_le_bytes()) // Convert each i32 to its little-endian bytes
                .collect();

            compute_block_hash(&Bytes::from(bytes)) // Convert the byte Vec to Bytes
        })
        .collect()
}
完成 hash 值的计算之后,会执行 find_matches 的逻辑。首先会创建一个一次性的异步通信通道,然后将 sequence(计算出的 hash 值),early_exit(是否提前退出),resp(发送通道) 打包成一个 req(MachRequest)。然后会将 req通过 match_tx 通道发送出去,然后异步等待 resp_rx 通道接收返回结果。注意上面提到发送接收通道 resp_tx 和 resp_rx,match_tx和 match_rx 都是一一对应的。这意味着存在一个后台监控器会接收任务,实际处理之后再 resend 给 find_matches。
async fn find_matches(
        &self,
        sequence: Vec<LocalBlockHash>,
    ) -> Result<OverlapScores, KvRouterError> {
        let (resp_tx, resp_rx) = oneshot::channel();
        let req = MatchRequest {
            sequence,
            early_exit: false,
            resp: resp_tx,
        };

        if let Err(e) = self.match_tx.send(req).await {
            log::error!(
                &#34;Failed to send match request: {:?}; the indexer maybe offline&#34;,
                e
            );
            return Err(KvRouterError::IndexerOffline);
        }

        resp_rx
            .await
            .map_err(|_| KvRouterError::IndexerDroppedRequest)
    }
KvIndexer 会创建一个单线程来依次处理 remove worker,find_matches,处理cancelled 请求以及 apply event 时间。核心的匹配代码在RadixTree下的 find_matches 中。借用一下 Dynamo 中的描述:
The `RadixTree` struct represents the main data structure, with nodes (`RadixBlock`) containing children and associated worker IDs.
RadixTree 的结构包括 root(根节点),lookup(查找表,包括两层 Hash表),expiration_duration(过期时间)。接下来我们进入它核心的 find_matches 逻辑:
整体流程会遍历 sequence 中的哈希块,并引用其下一个子节点的哈希块并进行匹配。如果子节点存在,那么会检测所有缓存该哈希块的workers_id,并更新 scores 值。接下来还会用rencent_uses(双向队列)检查一下 block 的访问时间,过期了的时间戳会被 pop_front 掉,新的access time 会被 push_back。同时也会把访问的频率记录下来(但目前还没有利用频率来判断)。最后如果启动了 early_exit(默认为 false),找到唯一匹配的worker就直接退出。
pub fn find_matches(&self, sequence: Vec<LocalBlockHash>, early_exit: bool) -> OverlapScores {
        let mut scores = OverlapScores::new();
        let mut current = self.root.clone();
        let now = Instant::now();
        for block_hash in sequence {
            let next_block = {
                let current_borrow = current.borrow();
                current_borrow.children.get(&block_hash).cloned()
            };

            if let Some(block) = next_block {
                scores.update_scores(&block.borrow().workers);

                if let Some(expiration_duration) = self.expiration_duration {
                    let mut block_mut = block.borrow_mut();

                    while let Some(access_time) = block_mut.recent_uses.front() {
                        if now.duration_since(*access_time) > expiration_duration {
                            block_mut.recent_uses.pop_front();
                        } else {
                            break;
                        }
                    }
                    scores.add_frequency(block_mut.recent_uses.len());
                    block_mut.recent_uses.push_back(now);
                }

                if early_exit && block.borrow().workers.len() == 1 {
                    break;
                }

                current = block;
            } else {
                break;
            }
        }

        scores
    }
上面的例子可能比较抽象,我这边举一个简单的例子来帮助大家理解:
`假设现在存在序列[“我爱北京城门”],并且kv_block_size=2,假如经过上述的 tokenized,分块,字节转化,哈希化之后成为序列:[[432, 265],[251,234],[673,654]]。然后进行匹配:`
`第一个块[432, 265],发现 workers = {1, 2, 3} ,意味着 worker 1,2,3 都缓存了这个块,scores 更新之后会变成 scores = { 1:1, 2:1, 3:1 }`
`第二个块[251,234],发现 workers = {1, 2},意味着只有 worker 1,2 缓存了这个块,scores 更新之后会变成 scores = { 1:2, 2:2, 3:1 }
第三个块[673,654],发现 workers = {1},意味着只有 worker 1 缓存了这个块,scores 更新之后会变成 scores = { 1:3, 2:2, 3:1 }`  
匹配统计完成之后,上述的 scores 就会被返回,进行 _cost_function 的计算。
上述例子的 scores 就是kv_router 代码中的返回值 scores,后续会被用来计算 cost_funcation 以及负载,从而来确定具体使用哪个 Worker。
        try:
            scores = await self.indexer.find_matches_for_request(
                request.tokens, lora_id
            )小结

本文以 python 接口为切入点,力图简洁的给大家介绍了 Dynamo 的组件功能,以及一些核心的逻辑。为了把整个功能的流程串起来,也附带介绍了一些简单的 Rust 代码。除了上述的组件以外,Nvidia 还提出了“Planner”,可以用来动态扩缩Prefill/Decode Worker,或者是实现两者的 role-switching。但是目前在 public repo 还没有实现,后续如果实现了(亦或是有其他关键更新),我也会在本文章中更新。希望大家多多关注!

后记


有些朋友可能会有疑问,KvIndexer 是怎么知道哪些 worker 缓存了哪些哈希块的信息?Fig.1 图中右上角的 vLLM 和 KvCacheEvent 部分具体承担了什么任务?Kv 的管理,传输以及通信是怎么处理的? 由于篇幅的原因,我会在后续的系列中给大家详细介绍。希望大家多多点赞收藏。
Dynamo 是一个设计很优秀的 PD 分离架构,虽然有一些复杂,但是也不失为一个好的学习样例,本文如果有任何错误或者遗漏之处,大家可以在评论区和谐讨论!

原文地址:https://zhuanlan.zhihu.com/p/1892956782365742153
楼主热帖
回复

使用道具 举报

发表回复

您需要登录后才可以回帖 登录 | 立即注册 微信登录 手机动态码快速登录

本版积分规则

关闭

官方推荐 上一条 /3 下一条

快速回复 返回列表 客服中心 搜索 官方QQ群 洽谈合作
快速回复返回顶部 返回列表