Pintos 实验分析 1.4

Pintos 最后的一个实验。

实验目的

  • 修改 Pintos 源码,实现对条件变量发出信号后,系统将唤醒等待该条件信号的优先级最高的线程。
  • 增添 Pintos 源码,实现使用整型数模拟实数四则运算。
  • 修改 Pintos 源码,实现统计每个线程的占用 CPU 的时间(recent_cpu),并根据占用 CPU 的时间、每个线程的 nice 值和系统的平均符合,实时更新所有线程的优先级,实现多级反馈调度。

实验过程

实验分析

条件变量及其顺序唤醒

条件变量(condition variable)是在互斥锁的保护下的利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作,等待和通知。

void
cond_wait (struct condition *cond, struct lock *lock)
{
  struct semaphore_elem waiter;
  sema_init (&waiter.semaphore, 0);
  list_push_back (&cond->waiters, &waiter.elem);
  lock_release (lock);
  sema_down (&waiter.semaphore);
  lock_acquire (lock);
}

等待操作,即线程等待被通知。 1. 首先会初始化一个信号量 waiter 为 0 2. 用 waiters 这个队列记录等待通知的线程 3. 释放拥有的互斥锁 4. 执行 P 操作,线程被休眠 5. 线程被通知后,会重新获得互斥锁,然后退出这个函数。

void
cond_signal (struct condition *cond, struct lock *lock UNUSED)
{
  if (!list_empty (&cond->waiters))
  {
    sema_up (&list_entry (list_pop_front (&cond->waiters), struct semaphore_elem, elem)->semaphore);
  }
}

通知操作,即通知一个线程唤醒。通过对等待队列里面的信号量进行 V 操作来唤醒等待的线程,按照 Pintos 的设计,需要唤醒优先级最高的线程。

所以在这里,我的设计思路是在唤醒之前,对整个队列按线程优先级进行排序,这样就可以保证唤醒的是优先级最高的线程。

首先需要编写比较函数,由于 waiters 队列里面的元素是信号量,需要通过信号量的 waiters 队列里面的元素找到对应线程的优先级。

bool
sema_cmp_by_priority (const struct list_elem *a, const struct list_elem *b, void *aux UNUSED)
{
  return list_entry (list_front (&(list_entry (a, struct semaphore_elem, elem)->semaphore.waiters)), struct thread, elem)->priority >
         list_entry (list_front (&(list_entry (b, struct semaphore_elem, elem)->semaphore.waiters)), struct thread, elem)->priority;
}

最后在 sema_up () 被调用之前加入 list_sort (&cond->waiters, sema_cmp_by_priority, NULL); 即可。

此外,通知唤醒还有 cond_broadcast () 函数,与 sema_up () 的区别是,前者是按优先级唤醒所有的等待线程,后者只唤醒优先级最高的一个线程。

另外我们可以注意到的是,为了保护条件变量的读写,执行上述三个函数,都需要在拿到互斥锁的前提下进行。

使用整型模拟实数运算

由于在多级反馈调度中需要用到实数运算,但 Pintos 本身不支持浮点类型,所以需要我们使用整形模拟实数类型。

由于我们用到的数据范围是一定的,为了代码实现上的简洁,我们并不是遵循 IEEE 定义的计算机记录实数的规范,而是使用固定位数的实数。我们将一个 32 bit 的 int 类型前 16 记录一个实数的整数部分,后 16 bit 记录一个实数的小数部分。在我的代码中,将这种类型称为 fix

显然我们可以通过位操作轻易的将 int 类型转为 fix 类型。

/* Parameter: int; Return type: fix; Conver int to fix */
#define F_CONST(A) ((fix)((A) << SHIFT_BIT))

也可以实现四则运算。

/* Parameter: fix, fix; Return type: fix; */
#define F_ADD(A, B) ((A) + (B))

/* Parameter: fix, int; Return type: fix; */
#define F_ADD_INT(A, B) ((A) + F_CONST(B))

/* Parameter: fix, fix; Return type: fix; */
#define F_SUB(A, B) ((A) - (B))

/* Parameter: fix, int; Return type: fix; */
#define F_SUB_INT(A, B) ((A) - F_CONST((B)))

