这种方式是一种生产者(producer)和消费者(consumer)模式,P将运算完成的kv值以非阻塞的方式插入buffer中,D通过阻塞模式去获取buffer中的kv值。buffer是双端队列(LookupBuffer),buffer之间的数据传递需要通过pipe完成,主要是解决远端数据传递(Remote transfer ),pipe可选择pynccl或者mooncacke store。
方案当前比较简洁,所以操作(部署)比较简单。代码实现上只需要把transfer配置传递给LLM,P与D基本相同,区别在于rank信息不一样,参考(vllm/examples/offline_inference/disaggregated_prefill.py):
# 相同的输入
prompts = [
"Hello, my name is",
"Hi, your name is",
"Tell me a very long story",
]
sampling_params = SamplingParams(temperature=0, top_p=0.95)
将 allocate_slots 和 free 函数传递给连接器,以便其能够为键值对(KV)缓存传输预留 GPU 缓冲区。这使得连接器能够透明地将外部键值对缓存注入到 GPU 中,并将其作为前缀缓存块,从而使调度器可以将它们视为普通的前缀缓存块进行处理。
改动点:get_computed_blocks
# In get_computed_blocks, we call the connector at the scheduler side (we call it scheduler
# connector) to determine the KV cache of what tokens need to be loaded from the connector:
def get_computed_blocks(
self, request: Request) -> tuple[list[KVCacheBlock], int]
# After querying the GPU prefix cache
computed_blocks, num_computed_tokens = self.connector.get_external_prefix_cache_blocks(
request,
computed_blocks,
num_computed_tokens,
)
return computed_blocks, num_computed_tokens
改动点:scheduler_output
"""
Before returning the scheduler output, we call the scheduler connector to:
calculate the KV cache of which tokens need to be saved to the connector
and prepare the metadata to tell the worker connector the KV cache of what tokens need to be saved / loaded.
"""
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=resumed_reqs_data + running_reqs_data,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs=scheduled_encoder_inputs,Worker侧相关侧的代码修改:
# In gpu_model_runner.py, prepare the worker connector’s input in _prepare_inputs()
def _prepare_inputs(
self,
scheduler_output: "SchedulerOutput",
) -> tuple[FlashAttentionMetadata, torch.Tensor,
Optional[SpecDecodeMetadata]]:
# This will reset the state of connector
self.connector.parse_connector_meta(scheduler_output.connector_meta)
......
# Run the decoder.
# Use persistent buffers for CUDA graphs.
with set_forward_context(attn_metadata, self.vllm_config, self.connector):
hidden_states = self.model(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,)
改动点:forward_context.py/set_forward_context()
"""
In forward_context.py/set_forward_context(), asynchronously fire KV cache load operation layer-by-layer
before model execution, and blocking wait for the KV cache save operation after model execution:
"""
connector.start_load_kv_async(static_forward_context)
try:
yield
finally:
connector.wait_for_save_kv()
改动点:vllm/attention/layer.py
# In vllm/attention/layer.py: check for load before attn and async save after attn:
forward_context: ForwardContext = get_forward_context()
attn_metadata = forward_context.attn_metadata
self_kv_cache = self.kv_cache[forward_context.virtual_engine]
forward_context.connector.wait_for_load_kv(self)
self.impl.forward(self,
query,
key,
value,
self_kv_cache,
attn_metadata,
output=output)
forward_context.connector.start_save_kv_async(self)V1版本的适配考虑
解决方案(Woosuk):
The key to high-performance disaggregation is efficient KV transfer. Dynamo leverage NIXL to transfer KV cache directly from the VRAM of prefill engine to the VRAM of decode engine. In addition, the KV transfer is non-blocking, allowing GPU forward pass to serve other requests in addition to the KV transfer.