并发

当程序的不同部分可能在不同的时间或乱序执行时,就会发生并发。在嵌入式环境中,这包括

  • 中断处理程序,它会在关联的中断发生时运行,
  • 各种形式的多线程,其中微处理器会定期在程序的不同部分之间切换,
  • 在某些系统中,多核微处理器,其中每个核心可以同时独立运行程序的不同部分。

由于许多嵌入式程序需要处理中断,并发通常会迟早出现,这也是许多细微且难以调试的错误可能发生的地方。幸运的是,Rust 提供了一些抽象和安全保证,以帮助我们编写正确的代码。

没有并发

嵌入式程序最简单的并发方式是没有并发:您的软件由一个不断运行的单个主循环组成,并且根本没有中断。有时,这完全适合手头的问题!通常,您的循环会读取一些输入,执行一些处理,并写入一些输出。

#[entry]
fn main() {
    let peripherals = setup_peripherals();
    loop {
        let inputs = read_inputs(&peripherals);
        let outputs = process(inputs);
        write_outputs(&peripherals, outputs);
    }
}

由于没有并发,因此无需担心程序的不同部分之间共享数据或同步对外围设备的访问。如果可以使用这种简单的方法,那可能是一个很好的解决方案。

全局可变数据

与非嵌入式 Rust 不同,我们通常没有创建堆分配并将对该数据的引用传递到新创建的线程的奢侈条件。相反,我们的中断处理程序可能会随时被调用,并且必须知道如何访问我们正在使用的任何共享内存。在最低级别,这意味着我们必须具有静态分配的可变内存,中断处理程序和主代码都可以引用它。

在 Rust 中,这样的 static mut 变量总是可以安全地读取或写入的,因为如果不特别注意,您可能会触发竞争条件,即您对变量的访问在中途被中断访问该变量的中断打断。

为了说明此行为如何在代码中导致细微的错误,请考虑一个嵌入式程序,该程序计算每个一秒周期内某个输入信号的上升沿(频率计数器)。

static mut COUNTER: u32 = 0;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // DANGER - Not actually safe! Could cause data races.
            unsafe { COUNTER += 1 };
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    unsafe { COUNTER = 0; }
}

每一秒,定时器中断都会将计数器重置为 0。同时,主循环不断测量信号,并在看到从低到高的变化时增加计数器。我们必须使用 unsafe 来访问 COUNTER,因为它是 static mut,这意味着我们向编译器保证我们不会导致任何未定义的行为。您能发现竞争条件吗?COUNTER 上的增量不能保证是原子的 — 实际上,在大多数嵌入式平台上,它将被拆分为加载、然后增量、然后存储。如果中断在加载后但在存储之前触发,则中断返回后,重置为 0 将被忽略 — 并且该周期我们将计数两次转换。

临界区

那么,我们如何处理数据竞争呢?一种简单的方法是使用临界区,即禁用中断的上下文。通过将 main 中对 COUNTER 的访问包装在临界区中,我们可以确保在我们完成增加 COUNTER 之前,定时器中断不会触发。

static mut COUNTER: u32 = 0;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // New critical section ensures synchronised access to COUNTER
            cortex_m::interrupt::free(|_| {
                unsafe { COUNTER += 1 };
            });
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    unsafe { COUNTER = 0; }
}

在此示例中,我们使用 cortex_m::interrupt::free,但其他平台将具有类似的机制,用于在临界区中执行代码。这与禁用中断、运行一些代码,然后重新启用中断相同。

请注意,我们不需要在定时器中断内放置临界区,原因有两个

  • COUNTER 写入 0 不会受到竞争的影响,因为我们不读取它
  • 它永远不会被 main 线程中断

如果 COUNTER 由可能抢占彼此的多个中断处理程序共享,那么每个中断处理程序也可能需要一个临界区。

