Ceph PG Lock and RWState

概述

在RWG的压力测试中,经常看到op的如下信息:”event”: “waiting for rw locks”

并且在RWG的压力很大时,这个waiting event会出现很多次,导致op的latency很高,所以需要分析下为什么会有这个waiting for rw locks的event?它与PG Lock的关系?

PG Lock

从 Ceph OSD op_shardedwq 中分析知道:

  1. OSD的osd_op_tp在处理OPRequest的时候就会先获取PG Lock;
  2. OSD的osd_op_tp在调用OSD::dequeue_op()返回后会释放PG Lock;

OSD::dequeue_op的调用过程如下:

1
2
3
4
5
6
7
8
OSD::dequeue_op()
|-- ReplicatedPG::do_request()
|-- ReplicatedPG::do_op() //在op类型为CEPH_MSG_OSD_OP时
|-- ReplicatedPG::execute_ctx() // 之前会会先获取RW Lock,失败后把op重新加入work queue
|-- ReplicatedPG::issue_repop() // 写case,读的话在ReplicatedPG::prepare_transaction()里处理
|-- ReplicatedBackend::submit_transaction()
|-- ReplicatedBackend::issue_op() // 发送rep ops
FileStore::queue_transactions() // 有journal时把transaction放到journal的work queue里

从上面的调用可以看出,OSD的osd_op_tp里的线程处理过程在获取PG Lock后,只会把op组装为transaction后交给FileStore的work queue,然后就返回释放PG Lock了;
返回后写的数据可能还没有写到journal和disk,即还没有commit/apply成功;
所以针对同一个object的操作,虽说对它的操作有PG Lock,但也可能在PG Lock释放后,对object的实际操作RW还没有完成;

这就引入了object的读写锁,即struct ObjectContext里的struct RWState,通过它来互斥对同一object的读写,但允许对同一object的同时多读、同时多写(详细见下面RWState的分析);

对于同时多写,因为有PG Lock,所以多写并不会引起data consistency问题(PG Lock保证多写是顺序的),多个写提交到FileStore后可以并发或合并(待分析);

RWState

RWState的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
struct ObjectContext {
...
struct RWState {
enum State {
RWNONE, // 初始状态,count = 0时设置
RWREAD, // read lock
RWWRITE, // write lock
RWEXCL, // exclusive lock
};
...
list<OpRequestRef> waiters; ///< ops waiting on state change
int count; ///< number of readers or writers
State state:4; ///< rw state
/// if set, restart backfill when we can get a read lock
bool recovery_read_marker:1;
/// if set, requeue snaptrim on lock release
bool snaptrimmer_write_marker:1;
...
bool get_read(OpRequestRef op) {
if (get_read_lock()) {
return true;
} // else
waiters.push_back(op);
return false;
}
/// this function adjusts the counts if necessary
bool get_read_lock() { // 获取read lock
// don't starve anybody!
if (!waiters.empty()) {
return false;
}
switch (state) {
case RWNONE:
assert(count == 0);
state = RWREAD;
// fall through
case RWREAD: // 支持同时多读
count++;
return true;
case RWWRITE:
return false;
case RWEXCL:
return false;
default:
assert(0 == "unhandled case");
return false;
}
}

bool get_write(OpRequestRef op, bool greedy=false) {
if (get_write_lock(greedy)) {
return true;
} // else
if (op)
waiters.push_back(op);
return false;
}
bool get_write_lock(bool greedy=false) { // 获取write lock
if (!greedy) {
// don't starve anybody!
if (!waiters.empty() ||
recovery_read_marker) {
return false;
}
}
switch (state) {
case RWNONE:
assert(count == 0);
state = RWWRITE;
// fall through
case RWWRITE: // 支持同时多写
count++;
return true;
case RWREAD:
return false;
case RWEXCL:
return false;
default:
assert(0 == "unhandled case");
return false;
}
}
bool get_excl_lock() { // 获取exclusive lock
switch (state) {
case RWNONE:
assert(count == 0);
state = RWEXCL;
count = 1;
return true;
case RWWRITE:
return false;
case RWREAD:
return false;
case RWEXCL:
return false;
default:
assert(0 == "unhandled case");
return false;
}
}
bool get_excl(OpRequestRef op) {
if (get_excl_lock()) {
return true;
} // else
if (op)
waiters.push_back(op);
return false;
}
...
} rwstate;
...
};

从上面的定义中看出,RW lock支持同时多读、同时多写;
do_op中,会根据op类型,尝试获取rw locks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
*/
void ReplicatedPG::do_op(OpRequestRef& op)
{
...
} else if (!get_rw_locks(write_ordered, ctx)) { // 获取rw lock失败
dout(20) << __func__ << " waiting for rw locks " << dendl;
op->mark_delayed("waiting for rw locks");
close_op_ctx(ctx); // close op ctx,会把当前op重新加入queue中
return;
}
...
}

-- get_rw_locks [ReplicatedPG]
/**
* Grabs locks for OpContext, should be cleaned up in close_op_ctx
*
* @param ctx [in,out] ctx to get locks for
* @return true on success, false if we are queued
*/
bool get_rw_locks(bool write_ordered, OpContext *ctx) {
/* If snapset_obc, !obc->obs->exists and we will always take the
* snapdir lock *before* the head lock. Since all callers will do
* this (read or write) if we get the first we will be guaranteed
* to get the second.
*/
if (write_ordered && ctx->op->may_read()) { // 根据op的flag设置lock type
ctx->lock_type = ObjectContext::RWState::RWEXCL;
} else if (write_ordered) {
ctx->lock_type = ObjectContext::RWState::RWWRITE;
} else {
assert(ctx->op->may_read());
ctx->lock_type = ObjectContext::RWState::RWREAD;
}
if (ctx->snapset_obc) {
assert(!ctx->obc->obs.exists);
if (!ctx->lock_manager.get_lock_type(
ctx->lock_type,
ctx->snapset_obc->obs.oi.soid,
ctx->snapset_obc,
ctx->op)) {
ctx->lock_type = ObjectContext::RWState::RWNONE;
return false;
}
}
if (ctx->lock_manager.get_lock_type( // 尝试获取lock
ctx->lock_type,
ctx->obc->obs.oi.soid,
ctx->obc,
ctx->op)) {
return true;
} else {
assert(!ctx->snapset_obc);
ctx->lock_type = ObjectContext::RWState::RWNONE;
return false;
}
}

close_op_ctx的调用关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
osd/ReplicatedPG.h
-- close_op_ctx [ReplicatedPG]
/**
* Cleans up OpContext
*
* @param ctx [in] ctx to clean up
*/
void close_op_ctx(OpContext *ctx) {
release_object_locks(ctx->lock_manager); // 调用
ctx->op_t.reset();
for (auto p = ctx->on_finish.begin();
p != ctx->on_finish.end();
ctx->on_finish.erase(p++)) {
(*p)();
}
delete ctx;
}

-- release_object_locks [ReplicatedPG]
/**
* Releases locks
*
* @param manager [in] manager with locks to release
*/
void release_object_locks(
ObcLockManager &lock_manager) {
list<pair<hobject_t, list<OpRequestRef> > > to_req;
bool requeue_recovery = false;
bool requeue_snaptrim = false;
lock_manager.put_locks( // 调用获取to_req
&to_req,
&requeue_recovery,
&requeue_snaptrim);
if (requeue_recovery)
osd->recovery_wq.queue(this);
if (requeue_snaptrim)
queue_snap_trim();
if (!to_req.empty()) { // to_req不为空
// requeue at front of scrub blocking queue if we are blocked by scrub
for (auto &&p: to_req) {
if (scrubber.write_blocked_by_scrub(
p.first.get_head(),
get_sort_bitwise())) {
waiting_for_active.splice(
waiting_for_active.begin(),
p.second,
p.second.begin(),
p.second.end());
} else {
requeue_ops(p.second); // 把request重新加入队列
}
}
}
}


osd/osd_types.h
-- put_locks [ObcLockManager]
void put_locks(
list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
bool *requeue_recovery,
bool *requeue_snaptrimmer) {
for (auto p: locks) {
list<OpRequestRef> _to_requeue;
p.second.obc->put_lock_type(
p.second.type,
&_to_requeue,
requeue_recovery,
requeue_snaptrimmer);
if (to_requeue) {
to_requeue->push_back(
make_pair(
p.second.obc->obs.oi.soid,
std::move(_to_requeue)));
}
}
locks.clear();
}

-- put_lock_type [ObjectContext]
void put_lock_type(
ObjectContext::RWState::State type,
list<OpRequestRef> *to_wake,
bool *requeue_recovery,
bool *requeue_snaptrimmer) {
switch (type) {
case ObjectContext::RWState::RWWRITE:
return put_write(to_wake, requeue_recovery, requeue_snaptrimmer);
case ObjectContext::RWState::RWREAD:
return put_read(to_wake);
case ObjectContext::RWState::RWEXCL:
return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer);
default:
assert(0 == "invalid lock type");
}
}

系统调用close_op_ctx()函数的地方有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1    166  osd/ReplicatedPG.cc <<finish>>
ctx->pg->close_op_ctx(ctx);
2 2099 osd/ReplicatedPG.cc <<do_op>>
close_op_ctx(ctx);
3 2105 osd/ReplicatedPG.cc <<do_op>>
close_op_ctx(ctx);
4 2134 osd/ReplicatedPG.cc <<do_op>>
close_op_ctx(ctx);
5 2970 osd/ReplicatedPG.cc <<execute_ctx>>
close_op_ctx(ctx);
6 3112 osd/ReplicatedPG.cc <<reply_ctx>>
close_op_ctx(ctx);
7 3119 osd/ReplicatedPG.cc <<reply_ctx>>
close_op_ctx(ctx);
8 3422 osd/ReplicatedPG.cc <<trim_object>>
close_op_ctx(ctx.release());
9 3430 osd/ReplicatedPG.cc <<trim_object>>
close_op_ctx(ctx.release());
10 6888 osd/ReplicatedPG.cc <<complete_read_ctx>>
close_op_ctx(ctx);
11 8174 osd/ReplicatedPG.cc <<try_flush_mark_clean>>
close_op_ctx(ctx.release());
12 8180 osd/ReplicatedPG.cc <<try_flush_mark_clean>>
close_op_ctx(ctx.release());
13 10153 osd/ReplicatedPG.cc <<on_change>>
close_op_ctx(i->second);
14 12183 osd/ReplicatedPG.cc <<agent_maybe_evict>>
close_op_ctx(ctx.release());

所以分析得出在op无论是出错、锁竞争、操作完成的情况下,都会调用close_op_ctx(),它会把等待在object的RWState锁上的op重新入队处理;

支持原创