并发

只要程序的不同部分可能在不同的时间或不同顺序执行,就会发生并发。在嵌入式环境中,这包括

  • 中断处理程序,它们在相关中断发生时运行,
  • 各种形式的多线程,其中您的微处理器定期在程序的不同部分之间切换,
  • 以及在某些系统中,多核微处理器,其中每个内核可以独立地同时运行程序的不同部分。

由于许多嵌入式程序需要处理中断,因此并发通常迟早会出现,这也是许多微妙且难以发现的错误可能发生的地方。幸运的是,Rust 提供了许多抽象和安全保证来帮助我们编写正确的代码。

无并发

对于嵌入式程序来说,最简单的并发是无并发:您的软件由一个单一的循环组成,该循环一直运行,并且根本没有中断。有时这完全适合手头的任务!通常,您的循环将读取一些输入,执行一些处理,并写入一些输出。

#[entry]
fn main() {
    let peripherals = setup_peripherals();
    loop {
        let inputs = read_inputs(&peripherals);
        let outputs = process(inputs);
        write_outputs(&peripherals, outputs);
    }
}

由于没有并发,因此无需担心在程序的不同部分之间共享数据或同步对外设的访问。如果您能够使用这种简单的方法,这可能是一个很好的解决方案。

全局可变数据

与非嵌入式 Rust 不同,我们通常不会有创建堆分配并将对该数据的引用传递到新创建的线程的奢侈。相反,我们的中断处理程序可能在任何时候被调用,并且必须知道如何访问我们正在使用的任何共享内存。在最低级别,这意味着我们必须拥有 *静态分配* 的可变内存,中断处理程序和主代码都可以引用它。

在 Rust 中,这种 static mut 变量始终不安全读取或写入,因为如果不特别注意,您可能会触发竞争条件,其中您对变量的访问在中途被也访问该变量的中断中断。

为了举例说明这种行为如何在您的代码中导致微妙的错误,请考虑一个嵌入式程序,它在每个一秒钟的时间段内计算某个输入信号的上升沿(频率计数器)

static mut COUNTER: u32 = 0;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // DANGER - Not actually safe! Could cause data races.
            unsafe { COUNTER += 1 };
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    unsafe { COUNTER = 0; }
}

每秒,定时器中断将计数器重置为 0。同时,主循环不断测量信号,并在看到从低到高的变化时递增计数器。我们必须使用 unsafe 来访问 COUNTER,因为它是一个 static mut,这意味着我们向编译器承诺我们不会导致任何未定义的行为。你能发现竞争条件吗?对 COUNTER 的递增 *不* 保证是原子的——事实上,在大多数嵌入式平台上,它将被拆分为加载、然后递增、然后存储。如果中断在加载之后但存储之前触发,则重置为 0 将在中断返回后被忽略——并且我们将在该时间段内计算两倍的转换次数。

临界区

那么,我们如何处理数据竞争呢?一个简单的方法是使用 *临界区*,这是一个中断被禁用的上下文。通过将对 mainCOUNTER 的访问包装在临界区中,我们可以确保定时器中断在我们完成递增 COUNTER 之前不会触发

static mut COUNTER: u32 = 0;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // New critical section ensures synchronised access to COUNTER
            cortex_m::interrupt::free(|_| {
                unsafe { COUNTER += 1 };
            });
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    unsafe { COUNTER = 0; }
}

在这个例子中,我们使用 cortex_m::interrupt::free,但其他平台将有类似的机制来在临界区中执行代码。这也与禁用中断、运行一些代码,然后重新启用中断相同。

请注意,我们不需要在定时器中断中放置临界区,原因有两个

  • 将 0 写入 COUNTER 不会受到竞争的影响,因为我们没有读取它
  • 它永远不会被 main 线程中断

如果 COUNTER 被多个可能 *抢占* 彼此的中断处理程序共享,那么每个中断处理程序可能也需要一个临界区。

