Ceph OSD op_shardedwq分析

概述

Ceph OSD处理OP,snap trim,scrub的是相同的work queue - osd::op_shardedwq

研究该shardedwq,有利于我们对snap trim和scrub的配置参数调整;

相关数据结构

这里主要涉及到两个数据结构:

  1. class PGQueueable
  2. class ShardedOpWQ

class PGQueueable

这个是封装PG一些请求的class,相关的操作有:

  1. OpRequestRef
  2. PGSnapTrim
  3. PGScrub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class PGQueueable {
typedef boost::variant<
OpRequestRef,
PGSnapTrim,
PGScrub
> QVariant; // 定义队列处理的三种请求

QVariant qvariant;
int cost;
unsigned priority;
utime_t start_time;
entity_inst_t owner;

struct RunVis : public boost::static_visitor<> {
OSD *osd;
PGRef &pg;
ThreadPool::TPHandle &handle;
RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
: osd(osd), pg(pg), handle(handle) {}
void operator()(OpRequestRef &op);
void operator()(PGSnapTrim &op);
void operator()(PGScrub &op);
};

public:
// cppcheck-suppress noExplicitConstructor
PGQueueable(OpRequestRef op) // 处理OpRequest
: qvariant(op), cost(op->get_req()->get_cost()),
priority(op->get_req()->get_priority()),
start_time(op->get_req()->get_recv_stamp()),
owner(op->get_req()->get_source_inst())
{}
PGQueueable( // 处理PGSnapTrim
const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
PGQueueable( // 处理PGScrub
const PGScrub &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
...
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
boost::apply_visitor(v, qvariant);
}
...
};

class ShardedOpWQ

这个是OSD中shard相关线程的work queue类,用来处理PGQueueable封装的三类PG操作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class OSD : public Dispatcher, public md_config_obs_t
{
...
friend class PGQueueable;
class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {

struct ShardData {
Mutex sdata_lock;
Cond sdata_cond;
Mutex sdata_op_ordering_lock;
map<PG*, list<PGQueueable> > pg_for_processing;
std::unique_ptr<OpQueue< pair<PGRef, PGQueueable>, entity_inst_t>> pqueue;
ShardData(
string lock_name, string ordering_lock,
uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
io_queue opqueue)
: sdata_lock(lock_name.c_str(), false, true, false, cct),
sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) {
if (opqueue == weightedpriority) {
pqueue = std::unique_ptr
<WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
new WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
max_tok_per_prio, min_cost));
} else if (opqueue == prioritized) {
pqueue = std::unique_ptr
<PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
new PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
max_tok_per_prio, min_cost));
}
}
};

vector<ShardData*> shard_list;
OSD *osd;
uint32_t num_shards; // 值为cct->_conf->osd_op_num_shards
...
void _process(uint32_t thread_index, heartbeat_handle_d *hb);
void _enqueue(pair <PGRef, PGQueueable> item);
void _enqueue_front(pair <PGRef, PGQueueable> item);
...
} op_shardedwq;
...
}

op_shardedwq对应的thread pool为:osd_op_tp

osd_op_tp的初始化在OSD的初始化类中:

1
2
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),

这里相关的配置参数有:

  1. osd_op_num_threads_per_shard,默认值为 2
  2. osd_op_num_shards,默认值为 5

PG会根据一定的映射模式映射到不同的shard上,然后由该shard对应的thread处理请求;

ShardedOpWQ的处理函数

该sharded的work queue的process函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) 
{
pair<PGRef, PGQueueable> item = sdata->pqueue->dequeue();

boost::optional<PGQueueable> op;
(item.first)->lock_suspend_timeout(tp_handle); // 获取pg lock


op->run(osd, item.first, tp_handle); // 根据不同类型操作调用不同函数
...
(item.first)->unlock(); // 释放pg lock
}

从上面可以看出在调用实际的处理函数前,就先获取了PG lock;处理返回后释放PG lock;

osd::opshardedwq的_process()函数会根据request的类型,调用不同的函数处理:

  1. OSD::dequeue_op()
  2. ReplicatedPG::snap_trimmer()
  3. PG::scrub()

在文件osd/OSD.cc中有这三类操作的不同处理函数定义:

1
2
3
4
5
6
7
8
9
10
11
void PGQueueable::RunVis::operator()(OpRequestRef &op) {
return osd->dequeue_op(pg, op, handle);
}

void PGQueueable::RunVis::operator()(PGSnapTrim &op) {
return pg->snap_trimmer(op.epoch_queued);
}

void PGQueueable::RunVis::operator()(PGScrub &op) {
return pg->scrub(op.epoch_queued, handle);
}

OSD操作的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
void OSD::dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle)
{
...
op->mark_reached_pg();
pg->do_request(op, handle); // PG op处理函数
...
}
```

### snap trim的处理函数

```c++
void ReplicatedPG::snap_trimmer(epoch_t queued)
{
if (g_conf->osd_snap_trim_sleep > 0) {
unlock(); // 释放pg lock
utime_t t;
t.set_from_double(g_conf->osd_snap_trim_sleep);
t.sleep(); // sleep osd_snap_trim_sleep 秒
lock(); // 获取pg lock
dout(20) << __func__ << " slept for " << t << dendl;
}
...
if (is_primary()) {
...
snap_trimmer_machine.process_event(SnapTrim());
...
} else if (is_active() &&
last_complete_ondisk.epoch > info.history.last_epoch_started) {
// replica collection trimming
snap_trimmer_machine.process_event(SnapTrim());
}
return;
}

PG scrub的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Scrub:
* PG_STATE_SCRUBBING is set when the scrub is queued
*
* scrub will be chunky if all OSDs in PG support chunky scrub
* scrub will fail if OSDs are too old.
*/
void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
{
if (g_conf->osd_scrub_sleep > 0 &&
(scrubber.state == PG::Scrubber::NEW_CHUNK ||
scrubber.state == PG::Scrubber::INACTIVE)) {
dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
unlock(); // 释放pg lock
utime_t t;
t.set_from_double(g_conf->osd_scrub_sleep);
handle.suspend_tp_timeout();
t.sleep(); // sleep osd_scrub_sleep 秒
handle.reset_tp_timeout();
lock(); // 获取pg lock
dout(20) << __func__ << " slept for " << t << dendl;
}
...

chunky_scrub(handle);
}

分析

Ceph PG lock的粒度

从函数OSD::ShardedOpWQ::_process()中看出,thread在区分具体的PG请求前就获取了PG lock,在return前释放PG lock;这个PG lock的粒度还是挺大的,若snap trim和scrub占用了PG lock太久,会影响到OSD PG正常的IO操作;

OSD PG相关的OP类型有(OSD::dequeue_op()函数处理):

  • CEPH_MSG_OSD_OP
  • MSG_OSD_SUBOP
  • MSG_OSD_SUBOPREPLY
  • MSG_OSD_PG_BACKFILL
  • MSG_OSD_REP_SCRUB
  • MSG_OSD_PG_UPDATE_LOG_MISSING
  • MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY

osd_snap_trim_sleeposd_scrub_sleep配置

从上面看g_conf->osd_snap_trim_sleepg_conf->osd_scrub_sleep配置为非0后,能让snap trim和scrub在每次执行前睡眠一段时间(不是random时间),这样能一定程度上降低这两个操作对PG IO ops的影响(获取PG lock);

如果设置了osd_snap_trim_sleeposd_scrub_sleep为非0,处理的线程会sleep,这样虽说释放了PG lock,但是占用了一个PG的一个处理线程,所以才有贴出来的ceph bug - http://tracker.ceph.com/issues/19497

现在我们配置的是:

  1. osd_op_num_shards = 30
  2. osd_op_num_threads_per_shard = 2 //默认值

所以一旦某个shard对应的一个thread被占用了,对应处理该shard的只有一个thread了,这样就有可能影响映射到该shard上的PG的正常IO了。

参考资料

http://blog.wjin.org/posts/ceph-scrub-mechanism.html

支持原创