这解决了我们眼前的问题,但是我们仍然需要编写很多需要仔细推理的 unsafe 代码,而且我们可能会不必要地使用临界区。由于每个临界区都会暂时暂停中断处理,因此会产生一些额外的代码大小和更高的中断延迟和抖动(中断可能需要更长的时间才能处理,并且处理它们的时间会更加多变)的关联成本。这是否是一个问题取决于您的系统,但总的来说,我们希望避免它。

值得注意的是,虽然临界区保证不会触发中断,但它不提供多核系统的独占保证!即使没有中断,另一个核心也可以愉快地访问与您的核心相同的内存。如果您正在使用多个核心,则需要更强的同步原语。

原子访问

在某些平台上,可以使用特殊的原子指令,这些指令提供有关读取-修改-写入操作的保证。特别是对于 Cortex-M:thumbv6(Cortex-M0、Cortex-M0+)仅提供原子加载和存储指令,而 thumbv7(Cortex-M3 及更高版本)提供完整的比较和交换 (CAS) 指令。这些 CAS 指令为强制禁用所有中断提供了一种替代方案:我们可以尝试增量,它在大多数情况下会成功,但如果它被中断,它会自动重试整个增量操作。这些原子操作即使在多个核心之间也是安全的。

use core::sync::atomic::{AtomicUsize, Ordering};

static COUNTER: AtomicUsize = AtomicUsize::new(0);

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // Use `fetch_add` to atomically add 1 to COUNTER
            COUNTER.fetch_add(1, Ordering::Relaxed);
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // Use `store` to write 0 directly to COUNTER
    COUNTER.store(0, Ordering::Relaxed)
}

这一次,COUNTER 是一个安全的 static 变量。由于 AtomicUsize 类型,COUNTER 可以从中断处理程序和主线程安全地修改,而无需禁用中断。如果可能,这是一个更好的解决方案 — 但您的平台可能不支持它。

关于 Ordering 的说明:这会影响编译器和硬件如何重新排列指令,并且还会对缓存可见性产生影响。假设目标是单核平台,Relaxed 在这种特定情况下就足够了,并且是最有效的选择。更严格的排序将导致编译器在原子操作周围发出内存屏障;根据您使用原子操作的目的,您可能需要也可能不需要它!原子模型的精确细节很复杂,最好在其他地方描述。

有关原子和排序的更多详细信息,请参阅 nomicon

抽象、Send 和 Sync

以上所有解决方案都不是特别令人满意。它们需要必须非常仔细检查且不符合人体工程学的 unsafe 块。当然,我们可以在 Rust 中做得更好!

我们可以将计数器抽象为一个安全接口,该接口可以在代码中的任何其他地方安全使用。对于此示例,我们将使用临界区计数器,但您可以使用原子执行非常相似的操作。

use core::cell::UnsafeCell;
use cortex_m::interrupt;

// Our counter is just a wrapper around UnsafeCell<u32>, which is the heart
// of interior mutability in Rust. By using interior mutability, we can have
// COUNTER be `static` instead of `static mut`, but still able to mutate
// its counter value.
struct CSCounter(UnsafeCell<u32>);

const CS_COUNTER_INIT: CSCounter = CSCounter(UnsafeCell::new(0));

impl CSCounter {
    pub fn reset(&self, _cs: &interrupt::CriticalSection) {
        // By requiring a CriticalSection be passed in, we know we must
        // be operating inside a CriticalSection, and so can confidently
        // use this unsafe block (required to call UnsafeCell::get).
        unsafe { *self.0.get() = 0 };
    }

    pub fn increment(&self, _cs: &interrupt::CriticalSection) {
        unsafe { *self.0.get() += 1 };
    }
}

// Required to allow static CSCounter. See explanation below.
unsafe impl Sync for CSCounter {}

// COUNTER is no longer `mut` as it uses interior mutability;
// therefore it also no longer requires unsafe blocks to access.
static COUNTER: CSCounter = CS_COUNTER_INIT;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // No unsafe here!
            interrupt::free(|cs| COUNTER.increment(cs));
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // We do need to enter a critical section here just to obtain a valid
    // cs token, even though we know no other interrupt could pre-empt
    // this one.
    interrupt::free(|cs| COUNTER.reset(cs));

    // We could use unsafe code to generate a fake CriticalSection if we
    // really wanted to, avoiding the overhead:
    // let cs = unsafe { interrupt::CriticalSection::new() };
}

