共享內存是最高效的IPC機制,因為它不涉及進程之間的任何數據傳輸。這種高效率帶來的問題是,我們必須用i其他輔助手段來同步進程對共享內存的訪問,否則會產生競態條件。因此,共享內存通常和其他進程間通信方式一起使用。
linux共享內存的API都定義在sys/shm.h頭文件中,包括4個系統調用:shmget、shmat、shmdt和shmctl。
shmget系統調用
shmget系統調用創建一段新的共享內存,或者獲取一段已經存在的共享內存。定義如下:
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
和semget系統調用一樣,key參數是一個鍵值,用來標識一段全局唯一的共享內存。size參數指定共享內存的大小,單位是字節。如果是創建新的共享內存,則size值必須指定。如果是獲取已經存在的共享內存,則可以把size設置為0。
shmflg參數的使用和含義與semget系統調用的sem_flags參數相同。不過shmget支持兩個額外的標志:
- SHM_HUGETLB:類似于mmap的MAP_HUGETLB標志,系統將使用“大頁面”來為共享內存分配空間
- SHM_NORESERVE:類似于mmap的MAP_NORESERVE,不為共享內存保留交換分區(swap空間)。這樣,當物理內存不足的時候,對該共享內存執行寫操作將觸發SIGSEGV信號。
shmget成功時返回一個正整數值,它是共享內存的標識符。shmget失敗時返回-1,并設置errno。
如果shmget用于創建共享內存,則這段共享內存的所有自己都被初始化為0,與之關聯的內核數據結構shmid_ds將被創建并初始化。shmid_ds結構體的定義如下:
struct shmid_ds
{
struct ipc_prem shm_prem; /*共享內存的操作權限*/
size_t shm_segsz; /*共享內存大小,單位是字節*/
__time_t shm_atime; /*對這段內存最后一次調用shmat的時間*/
__time_t shm_dtime; /*對這段內存最后一次調用shmdt的時間*/
__time_t shm_ctime; /*對這段內存最后一次調用shmctl的時間*/
__pid_t shm_cpid; /*創建者PID*/
__pid_t shm_lpid; /*最后一次執行shmat或shmdt操作的進程PID*/
shmatt_t shm_nattach; /*目前關聯到此共享內存的進程數量*/
/*省略一下填充字段*/
}
shmget對shmid_ds結構體的初始化包括:
- 將shm_perm.cuid和shm_perm.uid設置為調用進程的有效用戶ID
- 將shm_perm.cgid和shm_perm.gid設置為調用進程的有效組ID
- 將shm_perm.mode的最低9位設置為shmflg參數的最低9位
- 將shm_segsz設置為size
- 將shm_lpid、shm_nattach、shm_atime、shm_dtime設置為0
- 將shm_ctime設置為當前的時間
shmat和shmdt調用
共享內存被創建/獲取之后,我們不能立即訪問它,而是需要先將它關聯到進程的地址空間中。使用完共享內存之后,我們也需要將它從進程地址空間中分離。這兩項任務分別由兩個系統調用實現:
#include <sys/shm.h>
void* shmat(int shm_id, const void *shm_addr, int shmflg);
int shmdt(const void* shm_addr);
其中,shm_id參數是由shmget調用返回的共享內存標識符。
shm_addr參數指定將共享內存關聯到進程的哪塊地址空間,最終的效果還受到shmflg參數的可選標志SHM_RND影響:
- 如果shm_addr為NULL,則被關聯的地址由操作系統選擇。這是推薦的做法,以確保代碼的可移植性。
- 如果shm_addr非空,并且SHM_RND標志未被設置,則共享內存被關聯到addr指定的地址處。
- 如果shm_addr非空,并且設置了SHM_RND標志,則被關聯的地址是[shm_addr - (shm_addr % SHMLBA)]。SHMLBA的含義是“段低端邊界地址倍數”(Segment Low Boundary Address Multiple),它必須是內存頁面大小PAGE_SIZE的整數倍。現在的Linux內核中,它等于一個內存頁大小。SHM_RND的含義是圓整(round),即將共享內存被關聯的地址向下圓整到離shm_addr最近的SHMLBA的整數被地址處。
除了SHM_RND標志外,shmflg參數還支持如下標志:
- SHM_RDONLY:進程僅能讀取共享內存中的內容。若沒有指定該標志,則進程可同時對共享內存進行讀寫操作(當然,這需要在創建共享內存的時候指定其讀寫權限)
- SHM_REMAP:如果地址shmaddr已經被關聯到一段共享內存上,則重新關聯
- SHM_EXEC:它指定對共享內存段的執行權限。對共享內存而言,執行權限實際上和讀權限是一樣的。
shmat成功時返回共享內存被關聯到的地址,失敗則返回(void*)-1并設置errno。shmat成功時,將修改內核數據結構shmid_ds的部分字段:
- 將shm_nattach加1
- 將shm_lpid設置為調用進程PID
- 將shm_atime設置為當前的時間
shmdt函數將關聯到的shm_addr處的共享內存從進程中分離。它成功時返回0,失敗則返回-1并設置errno。shmdt在成功調用時將修改內核數據結構shmid_ds的部分字段:
- 將shm_nattach減1
- 將shm_lpid設置為調用進程的PID
- 將shm_dtime設置為當前的時間
shmctl系統調用
shmctl系統調用控制共享內存的某些屬性。定義如下:
#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);
其中,shm_id參數是由shmget調用返回的共享內存標識符。command參數指定要執行的命令。shm_ctl支持的所有命令如下表:

shmctl成功時的返回值取決于command參數,如上表,失敗是返回-1,并設置errno。
共享內存的POSIX方法
mmap函數利用它的MAP_ANONYMOUS標志可以實現父、子進程之間的匿名內存共享。通過打開同一個文件按,mmap可以實現無關進程之間的內存共享。Linux提供了另外一種利用mmap在無關進程之間共享內存的方式。這種方式無需任何文件的支持,但它需要先使用如下函數創建或打開一個POSIX可移植操作系統接口(Portable Operating System Interface of UNIX,縮寫為POSIX)共享內存對象:
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_open(const char* name, int oflag, mode_t mode);
shm_open的使用方法與open系統調用完全相同。
name參數指定要創建/打開的內存。從可移植性的角度考慮,該參數應該使用“/somename”的格式:
以“/”開始,后接多個字符,且這些字符都不是“/”;
以“"結尾,長度不超過NAME_MAX(通常是255)
oflag參數指定創建方式。它可以是下列標志中的一個或者多個按位或:
- O_RDONLY:以只讀方式打開共享內存對象
- O_RDWR:以可讀、可寫方式打開內存共享對象
- O_CREAT:如果共享內存對象不存在,則創建之。此時mode參數的最低9位將指定該共享內存對象的訪問權限。共享內存對象被創建的時候,其初始長度為0。
- O_EXCL:和O_CREAT一起使用,如果由name指定的共享內存已經存在,則shm_open調用返回錯誤,否則就創建一個新的共享內存對象
- O_TRUNC:如果共享內存已經存在,則把它截斷,使其長度為0
shm_open調用成功時返回一個文件描述符。該文件描述符可用于后續的mmap調用,從而將共享內存關聯到調用進程。shm_open失敗時返回-1,并設置errno。
和打開的文件最后需要關閉一樣,由shm_open創建的共享內存對象使用完之后也需要被刪除。這個通過如下函數實現:
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_unlink(const char* name);
該函數將name參數指定的共享內存對象標記為等待刪除。當所有使用該共享內存對象的進程都使用ummap將它從進程中分離之后,系統將銷毀這個共享內存對象所占據的資源。
如果代碼中使用了上述POSIX共享內存函數,則編譯的時候需要指定鏈接選項-ltr。
共享內存實例
在I/O復用高級應用中我們介紹過過一個聊天室程序。下面我們將它修改為一個多進程服務器:一個子進程處理一個客戶連接。同時,我們將所有客戶socket連接的讀緩沖設計為一塊共享內存:
#include <sys/socket.h>
#include <.NETinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
/*處理一個客戶連接必要的數據*/
struct client_data
{
sockaddr_in address ; /*客戶端socket地址*/
int connfd; /*socket文件描述符*/
pid_t pid; /*處理這個連接的子進程PID*/
int pipefd[2]; /*和父進程通信用的管道*/
};
static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char * share_mem = 0;
/*客戶連接數組。進程使用客戶連接的編號來索引這個數據,及可取得相關的客戶連接數據*/
client_data* users = 0;
/*子進程與客戶連接的映射關系,用進程PID來索引這個數組。即可取得該進程所處理的客戶鏈接的編號*/
int* sub_porcess = 0;
/*當前客戶數量*/
int user_count = 0;
bool stop_child = false;
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig, void(*handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '', sizeof(sa));
sa.sa_handler = handler;
if(restart){
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void del_resource()
{
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
delete [] users;
delete [] sub_porcess;
}
/*停止一個子進程*/
void child_term_handler(int sig)
{
stop_child = true;
}
/*子進程運行的函數,
參數idx指出該子進程處理客戶鏈接的編號
users是保存連接數據的數組
參數share_mem指出共享內存的其實地址*/
int run_child(int idx, client_data* users, char* share_mem)
{
epoll_event events[MAX_EVENT_NUMBER];
/*子進程使用I/O復用技術來同時監聽兩個文件描述副:
客戶連接socket、
與父進程通信的管道文件描述符*/
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd;
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1];
addfd(child_epollfd, pipefd);
int ret = 0;
/*子進程需要設置自己的信號處理函數*/
addsig(SIGTERM, child_term_handler, false);
while (!stop_child){
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if(number < 0 && errno != EINTR){
printf("epoll failuren");
break;
}
for(int i = 0; i < number; ++i)
{
int sockfd = events[i].data.fd;
/*本子進程負責的客戶連接有數據到達*/
if(sockfd == connfd && events[i].events & EPOLLIN){
memset(share_mem + idx* BUFFER_SIZE, '', BUFFER_SIZE);
/*將客戶數據讀取到對應的讀緩存中。
讀緩存是共享內存的一段,它開始去idx*BUffER_SIZE處,長度為BUFFER_SIZE
每個客戶連接的讀緩存是共享的*/
ret = recv(connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0);
if(ret < 0){
if(errno != EAGAIN){
stop_child = true;
}
}
else if(ret == 0){
stop_child = true;
}
else{
/*成功讀取客戶端數據后就通知主進程(通過管道)來處理*/
send(pipefd, (char*)&idx, sizeof(idx), 0);
}
}
/*主進程通知本進程(通過管道)將第client個客戶的數據發送到本進程負責的客戶端*/
else if(sockfd == pipefd && events[i].events & EPOLLIN){
int client = 0;
/*接受主進程發送過來的數據,即有客戶端數據到達的連接編號*/
ret = recv(sockfd, (char*)&client, sizeof(client), 0);
if(ret < 0){
if(errno != EAGAIN){
stop_child = true;
}
}
else if(ret = 0){
stop_child = true;
}
else{
send(connfd, share_mem + client*BUFFER_SIZE, BUFFER_SIZE, 0);
}
}
else{
continue;
}
}
}
close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}
int main(int argc, char const *argv[])
{
if(argc <= 2){
printf("usage: %s ip_address port_numbern", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
user_count = 0;
users = new client_data[USER_LIMIT+1];
sub_porcess = new int[PROCESS_LIMIT];
for(int i = 0; i < PROCESS_LIMIT; ++i){
sub_porcess[i] = -1;
}
epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(epollfd, sig_pipefd[0]);
addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminiate = false;
/*創建共享內存,作為所有客戶socket鏈接的讀緩存*/
shmfd = shm_open(shm_name, O_CREAT| O_RDWR, 0666);
assert(shmfd != -1);
ret = ftruncate(shmfd, USER_LIMIT* BUFFER_SIZE);
assert(ret != -1);
share_mem = (char*)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_READ|
PROT_WRITE, MAP_SHARED, shmfd, 0);
assert(share_mem != MAP_FAILED);
close(shmfd);
while(!stop_server){
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(number < 0 && errno != EINTR){
printf("epoll failuren");
break;
}
for(int i=0; i < number; ++i){
int sockfd = events[i].data.fd;
/*新的客戶連接*/
if(sockfd == listenfd){
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address,
&client_addrlength);
if(connfd < 0){
printf("errno is %dn", errno);
continue;
}
if(user_count >= USER_LIMIT){
const char *info = "too many usersn";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
/*保存第user_count個客戶連接的相關數據*/
users[user_count].address = client_address;
users[user_count].connfd = connfd;
/*在主進程和子進程間建立管道,以傳遞必要的數據*/
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if(pid < 0){
close(connfd);
continue;
}
else if (pid == 0){
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE);
exit(0);
}
else{
close(connfd);
close(users[user_count].pipefd[1]);
addfd(epollfd, users[user_count].pipefd[0]);
users[user_count].pid = pid;
/*記錄新的客戶連接在數組users中的索引值,
建立進程pid和該索引值之間的映射關系*/
sub_porcess[pid] = user_count;
user_count++;
}
}
/*處理信號事件*/
else if(sockfd == sig_pipefd[0] && events[i].events & EPOLLIN){
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if(ret == -1){
continue;
}
else if(ret == 0){
continue;
}
else{
for(int i = 0; i < ret; ++i){
switch (signals[i])
{
case SIGCHLD:{
/*子進程退出,表示有某個客戶端關閉了連接*/
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
int del_user = sub_porcess[pid];
sub_porcess[pid] = -1;
if(del_user < 0 || del_user > USER_LIMIT){
continue;
}
epoll_ctl(epollfd, EPOLL_CTL_DEL,
users[del_user].pipefd[0],0);
close(users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_porcess[users[del_user].pid] = del_user;
}
if(terminiate && user_count == 0){
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT:{
/*結束服務程序*/
printf("kill all the child nown");
if(user_count == 0){
stop_server = true;
break;
}
for(int i = 0; i < user_count; ++i){
int pid = users[i].pid;
kill(pid, SIGTERM);
}
terminiate = true;
break;
}
default:
break;
}
}
}
}
/*某個紫禁城向父進程寫入了數據*/
else if(events[i].events &EPOLLIN){
int child = 0;
ret = recv(sockfd, (char*)&child, sizeof(child),0);
printf("read data from child process accross pipen");
if(ret == -1){
continue;
}
else if(ret == 0){
continue;
}
else{
/*向除負責處理第child和客戶鏈接的子進程之外的其他子進程發送消息,
通知有客戶要寫*/
for(int j = 0; j < user_count; ++j){
if(users[j].pipefd[0] != sockfd){
printf("send data to child accross pipen");
send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}