Pintos 实验分析 1.1

正确处理线程的休眠与唤醒。

实验目的

修改 Pintos 源码,正确处理线程的休眠与唤醒,使线程休眠时 CPU 不会忙等待,唤醒时能按优先级唤醒。

实验过程

实验分析

线程的调度由操作系统负责,而操作系统进行调度需要获得 CPU 时间,而这个是通过向 CPU 发送中断来获得的。在 pintos 中,这个中断是由 Timer 这个定时器定时产生中断。

在这里,处理线程休眠的函数是 timer_sleep (),我先分析一下这个函数是如何实现原始休眠的。

void
timer_sleep (int64_t ticks)
{
  int64_t start = timer_ticks ();

  ASSERT (intr_get_level () == INTR_ON);
  while (timer_elapsed (start) < ticks)
    thread_yield ();
}
  1. 首先,start 记录了进入这个函数的当前时间。
  2. 利用 timer_elapsed () 判断当前时间和 start 的差值,换言之就是从 start 开始流逝的时间。
  3. 判断当前流逝的时间是否超过给定的 ticks,如果没有,执行 thread_yield (),让线程休眠。

然后,接下来我再分析 thread_yield () 这个函数究竟干了些什么。

void
thread_yield (void)
{
  struct thread *cur = thread_current ();
  enum intr_level old_level;

  ASSERT (!intr_context ());

  old_level = intr_disable ();
  if (cur != idle_thread)
    list_push_back (&ready_list, &cur->elem);
  cur->status = THREAD_READY;
  schedule ();
  intr_set_level (old_level);
}
  1. 获取当前线程并关闭中断。
  2. 如果当前线程不是 idle_thread 状态的线程,将这个线程丢到 ready_list 的末尾,然后执行 schedule ()
  3. 最后恢复之前的中断状态。

这里其实比较清楚整个流程了,但是不妨在看看 schedule () 干了什么,说不定还能看得懂。

static void
schedule (void)
{
  struct thread *cur = running_thread ();
  struct thread *next = next_thread_to_run ();
  struct thread *prev = NULL;

  ASSERT (intr_get_level () == INTR_OFF);
  ASSERT (cur->status != THREAD_RUNNING);
  ASSERT (is_thread (next));

  if (cur != next)
    prev = switch_threads (cur, next);
  thread_schedule_tail (prev);
}

函数再往下看就开始接近底层,也看不太懂了,但大概来说 schedule () 就是切换到了下一个线程。

好了,综合上面几个函数,可以看到 thread_yield () 就是把当前线程丢回到 ready_list 里面,然后切换到下一个线程。再往上一层,timer_sleep () 就是在循环里面不断将线程丢回 ready_list 不让它执行来实现休眠。这很类似是轮询的方式,即使线程休眠 CPU 也不能空闲,即是忙等待

轮询的方式实现下来是很低效的,接下来,我们需要设计一种优雅的方式实现线程休眠。

整个线程休眠的实现方式最大的问题在于 timer_sleep () 里面使用了轮询的机制而占用了 CPU 资源,由于休眠和中断的最小时间单位都是 tick,而且注意到有 thread_block () 这个函数,可以把当前线程 block 掉,不让它进入调度。所以我们完全可以把线程 block 掉来实现休眠,然后每一次中断检查一次是否有线程需要唤醒即可。

实现方法

void
timer_sleep (int64_t ticks)
{
  ASSERT (intr_get_level () == INTR_ON);
  if (ticks > 0)
  {
    enum intr_level old_level = intr_disable ();
    struct thread *current_thread = thread_current ();
    current_thread->ticks_blocked = ticks;
    thread_block ();
    intr_set_level (old_level);
  }
}
  1. 重写 timer_sleep (),不调用 thread_yield (),而是调用 thread_block ()
  2. 修改线程的结构体,加入成员变量 ticks_blocked,记录线程剩余休眠时间,在线程创建的时候初始化为 0。
  3. 每次中断将每个被 block 的线程剩余时间减一,判断如果有线程完成休眠则 thread_unblock ()
  4. 需要注意的是,这里需要判断传入参数 ticks 是否合法,若 ticks 不是正数则不执行函数。
