在 Gemini 2.0 系列教程的最后几期里,我们搭建了一个自托管的实时语音与视频聊天机器人,并且给它装上了“功能调用”的翅膀——让它能调用外部工具和 API。这些应用的共同特点,就是响应快、互动自然、推理能力强,背后全靠 Gemini 2.0 的多模态实时 API 撑着。
这一期,我们来聊聊另一个非常实用的场景。相信不少朋友已经在 Google AI Studio 里体验过,并且被它的效果和交互体验惊艳到了——没错,就是构建一个能通过语音与你实时协作的屏幕共享助手。下面会详细拆解它的前后端架构设计与具体代码实现。
Google AI Studio 确实是体验 Gemini 2.0 多模态能力的好入口。在它的“实时流”功能里,“共享您的屏幕”模块已经支持了文本、音频和屏幕的同步互动。不过,要想做到真正的个性化定制,还是得拿到底层 API,自己动手造轮子。
好,话不多说,直接开干。
架构
先来整体看看这个应用的结构。
这套架构的思路,其实跟之前基本一致——核心就是两条双向的 WebSocket 连接:一条在客户端和服务器之间,另一条在服务器和 Gemini API 之间。服务器充当中间的“信使”,负责转发消息和管理实时数据流。具体来说,服务器端的代码跟我们之前开发的基础多模态聊天机器人那期几乎一模一样。所以,如果你已经看过那期内容,可以直接跳过这部分速览,直奔客户端的开发。