/* Parameter: fix, int; Return type: fix; */
#define F_MULT_INT(A, B) ((A) * (B))

/* Parameter: fix, int; Return type: fix; */
#define F_DIV_INT(A, B) ((A) / (B))

但对于 fix 类型之间的乘除法,需要特别处理。

我们知道,右移 SHIFT_BIT 位,等于是乘上 $2^{SHIFT\\_BIT}$。所以对于两个 fix 类型相乘,我们只需要按照 int 类型一样相乘,然后在把额外的 $2^{SHIFT\\_BIT}$ 除掉即可。但是同时我们也会发现有一个问题,就是相乘之后会溢出,所以我们需要用 int64_t 这种 64 bit 长整形临时储存结果。

/* Parameter: fix, fix; Return type: fix; */
#define F_MULT(A, B) ((fix)((((int64_t) (A)) * (int64_t) (B)) >> SHIFT_BIT))

除法和乘法类似,只需要把额外的 $2^{SHIFT\\_BIT}$ 乘回去即可。但为了防止精度丢失,我们也是使用 int64_t 临时储存,先算乘法,再算除法。

/* Parameter: fix, fix; Return type: fix; */
#define F_DIV(A, B) ((fix)((((int64_t) (A)) << SHIFT_BIT) / (B)))

接下来还有一个重点是四舍五入的取整。在 C++ 编程的时候,我们知道需要把一个 double 类型 d 四舍五入转为 int 类型 i,只需要如下操作 d >= 0 ? (int) (d + 0.5) : (int) (d - 0.5)。在 fix 类型上,我们也可以按照上面的思路。

/* Parameter: fix; Return type: int; rounding to nearest; */
#define F_INT_N(A) ((A) >= 0 ? F_INT_Z((A) + (1 << (SHIFT_BIT - 1))) \
                             : F_INT_Z((A) - (1 << (SHIFT_BIT - 1))))
多级反馈调度的原因与实现

在我们之前的实验里面,调度都是单纯地基于优先级的抢占式调度,即是现在运行的线程始终是当前系统优先级的线程。这带来了一个问题如果有一个高优先级的线程需要大量 CPU 时间,其他线程就会一段时间内获取不到 CPU 时间,发生 starvation 现象。对于用户来说,就是系统的其他程序失去了响应。

接下来我们分析这种问题发生的原因。优先级是由用户或者程序自己设定的,而线程调度也只根据线程的优先级进行,这样实际上这个控制系统是缺乏反馈的。我们希望建立一种有反馈的控制机制,使得系统平均响应时间达到最低。同时我们留意到:

  1. 一些需要 IO 的线程需要极短的系统响应时间,而只会占用很少的 CPU 时间,它们希望尽快被调度;
  2. 一些 CPU 绑定的线程,需要大量的 CPU 时间,但对系统响应时间却没有太高要求。

结合上述的情况,多级反馈队列调度算法就被设计出来解决这个问题的。它的大体思想是记录线程占用的 CPU 时间的比例,随着占用 CPU 时间的增加,线程的优先级会下降,从而减少 starvation 现象的发生。此外,由于优先级被操作系统控制,多级反馈队列调度算法额外增加了 nice 值来允许用户一定程度上干预整个调度过程。

下面详细说说我是如何把多级反馈队列调度算法在 Pintos 中实现的。

  • 对与整个操作系统新增如下属性:

fix load_avg:代表当前系统的平均负载。在 Pintos 中,load_avgTIMER_FREQ 更新一次。load_avg 根据下面的公式进行更新。

$$load\avg = \frac{59\times load\avg+ready\_threads}{60}$$

  • 对于每一个线程新增如下属性:

  1. int nice:记录当前线程的 nice 值。nice 值表示该线程对于其它线程的友好程度,该值越大,代表该线程的优先级越容易下降而让其它线程运行更多的时间片。
  2. fix recent_cpu:记录当前线程的 CPU 占用情况。在每一个 timer_tick 中,如果该线程处于 RUNNING 状态,则 recent_cpu += 1。此外每 TIMER_FREQ 还需要进行一次所有线程的 recent_cpu 更新,recent_cpu 根据下面的公式进行更新。

$$recent\cpu=\frac{2\times load\avg\times recent\cpu}{2\times load\avg+1}+nice$$

最后我们根据上述统计得出的三个属性,每四个 timer_ticks 更新每一个线程的优先级。更新线程优先级根据下面的公式。

$$priority=PRI\MAX-(recent\cpu/4)-(nice\times 2)$$

下面谈谈具体的代码实现。

首先需要在 init_thread () 函数中初始化 nicerecent_cpu 为 0。

接着,把 thread_set_nice () 补充完整,即使我们在 mlfqs 中不需要优先级捐献,但还是需要实现优先级抢占,所以在修改完 nice 值之后需要重新调度一下。

void
thread_set_nice (int nice)
{
  thread_current ()->nice = nice;
  thread_update_priority (thread_current ());
  thread_yield ();
}

编写函数 thread_add_recent_cpu () 更新当前正在运行的线程的 recent_cpu 值,需要注意的是如果线程是 idle 状态则不需要更新了。

/* Increase recent_cpu of current thread. */
void
thread_add_recent_cpu (void)
{
  if (thread_current () != idle_thread)
    thread_current ()->recent_cpu = F_ADD_INT (thread_current ()->recent_cpu, 1);
}

编写 thread_update_recent_cpu () 更新所有线程的 recent_cpu,同样需要注意 idle 状态的线程不需要更新。

/* Update current thread recent_cpu. */
void
thread_update_recent_cpu (struct thread *t)
{
  if (t != idle_thread)
    t->recent_cpu = F_ADD_INT (F_MULT (F_DIV (F_MULT_INT (load_avg, 2), F_ADD_INT (F_MULT_INT (load_avg, 2), 1)), t->recent_cpu), t->nice);
}

更新操作系统 load_avg 的函数。

/* Update average load. */
void
thread_update_load_avg (void)
{
  load_avg = F_DIV_INT (F_ADD_INT (F_MULT_INT (load_avg, 59), thread_count_ready ()), 60);
}

由定时器触发中断会运行的函数是 timer_interrupt (),所以我们想上面的线程被调用,还需要在 timer_interrupt () 中“注册”一下上面的函数。

/* Timer interrupt handler. */
static void
timer_interrupt (struct intr_frame *args UNUSED)
{
  ticks++;
  thread_tick ();
  thread_foreach (thread_check_and_block, NULL);
  thread_add_recent_cpu ();
  if (ticks % TIMER_FREQ == 0)
  {
    thread_update_load_avg ();
    thread_foreach (thread_update_recent_cpu, NULL);
  }
  if (ticks % 4 == 0)
    thread_foreach (thread_update_priority, NULL);
}

由于 mlfqs 优先级由上面提到的三个属性确定,不需要考虑优先级捐献的问题,所以还是需要稍加判断。由于我 Pintos 设计中更新优先级只提供了唯一的一个接口 thread_update_priority (),所以只需要修改这个函数即可。

/* Update priority. */
void
thread_update_priority (struct thread *t)
{
  if (t == idle_thread) return;
  if (!thread_mlfqs)
  {
    int max_priority = PRI_MIN;
    if (!list_empty (&t->locks_holding))
    {
      list_sort (&t->locks_holding, lock_cmp_by_priority, NULL);
      if (list_entry (list_front (&t->locks_holding), struct lock, elem)->priority > max_priority)
        max_priority = list_entry (list_front (&t->locks_holding), struct lock, elem)->priority;
    }
    t->priority = max_priority > t->old_priority ? max_priority : t->old_priority;
  }
  else
    t->priority = PRI_MAX - F_INT_N (F_ADD_INT (F_DIV_INT (t->recent_cpu, 4), (2 * t->nice)));
}

到这里,在 Pintos 上已经实现了多级反馈调度。

测试解释

priority-condvar
thread_set_priority (PRI_MIN);
for (i = 0; i < 10; i++)
  {
    int priority = PRI_DEFAULT - (i + 7) % 10 - 1;
    char name[16];
    snprintf (name, sizeof name, "priority %d", priority);
    thread_create (name, priority, priority_condvar_thread, NULL);
  }

测试线程将自身优先级调到最低,然后创建了 10 个优先级各不相同的新线程。

static void
priority_condvar_thread (void *aux UNUSED)
{
  msg ("Thread %s starting.", thread_name ());
  lock_acquire (&lock);
  cond_wait (&condition, &lock);
  msg ("Thread %s woke up.", thread_name ());
  lock_release (&lock);
}

10 个新线程均需要等待条件变量 condition 被通知而休眠。

for (i = 0; i < 10; i++)
  {
    lock_acquire (&lock);
    msg ("Signaling...");
    cond_signal (&condition, &lock);
    lock_release (&lock);
  }

对条件变量执行通知操作,此时优先级最高的线程应该被唤醒。如果需要通过这个测试,这 10 个线程需要按照优先级从高到低被唤醒。

mlfqs-load-1

这个测试是检测单个线程运行时系统负载的情况,由于是只有一个线程运行,系统负载会从 0 向 1 逼近。

start_time = timer_ticks ();
for (;;)
  {
    load_avg = thread_get_load_avg ();
    ASSERT (load_avg >= 0);
    elapsed = timer_elapsed (start_time) / TIMER_FREQ;
    if (load_avg > 100)
      fail ("load average is %d.%02d "
            "but should be between 0 and 1 (after %d seconds)",
            load_avg / 100, load_avg % 100, elapsed);
    else if (load_avg > 50)
      break;
    else if (elapsed > 45)
      fail ("load average stayed below 0.5 for more than 45 seconds");
  }

上面这段代码是一个循环,进行三个判断:

  1. 如果系统负载超过 1,证明代码逻辑出错,测试不通过。
  2. 如果系统负载超过 0.5,则退出循环,测试继续。
  3. 如果测试结束的时候系统负载没有超过 0.5,证明代码逻辑出错,测试不通过。
if (elapsed < 38)
  fail ("load average took only %d seconds to rise above 0.5", elapsed);

如果在 38 秒之前系统负载上升到 0.5 也是不正常的,说明代码逻辑出错,测试不通过。

msg ("sleeping for another 10 seconds, please wait...");
timer_sleep (TIMER_FREQ * 10);

这里开始进程休眠,此时没有线程在运行,系统负载应该开始下降。

load_avg = thread_get_load_avg ();
if (load_avg < 0)
  fail ("load average fell below 0");
if (load_avg > 50)
  fail ("load average stayed above 0.5 for more than 10 seconds");
msg ("load average fell back below 0.5 (to %d.%02d)",
     load_avg / 100, load_avg % 100);

休眠结束,如果此时系统负载下降到 0 - 0.5 之间,则通过测试。

mlfqs-load-60

与上面的测试类似,改为 60 个线程的情况,下面我们看看被创建的 60 个线程是如何运行的。

static void
load_thread (void *aux UNUSED)
{
  int64_t sleep_time = 10 * TIMER_FREQ;
  int64_t spin_time = sleep_time + 60 * TIMER_FREQ;
  int64_t exit_time = spin_time + 60 * TIMER_FREQ;

  thread_set_nice (20);
  timer_sleep (sleep_time - timer_elapsed (start_time));
  while (timer_elapsed (start_time) < spin_time)
    continue;
  timer_sleep (exit_time - timer_elapsed (start_time));
}

可以清晰地看到,所有线程均会先休眠 10 秒,然后进入死循环 60 秒,最后再休眠 60 秒。所以整个系统的负载应该是先爬升,再下降。

mlfqs-load-avg

创建了 60 个线程,第 i 个线程休眠 10 + i 秒,然后运行 60 秒,最后休眠直到整个测试的总时间 120 秒结束。系统负载应该也是先爬升再下降。

上面的三个 mlfqs-load-* 系列测试都是测试 load-avg 计算是否正确,如果计算部分和统计部分的代码逻辑没有错误,应该就可以顺利通过的。

此外,在测试文件中,作者提醒到,如果是唯独 mlfqs-load-avg 这个测试无法通过,可能是 interrupt handler timer_interrupt () 运行时间过长。这样会导致测试主线程没有足够的时间打印信息而又被调度。

mlfqs-recent-1

这个测试是用于测试 recent_cpu 的计算是否正确。

