实验目的
修改 Pintos 源码,正确处理线程的休眠与唤醒,使线程休眠时 CPU 不会忙等待,唤醒时能按优先级唤醒。
实验过程
实验分析
线程的调度由操作系统负责,而操作系统进行调度需要获得 CPU 时间,而这个是通过向 CPU 发送中断来获得的。在 pintos 中,这个中断是由 Timer
这个定时器定时产生中断。
在这里,处理线程休眠的函数是 timer_sleep ()
,我先分析一下这个函数是如何实现原始休眠的。
void
timer_sleep (int64_t ticks)
{
int64_t start = timer_ticks ();
ASSERT (intr_get_level () == INTR_ON);
while (timer_elapsed (start) < ticks)
thread_yield ();
}
- 首先,
start
记录了进入这个函数的当前时间。 - 利用
timer_elapsed ()
判断当前时间和start
的差值,换言之就是从start
开始流逝的时间。 - 判断当前流逝的时间是否超过给定的
ticks
,如果没有,执行thread_yield ()
,让线程休眠。
然后,接下来我再分析 thread_yield ()
这个函数究竟干了些什么。
void
thread_yield (void)
{
struct thread *cur = thread_current ();
enum intr_level old_level;
ASSERT (!intr_context ());
old_level = intr_disable ();
if (cur != idle_thread)
list_push_back (&ready_list, &cur->elem);
cur->status = THREAD_READY;
schedule ();
intr_set_level (old_level);
}
- 获取当前线程并关闭中断。
- 如果当前线程不是
idle_thread
状态的线程,将这个线程丢到ready_list
的末尾,然后执行schedule ()
。 - 最后恢复之前的中断状态。
这里其实比较清楚整个流程了,但是不妨在看看 schedule ()
干了什么,说不定还能看得懂。
static void
schedule (void)
{
struct thread *cur = running_thread ();
struct thread *next = next_thread_to_run ();
struct thread *prev = NULL;
ASSERT (intr_get_level () == INTR_OFF);
ASSERT (cur->status != THREAD_RUNNING);
ASSERT (is_thread (next));
if (cur != next)
prev = switch_threads (cur, next);
thread_schedule_tail (prev);
}
函数再往下看就开始接近底层,也看不太懂了,但大概来说 schedule ()
就是切换到了下一个线程。
好了,综合上面几个函数,可以看到 thread_yield ()
就是把当前线程丢回到 ready_list
里面,然后切换到下一个线程。再往上一层,timer_sleep ()
就是在循环里面不断将线程丢回 ready_list
不让它执行来实现休眠。这很类似是轮询的方式,即使线程休眠 CPU 也不能空闲,即是忙等待。
轮询的方式实现下来是很低效的,接下来,我们需要设计一种优雅的方式实现线程休眠。
整个线程休眠的实现方式最大的问题在于 timer_sleep ()
里面使用了轮询的机制而占用了 CPU 资源,由于休眠和中断的最小时间单位都是 tick
,而且注意到有 thread_block ()
这个函数,可以把当前线程 block 掉,不让它进入调度。所以我们完全可以把线程 block 掉来实现休眠,然后每一次中断检查一次是否有线程需要唤醒即可。
实现方法
void
timer_sleep (int64_t ticks)
{
ASSERT (intr_get_level () == INTR_ON);
if (ticks > 0)
{
enum intr_level old_level = intr_disable ();
struct thread *current_thread = thread_current ();
current_thread->ticks_blocked = ticks;
thread_block ();
intr_set_level (old_level);
}
}
- 重写
timer_sleep ()
,不调用thread_yield ()
,而是调用thread_block ()
。 - 修改线程的结构体,加入成员变量
ticks_blocked
,记录线程剩余休眠时间,在线程创建的时候初始化为 0。 - 每次中断将每个被 block 的线程剩余时间减一,判断如果有线程完成休眠则
thread_unblock ()
。 - 需要注意的是,这里需要判断传入参数
ticks
是否合法,若ticks
不是正数则不执行函数。
static void
timer_interrupt (struct intr_frame *args UNUSED)
{
ticks++;
thread_tick ();
thread_foreach (thread_check_and_block, NULL);
}
void
thread_check_and_block (struct thread *t, void *aux UNUSED)
{
if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0)
{
t->ticks_blocked--;
if (t->ticks_blocked == 0)
{
thread_unblock(t);
}
}
}
在每次中断的时候使用 thread_foreach ()
遍历所有线程,对所有被 block 的线程减少休眠时间,对需要唤醒的线程唤醒。
这里需要注意的是对线程 unblock 需要在 if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0)
这个语句里面。而下面是一个错误示范:
void
thread_check_and_block (struct thread *t, void *aux UNUSED)
{
if (t->status == THREAD_BLOCKED && t->ticks_blocked > 0)
{
t->ticks_blocked--;
}
if (t->ticks_blocked == 0)
{
thread_unblock(t);
}
}
这样是会在实际运行程序的时候 kernel panic 的。因为线程被 block 不全是来自于 timer_sleep ()
的,有可能是由于等待获取锁,等待信号量等的原因被 block,如果在这里唤醒了不是被 timer_sleep ()
block 的线程,是会在断言处退出,引发 kernel panic。而上面正确的做法,可以保证 unblock 的线程都是由于 timer_sleep ()
而休眠的。
言归正传,直到这里,我们处理完了线程的休眠和唤醒,在每次中断,操作系统遍历检查所有线程后,就可以释放 CPU 资源了,解决了 CPU 的忙等待。
然而直到现在 alarm_priority
这一个 test 还不能通过,因为这个测试要求线程根据优先级进行调度,而我们现在的实现是根据 FCFS 调度。
通过这个测试,先要了解一下 pintos 中队列是怎样实现的。
/* List element. */
struct list_elem
{
struct list_elem *prev; /* Previous list element. */
struct list_elem *next; /* Next list element. */
};
/* List. */
struct list
{
struct list_elem head; /* List head. */
struct list_elem tail; /* List tail. */
};
这里队列的实现就是很简单的一个双向链表,而每个结点只记录它的相邻元素的地址,此外什么都没有。这里有一个疑问了,结点的内容是空的,那是怎么样通过结点找到该结点代表的元素呢?
#define list_entry(LIST_ELEM, STRUCT, MEMBER) \
((STRUCT *) ((uint8_t *) &(LIST_ELEM)->next \
- offsetof (STRUCT, MEMBER.next)))
可以看到这里有一个宏定义,大概实现是计算 MEMBER
在 STRUCT
中的位置,再根据这个偏移量把 list_elem
的地址移动到包含它的 STRUCT
的地址,再将这个指针通过类型转换转换为 STRUCT
类型的指针。
可以想到,如果每次调度的时候都遍历一次链表,找出优先级最高的线程,再取出链表,这样实现起来很复杂,效率也不高。所以换一个思路,如果我们把线程插入链表的时候按顺序插入,不就保证直接取出队列头部的时候就是有序的了吗?
当然这样类似是插入排序,效率看似并不高效。但仔细一想,每次插入,最坏情况是遍历一遍,最好情况是直接放到尾部,时间复杂度为 $O(n)$ ,而每次扔到链表尾部,再快排,时间复杂度最低都是 $O(nlogn)$。
然后我们发现有 void list_insert_ordered (struct list *, struct list_elem *, list_less_func *, void *aux);
这样一个函数。顾名思义,利用这个函数插入将使链表有序。
我们需要自行编写比较函数,利用上面提到的宏定义,需要注意的是在 pintos 把 priority
数值大的认为是优先级高。
bool
cmp_by_priority (const struct list_elem *a, const struct list_elem *b, void *aux UNUSED)
{
return list_entry(a, struct thread, elem)->priority >
list_entry(b, struct thread, elem)->priority;
}
然后我们分别将需要插入 ready_list
的创建线程的 init_thread ()
函数、把线程从 running 队列扔回 ready 队列的 thread_yield ()
函数,线程解除休眠的 thread_unblock ()
函数进行修改,把 list_push_back()
替换为 list_insert_ordered ()
。
这样就可以通过最后一个 test 了。
这里在补充一下上面没有提到的关闭中断的问题。
enum intr_level old_level;
old_level = intr_disable ();
// do something...
intr_set_level (old_level);
我们用上面这种形式来关闭中断,具体代码由于是内联汇编,看不懂就不解释了。但这不妨外我们去理解这个用法。intr_disable ();
关闭中断,并将之前中断的状态作为返回值,使得我们处理完原子操作之后可以恢复之前中断的状态。
为何要关闭中断呢?
因为在中断打开的时候,我们的程序可能不是连续执行的,有可能执行到一半的时候,来了个中断,CPU 执行别的程序去了。此时我们的数据可能会被更改,但被保存现场却不会随之而更改,这样就可能导致数据出错。最简单的方法屏蔽中断,让 CPU 不响应中断。
测试解释
可以看到除了 alarm-zero & alarm-negative 两个测试之外,均有断言 ASSERT (!thread_mlfqs);
,表明以下测试不适合 MLFQS(多级反馈调度队列),因为该调度算法不单取决于优先级和线程入队时间。
alarm-single & alarm-mutiple
这两个测试的测试内容大体是相同的,只是测试的次数不同。
首先,测试函数调用了如下代码创建线程,线程的 duration
取决于测试函数给的参数,代表被唤醒的次数。可以看到线程运行的函数和优先级都是一样的,只是最后的参数 t
不同,此参数随着线程被创建的次数而增大。
for (i = 0; i < thread_cnt; i++)
{
struct sleep_thread *t = threads + i;
char name[16];
t->test = &test;
t->id = i;
t->duration = (i + 1) * 10;
t->iterations = 0;
snprintf (name, sizeof name, "thread %d", i);
thread_create (name, PRI_DEFAULT, sleeper, t);
}
可以看到这个函数让当前线程休眠了 iterations
次。而在 timer_sleep (sleep_until - timer_ticks ());
中 sleep_until
随着 iterations
和 duration
的增大而增大。
而在线程的输出之前,获取了锁,这样是防止用户代码出错时导致输出乱码,输出完成释放锁,避免死锁。
int64_t sleep_until = test->start + i * t->duration;
timer_sleep (sleep_until - timer_ticks ());
lock_acquire (&test->output_lock);
*test->output_pos++ = t->id;
lock_release (&test->output_lock);
搞明白线程怎么运行了,再看看测试是如何判断代码是否正确的。
很简单就是判断顺序唤醒时线程的 iterations
和 duration
不递减即可。
new_prod = ++t->iterations * t->duration;
msg ("thread %d: duration=%d, iteration=%d, product=%d",
t->id, t->duration, t->iterations, new_prod);
if (new_prod >= product)
product = new_prod;
else
fail ("thread %d woke up out of order (%d > %d)!",
t->id, product, new_prod);
综上所述,只要唤醒顺序正确即可通过测试,例如休眠 20 ticks 的线程不会在休眠 10 ticks 的线程之前醒来。
alarm-simultaneous
整个测试就是调用了 test_sleep (3, 5);
这个函数。这个函数是测试了什么呢?我们继续往下看。
for (i = 0; i < thread_cnt; i++)
{
char name[16];
snprintf (name, sizeof name, "thread %d", i);
thread_create (name, PRI_DEFAULT, sleeper, &test);
}
可以看到,这里创建了 thread_cnt
个优先级相同的线程,均是执行 sleeper ()
这个函数。然后我们再看看这个函数是干什么的。
static void
sleeper (void *test_)
{
struct sleep_test *test = test_;
int i;
/* Make sure we're at the beginning of a timer tick. */
timer_sleep (1);
for (i = 1; i <= test->iterations; i++)
{
int64_t sleep_until = test->start + i * 10;
timer_sleep (sleep_until - timer_ticks ());
*test->output_pos++ = timer_ticks () - test->start;
thread_yield ();
}
}
可以看到这个函数让当前线程休眠了 iterations
次。而 timer_sleep (sleep_until - timer_ticks ());
这个则是整个测试的重点,sleep_until
是一个常数,timer_ticks ()
返回的是系统启动后到当前的时间。所以 sleep_until - timer_ticks ()
则是一个随时间减少的值,这样,每个线程休眠后是同时醒来的。
要通过这个测试,必须每个线程同时唤醒,而且是按着被创建的顺序被唤醒。
alarm-priority
sema_init (&wait_sema, 0);
在最开始,初始化信号量为 0。
我们先看看这 10 个线程是怎样被产生的。
for (i = 0; i < 10; i++)
{
int priority = PRI_DEFAULT - (i + 5) % 10 - 1;
char name[16];
snprintf (name, sizeof name, "priority %d", priority);
thread_create (name, priority, alarm_priority_thread, NULL);
}
在这里,i
从 0 到 9,(i + 5) % 10
则是一个先递减再递增的序列,所以看到线程被产生时的优先级是乱序的。
接着通过 thread_set_priority (PRI_MIN);
将当前线程设置到最低优先级,这样在调度的时候,这个线程会被最后唤醒。
for (i = 0; i < 10; i++)
sema_down (&wait_sema);
最后进行了 10 次 P 操作,这个操作是原子操作,接下来解释一下 P 操作干了什么。
void
sema_down (struct semaphore *sema)
{
enum intr_level old_level;
ASSERT (sema != NULL);
ASSERT (!intr_context ());
old_level = intr_disable ();
while (sema->value == 0)
{
list_push_back (&sema->waiters, &thread_current ()->elem);
thread_block ();
}
sema->value--;
intr_set_level (old_level);
}
在 P 操作中,如果 value == 0
就把当前线程 block 掉,而且每次 P 操作将 value 减一。现在测试函数被 block 了,我们看看被产生的 10 个线程究竟干了些什么。
static void
alarm_priority_thread (void *aux UNUSED)
{
/* Busy-wait until the current time changes. */
int64_t start_time = timer_ticks ();
while (timer_elapsed (start_time) == 0)
continue;
/* Now we know we're at the very beginning of a timer tick, so
we can call timer_sleep() without worrying about races
between checking the time and a timer interrupt. */
timer_sleep (wake_time - timer_ticks ());
/* Print a message on wake-up. */
msg ("Thread %s woke up.", thread_name ());
sema_up (&wait_sema);
}
首先,函数一开始就是进入一个忙等待,直到 timer_elapsed (start_time) != 0
程序再往下执行,这个语句保证了往下执行时刚好是中断的结束,即保证了此时测试函数已执行完 P 操作被休眠。
而同样地在 timer_sleep (wake_time - timer_ticks ());
中,wake_time
是一个常数,timer_ticks ()
返回的是系统启动后到当前的时间。所以 wake_time - timer_ticks ()
则是一个随时间减少的值,这样每个线程休眠后是同时醒来的。
在最后,进行了 V 操作,这个操作是原子操作。
void
sema_up (struct semaphore *sema)
{
enum intr_level old_level;
ASSERT (sema != NULL);
old_level = intr_disable ();
if (!list_empty (&sema->waiters))
thread_unblock (list_entry (list_pop_front (&sema->waiters),
struct thread, elem));
sema->value++;
intr_set_level (old_level);
}