ceph-rgw zipper的设计理念(2)
本文简介
书接上文。本文以CreateBucket为例进行详细讲述设计理念以及接口变化趋势。
1、接收请求和协议处理请求
rgw_asio_frontend.cc
主要功能:回调函数注册和请求处理
void handle_connection(boost::asio::io_context& context,
RGWProcessEnv& env, Stream& stream,
timeout_timer& timeout, size_t header_limit,
parse_buffer& buffer, bool is_ssl,
SharedMutex& pause_mutex,
rgw::dmclock::Scheduler *scheduler,
const std::string& uri_prefix,
boost::system::error_code& ec,
spawn::yield_context yield)
{
// don't impose a limit on the body, since we read it in pieces
static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
auto cct = env.driver->ctx();
// read messages from the stream until eof
for (;;) {
// configure the parser
rgw::asio::parser_type parser;
parser.header_limit(header_limit);
parser.body_limit(body_limit);
timeout.start();
// parse the header
http::async_read_header(stream, buffer, parser, yield[ec]);
timeout.cancel();
if (ec == boost::asio::error::connection_reset ||
ec == boost::asio::error::bad_descriptor ||
ec == boost::asio::error::operation_aborted ||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
ec == ssl::error::stream_truncated ||
#endif
ec == http::error::end_of_stream) {
ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
return;
}
auto& message = parser.get();
bool expect_continue = (message[http::field::expect] == "100-continue");
{
// process the request
RGWRequest req{env.driver->get_new_req_id()};
StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
is_ssl, local_endpoint, remote_endpoint};
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(cct,
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
&real_client))));
RGWRestfulIO client(cct, &real_client_io);
optional_yield y = null_yield;
process_request(env, &req, uri_prefix, &client, y,
scheduler, &user, &latency, &http_ret);
}
if (!parser.keep_alive()) {
return;
}
// if we failed before reading the entire message, discard any remaining
// bytes before reading the next
while (!expect_continue && !parser.is_done()) {
static std::array<char, 1024> discard_buffer;
auto& body = parser.get().body();
body.size = discard_buffer.size();
body.data = discard_buffer.data();
timeout.start();
http::async_read_some(stream, buffer, parser, yield[ec]);
timeout.cancel();
if (ec == http::error::need_buffer) {
continue;
}
if (ec == boost::asio::error::connection_reset) {
return;
}
if (ec) {
ldout(cct, 5) << "failed to discard unread message: "
<< ec.message() << dendl;
return;
}
}
}
}
rgw_process.cc
主要功能:请求处理,包括身份认证、请求处理,函数调用返回等。
int process_request(const RGWProcessEnv& penv,
RGWRequest* const req,
const std::string& frontend_prefix,
RGWRestfulIO* const client_io,
optional_yield yield,
rgw::dmclock::Scheduler *scheduler,
string* user,
ceph::coarse_real_clock::duration* latency,
int* http_ret)
{
int ret = client_io->init(g_ceph_context);
dout(1) << "====== starting new request req=" << hex << req << dec
<< " =====" << dendl;
perfcounter->inc(l_rgw_req);
RGWEnv& rgw_env = client_io->get_env();
req_state rstate(g_ceph_context, penv, &rgw_env, req->id);
req_state *s = &rstate;
rgw::sal::Driver* driver = penv.driver;
RGWHandler_REST *handler = rest->get_handler(driver, s,
*penv.auth_registry,
frontend_prefix,
client_io, &mgr, &init_error);
ldpp_dout(s, 2) << "getting op " << s->op << dendl;
op = handler->get_op();
std::tie(ret,c) = schedule_request(scheduler, s, op);
req->op = op;
ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;
s->op_type = op->get_type();
try {
ldpp_dout(op, 2) << "verifying requester" << dendl;
ret = op->verify_requester(*penv.auth_registry, yield);
ldpp_dout(op, 2) << "normalizing buckets and tenants" << dendl;
ret = handler->postauth_init(yield);
ret = rgw_process_authenticated(handler, op, req, s, yield, driver);
} catch (const ceph::crypto::DigestException& e) {
dout(0) << "authentication failed" << e.what() << dendl;
abort_early(s, op, -ERR_INVALID_SECRET_KEY, handler, yield);
}
done:
try {
client_io->complete_request();
} catch (rgw::io::Exception& e) {
dout(0) << "ERROR: client_io->complete_request() returned "
<< e.what() << dendl;
}
if (handler)
handler->put_op(op);
rest->put_handler(handler);
const auto lat = s->time_elapsed();
if (latency) {
*latency = lat;
}
dout(1) << "====== req done req=" << hex << req << dec
<< " op status=" << op_ret
<< " http_status=" << s->err.http_ret
<< " latency=" << lat
<< " ======"
<< dendl;
return (ret < 0 ? ret : s->err.ret);
} /* process_request */
在rgw_process_authenticated函数中进行OP的详细处理。包括身份认证、pre-exec、exec、complete等函数。
int rgw_process_authenticated(RGWHandler_REST * const handler,
RGWOp *& op,
RGWRequest * const req,
req_state * const s,
optional_yield y,
rgw::sal::Driver* driver,
const bool skip_retarget)
{
ldpp_dout(op, 2) << "init permissions" << dendl;
int ret = handler->init_permissions(op, y);
ldpp_dout(op, 2) << "init op" << dendl;
ret = op->init_processing(y);
ldpp_dout(op, 2) << "verifying op mask" << dendl;
ret = op->verify_op_mask();
ldpp_dout(op, 2) << "verifying op permissions" << dendl;
{
auto span = tracing::rgw::tracer.add_span("verify_permission", s->trace);
std::swap(span, s->trace);
ret = op->verify_permission(y);
std::swap(span, s->trace);
}
ldpp_dout(op, 2) << "verifying op params" << dendl;
ret = op->verify_params();
ldpp_dout(op, 2) << "executing" << dendl;
{
auto span = tracing::rgw::tracer.add_span("execute", s->trace);
std::swap(span, s->trace);
op->execute(y);
std::swap(span, s->trace);
}
ldpp_dout(op, 2) << "completing" << dendl;
op->complete();
return 0;
}
rgw_op.cc
此处忽略rest或者swift中的协议处理过程,直接到RGWOP::createBucket()中
void RGWCreateBucket::execute(optional_yield y)
{
const rgw::SiteConfig& site = *s->penv.site;
const std::optional<RGWPeriod>& period = site.get_period();
const RGWZoneGroup& my_zonegroup = site.get_zonegroup();
/*步骤1:处理zonegroup信息,确定桶的placement、storage_class等信息,以及是否是主站点存储*/
/*步骤2:读取桶的信息,如果存在则进行一些处理*/
// read the bucket info if it exists
op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
&s->bucket, y);
/*步骤3:如果桶不存在,则初始化各种信息,*/
s->bucket_owner.id = s->user->get_id();
s->bucket_owner.display_name = s->user->get_display_name();
createparams.owner = s->user->get_id();
buffer::list aclbl;
policy.encode(aclbl);
createparams.attrs[RGW_ATTR_ACL] = std::move(aclbl);
if (has_cors) {
buffer::list corsbl;
cors_config.encode(corsbl);
createparams.attrs[RGW_ATTR_CORS] = std::move(corsbl);
}
/*步骤4:创建桶*/
ldpp_dout(this, 10) << "user=" << s->user << " bucket=" << s->bucket << dendl;
op_ret = s->bucket->create(this, createparams, y);
/*步骤5:如果失败,则回退处理*/
.....
}
2、store层处理和rados中的处理
int RadosBucket::create(const DoutPrefixProvider* dpp,
const CreateParams& params,
optional_yield y)
{
rgw_bucket key = get_key();
key.marker = params.marker;
key.bucket_id = params.bucket_id;
/*创建桶,此处调用rados.cc中的处理流程*/
int ret = store->getRados()->create_bucket(
dpp, y, key, params.owner, params.zonegroup_id,
params.placement_rule, params.zone_placement, params.attrs,
params.obj_lock_enabled, params.swift_ver_location,
params.quota, params.creation_time, &bucket_version, info);
/*link处理*/
ret = link(dpp, params.owner, y, false);
if (ret && !existed && ret != -EEXIST) {
/* if it exists (or previously existed), don't remove it! */
ret = unlink(dpp, params.owner, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret
<< dendl;
}
} else if (ret == -EEXIST || (ret == 0 && existed)) {
ret = -ERR_BUCKET_EXISTS;
}
return ret;
}
int RGWRados::create_bucket(const DoutPrefixProvider* dpp,
optional_yield y,
const rgw_bucket& bucket,
const rgw_user& owner,
const std::string& zonegroup_id,
const rgw_placement_rule& placement_rule,
const RGWZonePlacementInfo* zone_placement,
const std::map<std::string, bufferlist>& attrs,
bool obj_lock_enabled,
const std::optional<std::string>& swift_ver_location,
const std::optional<RGWQuotaInfo>& quota,
std::optional<ceph::real_time> creation_time,
obj_version* pep_objv,
RGWBucketInfo& info)
{
int ret = 0;
#define MAX_CREATE_RETRIES 20 /* need to bound retries */
for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
/*步骤1:初始化bucket的ver_id和quota、time等初始化信息*/
/*步骤2:bucket_index 初始化*/
if (zone_placement) {
ret = svc.bi->init_index(dpp, info, info.layout.current_index);
if (ret < 0) {
return ret;
}
}
/*步骤3:linkbucket_info信息*/
constexpr bool exclusive = true;
ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y);
if (ret == -ECANCELED) {
ret = -EEXIST;
}
:
return ret;
}
/* this is highly unlikely */
ldpp_dout(dpp, 0) << "ERROR: could not create bucket, continuously raced with bucket creation and removal" << dendl;
return -ENOENT;
}
put_linked_bucket_info函数
int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv,
const map<string, bufferlist> *pattrs, bool create_entry_point,
const DoutPrefixProvider *dpp, optional_yield y)
{
bool create_head = !info.has_instance_obj || create_entry_point;
/*步骤1:写bucket_instance*/
int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y);
RGWBucketEntryPoint entry_point;
entry_point.bucket = info.bucket;
entry_point.owner = info.owner;
entry_point.creation_time = info.creation_time;
entry_point.linked = true;
/*存储bucket_entrypoint实体信息*/
ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams()
.set_exclusive(exclusive)
.set_objv_tracker(&ot)
.set_mtime(mtime));
if (ret < 0)
return ret;
return 0;
}
3、SVC中的处理:bucket index的创建
int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout)
{
librados::IoCtx index_pool;
string dir_oid = dir_oid_prefix;
int r = open_bucket_index_pool(dpp, bucket_info, &index_pool);
if (r < 0) {
return r;
}
dir_oid.append(bucket_info.bucket.bucket_id);
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);
return CLSRGWIssueBucketIndexInit(index_pool,
bucket_objs,
cct->_conf->rgw_bucket_index_max_aio)();
}
4、总结
至此,一个bucket创建完毕,其他的op类似于此,整体结构变化不大。下图是rgw_rados.h、rgw_sal_rados.h、rgw_service.h和svc_module***.h的相关关系,比较粗糙,仅供参考。