这解决了我们的直接问题,但我们仍然留下了很多不安全的代码,我们需要仔细推理,并且我们可能在不必要地使用临界区。由于每个临界区都会暂时暂停中断处理,因此会产生一些额外的代码大小以及更高的中断延迟和抖动(中断可能需要更长时间才能处理,并且它们被处理的时间将更加可变)。这是否是一个问题取决于您的系统,但总的来说,我们希望避免它。

值得注意的是,虽然临界区保证不会触发中断,但它在多核系统上不提供排他性保证!另一个内核可以愉快地访问与您的内核相同的内存,即使没有中断。如果您使用多个内核,您将需要更强大的同步原语。

原子访问

在某些平台上,可以使用特殊的原子指令,这些指令提供关于读-修改-写操作的保证。特别是对于 Cortex-M:thumbv6(Cortex-M0、Cortex-M0+)只提供原子加载和存储指令,而 thumbv7(Cortex-M3 及更高版本)提供完整的比较和交换 (CAS) 指令。这些 CAS 指令为禁用所有中断的繁重方法提供了一种替代方案:我们可以尝试递增,它在大多数情况下都会成功,但如果它被中断,它会自动重试整个递增操作。这些原子操作即使跨多个内核也是安全的。

use core::sync::atomic::{AtomicUsize, Ordering};

static COUNTER: AtomicUsize = AtomicUsize::new(0);

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // Use `fetch_add` to atomically add 1 to COUNTER
            COUNTER.fetch_add(1, Ordering::Relaxed);
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // Use `store` to write 0 directly to COUNTER
    COUNTER.store(0, Ordering::Relaxed)
}

这次 COUNTER 是一个安全的 static 变量。由于 AtomicUsize 类型,COUNTER 可以安全地从中断处理程序和主线程修改,而无需禁用中断。在可能的情况下,这是一个更好的解决方案——但它可能不受您的平台支持。

关于 Ordering 的说明:它会影响编译器和硬件如何重新排序指令,并且还会对缓存可见性产生影响。假设目标是单核平台,Relaxed 就足够了,并且是这种特定情况下的最有效选择。更严格的排序会导致编译器在原子操作周围发出内存屏障;根据您使用原子操作的目的,您可能需要或不需要这样做!原子模型的精确细节很复杂,最好在其他地方描述。

有关原子和排序的更多详细信息,请参阅 nomicon

抽象、Send 和 Sync

以上所有解决方案都不是特别令人满意。它们需要 unsafe 代码块,这些代码块必须非常仔细地检查,并且不符合人体工程学。我们肯定可以在 Rust 中做得更好!

我们可以将计数器抽象成一个安全的接口,该接口可以在我们代码中的任何其他地方安全地使用。对于此示例,我们将使用临界区计数器,但您可以使用原子操作做类似的事情。

use core::cell::UnsafeCell;
use cortex_m::interrupt;

// Our counter is just a wrapper around UnsafeCell<u32>, which is the heart
// of interior mutability in Rust. By using interior mutability, we can have
// COUNTER be `static` instead of `static mut`, but still able to mutate
// its counter value.
struct CSCounter(UnsafeCell<u32>);

const CS_COUNTER_INIT: CSCounter = CSCounter(UnsafeCell::new(0));

impl CSCounter {
    pub fn reset(&self, _cs: &interrupt::CriticalSection) {
        // By requiring a CriticalSection be passed in, we know we must
        // be operating inside a CriticalSection, and so can confidently
        // use this unsafe block (required to call UnsafeCell::get).
        unsafe { *self.0.get() = 0 };
    }

    pub fn increment(&self, _cs: &interrupt::CriticalSection) {
        unsafe { *self.0.get() += 1 };
    }
}

// Required to allow static CSCounter. See explanation below.
unsafe impl Sync for CSCounter {}

// COUNTER is no longer `mut` as it uses interior mutability;
// therefore it also no longer requires unsafe blocks to access.
static COUNTER: CSCounter = CS_COUNTER_INIT;

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            // No unsafe here!
            interrupt::free(|cs| COUNTER.increment(cs));
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // We do need to enter a critical section here just to obtain a valid
    // cs token, even though we know no other interrupt could pre-empt
    // this one.
    interrupt::free(|cs| COUNTER.reset(cs));

    // We could use unsafe code to generate a fake CriticalSection if we
    // really wanted to, avoiding the overhead:
    // let cs = unsafe { interrupt::CriticalSection::new() };
}