static void
timer_interrupt (struct intr_frame *args UNUSED)
{
  ticks++;
  thread_tick ();
  thread_foreach (thread_check_and_block, NULL);
}
void
thread_check_and_block (struct thread *t, void *aux UNUSED)
{
  if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0)
  {
      t->ticks_blocked--;
      if (t->ticks_blocked == 0)
      {
          thread_unblock(t);
      }
  }
}

在每次中断的时候使用 thread_foreach () 遍历所有线程,对所有被 block 的线程减少休眠时间,对需要唤醒的线程唤醒。

这里需要注意的是对线程 unblock 需要在 if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0) 这个语句里面。而下面是一个错误示范:

void
thread_check_and_block (struct thread *t, void *aux UNUSED)
{
  if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0)
  {
      t->ticks_blocked--;
  }
  if (t->ticks_blocked == 0)
  {
      thread_unblock(t);
  }
}

这样是会在实际运行程序的时候 kernel panic 的。因为线程被 block 不全是来自于 timer_sleep () 的,有可能是由于等待获取锁,等待信号量等的原因被 block,如果在这里唤醒了不是被 timer_sleep () block 的线程,是会在断言处退出,引发 kernel panic。而上面正确的做法,可以保证 unblock 的线程都是由于 timer_sleep () 而休眠的。

言归正传,直到这里,我们处理完了线程的休眠和唤醒,在每次中断,操作系统遍历检查所有线程后,就可以释放 CPU 资源了,解决了 CPU 的忙等待。

然而直到现在 alarm_priority 这一个 test 还不能通过,因为这个测试要求线程根据优先级进行调度,而我们现在的实现是根据 FCFS 调度。

通过这个测试,先要了解一下 pintos 中队列是怎样实现的。

/* List element. */
struct list_elem
  {
    struct list_elem *prev;     /* Previous list element. */
    struct list_elem *next;     /* Next list element. */
  };
/* List. */
struct list
  {
    struct list_elem head;      /* List head. */
    struct list_elem tail;      /* List tail. */
  };

这里队列的实现就是很简单的一个双向链表,而每个结点只记录它的相邻元素的地址,此外什么都没有。这里有一个疑问了,结点的内容是空的,那是怎么样通过结点找到该结点代表的元素呢?

#define list_entry(LIST_ELEM, STRUCT, MEMBER)           \
        ((STRUCT *) ((uint8_t *) &(LIST_ELEM)->next     \
                     - offsetof (STRUCT, MEMBER.next)))

可以看到这里有一个宏定义,大概实现是计算 MEMBERSTRUCT 中的位置,再根据这个偏移量把 list_elem 的地址移动到包含它的 STRUCT 的地址,再将这个指针通过类型转换转换为 STRUCT 类型的指针。

可以想到,如果每次调度的时候都遍历一次链表,找出优先级最高的线程,再取出链表,这样实现起来很复杂,效率也不高。所以换一个思路,如果我们把线程插入链表的时候按顺序插入,不就保证直接取出队列头部的时候就是有序的了吗?

当然这样类似是插入排序,效率看似并不高效。但仔细一想,每次插入,最坏情况是遍历一遍,最好情况是直接放到尾部,时间复杂度为 $O(n)$ ,而每次扔到链表尾部,再快排,时间复杂度最低都是 $O(nlogn)$。

然后我们发现有 void list_insert_ordered (struct list *, struct list_elem *, list_less_func *, void *aux); 这样一个函数。顾名思义,利用这个函数插入将使链表有序。

我们需要自行编写比较函数,利用上面提到的宏定义,需要注意的是在 pintos 把 priority 数值大的认为是优先级高。