我们已将 unsafe 代码移到我们精心计划的抽象内部,现在我们的应用程序代码不包含任何 unsafe 块。

此设计要求应用程序传入一个 CriticalSection 令牌:这些令牌只能由 interrupt::free 安全地生成,因此通过要求传入一个令牌,我们确保我们在临界区内操作,而无需实际自己执行锁。此保证由编译器静态提供:不会有任何与 cs 关联的运行时开销。如果我们有多个计数器,则可以为它们全部提供相同的 cs,而无需多个嵌套的临界区。

这也引出了 Rust 中并发的一个重要主题:SendSync 特性。总结 Rust 书籍,当一个类型可以安全地移动到另一个线程时,它就是 Send,而当它可以安全地在多个线程之间共享时,它就是 Sync。在嵌入式环境中,我们认为中断是在与应用程序代码不同的线程中执行的,因此中断和主代码都访问的变量必须是 Sync。

对于 Rust 中的大多数类型,编译器会自动为您派生这两个特性。但是,由于 CSCounter 包含 UnsafeCell,因此它不是 Sync,因此我们无法创建 static CSCounterstatic 变量必须是 Sync,因为它们可以被多个线程访问。

为了告诉编译器我们已经注意到了 CSCounter 实际上可以在线程之间安全地共享,我们显式实现了 Sync 特性。与之前使用临界区一样,这仅在单核平台上是安全的:对于多核,您需要采取更多措施来确保安全。

互斥锁

我们为我们的计数器问题创建了一个有用的特定抽象,但是有许多用于并发的常见抽象。

一种这样的同步原语是互斥锁,它是互斥的缩写。这些结构确保对变量(例如我们的计数器)的独占访问。线程可以尝试锁定(或获取)互斥锁,并且要么立即成功,要么阻塞等待获取锁,要么返回无法锁定互斥锁的错误。当该线程持有锁时,它将被授予对受保护数据的访问权限。当线程完成时,它会解锁(或释放)互斥锁,允许另一个线程锁定它。在 Rust 中,我们通常使用 Drop 特性来实现解锁,以确保它在互斥锁超出范围时始终被释放。

在中断处理程序中使用互斥锁可能会很棘手:中断处理程序通常不可接受阻塞,并且阻塞等待主线程释放锁会尤其灾难性,因为那样我们会死锁(主线程将永远不会释放锁,因为执行会停留在中断处理程序中)。死锁不被认为是不安全的:即使在安全的 Rust 中也可能发生。

为了完全避免这种行为,我们可以实现一个需要临界区锁定的互斥锁,就像我们的计数器示例一样。只要临界区必须持续与锁定时间一样长,我们就可以确保我们对包装的变量拥有独占访问权限,而无需甚至跟踪互斥锁的锁定/解锁状态。

实际上,cortex_m crate 为我们完成了此操作!我们可以使用它来编写我们的计数器

use core::cell::Cell;
use cortex_m::interrupt::Mutex;

static COUNTER: Mutex<Cell<u32>> = Mutex::new(Cell::new(0));

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            interrupt::free(|cs|
                COUNTER.borrow(cs).set(COUNTER.borrow(cs).get() + 1));
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // We still need to enter a critical section here to satisfy the Mutex.
    interrupt::free(|cs| COUNTER.borrow(cs).set(0));
}

我们现在使用 Cell,它与其兄弟 RefCell 一起用于提供安全的内部可变性。我们已经了解了 UnsafeCell,它是 Rust 中内部可变性的最底层:它允许你获取对其值的多个可变引用,但只能通过不安全的代码来实现。 Cell 类似于 UnsafeCell,但它提供了安全的接口:它只允许获取当前值的副本或替换它,而不是获取引用,并且由于它不是 Sync 的,因此不能在线程之间共享。这些约束意味着可以安全地使用它,但我们不能直接在 static 变量中使用它,因为 static 必须是 Sync 的。

那么,为什么上面的例子能工作呢? Mutex<T> 为任何 Send 的 T(例如 Cell)实现了 Sync。它可以安全地做到这一点,因为它只在临界区期间提供对其内容的访问。因此,我们能够获得一个安全的计数器,而无需任何不安全的代码!

这对于像我们计数器的 u32 这样的简单类型来说很棒,但是对于更复杂且不是 Copy 的类型呢?嵌入式环境中一个非常常见的例子是外围设备结构体,它通常不是 Copy 的。为此,我们可以转向 RefCell

共享外围设备

使用 svd2rust 和类似抽象生成的设备 crate 通过强制同一时间只能存在外围设备结构体的一个实例来提供对外围设备的安全访问。这确保了安全性,但使得从主线程和中断处理程序访问外围设备变得困难。

为了安全地共享外围设备访问,我们可以使用之前看到的 Mutex。我们还需要使用 RefCell,它使用运行时检查来确保一次只发出对外围设备的单个引用。这比普通的 Cell 有更多的开销,但由于我们发出的是引用而不是副本,我们必须确保一次只有一个存在。

最后,我们还必须考虑如何在主代码中初始化后将外围设备移动到共享变量中。为此,我们可以使用 Option 类型,将其初始化为 None,并在以后设置为外围设备的实例。

use core::cell::RefCell;
use cortex_m::interrupt::{self, Mutex};
use stm32f4::stm32f405;

static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
    Mutex::new(RefCell::new(None));

#[entry]
fn main() -> ! {
    // Obtain the peripheral singletons and configure it.
    // This example is from an svd2rust-generated crate, but
    // most embedded device crates will be similar.
    let dp = stm32f405::Peripherals::take().unwrap();
    let gpioa = &dp.GPIOA;

    // Some sort of configuration function.
    // Assume it sets PA0 to an input and PA1 to an output.
    configure_gpio(gpioa);

    // Store the GPIOA in the mutex, moving it.
    interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));
    // We can no longer use `gpioa` or `dp.GPIOA`, and instead have to
    // access it via the mutex.

    // Be careful to enable the interrupt only after setting MY_GPIO:
    // otherwise the interrupt might fire while it still contains None,
    // and as-written (with `unwrap()`), it would panic.
    set_timer_1hz();
    let mut last_state = false;
    loop {
        // We'll now read state as a digital input, via the mutex
        let state = interrupt::free(|cs| {
            let gpioa = MY_GPIO.borrow(cs).borrow();
            gpioa.as_ref().unwrap().idr.read().idr0().bit_is_set()
        });

        if state && !last_state {
            // Set PA1 high if we've seen a rising edge on PA0.
            interrupt::free(|cs| {
                let gpioa = MY_GPIO.borrow(cs).borrow();
                gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
            });
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // This time in the interrupt we'll just clear PA0.
    interrupt::free(|cs| {
        // We can use `unwrap()` because we know the interrupt wasn't enabled
        // until after MY_GPIO was set; otherwise we should handle the potential
        // for a None value.
        let gpioa = MY_GPIO.borrow(cs).borrow();
        gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().clear_bit());
    });
}

这需要理解很多内容,所以让我们分解一下重要的行。

static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
    Mutex::new(RefCell::new(None));

我们的共享变量现在是一个围绕 RefCellMutex,它包含一个 OptionMutex 确保我们只在临界区期间有访问权限,因此即使普通的 RefCell 不是 Sync 的,它也使该变量成为 Sync 的。 RefCell 为我们提供了带引用的内部可变性,我们需要它来使用我们的 GPIOAOption 让我们将此变量初始化为空,并且稍后才能实际将变量移入。我们不能静态地访问外围设备单例,只能在运行时访问,所以这是必需的。

interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));

在临界区内,我们可以调用 mutex 上的 borrow(),这将为我们提供对 RefCell 的引用。然后,我们调用 replace() 将新值移入 RefCell

interrupt::free(|cs| {
    let gpioa = MY_GPIO.borrow(cs).borrow();
    gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
});

最后,我们以安全且并发的方式使用 MY_GPIO。临界区阻止中断像往常一样触发,并让我们借用 mutex。然后 RefCell 为我们提供一个 &Option<GPIOA>,并跟踪它被借用多长时间 - 一旦该引用超出范围,RefCell 将被更新以指示它不再被借用。

由于我们无法将 GPIOA 移出 &Option,我们需要使用 as_ref() 将其转换为 &Option<&GPIOA>,最后我们可以 unwrap() 来获取允许我们修改外围设备的 &GPIOA

如果我们需要对共享资源的可变引用,则应使用 borrow_mutderef_mut。以下代码显示了使用 TIM2 定时器的示例。

use core::cell::RefCell;
use core::ops::DerefMut;
use cortex_m::interrupt::{self, Mutex};
use cortex_m::asm::wfi;
use stm32f4::stm32f405;

static G_TIM: Mutex<RefCell<Option<Timer<stm32::TIM2>>>> =
	Mutex::new(RefCell::new(None));

#[entry]
fn main() -> ! {
    let mut cp = cm::Peripherals::take().unwrap();
    let dp = stm32f405::Peripherals::take().unwrap();

    // Some sort of timer configuration function.
    // Assume it configures the TIM2 timer, its NVIC interrupt,
    // and finally starts the timer.
    let tim = configure_timer_interrupt(&mut cp, dp);

    interrupt::free(|cs| {
        G_TIM.borrow(cs).replace(Some(tim));
    });

    loop {
        wfi();
    }
}

#[interrupt]
fn timer() {
    interrupt::free(|cs| {
        if let Some(ref mut tim)) =  G_TIM.borrow(cs).borrow_mut().deref_mut() {
            tim.start(1.hz());
        }
    });
}

哇!这是安全的,但也有点笨拙。还有其他我们可以做的事情吗?

RTIC

一种替代方案是 RTIC 框架,它是 Real Time Interrupt-driven Concurrency 的缩写。它强制执行静态优先级并跟踪对 static mut 变量(“资源”)的访问,以静态地确保始终安全地访问共享资源,而无需始终进入临界区和使用引用计数(如 RefCell 中)的开销。这具有许多优点,例如保证没有死锁,并提供极低的时间和内存开销。

该框架还包括其他功能,如消息传递,这减少了对显式共享状态的需求,以及在给定时间安排任务运行的能力,这可用于实现定期任务。查看 文档 以获取更多信息!

实时操作系统

嵌入式并发的另一个常见模型是实时操作系统(RTOS)。虽然目前在 Rust 中探索较少,但它们在传统的嵌入式开发中得到了广泛使用。开源示例包括 FreeRTOSChibiOS。这些 RTOS 提供对运行多个应用程序线程的支持,CPU 在线程放弃控制(称为协同多任务处理)或基于常规计时器或中断(抢占式多任务处理)时在它们之间切换。 RTOS 通常提供互斥锁和其他同步原语,并且经常与硬件功能(例如 DMA 引擎)互操作。

在撰写本文时,没有太多 Rust RTOS 示例可以参考,但这是一个有趣的领域,所以请关注这方面!

多核

在嵌入式处理器中使用两个或多个内核变得越来越普遍,这为并发增加了额外的复杂性。所有使用临界区的示例(包括 cortex_m::interrupt::Mutex)都假设唯一其他执行线程是中断线程,但在多核系统上,情况不再如此。相反,我们需要为多核(也称为 SMP,即对称多处理)设计的同步原语。

这些通常使用我们之前看到的原子指令,因为处理系统将确保在所有内核上保持原子性。

详细介绍这些主题目前超出了本书的范围,但一般模式与单核情况相同。