背景
简单来说,HAMI-Device-Plugin 旨在替代默认的 Nvidia-Device-Plugin,其核心任务是接管虚拟化 GPU(vGPU)设备的发现与管理,为上游的 vGPU 资源调度提供完整的数据支撑。如果你之前接触过 Kubernetes 环境下的 GPU 管理,应该知道原生 Device Plugin 在处理虚拟化 GPU 时往往力不从心,而 HAMI 正是为弥补这一短板而生。
![[AI-INFRA] HAMI VGPU 系列02:HAMI-Device-Plugin](/uploadfile/2026/0601/0e84389115a782e79852f130f1d1dccf.webp)
在探究 Device-Plugin 时,有几个关键方法值得重点关注:
- Register:将插件注册到 Kubelet,其中的
ResourceName参数至关重要,Kubelet 后续正是依靠这个名字向集群上报资源信息。 - RegisterInAnnotation:向 Kubernetes Node 写入 annotation,这些数据会直接供给 HAMI 调度逻辑使用。
- ListAndWatch:感知 GPU 设备状态并上报给 Kubelet。
- Allocate:为容器分配具体的 GPU 资源。
Register
程序的入口位于 cmd/device-plugin/nvidia/main.go,启动后依次执行 start → startPlugins。来看一段关键代码:
func startPlugins(c *cli.Context, o *options) ([]plugin.Interface, bool, error) {
// ... something before
started := 0
for _, p := range plugins {
if len(p.Devices()) == 0 { continue }
if err := p.Start(o.kubeletSocket); err != nil {
klog.Errorf("Failed to start plugin: %v", err)
return plugins, true, nil
}
started++
}
// ... something after
}
这里最核心的就是调用了 p.Start(o.kubeletSocket),它会触发后续完整的 Register 流程。参数 o.kubeletSocket 的默认值为 /var/lib/kubelet/device-plugins/kubelet.sock,通过命令行参数解析传入:
&cli.StringFlag{
Name: "kubelet-socket",
Value: kubeletdevicepluginv1beta1.KubeletSocket,
Usage: "specify the socket for communicating with the kubelet; if this is empty, no connection with the kubelet is attempted",
Destination: &o.kubeletSocket,
EnvVars: []string{"KUBELET_SOCKET"},
},
实际上,启动 gRPC 服务并向 Kubelet 注册的具体逻辑是在 pkg 侧实现的,cmd 框架仅负责调用抽象接口。理解这一点,有助于你更好地把握代码的层次结构。
// Start starts the gRPC server, registers the device plugin with the Kubelet,
// and starts the device healthchecks.
func (plugin *NvidiaDevicePlugin) Start(kubeletSocket string) error {
// ...
// 1. 启动 gRPC 服务
err = plugin.Serve()
// ...
// 2. 向 kubelet 注册
err = plugin.Register(kubeletSocket)
// ...
}
// plugin.Register(kubeletSocket)
func (plugin *NvidiaDevicePlugin) Register(kubeletSocket string) error {
if kubeletSocket == "" {
klog.Info("Skipping registration with Kubelet")
return nil
}
conn, err := plugin.dial(kubeletSocket, 5*time.Second)
if err != nil { return err }
defer conn.Close()
client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
// 关键点: 上报 ResourceName
reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
Version: kubeletdevicepluginv1beta1.Version,
Endpoint: path.Base(plugin.socket),
ResourceName: string(plugin.rm.Resource()),
Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
GetPreferredAllocationA vailable: false,
},
}
_, err = client.Register(context.Background(), reqt)
if err != nil { return err }
return nil
}
另外,HAMI-Device-Plugin 还会持续监听 kubelet socket 的变动:一旦发现 socket 被重新创建(比如 Kubelet 重启),它会自动重启整套 plugin,再次走一遍 Serve() → Register() 流程。
func start(c *cli.Context, o *options) error {
kubeletSocketDir := filepath.Dir(o.kubeletSocket)
watcher, err := watch.Files(kubeletSocketDir)
// ...
case event := <-watcher.Events:
if o.kubeletSocket != "" && event.Name == o.kubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", o.kubeletSocket)
goto restart
}
}
RegisterInAnnotation
当 gRPC 服务启动并成功注册到 Kubelet 后,插件会启动一个 goroutine 来执行 WatchAndRegister,这是 RegisterInAnnotation 的真正入口。
func (plugin *NvidiaDevicePlugin) Start(kubeletSocket string) error {
plugin.initialize()
// ...
go func() {
plugin.WatchAndRegister(plugin.disableWatchAndRegister, plugin.ackDisableWatchAndRegister)
}()
if deviceSupportMig {
plugin.ApplyMigTemplate()
}
return nil
}
WatchAndRegister 内部不断循环,按设定的间隔去调用 RegisterInAnnotation:
func (plugin *NvidiaDevicePlugin) WatchAndRegister(disableNVML <-chan bool, ackDisableWatchAndRegister chan<- bool) {
klog.Info("Starting WatchAndRegister")
errorSleepInterval := time.Second * 5
successSleepInterval := time.Second * 30
var disableWatchAndRegister bool
for {
select {
case disable := <-disableNVML:
if disable {
klog.Info("Received disableNVML signal, stopping WatchAndRegister")
disableWatchAndRegister = true
} else {
klog.Info("Received enableNVML signal, resuming WatchAndRegister")
disableWatchAndRegister = false
}
default:
}
if disableWatchAndRegister {
klog.Info("WatchAndRegister is disabled by disableWatchAndRegister signal, sleep a success interval")
ackDisableWatchAndRegister <- true
time.Sleep(successSleepInterval)
continue
}
// 执行
err := plugin.RegisterInAnnotation()
if err != nil {
klog.Errorf("Failed to register annotation: %v", err)
klog.Infof("Retrying in %v seconds...", errorSleepInterval)
time.Sleep(errorSleepInterval)
} else {
klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
time.Sleep(successSleepInterval)
}
}
}
现在来看 RegisterInAnnotation 的具体实现:
func (plugin *NvidiaDevicePlugin) RegisterInAnnotation() error {
// 1. 获取 hami.io/node-nvidia-register Annotation 的数据源
devices := plugin.getAPIDevices()
klog.InfoS("start working on the devices", "devices", devices)
annos := make(map[string]string)
node, err := util.GetNode(util.NodeName)
if err != nil { return err }
encodeddevices := device.MarshalNodeDevices(*devices)
if encodeddevices == plugin.deviceCache {
return nil
}
plugin.deviceCache = encodeddevices
var data []byte
if os.Getenv("ENABLE_TOPOLOGY_SCORE") == "true" {
// 2. 获取 hami.io/node-nvidia-score Annotation 的数据源
gpuScore, err := nvidia.CalculateGPUScore(device.GetDevicesUUIDList(*devices))
if err != nil { return err }
data, err = json.Marshal(gpuScore)
if err != nil { return err }
}
annos[nvidia.RegisterAnnos] = encodeddevices
if len(data) > 0 {
annos[nvidia.RegisterGPUPairScore] = string(data)
}
// 直接和 kube-apiserver 进行通信,而不是使用的传统的 device plugin 上报流程
err = util.PatchNodeAnnotations(node, annos)
if err != nil { klog.Errorln("patch node error", err.Error()) }
return err
}
该方法主要完成了三项任务:
- 将本节点 GPU 资源清单写入 annotation
hami.io/node-nvidia-register。例如:
hami.io/node-nvidia-register: '[{"id":"GPU-******-****-****-********","count":4,"devmem":15360,"devcore":100,"type":"NVIDIA-TeslaT4","numa":1,"mode":"hami-core","health":true,"devicepairscore":{}}]'
- 如果启用了拓扑分数计算,还会把 GPU 拓扑亲和分数写入
hami.io/node-nvidia-score:
[{"uuid": "GPU-0","score": {"GPU-1": 200,"GPU-2": 100}}]
- 利用
util.PatchNodeAnnotations直接与 kube-apiserver 通信,而非走传统的 Device Plugin 上报流程。传统方式仅能上报资源总量,而 HAMI 的调度需要更细粒度的信息——比如单卡剩余显存/核心能否满足分配、NUMA 是否匹配、多卡组合的拓扑 Score 是否存在更优节点。通过 annotation 上报,调度器就能获得这些精细数据,从而做出更合理的决策。
ListAndWatch
当 HAMI-Device-Plugin 启动自己的 gRPC Server 后,Kubelet 便可调用标准的 Device-Plugin 接口 ListAndWatch 来获取设备信息。
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{
Devices: plugin.apiDevices(),
})
for {
select {
case <-plugin.stop:
return nil
case d := <-plugin.health:
d.Health = kubeletdevicepluginv1beta1.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{
Devices: plugin.apiDevices(),
})
}
}
}
返回给 Kubelet 的信息包括:
- 设备 ID
- 设备健康状态
- 设备 Topology 拓扑信息
Kubelet 拿到这些数据后,后续在调用 Allocate 时只需知道“把哪块 GPU 设备分配给哪个 Container”即可,具体分配细节由 Device Plugin 完成。
Allocate
分配流程分为两个阶段:硬件分配和 CUDA 劫持。
硬件分配
HAMI-Scheduler 会提前在 Pod 的 annotation 中写入调度结果,Device Plugin 在 Allocate() 中依赖以下关键 annotation:
hami.io/vgpu-nodehami.io/bind-phase=allocatinghami.io/bind-timehami.io/vgpu-devices-to-allocatehami.io/vgpu-devices-allocated
Allocate() 的解析方式:先通过 GetPendingPod() 找到当前 Pod,再通过 GetNextDeviceRequest() 从 annotation 中找出当前应处理的 Container,最后返回对应的环境变量、挂载、设备信息给 Kubelet。Kubelet 会将分配记录持久化到本地的 checkpoint 文件中(以 PodUID + ContainerName + ResourceName 为维度)。
CUDA 劫持
为了限制显存和算力,Device Plugin 会注入两个关键的环境变量:CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT。同时,它会将 libvgpu.so 挂载到容器内的 /usr/local/vgpu/libvgpu.so 路径,并通过 /etc/ld.so.preload 强制加载。
这里有两个实现细节值得注意:
hami.io/vgpu-devices-to-allocate由 HAMI-Scheduler 写入,Kubelet 调用 Device Plugin 的Allocate成功后,Device Plugin 会逐个擦除这些条目;而hami.io/vgpu-devices-allocated由 Scheduler 写入后保持不变。- 生产环境实测发现,即使容器镜像内的
LD_PRELOAD被覆盖,CUDA 劫持依然不受影响。原因是libvgpu.so不走LD_PRELOAD,而是通过/etc/ld.so.preload进行全局加载。Linux 动态链接器ld.so的加载顺序是:
进程启动
↓
ld.so 读取 /etc/ld.so.preload ← 第一步,文件级,不可绕过
↓
ld.so 读取 LD_PRELOAD 环境变量 ← 第二步,可被覆盖
↓
正常加载依赖库
所以即便用户把 LD_PRELOAD 改了,/etc/ld.so.preload 里的 libvgpu.so 依然会被强制加载。相关代码片段如下:
for _, val := range currentCtr.Env {
if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
t, _ := strconv.ParseBool(val.Value)
if !t { continue }
found = true
break
}
}
if !found {
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: "/etc/ld.so.preload",
HostPath: hostHookPath + "/vgpu/ld.so.preload",
ReadOnly: true,
})
}
下面是一个成功分配 Pod 的 Annotation 示例,帮助我们更直观地理解整个分配过程:
hami.io/bind-phase: successhami.io/bind-time: 1776156519(由 HAMI-Scheduler 写入后不变)hami.io/vgpu-devices-allocated: GPU-f49304b4-45f4-1c78-6bd9-bb970575182e,NVIDIA,7680,50:;(同一 Container 内不同设备用冒号分隔,不同 Container 之间用分号分隔)hami.io/vgpu-devices-to-allocate: ;(由 Scheduler 初始化后再由 Device Plugin 擦除)hami.io/vgpu-node: svr30187hw1288hami.io/vgpu-time: 1776156519
流程总结:
- HAMI-Scheduler 调度后,
hami.io/bind-phase变为allocating。 - HAMI-Device-Plugin 执行
Allocate后,将bind-phase更新为success或failed。
FAQ
为什么 Device-Plugin 按照 Container 维度分配 GPU 设备?
要回答这个问题,先看一个生产环境中的实际证据——宿主机上的 checkpoint 文件内容:
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
输出(简化格式):
{
"Data": {
"PodDeviceEntries": [
{"PodUID":"...", "ContainerName":"aitraining", "ResourceName":"nvidia.com/gpu", "DeviceIDs":{"-1":["GPU-..."]}},
{"PodUID":"...", "ContainerName":"querynode", "ResourceName":"cloud.ctrip.com/ip", "DeviceIDs":{"-1":["svr30187hw1288-..."]}}
]
},
"Checksum": ...
}
原理其实不难理解。我们回顾一下容器环境的 GPU 设备挂载链路:docker run -it --gpu all ... 启动后,由 Docker/Containerd 构建容器 Runtime Bundle(包括 Runtime Spec 和 rootfs),然后 nvidia-container-runtime-hook 判断容器是否需要 GPU 设备,并将 GPU 设备、驱动库等依赖描述注入到 Runtime Bundle 中,最后由 runc 完成容器的创建和启动。
在这个过程中,Kubelet 调用 Device Plugin 的 Allocate 实际上是按 Container 维度进行的——每个 Container 都是一个独立的 PodDeviceEntry,里面记录了该 Container 被分配到的 GPU 设备 ID。这也是为什么 Device Plugin 必须按 Container 维度返回分配结果。