do
  {
    msg ("Sleeping 10 seconds to allow recent_cpu to decay, please wait...");
    start_time = timer_ticks ();
    timer_sleep (DIV_ROUND_UP (start_time, TIMER_FREQ) - start_time
                 + 10 * TIMER_FREQ);
  }
while (thread_get_recent_cpu () > 700);

让测试线程休眠,直到 recent_cpu 下降到 7 以下,这个是为了别的测试调用这个测试而设计的。

start_time = timer_ticks ();
for (;;)
  {
    int elapsed = timer_elapsed (start_time);
    if (elapsed % (TIMER_FREQ * 2) == 0 && elapsed > last_elapsed)
      {
        int recent_cpu = thread_get_recent_cpu ();
        int load_avg = thread_get_load_avg ();
        int elapsed_seconds = elapsed / TIMER_FREQ;
        msg ("After %d seconds, recent_cpu is %d.%02d, load_avg is %d.%02d.",
             elapsed_seconds,
             recent_cpu / 100, recent_cpu % 100,
             load_avg / 100, load_avg % 100);
        if (elapsed_seconds >= 180)
          break;
      }
    last_elapsed = elapsed;
  }

整个测试只有一个线程,if (elapsed % (TIMER_FREQ * 2) == 0 && elapsed > last_elapsed) 代表每两个 TIMER_FREQ 为一次周期进行一次输出,输出 recent_cpu 的值和 load_avg,整个测试进行 180 秒后退出。如果 recent_cpu 的计算没有出错,测试应该就可以通过了。

mlfqs-fair-2 & mlfqs-fair-20 & mlfqs-nice-2 & mlfqs-nice-10

这四个测试都是测试 nice 值是否正确生效,他们都是共用同一套测试代码,所以先分析测试代码。

static void
test_mlfqs_fair (int thread_cnt, int nice_min, int nice_step)
{
  struct thread_info info[MAX_THREAD_CNT];
  int64_t start_time;
  int nice;
  int i;
  thread_set_nice (-20);
  start_time = timer_ticks ();
  msg ("Starting %d threads...", thread_cnt);
  nice = nice_min;
  for (i = 0; i < thread_cnt; i++)
    {
      struct thread_info *ti = &info[i];
      char name[16];
      ti->start_time = start_time;
      ti->tick_count = 0;
      ti->nice = nice;
      snprintf(name, sizeof name, "load %d", i);
      thread_create (name, PRI_DEFAULT, load_thread, ti);
      nice += nice_step;
    }
  msg ("Starting threads took %"PRId64" ticks.", timer_elapsed (start_time));
  msg ("Sleeping 40 seconds to let threads run, please wait...");
  timer_sleep (40 * TIMER_FREQ);
  for (i = 0; i < thread_cnt; i++)
    msg ("Thread %d received %d ticks.", i, info[i].tick_count);
}

这个是测试的主线程,他会创建 thread_cnt 个新线程,从 nice_minnice_step 为间隔调整它们的 nice 值,最后输出每个线程获得的 CPU 时间片。其中 thread_set_nice (-20); 保证了测试主线程获得最多的 CPU 时间,保证所有新线程在休眠结束前都被成功创建。

static void
load_thread (void *ti_)
{
  struct thread_info *ti = ti_;
  int64_t sleep_time = 5 * TIMER_FREQ;
  int64_t spin_time = sleep_time + 30 * TIMER_FREQ;
  int64_t last_time = 0;

  thread_set_nice (ti->nice);
  timer_sleep (sleep_time - timer_elapsed (ti->start_time));
  while (timer_elapsed (ti->start_time) < spin_time)
    {
      int64_t cur_time = timer_ticks ();
      if (cur_time != last_time)
        ti->tick_count++;
      last_time = cur_time;
    }
}

这个是被创建的线程运行的函数,下面分析一下重点的语句:

  1. thread_set_nice (ti->nice); 线程会将自己的 nice 值调到预设值。
  2. timer_sleep (sleep_time - timer_elapsed (ti->start_time)); 是 Pintos 测试中常用的休眠方法,这个可以使得不是同时被创建的线程同时休眠结束。
  3. 每个线程不算上睡眠时间会运行 30 * TIMER_FREQ 时间,然后它们通过 ti->tick_count++; 统计获得的 CPU 时间片。