代码讲解 — 服务器
服务器是用 Python 写的,主要干两件事:处理客户端的 WebSocket 连接,以及管理 Gemini API 的连接。
开始之前,你需要装好 WebSockets 和 google-genai 这两个库。然后,为模型 gemini-2.0-flash_exp 配置好 API 密钥,并用 API 版本 v1alpha 创建一个 Gemini 客户端。
#### pip install --upgrade google-genai==0.3.0##
import asyncio
import json
import os
import websockets
from google import genai
import base64
### 从环境中加载 API 密钥
os.environ['GOOGLE_API_KEY'] = ''
MODEL = "gemini-2.0-flash-exp" # 使用您的模型 ID
client = genai.Client(
http_options={
'api_version': 'v1alpha',
}
)
在代码的末尾,我们定义了一个 websockets.serve 函数,让服务器在指定端口上跑起来。每个来自客户端的 WebSocket 连接,都会触发名为 gemini_session_handler 的处理程序。
async def main() -> None:
async with websockets.serve(gemini_session_handler, "localhost", 9083):
print("正在运行 websocket 服务器 localhost:9083...")
await asyncio.Future() # 保持服务器无限运行
if __name__ == "__main__":
asyncio.run(main())
在 gemini_session_handler 这个函数里,我们用 client.aio.live.connect() 跟 Gemini API 建联。配置数据来自客户端发来的第一条消息,里面包含了 response_modalities,再加上我们自己设置的 system_instruction,用来告诉模型:“你现在是个屏幕共享助手。”
随后,处理程序就专心干转发的活儿了:
send_to_gemini函数负责接住客户端发来的消息,把里面的音频和图像数据提取出来,发给 Gemini API。receive_from_gemini函数则监听 Gemini API 的响应,把文本或音频数据拆包,再送回给客户端。
为了做到真正的实时交互和随时打断,所有这些任务都在两个并行的线程里异步处理。具体看代码:
async def gemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
"""在 websocket 会话中处理与 Gemini API 的交互。
参数:
client_websocket: 与客户端的 websocket 连接。
"""
try:
config_message = await client_websocket.recv()
config_data = json.loads(config_message)
config = config_data.get("setup", {})
config["system_instruction"] = """您是屏幕共享会话的有用助手。您的角色是:
1) 分析并描述共享屏幕上的内容
2) 回答有关共享内容的问题
3) 提供与所显示内容相关的信息和背景
4) 协助处理与屏幕共享相关的技术问题
5) 保持专业和乐于助人的语气。专注于简洁明了地回答。"""
async with client.aio.live.connect(model=MODEL, config=config) as session:
print("已连接到 Gemini API")
async def send_to_gemini():
"""将来自客户端 websocket 的消息发送到 Gemini API。"""
try:
async for message in client_websocket:
try:
data = json.loads(message)
if "realtime_input" in data:
for chunk in data["realtime_input"]["media_chunks"]:
if chunk["mime_type"] == "audio/pcm":
await session.send({"mime_type": "audio/pcm", "data": chunk["data"]})
elif chunk["mime_type"] == "image/jpeg":
await session.send({"mime_type": "image/jpeg", "data": chunk["data"]})
except Exception as e:
print(f"发送到 Gemini 时出错: {e}")
print("客户端连接关闭(发送)")
except Exception as e:
print(f"发送到 Gemini 时出错: {e}")
finally:
print("send_to_gemini 关闭")
async def receive_from_gemini():
"""接收来自 Gemini API 的响应并将其转发给客户端,循环直到回合完成。"""
try:
while True:
try:
print("从 Gemini 接收")
async for response in session.receive():
if response.server_content is None:
print(f'未处理的服务器消息! - {response}')
continue
model_turn = response.server_content.model_turn
if model_turn:
for part in model_turn.parts:
if hasattr(part, 'text') and part.text is not None:
await client_websocket.send(json.dumps({"text": part.text}))
elif hasattr(part, 'inline_data') and part.inline_data is not None:
print("音频 mime_type:", part.inline_data.mime_type)
base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')
await client_websocket.send(json.dumps({
"audio": base64_audio,
}))
print("音频已接收")
if response.server_content.turn_complete:
print('\n<回合完成>')
except websockets.exceptions.ConnectionClosedOK:
print("客户端连接正常关闭(接收)")
break # 如果连接关闭,则退出循环
except Exception as e:
print(f"接收来自 Gemini 时出错: {e}")
break
except Exception as e:
print(f"接收来自 Gemini 时出错: {e}")
finally:
print("Gemini 连接关闭(接收)")
# 启动发送循环
send_task = asyncio.create_task(send_to_gemini())
# 将接收循环作为后台任务启动
receive_task = asyncio.create_task(receive_from_gemini())
await asyncio.gather(send_task, receive_task)
except Exception as e:
print(f"Gemini 会话中间出错: {e}")
finally:
print("Gemini 会话关闭。")
代码讲解 — 客户端
接着看客户端的 HTML 和 Ja vaScript。这部分主要关注跟上一期教程前端代码的差异点。原始代码和修改后的代码都可以在 GitHub 仓库里找到。
图像处理
先来说说这个版本里最重要的新功能——用 startScreenShare 函数,替换了之前调用网络摄像头的逻辑,改成了屏幕共享。
startScreenShare
async function startScreenShare() {
try {
stream = await na vigator.mediaDevices.getDisplayMedia({
video: {
width: { max: 640 },
height: { max: 480 },
},
});
video.srcObject = stream;
await new Promise(resolve => {
video.onloadedmetadata = () => {
console.log("video loaded metadata");
resolve();
}
});
} catch (err) {
console.error("Error accessing the screen: ", err);
}
}
这个异步函数用 na vigator.mediaDevices.getDisplayMedia() 方法来获取屏幕捕获流。拿到流之后,把它设成 HTML 视频元素的源,然后等视频的元数据加载完成,确保后续操作能安全地访问视频尺寸。
接下来是 captureImage() 函数,它负责定期抓取视频帧,并转换成 base64 编码的数据,方便传回服务器。
captureImage
function captureImage() {
if (stream && video.videoWidth > 0 && video.videoHeight > 0 && context) {
canvas.width = 640;
canvas.height = 480;
context.drawImage(video, 0, 0, canvas.width, canvas.height);
const imageData = canvas.toDataURL("image/jpeg").split(",")[1].trim();
currentFrameB64 = imageData;
}
else {
console.log("no stream or video metadata not loaded");
}
}
这个函数里加了对 stream 的检查,并且在调用 drawImage() 之前,先确认视频的元数据已经加载好了。宽度和高度现在固定为 640x480,然后直接把视频帧转成 JPEG 格式的 base64 字符串,准备发给服务器。
这两个函数定义好之后,初始化屏幕共享和 WebSocket 连接的方式如下:
window.addEventListener("load", async () => {
await startScreenShare();
setInterval(captureImage, 3000);
connect();
});
页面加载时,先调用 startScreenShare 启动屏幕的初始视频流,然后设置一个每三秒执行一次 captureImage 的定时器——当然,这个时间间隔完全可以根据你的屏幕操作频率调小一点,以获得更及时的更新。最后,调用 WebSocket 的连接函数,这部分跟之前基本一样。
音频处理
initializeAudioContext
async function initializeAudioContext() {
if (initialized) return;
audioInputContext = new (window.AudioContext ||
window.webkitAudioContext)({
sampleRate: 24000
});
await audioInputContext.audioWorklet.addModule("pcm-processor.js");
workletNode = new AudioWorkletNode(audioInputContext, "pcm-processor");
workletNode.connect(audioInputContext.destination);
initialized = true;
}
音频这部分,我们展示了音频工作单元的初始化函数。它跟之前版本一样,用的采样率是 24000,以及同一个 pcm-processor.js 文件里的 PCM 处理逻辑。
sendVoiceMessage
function sendVoiceMessage(b64PCM) {
if (webSocket == null) {
console.log("websocket not initialized");
return;
}
payload = {
realtime_input: {
media_chunks: [{
mime_type: "audio/pcm",
data: b64PCM,
},
{
mime_type: "image/jpeg",
data: currentFrameB64,
},
],
},
};
webSocket.send(JSON.stringify(payload));
console.log("sent: ", payload);
}
sendVoiceMessage() 函数的作用,是把音频和图像的 base64 数据打包在一起,一起发给服务器。
receiveMessage
function receiveMessage(event) {
const messageData = JSON.parse(event.data);
const response = new Response(messageData);
if (response.text) {
displayMessage("GEMINI: " + response.text);
}
if (response.audioData) {
injestAudioChuckToPlay(response.audioData);
}
}
客户端从服务器收到 JSON 格式的消息后,解析出来,根据内容决定是显示文本还是播放音频。
sendInitialSetupMessage
function sendInitialSetupMessage() {
console.log("sending setup message");
setup_client_message = {
setup: {
generation_config: { response_modalities: ["AUDIO"] },
},
};
webSocket.send(JSON.stringify(setup_client_message));
}
千万别忘了在客户端的第一条配置消息里,选好你想要的响应方式。这里我们选的是 AUDIO,你也可以换成 TEXT,直接在网页上看文本输出。但有一点要特别注意:这个参数虽然看起来是列表形式,但目前只支持单一方式。如果你同时传了文本和音频,模型就会直接报错。
到这一步,我们已经成功地把输入源从网络摄像头切换到了屏幕共享。这样一来,就能捕获屏幕上的画面流发给 Gemini,并获得相应的回应了。
运行应用程序
这种屏幕共享助手的应用场景其实很广,帮你记笔记、浏览网页、甚至玩游戏都不在话下。对我个人来说,最有趣的是用它辅助日常的论文研究工作。
那么,开始动手吧!
首先,运行 Python 文件来启动服务器。WebSocket 服务器会在代码里定义的 8093 端口上监听。
接着,用下面的命令启动客户端:
python -m http.server
现在,在本地 8000 端口访问服务器就能看到效果了。