我们将 unsafe 代码移到了我们精心设计的抽象内部,现在我们的应用程序代码不包含任何 unsafe 代码块。

此设计要求应用程序传入一个 CriticalSection 令牌:这些令牌只能由 interrupt::free 安全地生成,因此通过要求传入一个令牌,我们确保我们在临界区内操作,而无需实际执行锁定操作。此保证由编译器静态提供:与 cs 相关的任何运行时开销都不会出现。如果我们有多个计数器,它们都可以使用相同的 cs,而无需多个嵌套的临界区。

这也引出了 Rust 中并发的一个重要主题:SendSync 特性。为了总结 Rust 手册,当一个类型可以安全地移动到另一个线程时,它就是 Send,而当它可以安全地在多个线程之间共享时,它就是 Sync。在嵌入式环境中,我们将中断视为在与应用程序代码不同的线程中执行,因此由中断和主代码访问的变量必须是 Sync。

对于 Rust 中的大多数类型,这两个特性都是由编译器自动为您推导的。但是,因为 CSCounter 包含一个 UnsafeCell,所以它不是 Sync,因此我们不能创建一个 static CSCounterstatic 变量必须是 Sync,因为它们可以被多个线程访问。

为了告诉编译器我们已经注意到了 CSCounter 实际上是线程安全的,我们显式地实现了 Sync 特性。与之前使用临界区一样,这在单核平台上才是安全的:在多核平台上,您需要采取更多措施来确保安全。

互斥锁

我们创建了一个针对计数器问题的有用抽象,但还有许多用于并发的常见抽象。

其中一个同步原语是互斥锁,简称互斥。这些结构确保对变量(例如我们的计数器)的独占访问。一个线程可以尝试锁定(或获取)互斥锁,并且要么立即成功,要么阻塞等待锁定被获取,要么返回一个错误,表明互斥锁无法被锁定。当该线程持有锁定时,它被授予对受保护数据的访问权限。当线程完成时,它解锁(或释放)互斥锁,允许另一个线程锁定它。在 Rust 中,我们通常使用 Drop 特性来实现解锁,以确保在互斥锁超出范围时始终释放它。

在中断处理程序中使用互斥锁可能很棘手:中断处理程序通常不允许阻塞,并且如果它阻塞等待主线程释放锁定,这将是灾难性的,因为我们随后会死锁(主线程永远不会释放锁定,因为执行会一直停留在中断处理程序中)。死锁不被认为是不安全的:即使在安全的 Rust 中也是可能的。

为了完全避免这种行为,我们可以实现一个需要临界区才能锁定的互斥锁,就像我们的计数器示例一样。只要临界区必须持续到锁定结束,我们就可以确定我们对包装的变量具有独占访问权限,甚至不需要跟踪互斥锁的锁定/解锁状态。

这实际上是在 cortex_m crate 中为我们完成的!我们可以使用它来编写我们的计数器

use core::cell::Cell;
use cortex_m::interrupt::Mutex;

static COUNTER: Mutex<Cell<u32>> = Mutex::new(Cell::new(0));

#[entry]
fn main() -> ! {
    set_timer_1hz();
    let mut last_state = false;
    loop {
        let state = read_signal_level();
        if state && !last_state {
            interrupt::free(|cs|
                COUNTER.borrow(cs).set(COUNTER.borrow(cs).get() + 1));
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // We still need to enter a critical section here to satisfy the Mutex.
    interrupt::free(|cs| COUNTER.borrow(cs).set(0));
}

我们现在使用 Cell,它与它的兄弟 RefCell 一起用于提供安全的内部可变性。我们已经看到了 UnsafeCell,它是 Rust 中内部可变性的底层:它允许您获取多个对它的值的可变引用,但只能使用不安全的代码。Cell 就像一个 UnsafeCell,但它提供了一个安全的接口:它只允许获取当前值的副本或替换它,而不是获取引用,并且因为它不是 Sync,所以它不能在多个线程之间共享。这些约束意味着它使用起来是安全的,但我们不能直接在 static 变量中使用它,因为 static 必须是 Sync。

那么上面的例子为什么有效呢?Mutex<T> 为任何是 Send 的 T(例如 Cell)实现了 Sync。它可以安全地做到这一点,因为它只在临界区内提供对内容的访问。因此,我们能够获得一个没有不安全代码的安全计数器!

这对于像计数器的 u32 这样的简单类型来说很棒,但对于更复杂的类型(不是 Copy)呢?在嵌入式环境中,一个非常常见的例子是外设结构,它通常不是 Copy。为此,我们可以转向 RefCell

共享外设

使用 svd2rust 和类似抽象生成的设备 crate 通过强制外设结构一次只能存在一个实例来提供对外设的安全访问。这确保了安全性,但使得从主线程和中断处理程序访问外设变得困难。

为了安全地共享外设访问,我们可以使用之前看到的 Mutex。我们还需要使用 RefCell,它使用运行时检查来确保一次只发出一个对外设的引用。这比普通的 Cell 有更高的开销,但由于我们发出的是引用而不是副本,因此我们必须确保一次只存在一个引用。

最后,我们还需要考虑如何在主代码中初始化外设后将其移动到共享变量中。为此,我们可以使用 Option 类型,将其初始化为 None,然后将其设置为外设实例。

use core::cell::RefCell;
use cortex_m::interrupt::{self, Mutex};
use stm32f4::stm32f405;

static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
    Mutex::new(RefCell::new(None));

#[entry]
fn main() -> ! {
    // Obtain the peripheral singletons and configure it.
    // This example is from an svd2rust-generated crate, but
    // most embedded device crates will be similar.
    let dp = stm32f405::Peripherals::take().unwrap();
    let gpioa = &dp.GPIOA;

    // Some sort of configuration function.
    // Assume it sets PA0 to an input and PA1 to an output.
    configure_gpio(gpioa);

    // Store the GPIOA in the mutex, moving it.
    interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));
    // We can no longer use `gpioa` or `dp.GPIOA`, and instead have to
    // access it via the mutex.

    // Be careful to enable the interrupt only after setting MY_GPIO:
    // otherwise the interrupt might fire while it still contains None,
    // and as-written (with `unwrap()`), it would panic.
    set_timer_1hz();
    let mut last_state = false;
    loop {
        // We'll now read state as a digital input, via the mutex
        let state = interrupt::free(|cs| {
            let gpioa = MY_GPIO.borrow(cs).borrow();
            gpioa.as_ref().unwrap().idr.read().idr0().bit_is_set()
        });

        if state && !last_state {
            // Set PA1 high if we've seen a rising edge on PA0.
            interrupt::free(|cs| {
                let gpioa = MY_GPIO.borrow(cs).borrow();
                gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
            });
        }
        last_state = state;
    }
}

#[interrupt]
fn timer() {
    // This time in the interrupt we'll just clear PA0.
    interrupt::free(|cs| {
        // We can use `unwrap()` because we know the interrupt wasn't enabled
        // until after MY_GPIO was set; otherwise we should handle the potential
        // for a None value.
        let gpioa = MY_GPIO.borrow(cs).borrow();
        gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().clear_bit());
    });
}

这些内容很多,让我们分解一下重要的行。

static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
    Mutex::new(RefCell::new(None));

我们的共享变量现在是一个围绕 RefCellMutex,它包含一个 OptionMutex 确保我们只在临界区内有访问权限,因此即使普通的 RefCell 不是 Sync,它也会使变量成为 Sync。RefCell 为我们提供了内部可变性,并带有引用,我们将在使用 GPIOA 时需要它。Option 允许我们将此变量初始化为空,然后才实际将变量移入。我们不能静态地访问外设单例,只能在运行时访问,因此这是必需的。

interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));

在临界区内,我们可以对互斥锁调用 borrow(),这将为我们提供对 RefCell 的引用。然后,我们调用 replace() 将我们的新值移动到 RefCell 中。

interrupt::free(|cs| {
    let gpioa = MY_GPIO.borrow(cs).borrow();
    gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
});

最后,我们以安全且并发的方式使用 MY_GPIO。临界区阻止中断像往常一样触发,并允许我们借用互斥锁。然后,RefCell 为我们提供一个 &Option<GPIOA>,并跟踪它被借用的时间长度 - 一旦该引用超出范围,RefCell 将被更新以表明它不再被借用。

由于我们不能将 GPIOA&Option 中移出,因此我们需要使用 as_ref() 将其转换为 &Option<&GPIOA>,最后我们可以 unwrap() 来获取 &GPIOA,这让我们可以修改外设。

如果我们需要对共享资源的可变引用,那么应该使用 borrow_mutderef_mut。以下代码展示了使用 TIM2 定时器的示例。

use core::cell::RefCell;
use core::ops::DerefMut;
use cortex_m::interrupt::{self, Mutex};
use cortex_m::asm::wfi;
use stm32f4::stm32f405;

static G_TIM: Mutex<RefCell<Option<Timer<stm32::TIM2>>>> =
	Mutex::new(RefCell::new(None));

#[entry]
fn main() -> ! {
    let mut cp = cm::Peripherals::take().unwrap();
    let dp = stm32f405::Peripherals::take().unwrap();

    // Some sort of timer configuration function.
    // Assume it configures the TIM2 timer, its NVIC interrupt,
    // and finally starts the timer.
    let tim = configure_timer_interrupt(&mut cp, dp);

    interrupt::free(|cs| {
        G_TIM.borrow(cs).replace(Some(tim));
    });

    loop {
        wfi();
    }
}

#[interrupt]
fn timer() {
    interrupt::free(|cs| {
        if let Some(ref mut tim)) =  G_TIM.borrow(cs).borrow_mut().deref_mut() {
            tim.start(1.hz());
        }
    });
}

哇!这是安全的,但也有些笨拙。我们还能做些什么呢?

RTIC

一种替代方案是 RTIC 框架,它是实时中断驱动并发的缩写。它强制执行静态优先级并跟踪对 static mut 变量(“资源”)的访问,以静态地确保始终安全地访问共享资源,而无需始终进入临界区和使用引用计数(如 RefCell)的开销。这有许多优点,例如保证没有死锁,并提供极低的时延和内存开销。

该框架还包括其他功能,例如消息传递,这减少了对显式共享状态的需求,以及在给定时间安排任务运行的能力,这可以用来实现周期性任务。查看 文档以获取更多信息!

实时操作系统

嵌入式并发的另一种常见模型是实时操作系统 (RTOS)。虽然目前在 Rust 中的探索较少,但它们在传统的嵌入式开发中被广泛使用。开源示例包括 FreeRTOSChibiOS。这些 RTOS 提供对运行多个应用程序线程的支持,CPU 在这些线程之间切换,要么是当线程让出控制权时(称为协作式多任务),要么是基于定期计时器或中断时(抢占式多任务)。RTOS 通常提供互斥锁和其他同步原语,并且通常与硬件功能(如 DMA 引擎)互操作。

在撰写本文时,没有多少 Rust RTOS 示例可以指出,但这是一个有趣的领域,所以请关注这个领域!

多个内核

在嵌入式处理器中拥有两个或多个内核变得越来越普遍,这为并发增加了额外的复杂性。所有使用临界区的示例(包括 cortex_m::interrupt::Mutex)都假设唯一的其他执行线程是中断线程,但在多核系统上,情况不再如此。相反,我们将需要为多个内核设计的同步原语(也称为 SMP,即对称多处理)。

这些通常使用我们之前看到的原子指令,因为处理系统将确保在所有内核上保持原子性。

详细介绍这些主题目前超出了本书的范围,但总体模式与单核情况相同。