综合来说,这套测试框架就是创建一系列线程,它们可以有相同或者不同的 nice 值,最后测试通过统计所有线程获得的 CPU 时间片来判断 nice 值的设置是否生效。

void
test_mlfqs_fair_2 (void)
{
  test_mlfqs_fair (2, 0, 0);
}

void
test_mlfqs_fair_20 (void)
{
  test_mlfqs_fair (20, 0, 0);
}

上述两个测试创建了 2 个和 20 个 nice 值相同的线程,理论上 nice 值相同的线程运行相同的函数,获得的 CPU 时间片是一样的,而实际测试中,由于程序运行的不确定性,一定范围内的波动是允许的。

void
test_mlfqs_nice_2 (void)
{
  test_mlfqs_fair (2, 0, 5);
}

void
test_mlfqs_nice_10 (void)
{
  test_mlfqs_fair (10, 0, 1);
}

上述两个测试创建了多个 nice 值不相同的线程,运行相同的函数。nice 值更高的线程应该获得更少的 CPU 时间片。

mlfqs-block

这个测试检测被 block 的线程是否能正确更新 recent_cpu

void
test_mlfqs_block (void)
{
  lock_acquire (&lock);
  thread_create ("block", PRI_DEFAULT, block_thread, &lock);
  timer_sleep (25 * TIMER_FREQ);
  while (timer_elapsed (start_time) < 5 * TIMER_FREQ)
    continue;
  lock_release (&lock);
}

主线程持有锁之后,创建了一个新线程,然后休眠 25 秒,运行 5 秒,然后释放锁。

static void
block_thread (void *lock_)
{
  start_time = timer_ticks ();
  while (timer_elapsed (start_time) < 20 * TIMER_FREQ)
    continue;
  lock_acquire (lock);
}

新创建的线程运行 20 秒之后,尝试获取锁。此时主线程还有 10 秒才释放锁,所以会休眠 10 秒。此时 recent_cpu 应该下降。由于释放锁前运行了 5 秒,主线程的 recent_cpu 会比新创建的线程高,从而新创建的线程应该先被调度获取锁。

回答问题

信号量,锁,条件变量的异同点。

  • 信号量是线程同步的一个工具,而锁和条件变量都是对信号量的一种封装。信号量有一个属性代表资源可用值,而且所有线程都可以对信号量进行 P 和 V 操作。
  • 锁是互斥的,是二元量,只能同时被一个线程拥有,被同一个线程释放,其余线程只能被阻塞等待。用于保护只能被单独访问的资源。
  • 条件变量和锁类似,都是二元量,而且默认都是为 false。有两种操作,一是 wait,说明该线程需要等待该变量为 true 才可以继续运行;另一种操作是通知操作,即是使条件变量为真,唤醒等待的线程(一个或者多个)。两个操作都需要在拥有锁的情况下进行。

多级反馈调度为什么不需要优先级捐赠?

  • 多级反馈调度本来的设计就不是让优先级高的线程运行完毕再让低优先级的运行。
  • 在以往的调度方式中,优先级是程序或者人为设定的,而在多级反馈调度是通过统计 CPU 时间得出的,是操作系统进行反馈调度的手段,使得系统响应时间缩短,捐赠优先级会破坏这种反馈调度,使得多级反馈调度失去意义。

多级反馈调度为什么能避免饥饿现象?

多级反馈队列调度算法的大体思想是记录线程占用的 CPU 时间的比例,随着占用 CPU 时间的增加,线程的优先级会下降;而没有没有被调度到的线程占用 CPU 时间很低,甚至为零,线程的优先级会上升,继而被调度运行,这样一来饥饿现象就可以避免了。

实验感想

  • 信号量是线程同步的一个工具,而锁和条件变量都是对信号量的一种封装,使得在实际编程中更容易使用。
  • 调度控制系统一般需要做成闭环控制,这样系统一般易于趋于稳定,例如是多级反馈调度算法。
  • 将整形数人为划分为两部分,可以模拟实数运算,而且可以利用宏定义封装一下算式,但宏定义是编译时展开的,所以需要注意运算顺序,多加括号。