小編給大家分享一下如何實現ceph SimpleMessenger模塊消息的接收,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
OSD服務端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函數
|- 358 void add_dispatcher_head(Dispatcher *d) {
|| 359 bool first = dispatchers.empty();
|| 360 dispatchers.push_front(d);
|| 361 if (d->ms_can_fast_dispatch_any())
|| 362 fast_dispatchers.push_front(d);
|| 363 if (first)
|| 364 ready(); //如果dispatcher list空,啟動SimpleMessenger::ready,不為空證明SimpleMessenger已經啟動了
|| 365 }在SimpleMessenger::ready()中,啟動DispatchQueue等待mqueue,如果綁定了端口就啟動 accepter接收線程
76 void SimpleMessenger::ready()
- 77 {
| 78 ldout(cct,10) << "ready " << get_myaddr() << dendl;
| 79 dispatch_queue.start(); //啟動DispatchQueue,等待mqueue
| 80
| 81 lock.Lock();
| 82 if (did_bind)
| 83 accepter.start();
| 84 lock.Unlock();
| 85 }Accepter是Thread的繼承類,Accepter::start()最終調用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe類中
void *Accepter::entry()
{
...
struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
int r = poll(&pfd, 1, -1);
if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;
// accept
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;
msgr->add_accept_pipe(sd); //注冊一個pipe,啟動讀線程,從該sd中讀取數據
} else {
ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
if (++errors > 4)
break;
}
}
...
return 0;在SimpleMessenger::add_accept_pipe(int sd)中,申請一個Pipe類并把sd加入到Pipe中,開始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd)
- 341 {
| 342 lock.Lock();
| 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
| 344 p->sd = sd;
| 345 p->pipe_lock.Lock();
| 346 p->start_reader();
| 347 p->pipe_lock.Unlock();
| 348 pipes.insert(p);
| 349 accepting_pipes.insert(p);
| 350 lock.Unlock();
| 351 return p;
| 352 }Pipe類內部有一個Reader和Writer線程類,Pipe::start_reader()啟動Pipe::Reader::entry(),最終啟動Pipe::reader函數
134 void Pipe::start_reader()
- 135 {
| 136 assert(pipe_lock.is_locked());
| 137 assert(!reader_running);
|- 138 if (reader_needs_join) {
|| 139 reader_thread.join();
|| 140 reader_needs_join = false;
|| 141 }
| 142 reader_running = true;
| 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
| 144 }|- 48 class Reader : public Thread {
|| 49 Pipe *pipe;
|| 50 public:
|| 51 explicit Reader(Pipe *p) : pipe(p) {}
|| 52 void *entry() { pipe->reader(); return 0; }
|| 53 } reader_thread;在Pipe::reader函數中根據tag接收不同類型的消息,如果是CEPH_MSGR_TAG_MSG類型消息調用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader()
{
pipe_lock.Lock();
if (state == STATE_ACCEPTING) {
accept(); //第一次進入此函數處理
assert(pipe_lock.is_locked());
}
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
assert(pipe_lock.is_locked());
......
......
else if (tag == CEPH_MSGR_TAG_MSG) {
ldout(msgr->cct,20) << "reader got MSG" << dendl;
Message *m = 0;
int r = read_message(&m, auth_handler.get());
pipe_lock.Lock();
if (!m) {
if (r < 0)
fault(true);
continue;
}
......
......
......
// note last received message.
in_seq = m->get_seq();
cond.Signal(); // wake up writer, to ack this
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
in_q->fast_preprocess(m); //mds 、mon不會進入此函數,預處理
if (delay_thread) {
utime_t release;
if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
release = m->get_recv_stamp();
release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
}
delay_thread->queue(release, m);
} else {
if (in_q->can_fast_dispatch(m)) {
reader_dispatching = true;
pipe_lock.Unlock();
in_q->fast_dispatch(m);
pipe_lock.Lock();
reader_dispatching = false;
if (state == STATE_CLOSED ||
notify_on_dispatch_done) { // there might be somebody waiting
notify_on_dispatch_done = false;
cond.Signal();
}
} else { //mds進入此else
in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中
}
}
}
......
......
}
// reap?
reader_running = false;
reader_needs_join = true;
unlock_maybe_reap();
ldout(msgr->cct,10) << "reader done" << dendl;
}在Pipe::DispatchQueue::enqueue函數中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
id, priority, QueueItem(m));
} else {
mqueue.enqueue(
id, priority, m->get_cost(), QueueItem(m));
}
cond.Signal(); //喚醒dispatch_queue.start() 啟動的dispatchThread,進入entry進行處理
}看完了這篇文章,相信你對“如何實現ceph SimpleMessenger模塊消息的接收”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。