bool
cmp_by_priority (const struct list_elem *a, const struct list_elem *b, void *aux UNUSED)
{
  return list_entry(a, struct thread, elem)->priority >
         list_entry(b, struct thread, elem)->priority;
}

然后我们分别将需要插入 ready_list 的创建线程的 init_thread () 函数、把线程从 running 队列扔回 ready 队列的 thread_yield () 函数,线程解除休眠的 thread_unblock ()函数进行修改,把 list_push_back() 替换为 list_insert_ordered ()

这样就可以通过最后一个 test 了。

这里在补充一下上面没有提到的关闭中断的问题。

enum intr_level old_level;
old_level = intr_disable ();
// do something...
intr_set_level (old_level);

我们用上面这种形式来关闭中断,具体代码由于是内联汇编,看不懂就不解释了。但这不妨外我们去理解这个用法。intr_disable (); 关闭中断,并将之前中断的状态作为返回值,使得我们处理完原子操作之后可以恢复之前中断的状态。

为何要关闭中断呢?

因为在中断打开的时候,我们的程序可能不是连续执行的,有可能执行到一半的时候,来了个中断,CPU 执行别的程序去了。此时我们的数据可能会被更改,但被保存现场却不会随之而更改,这样就可能导致数据出错。最简单的方法屏蔽中断,让 CPU 不响应中断。

测试解释

可以看到除了 alarm-zero & alarm-negative 两个测试之外,均有断言 ASSERT (!thread_mlfqs);,表明以下测试不适合 MLFQS(多级反馈调度队列),因为该调度算法不单取决于优先级和线程入队时间。

alarm-single & alarm-mutiple

这两个测试的测试内容大体是相同的,只是测试的次数不同。

首先,测试函数调用了如下代码创建线程,线程的 duration 取决于测试函数给的参数,代表被唤醒的次数。可以看到线程运行的函数和优先级都是一样的,只是最后的参数 t 不同,此参数随着线程被创建的次数而增大。

for (i = 0; i < thread_cnt; i++)
    {
      struct sleep_thread *t = threads + i;
      char name[16];

      t->test = &test;
      t->id = i;
      t->duration = (i + 1) * 10;
      t->iterations = 0;

      snprintf (name, sizeof name, "thread %d", i);
      thread_create (name, PRI_DEFAULT, sleeper, t);
    }

可以看到这个函数让当前线程休眠了 iterations 次。而在 timer_sleep (sleep_until - timer_ticks ());sleep_until 随着 iterationsduration 的增大而增大。

而在线程的输出之前,获取了锁,这样是防止用户代码出错时导致输出乱码,输出完成释放锁,避免死锁。

int64_t sleep_until = test->start + i * t->duration;
      timer_sleep (sleep_until - timer_ticks ());
      lock_acquire (&test->output_lock);
      *test->output_pos++ = t->id;
      lock_release (&test->output_lock);

搞明白线程怎么运行了,再看看测试是如何判断代码是否正确的。

很简单就是判断顺序唤醒时线程的 iterationsduration 不递减即可。

new_prod = ++t->iterations * t->duration;

      msg ("thread %d: duration=%d, iteration=%d, product=%d",
           t->id, t->duration, t->iterations, new_prod);

      if (new_prod >= product)
        product = new_prod;
      else
        fail ("thread %d woke up out of order (%d > %d)!",
              t->id, product, new_prod);

综上所述,只要唤醒顺序正确即可通过测试,例如休眠 20 ticks 的线程不会在休眠 10 ticks 的线程之前醒来。

alarm-simultaneous

整个测试就是调用了 test_sleep (3, 5); 这个函数。这个函数是测试了什么呢?我们继续往下看。

for (i = 0; i < thread_cnt; i++)
    {
      char name[16];
      snprintf (name, sizeof name, "thread %d", i);
      thread_create (name, PRI_DEFAULT, sleeper, &test);
    }

可以看到,这里创建了 thread_cnt 个优先级相同的线程,均是执行 sleeper () 这个函数。然后我们再看看这个函数是干什么的。

