实验目的
修改 Pintos 源码,实现优先级捐赠机制,即涉及到锁的情况下,线程调度仍然遵循优先级最高的线程如果就绪则应该让其先运行。
实验过程
实验分析
优先级捐赠机制的基本思想
我们可以考虑下面这种情况
- 线程 A 持有锁,优先级为 1。
- 线程 B 优先级为 2。
- 线程 C 想获取锁,优先级为 3。
按我们当前的代码逻辑,整个系统会是这样运行的:
- 线程 C 先运行,需要获取锁,然后被休眠。
- 线程 B 运行,一段时间后,运行完毕。
- 线程 A 运行,运行一段时间,释放锁,C 被唤醒。
- 线程 C 运行,一段时间后,运行完毕。
- 线程 A 继续运行,一段时间后,运行完毕。
可以看到,在有加锁的情况下,线程调度并不符合优先级高的先运行,例如在上面的例子,优先级为 2 的线程 B 就优先于优先级为 3 的线程 C 运行。而根据 Pintos 的设计,C 应该优于 B 运行,但锁是不根据优先级进行抢占,所以我们需要修改 Pintos 源码使之按如下循序运行:
- 线程 C 先运行,需要获取锁,然后被休眠。
- 线程 A 运行,运行一段时间,释放锁,C 被唤醒。
- 线程 C 运行,一段时间后,运行完毕。
- 线程 B 运行,一段时间后,运行完毕。
- 线程 A 继续运行,一段时间后,运行完毕。
这就是优先级捐赠机制的最基本思想,线程 C 的优先级捐赠给了 A,让 A 先运行,释放锁后再恢复优先级。
而实际上,实际情况比上述情况复杂很多,我们可以在下面的测试分析看到。
测试解释
可以看到测试均有断言 ASSERT (!thread_mlfqs);
,表明以下测试不适合 MLFQS(多级反馈调度队列),因为该调度算法不单取决于优先级。
priority-donate-one
lock_init (&lock);
lock_acquire (&lock);
thread_create ("acquire1", PRI_DEFAULT + 1, acquire1_thread_func, &lock);
msg ("This thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 1, thread_get_priority ());
thread_create ("acquire2", PRI_DEFAULT + 2, acquire2_thread_func, &lock);
msg ("This thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 2, thread_get_priority ());
lock_release (&lock);
msg ("acquire2, acquire1 must already have finished, in that order.");
msg ("This should be the last line before finishing this test.");
这个测试是最基础的优先级捐赠。主线程拥有锁,创建了两个优先级更高的线程去获取锁,然后通过检查主线程的优先级,来判断优先级捐赠是否成功。
测试流程和上面 优先级捐赠机制的基本思想 提到基本一样,这里不再重复了。
priority-donate-multiple
lock_acquire (&a);
lock_acquire (&b);
thread_create ("a", PRI_DEFAULT + 1, a_thread_func, &a);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 1, thread_get_priority ());
thread_create ("b", PRI_DEFAULT + 2, b_thread_func, &b);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 2, thread_get_priority ());
lock_release (&b);
msg ("Thread b should have just finished.");
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 1, thread_get_priority ());
lock_release (&a);
msg ("Thread a should have just finished.");
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT, thread_get_priority ());
主线程持有锁 a、b,创建了两个优先级更高的线程去分别获取锁 a 和锁 b,然后分别释放锁 b 和锁 a,然后通过检查主线程的优先级,来判断优先级捐赠是否成功。这里主要是测试持有多个锁,进行多次优先级捐赠的时候,需要使用优先级最高的捐赠;释放锁的时候,恢复原来的优先级,需要考虑是否仍有捐赠的情况。
如果优先级捐赠正确,整个流程应该是这样的:
- 主线程以
PRI_DEFAULT
优先级运行,拥有锁 a、b,创建了优先级为PRI_DEFAULT + 1
的线程 a,a 进行抢占。 - a 线程需要获取锁 a,捐献
PRI_DEFAULT + 1
优先级给主线程,然后被休眠。 - 主线程创建优先级为
PRI_DEFAULT + 2
的 b 线程,b 进行抢占。 - b 线程需要获取锁 b,捐献
PRI_DEFAULT + 2
优先级给主线程,然后被休眠。 - 主线程释放锁 b,主线程优先级恢复到 a 线程捐献的优先级
PRI_DEFAULT + 1
。 - b 线程运行,退出。
- 主线程释放锁 a,恢复原有优先级。
- a 线程运行,退出。
- 主线程运行,退出。
priority-donate-multiple2
lock_acquire (&a);
lock_acquire (&b);
thread_create ("a", PRI_DEFAULT + 3, a_thread_func, &a);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 3, thread_get_priority ());
thread_create ("c", PRI_DEFAULT + 1, c_thread_func, NULL);
thread_create ("b", PRI_DEFAULT + 5, b_thread_func, &b);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 5, thread_get_priority ());
lock_release (&a);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 5, thread_get_priority ());
lock_release (&b);
msg ("Threads b, a, c should have just finished, in that order.");
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT, thread_get_priority ());
这个测试与上面的 priority-donate-multiple
十分相似,不同的是释放锁的先后顺序不同了,并加入了与锁无关但优先级的比主线程高的线程。
如果优先级捐赠正确,整个流程应该是这样的:
- 主线程以
PRI_DEFAULT
优先级运行,拥有锁 a、b,创建了优先级为PRI_DEFAULT + 3
的线程 a,a 进行抢占。 - a 线程需要获取锁 a,捐献
PRI_DEFAULT + 3
优先级给主线程,然后被休眠。 - 创建优先级为
PRI_DEFAULT + 1
的 c 线程,不运行。 - 创建优先级为
PRI_DEFAULT + 5
的 b 线程,b 进行抢占。 - b 线程需要获取锁 b,捐献
PRI_DEFAULT + 5
优先级给主线程,然后被休眠。 - 主线程释放锁 a,但由于 b 线程捐献的优先级更高,主线程优先级不改变。
- 主线程释放锁 b,恢复原有优先级。
- b 线程运行,退出。
- a 线程运行,退出。
- c 线程运行,退出。
- 主线程运行,退出。
priority-donate-nest
lock_acquire (&a);
locks.a = &a;
locks.b = &b;
thread_create ("medium", PRI_DEFAULT + 1, medium_thread_func, &locks);
thread_yield ();
msg ("Low thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 1, thread_get_priority ());
thread_create ("high", PRI_DEFAULT + 2, high_thread_func, &b);
thread_yield ();
msg ("Low thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 2, thread_get_priority ());
lock_release (&a);
thread_yield ();
msg ("Medium thread should just have finished.");
msg ("Low thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT, thread_get_priority ());
主线程优先级为 PRI_DEFAULT
,拥有锁 a。创建了优先级为 PRI_DEFAULT + 1
的线程 medium,线程 medium 拥有锁 b,需要获取锁 a。随后主线程创建优先级为 PRI_DEFAULT + 2
的线程 high,线程 high 需要获取锁 b。
通过这个测试需要注意优先级捐赠需要递归进行,即 high 线程捐赠给 medium 线程时,也需要给 low 线程捐赠。捐赠结束的时候,被捐赠过的线程都需要恢复优先级。
priority-donate-sema
- 主线程函数
thread_create ("low", PRI_DEFAULT + 1, l_thread_func, &ls);
thread_create ("med", PRI_DEFAULT + 3, m_thread_func, &ls);
thread_create ("high", PRI_DEFAULT + 5, h_thread_func, &ls);
sema_up (&ls.sema);
msg ("Main thread finished.");
static void
l_thread_func (void *ls_)
{
struct lock_and_sema *ls = ls_;
lock_acquire (&ls->lock);
msg ("Thread L acquired lock.");
sema_down (&ls->sema);
msg ("Thread L downed semaphore.");
lock_release (&ls->lock);
msg ("Thread L finished.");
}
static void
m_thread_func (void *ls_)
{
struct lock_and_sema *ls = ls_;
sema_down (&ls->sema);
msg ("Thread M finished.");
}
static void
h_thread_func (void *ls_)
{
struct lock_and_sema *ls = ls_;
lock_acquire (&ls->lock);
msg ("Thread H acquired lock.");
sema_up (&ls->sema);
lock_release (&ls->lock);
msg ("Thread H finished.");
}
主线程创建了三个线程:
- low 线程优先级为
PRI_DEFAULT + 1
,拥有锁,对信号量进行了 P 操作,被休眠。 - med 线程优先级为
PRI_DEFAULT + 3
,对信号量进行了 P 操作,被休眠。 - high 线程优先级为
PRI_DEFAULT + 5
,获取锁,对 low 线程捐献了优先级,被休眠。
此时主线程对信号量进行 V 操作,需要唤醒其他进行了 P 操作被休眠的线程。由于 low 线程被捐献了优先级,所以它需要优于 med 线程被唤醒,并考虑抢占的情况。这个是通过这个测试的关键。
priority-donate-lower
lock_acquire (&lock);
thread_create ("acquire", PRI_DEFAULT + 10, acquire_thread_func, &lock);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 10, thread_get_priority ());
msg ("Lowering base priority...");
thread_set_priority (PRI_DEFAULT - 10);
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT + 10, thread_get_priority ());
lock_release (&lock);
msg ("acquire must already have finished.");
msg ("Main thread should have priority %d. Actual priority: %d.",
PRI_DEFAULT - 10, thread_get_priority ());
这个测试主要是检测在优先级捐献的过程中进行了 thread_set_priority
修改优先级,线程调度是否正确。
下面分析一下这个测试具体流程
- 主线程拥有锁,创建了优先级为
PRI_DEFAULT + 10
的 acquire 线程,进行抢占。 - acquire 线程尝试获取锁,进行优先级捐献,被休眠。
- 主线程通过
thread_set_priority (PRI_DEFAULT - 10);
降低优先级,但由于被捐献的优先级更高,线程仍然继续运行。 - 主线程释放锁,恢复到
PRI_DEFAULT - 10
优先级。 - acquire 线程退出。
- 主线程退出。
想要通过这个测试,需要注意的是 thread_set_priority
修改的是线程自带的优先级,但我们判断线程优先级的时候始终要取被捐献的优先级和自带优先级之中最高的一个。
priority-donate-chain
thread_set_priority (PRI_MIN);
for (i = 0; i < NESTING_DEPTH - 1; i++)
lock_init (&locks[i]);
lock_acquire (&locks[0]);
msg ("%s got lock.", thread_name ());
for (i = 1; i < NESTING_DEPTH; i++)
{
char name[16];
int thread_priority;
snprintf (name, sizeof name, "thread %d", i);
thread_priority = PRI_MIN + i * 3;
lock_pairs[i].first = i < NESTING_DEPTH - 1 ? locks + i: NULL;
lock_pairs[i].second = locks + i - 1;
thread_create (name, thread_priority, donor_thread_func, lock_pairs + i);
msg ("%s should have priority %d. Actual priority: %d.",
thread_name (), thread_priority, thread_get_priority ());
snprintf (name, sizeof name, "interloper %d", i);
thread_create (name, thread_priority - 1, interloper_thread_func, NULL);
}
lock_release (&locks[0]);
msg ("%s finishing with priority %d.", thread_name (),
thread_get_priority ());
这个测试创建了 15 个线程,其中 8 个线程链式等待获取锁,进行链式的优先级捐赠;另外七个线程拥有不同的优先级,穿插其中。测试架构如下图:
通过这个测试,需要正确完成优先级递归捐赠,优先级还原和优先级抢占功能。
代码设计
乍看之下完成优先级捐赠需要考虑十分多情况,特别是多种情况叠加的时候,而实际上我没有去单独考虑每种情况怎么解决,这样设计起来很复杂,也容易遗漏情况。一种更优的设计思路是设计一个优先级更新逻辑,该逻辑可以根据当前状况更新线程的优先级。
这里先说明我新添加的数据结构:
在 struct lock
中新增
int priority; /* Priority of the lock. */
priority
为锁的优先级,该优先级在锁没有被获取时候为PRI_MIN
,被获取后取决于等待获取锁的所有线程优先级的最大值。
在 struct thread
中新增
int old_priority; /* Old priority. */
struct list locks_holding; /* Locks that the thread is holding. */
struct lock *lock_waiting; /* The lock that the thread is waiting for. */
old_priority
记录线程自己的优先级,不受捐献的优先级影响。locks_holding
记录拥有锁的列表。lock_waiting
记录需要等待获取的锁。
然后我在 thread.c
文件中编写了如下函数,来更新当前线程的优先级。
/* Update priority. */
void
thread_check_priority (struct thread *t)
{
int max_priority = PRI_MIN;
if (!list_empty (&t->locks_holding))
{
list_sort (&t->locks_holding, lock_cmp_by_priority, NULL);
if (list_entry (list_front (&t->locks_holding), struct lock, elem)->priority > max_priority)
max_priority = list_entry (list_front (&t->locks_holding), struct lock, elem)->priority;
}
if (max_priority > t->old_priority)
t->priority = max_priority;
else
t->priority = t->old_priority;
list_sort (&ready_list, thread_cmp_by_priority, NULL);
}
伪代码
int lock_max_priority
if (!locks_holding_list.isEmpty())
sort_by_priority(locks_holding_list)
lock_max_priority = locks_holding_list.front.priority
thread.priority = max(lock_max_priority, thread.old_priority);
这个函数是这次实验的重点之一,它根据当前线程拥有所有的锁和线程自带的优先级来更新线程的优先级,即始终选择它们之中的最大值。所以我只要保证线程 locks_holding
更改时这个函数被调用,就可以保证线程的优先级是正确的。
修改 lock_acquire
函数
void
lock_acquire (struct lock *lock)
{
ASSERT (lock != NULL);
ASSERT (!intr_context ());
ASSERT (!lock_held_by_current_thread (lock));
if (lock->holder != NULL)
{
thread_current ()->lock_waiting = lock;
struct lock *iterator_lock = lock;
while (iterator_lock != NULL &&
thread_current ()->priority > iterator_lock->priority)
{
iterator_lock->priority = thread_current ()->priority;
thread_check_priority (iterator_lock->holder);
iterator_lock = iterator_lock->holder->lock_waiting;
}
}
sema_down (&lock->semaphore);
thread_current ()->lock_waiting = NULL;
list_insert_ordered (&thread_current ()->locks_holding, &lock->elem, lock_cmp_by_priority, NULL);
lock->holder = thread_current ();
thread_check_priority (thread_current ());
}
这部分代码是处理嵌套的情况。
if (lock->holder != NULL)
{
thread_current ()->lock_waiting = lock;
struct lock *iterator_lock = lock;
while (iterator_lock != NULL &&
thread_current ()->priority > iterator_lock->priority)
{
iterator_lock->priority = thread_current ()->priority;
thread_check_priority (iterator_lock->holder);
iterator_lock = iterator_lock->holder->lock_waiting;
}
}
另外需要注意的是成功获取锁后要检查优先级,因为有可能锁的优先级高于线程原来的优先级。
伪代码
priority_donate = thread_current.priority
while (lock != NULL && priority_donate > lock.priority)
lock.priority = priority_donate
thread_check_priority (lock.holder)
lock = lock.holder.lock_waiting
在 lock_release
函数新增下列语句
list_remove (&lock->elem);
thread_check_priority (thread_current ());
lock->priority = PRI_MIN;
lock->holder = NULL;
修改 thread_set_priority
函数
void
thread_set_priority (int new_priority)
{
thread_current ()->old_priority = new_priority;
thread_check_priority (thread_current ());
thread_yield ();
}
直到这里,除了一些初始化函数没有列出外,单独关于锁的优先级抢占已经完成。
在这个设计里面,最重要的两个思想是:
- 维护锁的优先级正确
- 通过拥有锁的列表维护线程优先级正确
我们先看第一条,可以考虑以下问题 锁的优先级什么时候会发生改变?
答案是 只有新线程需要获取锁,锁的优先级才有可能发生改变。
所以只需要在 lock_acquire
函数维护好锁的优先级即可。
再考虑第二条,线程的优先级什么时候会发生改变?
在获取锁前,获取锁后,释放锁后,手动修改优先级四个情况下,线程的优先级均有可能发生改变,需要调用 thread_check_priority
更新优先级。
最后再考虑有信号量的情况,V 操作的时候,先对等待列表里面的线程按优先级排序,这样唤醒队首的线程就是优先级最高的。
这里不使用顺序插入的原因是顺序插入后,一旦有优先级改变,还需要重新调整位置,代码量比较大。所以这里在取出前进行排序是最快捷的方法。
void
sema_up (struct semaphore *sema)
{
enum intr_level old_level;
ASSERT (sema != NULL);
old_level = intr_disable ();
if (!list_empty (&sema->waiters))
{
list_sort (&sema->waiters, thread_cmp_by_priority, NULL);
thread_unblock (list_entry (list_pop_front (&sema->waiters), struct thread, elem));
}
sema->value++;
thread_yield ();
intr_set_level (old_level);
}
回答问题
如何解决嵌套捐赠的问题?
首先在锁的结构体增加 holder
指针,记录这个锁的拥有者。
接着我们可以用一个循环来完成整个链的优先级更新:
if (lock->holder != NULL)
{
thread_current ()->lock_waiting = lock;
struct lock *iterator_lock = lock;
while (iterator_lock != NULL &&
thread_current ()->priority > iterator_lock->priority)
{
iterator_lock->priority = thread_current ()->priority;
thread_check_priority (iterator_lock->holder);
iterator_lock = iterator_lock->holder->lock_waiting;
}
}
其中的 thread_check_priority
函数完成的是线程优先级的更新:
/* Update priority. */
void
thread_check_priority (struct thread *t)
{
int max_priority = PRI_MIN;
if (!list_empty (&t->locks_holding))
{
list_sort (&t->locks_holding, lock_cmp_by_priority, NULL);
if (list_entry (list_front (&t->locks_holding), struct lock, elem)->priority > max_priority)
max_priority = list_entry (list_front (&t->locks_holding), struct lock, elem)->priority;
}
if (max_priority > t->old_priority)
t->priority = max_priority;
else
t->priority = t->old_priority;
list_sort (&ready_list, thread_cmp_by_priority, NULL);
}
总的来说,一个线程接受捐献后需要检查这个线程等待的锁的优先级,先更新这个锁的优先级,再更新这个锁拥有者的优先级,按这样一直迭代下去。
如何解决一个线程占有了多个锁的问题?
在线程结构体中增加 struct list locks_holding
记录拥有锁的列表,通过有序插入和列表排序,始终保证按锁的优先级进行排序,这样 thread_check_priority
更新优先级的时候,只需要考虑队首即可。释放锁的时候,在列表中删除锁,然后通过同样的方法更新一次优先级即可。
实验感想
- 这次实验最大的收获,除了是亲自完成了复杂的优先级捐献,获得通过测试的成就感外,还学会了一直解决问题的方法。例如在本次的优先级捐献实验中,我们分析了很多种情况,每种情况我们都有不同的解决方法。但一旦情况多起来,一个一个问题单独解决可能会很麻烦,也很容易遗漏。这时候不妨跳出这些问题的局促,在另一个角度思考,也许会有一个更加通用的方法,在另一个维度上去解决这个问题。例如这次这么复杂的情况,其实不单独考虑这么多问题,抓住两点就好,维护好锁的优先级和线程优先级。
- 完成现在的实验,也要考虑到之前实验的情况,例如在信号量处理的时候,会容易忘记优先级抢占的情况。
- 考虑问题要全面。例如我在解决 维护好锁的优先级 这个问题上,就需要考虑另外一个问题,锁的优先级在什么情况下会被改变?这时候就需要完成地考虑整个系统的所有过程,否则容易遗漏情况。
- 七分思考,三分做。这句话实在是太对了,把过程弄清楚,解决方案想清楚,做好草稿,再开始写代码也不迟。缺乏思考的代码都是废代码。