Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Memcached 源碼分析--網絡模型流程分析

Memcached 源碼分析--網絡模型流程分析

編輯:關於Android編程

一、功能介紹

Memcached 是一個高性能的分布式內存對象緩存系統,用於動態Web應用以減輕數據庫負載。
它通過在內存中緩存數據和對象來減少讀取數據庫的次數,從而提高動態、數據庫驅動網站的速度。
Memcached基於一個存儲鍵/值對的hashmap。其守護進程(daemon)是用C寫的,但是客戶端可以用任何語言來編寫,
並通過memcached協議與守護進程通信。

memcached缺乏認證以及安全管制,這代表應該將memcached服務器放置在防火牆後。

memcached的API使用32位元的循環冗余校驗(CRC-32)計算鍵值後,將資料分散在不同的機器上。
當表格滿了以後,接下來新增的資料會以LRU機制替換掉。

memcached是一款非常普及的服務器端緩存軟件,memcached主要是基於Libevent庫進行開發的

memcached服務器端並沒有分布式功能、各個memcached不會互相通信以共享信息、完全取決於客戶端的實現達到分布式。

研究版本:memcached-1.4.31

二、網絡線程模型

主要涉及兩個主要文件:memcached.c 和thread.c文件

主線程主要用於監聽accpet客戶端的Socket連接,而工作線程主要用於接管具體的客戶端連接。
主線程和工作線程之間主要通過基於Libevent的pipe的讀寫事件來監聽,當有連接連接上來的時候,主線程會將連接交個某一個工作線程去接管,後期客戶端和服務端的讀寫工作都會在這個工作線程中進行。
工作線程也是基於Libevent的事件的,當有讀或者寫的事件進來的時候,就會觸發事件的回調函數。

分析後的整體流程圖解:

\

 

主線程負責客戶端的套接字監聽、當有新的連接到來時則均衡選擇工作線程、工作線程負責讀寫並處理相應的命令。

   

main函數@memcached.c 文件了解整個構建邏輯

int main (int argc, char **argv) {
	/* handle SIGINT and SIGTERM */
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);

    /* init settings */
    settings_init();

    /* Run regardless of initializing it later */
    init_lru_crawler();
    init_lru_maintainer();

	hash_init(hash_type);
	
	/* initialize main thread libevent instance */
    main_base = event_init();

	/* start up worker threads if MT mode */
	// 這個方法主要用來創建工作線程,默認會創建8個工作線程
    memcached_thread_init(settings.num_threads);

	// 根據啟動配置可以創建TCP/UDP協議套接字
	server_sockets(settings.port, tcp_transport, portnumber_file);
	server_sockets(settings.udpport, udp_transport, portnumber_file);

    /* enter the event loop */
    // 進入主線程的事件循環
	event_base_loop(main_base, 0);	
}

 

先看主線程的監聽工作:


server_sockets(settings.port, tcp_transport, portnumber_file); -> server_socket(settings.inter, port, transport, portnumber_file); /** * Create a socket and bind it to a specific port number */ static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file) { // socket的bind、listen、setopt 等操作創建socket server socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); fcntl(sfd, F_SETFL, flags | O_NONBLOCK); setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); bind(sfd, next->ai_addr, next->ai_addrlen); // state的類型為:conn_listening 監聽類型套接字、那麼在哪裡進行accept. listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,transport, main_base); } conn *conn_new(const int sfd, enum conn_states init_state,...){ // 這裡首先會將監聽套接字加入到libevent體系中,並設定event_handler事件回調函數 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = event_flags; event_add(&c->event, 0); } void event_handler(const int fd, const short which, void *arg) { drive_machine(c); // 異步fd編程最重要的狀態機 } // static void drive_machine(conn *c) { while (!stop) { case conn_listening: // 調用accept接受客戶端的socket連接代碼 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); // 客戶端用socket連接上來,則會調用這個分發邏輯的函數、分發到工作線程接管具體的讀寫操作 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); break; case conn_read: IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); break; case conn_parse_cmd : try_read_command(c); break; case conn_write: add_iov(c, c->wcurr, c->wbytes); break; ... } }
dispatch_conn_new函數@thread.c
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags...){
	// 申請一塊CQ_ITEM的內存塊,用於存儲連接的基本信息
	CQ_ITEM *item = cqi_new();
	
	// 通過求余數的方法來得到當前的連接需要哪個工作線程接管、且記錄每次最後一次使用的工作線程
	// 通過最後記錄就可以讓工作線程進入一個輪詢,保證了每個工作線程處理的連接數的平衡-相同連接數
	int tid = (last_thread + 1) % settings.num_threads;
    LIBEVENT_THREAD *thread = threads + tid;
    last_thread = tid;

	item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
	// 工作線程的隊列中放入CQ_ITEM
    cq_push(thread->new_conn_queue, item);
	
	// 向工作線程的pipe中寫入數據,只為喚醒工作線程
	buf[0] = 'c';
	write(thread->notify_send_fd, buf, 1);
}

