GuangchaoSun's Blog

第12章 并发编程

学习目标

  • 理解并行的几个层次:进程、事件、线程
  • 了解同步问题产生的背景,以及信号量的工作机制
  • 理解线程安全和一致性问题

并行方法

现代操作系统提供了三种基本的构造并发程序的方法:

  • 进程
    • 每个逻辑流都是一个进程,由内核来调度和维护。
    • 因为进程有独立虚拟地址空间,想要和其他流通信,控制流必须使用某种显式的进程间通信(interprocess communication,IPC)机制。
  • I/O多路复用
    • 应用程序在一个进程的上下文显式地调度它们自己的逻辑流
    • 逻辑流被模型化为状态机,数据到达文件描述符后,主程序显式地从一个状态转化到另一个状态。
    • 因为程序是一个单独的进程,所以所有的流都共享一个地址空间。
  • 线程
    • 线程是运行在一个单一进程上下文中的逻辑流,由内核进行调度。
    • 像进程一样由内核进行调度,像I/O多路复用流一样共用一个虚拟地址空间

基于进程

为每个客户端建立一个单独的进程,是建立连接之后才开始并行,连接的建立还是串行的。
下面的是一个基于进程的并发echo服务器。父进程派生一个子进程来处理每个新的连接请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "csapp.h"
void echo(int connfd)
void sigchld_handler(int sig)
{
while(waitpid(-1, 0, WNOHANG) > 0)//判断条件成立时,跳过
;
return;
//Reap all zombie children(回收所有僵尸子线程)
}
int main(int argc, char **argv){
int listenfd, connfd, port;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
if(argc != 2){
fprintf(stderr, "usage : %s <port>\n",argv[0]);
exit(0);
}
port = atoi(argv[1]);//字符串转化为整型
Singnal(SIGCHLD,sigchld_handler);
listenfd = Open_listenfd(port);
while(1){
connfd = Accept(listenfd, (SA*) &clientaddr, &clientlen);
if (Fork() == 0)
{
Close(listenfd); /*Child close its listening socket*/
echo(connfd); /*Child services client*/
Close(connfd); /*Child closes connection with client*/
exit(0); /*Child exits*/
}
Close(connfd); /*Parents closes connected socket*/
}
}

整个执行模型中:

  • 每个客户端由独立子进程处理
    • 必须回收僵尸进程,来避免严重的内存泄漏
  • 不同进程之间不共享数据
  • 父进程和子进程都有listenfdconnfd,所以在子进程中需要关闭listened
    • 内核会保存每个socket的计数,在 fork 之后refcnt(connfd) = 2,所以在父进程中需要关闭connfd,这样在子进程结束后引用计数才会为零

关于进程的劣势:

独立的地址空间使得进程共享状态信息比较困难。为了共享信息必须通过显式的IPC(进程间通信)机制。因为进程控制和IPC的开销很高,所以会比较慢。

基于I/O多路复用

基本思路:使用select函数,要求内核挂起进程,只有在一个或多个I/O事件发生后,才将控制返回给应用程序。
优点:

  • 给予了更多对程序的控制
  • 在流之间共享数据变得容易

劣势:

  • 代码的逻辑复杂度比较高
  • 难以进行精确度比较高的并行
  • 无法发挥多核处理器的全部性能

基于线程

线程 (thread) 就是运行在进程上下文中的逻辑流。每个线程都有自己的上下文 (thread context)。

  • 上下文:
    • 整数线程ID (Thread ID, TID)
    • 栈、栈指针
    • 程序计数器
    • 通用目的寄存器
    • 条件码

进程和线程的差别已经被说了太多次,这里简单提一下。相同点在于,它们都有自己的逻辑控制流,可以并行,都需要进行上下文切换。不同点在于,线程共享代码和数据(进程通常不会),线程开销比较小(创建和回收)。

Posix Threads

Pthreads是在C程序中处理线程的一个标准接口。Pthreads定义了一套C语言的类型、函数和常量,它以pthread.h头文件和一个线程库实现。

Pthreads API中大致有100个函数调用,全部以pthread_开头,分为四类:

  1. 线程管理,例如创建线程,等待 (join) 线程,查询线程
  2. Mutex:创建、摧毁、锁定、解锁、设置属性等操作
  3. 条件变量(Condition Variable):创建、摧毁、等待、通知、设置与查询属性等操作
  4. 使用了读写锁的线程间的同步管理

使用线程的方式重写echo服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "csapp.h"
void echo(int connfd);
void *thread(void *vargp);
int main(int argc, char **argv)
{
int listenfd, *connfdp, port;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
pthread_t tid;
if(argc != 2){
fprintf(stderr, "usage : %s <port>\n",argv[0]);
exit(0);
}
port = atoi(argv[1]);
listenfd = Open_listenfd(port);
while(1){
connfdp = Malloc(sizeof(int));
*connfdp = Accept(listenfd, (SA *)&clientaddr, &clientlen);
Pthread_create(&tid, NULL, thread, connfdp);
}
}
void *thread(void *vargp){
int connfd = *((int *)vargp);
Pthread_detach(pthread_self());
// detach 之后不用显式 join,会在执行完毕后自动回收
Free(vargp);
echo(connfd);
Close(connfd);
return NULL;
}

同步

共享变量

什么是 Shared Variable (共享变量)?

A variable x is shared if and only if multiple threads reference some instance of x

线程存储器模型:

  • 每个线程和其他线程一起共享进程上下文的剩余部分
    • Code,data,heap and shared library segments of the process virtual address space
    • Open files and installed handlers
  • 每个线程都有自己独立的线程上下文(线程ID、栈、栈指针、程序计数器、条件码和通用寄存器值)
  • 多个线程在一个单独的上下文中运行

将变量映射到存储器:

  • 全局变量:定义在函数之外的变量
    • 虚拟存储器的读/写区域只包含每个全局变量的一个实例,任何线程都可以引用
  • 本地自动变量:定义在函数内部没有static属性的变量
    • 每个线程的栈都包含它自己的所有本地变量的实例
  • 本地静态变量:定义在函数内部并且有static属性的变量
    • 虚拟存储器的读/写区域只包含在程序中声明的每个本地静态变量的一个实例

用信号量同步线程(*)

信号量

为了解决经典的同步不同执行的问题,Edsger Dijkstra 提出了信号量(semaphore)的解决方法。计数信号量具备两种操作动作称为 V(signal()) 和P((wait()) 。V 操作会增加信号量 S 的数值,P操作会减少。运作方式:

  1. 初始化,给它一个非负的整数值。
  2. 运行 P,信号量 S 的值将被减少。企图进入临界区块的进程,需要先运行 P。当信号量 S 减为负值时,进程会被挡住,不能继续;当信号量 S 不为负时,进程可以获准进入临界区块。
  3. 运行 V,信号量 S 的值会被增加。结束离开临界区块的进程,将会运行V。当信号量 S 不为负值时,先前被挡住的其他进程,将可获准进入临界区块。

加上互斥信号量使得计数器程序可以正确运行:

1
2
3
4
5
6
7
8
9
10
11
12
//先定义信号量
volatile long cnt = 0;
sem_t mutex; //Semaphore that protects counter
Sem_init(&mutex, 0, 1);
//在线程中用P和V包围关键操作
for(i = 0; i < niters; i++)
{
P(&mutex);//加锁
cnt++;
V(&mutex);//解锁
}

在使用线程时,脑中需要有一个清晰的分享变量的概念,共享变量需要互斥访问,而Semaphores是一个基础的机制。

生产者-消费者问题

信号量的作用:

  • 提供互斥访问
  • 调度对共享资源的访问

具体的同步模型:

  • 生产者等待空的slot(槽位),把 item 存储到buffer,并通知消费者
  • 消费者等待 item,从 buffer 中移除 item,并通知生产者。

主要用于

  • 多媒体处理
    • 生产者编码视频帧
    • 消费者解码并在屏幕上呈现出来
    • 缓冲区是为了减少视频流的抖动,而这种抖动是由各个帧的编码和解码时与数据相关的差异引起的
  • 图形用户接口设计
    • 生产者检测到鼠标和键盘事件,插入到缓冲区中
    • 消费者以基于优先级的方式读取这些事件,并显示在屏幕上

读者-写者问题

是互斥问题的通用描述,具体为:

  • 读者线程只读取对象
  • 写者线程修改对象
  • 写者对于对象的访问是互斥的
  • 多个读者可以同时读取对象

常见的应用场景:

  • 在线订票系统
  • 多线程缓存web代理

线程安全

线程安全的定义:

A function is thread-safe iff it will always prodece correct results when called repeatedly from multiple concurrent threads

主要有4类线程不安全的函数

  • 不保护共享变量的函数
    • 解决方法:使用P和V semaphore 操作
    • 问题:同步操作会影响性能
  • 保持跨越多个调用的状态的函数
    • 解决方法:把状态当做传入函数
  • 返回指向静态变量的指针的函数
    • 解决办法1:重写函数,传地址用以保存
    • 解决方法2:lock-and-copy
  • 调用线程不安全函数的函数
    • 解决方法:只调用线程安全的函数

可重入性:

A function is reentrant iff it accesses no shared variables when called by multiple threads

死锁:
互斥锁加锁顺序规则:如果对于程序中每对互斥锁(s,t),给所有的锁分配一个全序,每个线程按照这个顺序来请求锁,并且按照逆序来释放,那么这个程序就是无死锁的。

参考资料

  1. 并行计算
  2. 竞争条件
  3. 死锁
  4. Posix Thread
  5. POSIX thread (pthread) libraries