
1. 问题背景:阻塞式操作与 Shiny 应用的响应性
在开发基于 shiny for python 的交互式应用时,我们经常需要处理一些耗时的操作,例如通过串口发送一系列指令来控制外部设备。如果这些操作直接放在 @reactive.effect 或 @reactive.event 装饰器修饰的函数内部,并且包含了阻塞式的循环或长时间的延迟(如 time.sleep() 或忙等待 while 循环),就会导致整个 shiny 应用的用户界面(ui)失去响应。
考虑一个控制流体泵的场景:用户点击“启动”按钮(p1),应用开始按照预设的流量曲线循环发送串口指令。如果用户希望在传输过程中随时点击“停止”按钮(p2)来中断传输,那么一个阻塞式的启动逻辑将无法满足需求。原始实现中,p1 按钮对应的 _ 函数内部包含一个 while 循环,每次发送指令后都会等待两秒。这意味着在循环完成之前,p2 按钮的点击事件将无法被 Shiny 应用的主事件循环及时捕获和处理,导致停止指令被排队,直到当前传输循环结束后才能执行。
原始的阻塞式代码示例(存在响应性问题):
import time
import serial
from shiny import reactive
# 假设 ser 已经初始化为串口对象
ser = serial.Serial("COM6", 115200)
@reactive.Effect
@reactive.event(input.p1)
def _():
y = yg.get() # 从 reactive value yg 获取电压数组
for e in y: # 遍历数组
msg = "1:1:"+str(e)+":100" # 格式化驱动电压消息
ser.write(bytes(msg,'utf-8')) # 发送消息
t0 = time.time() # 记录时间戳
while(((time.time()-t0)<=2)): # 忙等待,直到2秒后
pass
ser.write(bytes("0:1",'utf-8')) # 传输结束后停止泵
@reactive.Effect
@reactive.event(input.p2)
def _():
#print("1:0")
ser.write(bytes("0:1",'utf-8')) # 停止泵问题分析: 上述 input.p1 对应的 _ 函数内部的 for 循环和 while 忙等待是导致问题的根源。在 Shiny 应用中,所有 reactive.Effect 和 reactive.event 装饰器修饰的函数都在同一个主线程中执行。当一个函数长时间运行时,它会独占主线程,阻止其他事件(如 input.p2 的点击)被处理,从而导致 UI 卡顿和失去响应。
2. 解决方案:利用多线程实现非阻塞操作
为了解决主线程阻塞问题,我们可以将耗时操作从主线程中剥离,放到一个独立的后台线程中执行。Python 的 threading 模块提供了实现这一目标的工具,特别是 threading.Thread 用于创建新线程,以及 threading.Event 用于线程间的信号通信。
核心思路:
- 创建一个独立的函数,包含需要长时间运行的逻辑(如串口数据传输循环)。
- 使用 threading.Thread 将这个函数包装成一个新线程。
- 利用 threading.Event 对象作为信号量,实现主线程与子线程之间的通信。主线程可以在需要停止任务时设置 Event,子线程则周期性检查 Event 的状态以决定是否继续执行。
改进后的代码实现:
import serial
import time
import numpy as np
import threading as th
from shiny import App, ui, reactive
# 假设 ser 已经初始化
ser = serial.Serial("COM6", 115200)
# 定义一个全局的 Event 对象,用于线程间通信
sflag = th.Event()
# 辅助函数:发送串口消息
def transmit(e):
"""
根据给定的电压值 e 格式化消息并发送到串口。
"""
msg = "1:1:"+str(e)+":100"
# print(msg) # 调试用
ser.write(bytes(msg,'utf-8'))
# 后台线程执行的函数:定时发送数据
def rtimer(y, sflag):
"""
在独立线程中执行的函数,循环遍历数组 y 并发送数据。
每隔2秒发送一次,直到数组遍历完毕或 sflag 被设置。
"""
i = 0
while i < np.size(y) and not sflag.is_set():
transmit(y[i])
i += 1
time.sleep(2) # 使用 time.sleep() 在子线程中安全等待
# 循环结束后,如果不是因为 sflag 停止,则发送停止指令
# 但由于 p2 也会发送停止指令,此处可以根据实际需求调整
if not sflag.is_set(): # 如果是正常完成,而不是被中断
ser.write(bytes("0:1",'utf-8')) # 停止泵
# p1 按钮的响应函数:启动传输线程
@reactive.Effect()
@reactive.event(input.p1)
def start_pump_transmission():
"""
处理 p1 按钮点击事件,启动数据传输线程。
"""
y = yg.get() # 从 reactive value yg 获取数据
sflag.clear() # 启动前清除停止信号,确保线程可以运行
# 创建并启动新线程
timer_thread = th.Thread(target=rtimer, args=[y, sflag])
timer_thread.start()
# p2 按钮的响应函数:停止传输
@reactive.Effect()
@reactive.event(input.p2)
def stop_pump_transmission():
"""
处理 p2 按钮点击事件,设置停止信号并立即发送停止指令。
"""
sflag.set() # 设置停止信号,通知后台线程停止
ser.write(bytes("1:0",'utf-8')) # 立即发送停止泵的指令代码解释:
-
sflag = th.Event(): 创建一个 Event 对象,它包含一个内部标志,默认是 False。
- sflag.clear(): 将内部标志设置为 False。
- sflag.set(): 将内部标志设置为 True。
- sflag.is_set(): 检查内部标志是否为 True。
- transmit(e) 函数: 这是一个简单的辅助函数,用于格式化并发送串口消息。它与主线程或子线程的执行逻辑无关,因此可以被任一线程调用。
-
rtimer(y, sflag) 函数: 这是在独立线程中执行的核心逻辑。
- 它接收数据数组 y 和 sflag 作为参数。
- while i
- time.sleep(2): 在子线程中使用 time.sleep() 是安全的,因为它只会阻塞当前子线程,而不会阻塞主线程和 UI。
-
start_pump_transmission() (@reactive.event(input.p1)):
- 在启动新任务之前,调用 sflag.clear() 确保停止信号被清除,以便新线程能够正常运行。
- th.Thread(target=rtimer, args=[y, sflag]):创建一个新的线程实例,指定其目标函数为 rtimer,并将 y 和 sflag 作为参数传递给它。
- timer_thread.start():启动新线程。此时,rtimer 函数将在一个独立的后台线程中运行,而主线程则继续处理 Shiny 应用的 UI 事件。
-
stop_pump_transmission() (@reactive.event(input.p2)):
- sflag.set():当用户点击“停止”按钮时,主线程会立即执行此操作,设置 sflag 的内部标志为 True。
- 后台线程在下一次循环迭代时检查 sflag.is_set() 会发现标志已设置,从而跳出循环,实现任务的平滑终止。
- ser.write(bytes("1:0",'utf-8')):同时,主线程可以立即发送停止泵的串口指令,确保物理设备能尽快停止。
3. 优点与注意事项
优点:
- 保持 UI 响应性: 长时间运行的任务被移至后台线程,主线程不再被阻塞,Shiny 应用的 UI 保持流畅和响应。
- 即时中断: 用户可以随时点击“停止”按钮,后台任务会迅速响应停止信号并终止。
- 清晰的任务控制: threading.Event 提供了一种简单有效的线程间通信机制,用于控制后台任务的生命周期。
注意事项:
- 线程安全: 当多个线程访问和修改共享资源(如全局变量、数据库连接、串口对象)时,需要特别注意线程安全。在本例中,ser 对象在主线程和子线程中都被访问,但由于 transmit 函数和 stop_pump_transmission 函数是串行地对 ser 进行写操作(通常 ser.write 是原子操作或底层有锁),且 sflag 专门用于协调,因此风险较低。但在更复杂的场景中,可能需要使用 threading.Lock 来保护共享资源。
- 错误处理: 在后台线程中发生的异常不会自动传播到主线程。应在 rtimer 函数内部添加适当的 try-except 块来捕获和处理潜在的错误。
- 资源清理: 确保在应用关闭或任务结束后,正确关闭串口等资源。
- 替代方案:asyncio: 对于 I/O 密集型任务(如串口通信、网络请求),Python 的 asyncio 模块通常是比 threading 更现代、更高效的解决方案。然而,asyncio 需要整个应用架构都支持异步,如果现有代码是同步阻塞式的,使用 threading 可能是更直接的“打补丁”方式。Shiny for Python 本身是基于 asyncio 构建的,因此将同步阻塞任务放入线程是避免阻塞其事件循环的有效方法。
4. 总结
在 Shiny for Python 应用中,处理耗时或阻塞式操作的关键在于将其从主事件循环中分离。通过利用 Python 的 threading 模块,我们可以将这些任务放到独立的后台线程中执行,并使用 threading.Event 等机制进行线程间的有效通信,从而实现非阻塞的 UI 体验和对任务的精确控制。这种方法不仅解决了 UI 响应性问题,也使得应用能够更好地处理复杂的实时交互场景,如本例中对流体泵的即时启停控制。