static void
sleeper (void *test_)
{
  struct sleep_test *test = test_;
  int i;

  /* Make sure we're at the beginning of a timer tick. */
  timer_sleep (1);

  for (i = 1; i <= test->iterations; i++)
    {
      int64_t sleep_until = test->start + i * 10;
      timer_sleep (sleep_until - timer_ticks ());
      *test->output_pos++ = timer_ticks () - test->start;
      thread_yield ();
    }
}

可以看到这个函数让当前线程休眠了 iterations 次。而 timer_sleep (sleep_until - timer_ticks ()); 这个则是整个测试的重点,sleep_until 是一个常数,timer_ticks () 返回的是系统启动后到当前的时间。所以 sleep_until - timer_ticks () 则是一个随时间减少的值,这样,每个线程休眠后是同时醒来的。

要通过这个测试,必须每个线程同时唤醒,而且是按着被创建的顺序被唤醒。

alarm-priority

sema_init (&wait_sema, 0); 在最开始,初始化信号量为 0。

我们先看看这 10 个线程是怎样被产生的。

for (i = 0; i < 10; i++)
    {
      int priority = PRI_DEFAULT - (i + 5) % 10 - 1;
      char name[16];
      snprintf (name, sizeof name, "priority %d", priority);
      thread_create (name, priority, alarm_priority_thread, NULL);
    }

在这里,i 从 0 到 9,(i + 5) % 10 则是一个先递减再递增的序列,所以看到线程被产生时的优先级是乱序的。

接着通过 thread_set_priority (PRI_MIN); 将当前线程设置到最低优先级,这样在调度的时候,这个线程会被最后唤醒。

for (i = 0; i < 10; i++)
    sema_down (&wait_sema);

最后进行了 10 次 P 操作,这个操作是原子操作,接下来解释一下 P 操作干了什么。

void
sema_down (struct semaphore *sema)
{
  enum intr_level old_level;

  ASSERT (sema != NULL);
  ASSERT (!intr_context ());

  old_level = intr_disable ();
  while (sema->value == 0)
    {
      list_push_back (&sema->waiters, &thread_current ()->elem);
      thread_block ();
    }
  sema->value--;
  intr_set_level (old_level);
}

在 P 操作中,如果 value == 0 就把当前线程 block 掉,而且每次 P 操作将 value 减一。现在测试函数被 block 了,我们看看被产生的 10 个线程究竟干了些什么。

static void
alarm_priority_thread (void *aux UNUSED)
{
  /* Busy-wait until the current time changes. */
  int64_t start_time = timer_ticks ();
  while (timer_elapsed (start_time) == 0)
    continue;

  /* Now we know we're at the very beginning of a timer tick, so
     we can call timer_sleep() without worrying about races
     between checking the time and a timer interrupt. */
  timer_sleep (wake_time - timer_ticks ());

  /* Print a message on wake-up. */
  msg ("Thread %s woke up.", thread_name ());

  sema_up (&wait_sema);
}

首先,函数一开始就是进入一个忙等待,直到 timer_elapsed (start_time) != 0 程序再往下执行,这个语句保证了往下执行时刚好是中断的结束,即保证了此时测试函数已执行完 P 操作被休眠。

而同样地在 timer_sleep (wake_time - timer_ticks ()); 中,wake_time 是一个常数,timer_ticks () 返回的是系统启动后到当前的时间。所以 wake_time - timer_ticks () 则是一个随时间减少的值,这样每个线程休眠后是同时醒来的。

在最后,进行了 V 操作,这个操作是原子操作。

void
sema_up (struct semaphore *sema)
{
  enum intr_level old_level;

  ASSERT (sema != NULL);

  old_level = intr_disable ();
  if (!list_empty (&sema->waiters))
    thread_unblock (list_entry (list_pop_front (&sema->waiters),
                                struct thread, elem));
  sema->value++;
  intr_set_level (old_level);
}