總結一下主線程工作:
1、memcached首先在主線程中會創建main_base,memcached的主線程的主要工作就是監聽和接收listen和accpet新進入的連接。
2、當用戶有連接進來的時候,main thread主線程會通過libevent驅動在狀態機中accept新的連接。
3、main thread主線程會通過求余的方式選擇一個worker thread工作線程。

主線程處程流程圖:

 

\

 

再看工作線程的工作:

在main中調用 memcached_thread_init 創建工作線程的函數:


/* * Initializes the thread subsystem, creating various worker threads. * * nthreads Number of worker event handler threads to spawn */ void memcached_thread_init(int nthreads) { for (i = 0; i < nthreads; i++) { int fds[2]; // 創建pipe,主要用於主線程和工作線程之間的通信 if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } // 每個線程的LIBEVENT_THREAD基本數據結構 threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; // 主要是創建每個線程自己的libevent的event_base setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } } /* * Set up a thread's information. */ static void setup_thread(LIBEVENT_THREAD *me) { /* Listen for notifications from other threads */ event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); event_add(&me->notify_event, 0) ; // 初始化一個工作隊列 me->new_conn_queue = malloc(sizeof(struct conn_queue)); cq_init(me->new_conn_queue); }
在些注冊回調函數 thread_libevent_process 用於數據讀事件監控。[email protected] 就是調用 pthread_create 創建線程、執行worker_libevent函數實體。
/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
	register_thread_initialized();

	// 每個線程中都會有自己獨立的event_base和事件的循環機制、獨立處理自己接管的連接。
    event_base_loop(me->base, 0);
}
創建工作線程的工作結束後、那麼工作線程如何配合主線程工作的。主線程接收到accept新的連接後,
調用 dispatch_conn_new 分配一個工作線程並往隊列中寫入數據,利用管道喚醒工作線程函數 thread_libevent_process。

/* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */ static void thread_libevent_process(int fd, short which, void *arg) { // 主線程中如果有新的連接,會向其中一個線程的pipe中寫入1 read(fd, buf, 1); //從工作線程的隊列中獲取一個CQ_ITEM連接信息、如果item不為空,則需要進行連接後的接管。 item = cq_pop(me->new_conn_queue); if(item != NULL){ //conn_new這個方法非常重要,主要是創建socket的讀寫等監聽事件。 //init_state 為初始化的類型,主要在drive_machine中通過這個狀態類判斷處理類型 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); ... } }
又通過conn_new函數介入到event_handler->drive_machine狀態機中,如此就形成由數據事件驅動的完整運行態。

總結一下工作線程工作:
1、memcached啟動的時候會初始化N個worker thread工作線程、默認是8個可配置。
2、worker thread工作線程和main thread主線程之間主要通過pipe來進行通信。
3、main thread會將當前用戶的連接信息放入一個CQ_ITEM,並且將CQ_ITEM放入這個線程的conn_queue處理隊列,然後主線程會通過寫入pipe的方式來通知worker thread工作線程。
4、工作線程得到主線程通知,從自已的conn_queue隊列中取得一條連接信息進行處理,創建libevent的socket讀寫事件
5、工作線程會監聽用戶的socket,如果用戶有消息傳遞過來,則會進行消息解析和處理,返回相應的結果。

工作線程數據處理流程:

 

\

 

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved