Java学习笔记(二) 面向对象—构造函数
Overview
缓存是为达到系统快速响应的一项关键技术,Ceph 作为一个复杂的分布式存储系统,有多种、多级缓存存在。缓存按照位置分为:
- 客户端缓存
- 服务端缓存
- 网络中缓存
按照部署方式分为:
- 单体缓存
- 缓存集群
- 分布式缓存
而Rados 网关缓存,也即RGW Cache 按照位置:作为Ceph client 可以归为客户端缓存,作为上层应用的服务端可以归为服务端缓存。而按照部署方式则为分布式缓存,因为Ceph 集群通常会存在多个RGW 实例,分布式缓存会涉及到缓存同步等问题。
RGW Cache 将对象存储的相关元数据存储在内部缓存中,用于提升性能。
RGW Cache 执行路径
前面已经提到,目前Ceph 中涉及RGW Cache 的配置参数有三个:
- rgw_cache_enabled: RGW Cache 开关,默认为true,即开启。
- rgw_cache_expiry_interval: 缓存数据的过期时间,默认900秒。
- rgw_cache_lru_size: RGW 缓存entries的最大数量,当缓存满后会根据LRU算法做缓存entries替换,entries size默认为10000。读请求较多的场景,适当大的参数配置可以带来更好的性能。
查看RGW cache 命中率:
[root@umstor14 build]# bin/ceph daemon out/radosgw.8000.asok perf dump|grep cache
"cache_hit": 336,
"cache_miss": 135,
ceph.conf 中配置参数rgw_cache_enabled。
rgw_main.cc 中,获得RGWRados *store:
int main() {
RGWRados *store =
RGWStoreManager::get_storage(g_ceph_context,
g_conf()->rgw_enable_gc_threads,
g_conf()->rgw_enable_lc_threads,
g_conf()->rgw_enable_bl_threads,
g_conf()->rgw_enable_quota_threads,
g_conf()->rgw_run_sync_thread,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
g_conf()->rgw_cache_enabled); // 获取rgw_cache_enabled 的配置,决定是否开启缓存
}
调用路径如下:
RGWRados RGWStoreManager::RGWStoreManager::get_storage() ==>
RGWRados RGWStoreManager::init_storage_provider() ==>
int RGWRados::initialize(CephContext *_cct) ==>
int RGWRados::initialize()
/**
* Initialize the RADOS instance and prepare to do other ops
* Returns 0 on success, -ERR# on failure.
*/
int RGWRados::initialize()
{
int ret;
inject_notify_timeout_probability =
cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
ret = init_svc(false); // 初始化包含svc_sysobj, sysobj_cache, svc_notify等的RGW Services
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
return ret;
}
host_id = svc.zone_utils->gen_host_id();
ret = init_rados(); //rados 相关上下文初始化
if (ret < 0)
return ret;
return init_complete(); // 初始化gc,lc,reshard 等线程
}
RGWRados *store的初始化中初始化RGW 服务:
int RGWRados::init_svc(bool raw) raw=false ==>
int RGWServices::init(CephContext cct, bool have_cache) ==>
int RGWServices::do_init(CephContext cct, bool have_cache, false) ==>
int RGWServices_Def::init(CephContext *cct, bool have_cache, false)
int RGWServices_Def::init(CephContext *cct,
bool have_cache,
bool raw)
{
finisher = std::make_unique<RGWSI_Finisher>(cct);
notify = std::make_unique<RGWSI_Notify>(cct);
rados = std::make_unique<RGWSI_RADOS>(cct);
zone = std::make_unique<RGWSI_Zone>(cct);
zone_utils = std::make_unique<RGWSI_ZoneUtils>(cct);
quota = std::make_unique<RGWSI_Quota>(cct);
sync_modules = std::make_unique<RGWSI_SyncModules>(cct);
sysobj = std::make_unique<RGWSI_SysObj>(cct);
sysobj_core = std::make_unique<RGWSI_SysObj_Core>(cct);
if (have_cache) {
sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(cct);
}
...
// 各类服务初始化
sysobj_core->core_init(rados.get(), zone.get());
if (have_cache) {
sysobj_cache->init(rados.get(), zone.get(), notify.get());
sysobj->init(rados.get(), sysobj_cache.get());
} else {
sysobj->init(rados.get(), sysobj_core.get());
}
...
//启动notify 服务
if (!raw) {
r = notify->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start notify service (" << cpp_strerror(-r) << dendl;
return r;
}
}
...
// 启动sysobj_core 服务
r = sysobj_core->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start sysobj_core service (" << cpp_strerror(-r) << dendl;
return r;
}
// 根据参数配置选择是否启动sysobj_cache 服务
if (have_cache) {
r = sysobj_cache->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start sysobj_cache service (" << cpp_strerror(-r) << dendl;
return r;
}
}
// 启动sysobj 服务
r = sysobj->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start sysobj service (" << cpp_strerror(-r) << dendl;
return r;
}
/* cache or core services will be started by sysobj */
return 0;
}
CacheProovider RGWSI_SysObj_Cache继承自RGWSI_SysObj_Core,而RGWSI_SysObj_Core 又是RGWServiceInstance的子类。
最终启动RGWSI_SysObj_Cache 服务。
int RGWServiceInstance::start() ==>
virtual int RGWServiceInstance::do_start() ==>
int RGWSI_SysObj_Cache::do_start()
子类RGWSI_SysObj_Cache::do_start()中
int RGWSI_SysObj_Cache::do_start()
{
int r = RGWSI_SysObj_Core::do_start(); // 目前并没做什么,return 0
if (r < 0) {
return r;
}
// 启动notify 服务,为了后面的不同实例间的缓存分发
r = notify_svc->start();
if (r < 0) {
return r;
}
assert(notify_svc->is_started());
cb.reset(new RGWSI_SysObj_Cache_CB(this)); // 初始化回调对象
// 注册包含回调函数的对象至notify_svc
// 通过notify_svc 的watch/notify 机制调用到已注册的回调函数 int RGWSI_SysObj_Cache::watch_cb()
notify_svc->register_watch_cb(cb.get());
return 0;
}
watch_cb()的调用路径是:
int RGWSI_Notify::watch_cb() ==>
int RGWSI_SysObj_Cache_CB::watch_cb() ==>
int RGWSI_SysObj_Cache::watch_cb()
RGW Cache 组织架构
一般的Cache 系统会有以下四个重要的概念:
- CachingProvider:定义了创建、配置、获取、管理和控制一个或多个CacheManager。一个应用可以访问多个CachingProvider。
- CacheManager:定义了创建、配置、获取、管理和控制一个或多个唯一命名的Cache,这些Cache 存在于CacheManager的上下文中。一个CacheManager仅被一个CachingProvider拥有。
- Cache:是一个类似于Map 的数据结构并临时存储以key 为索引的值。一个Cache 仅被一个CacheManager 拥有。
- Entry:是一个存储在Cache 中的key-value 对。
CachingProvider <>-----> CacheManager <>-----> Cache <>-----> Entry
RGW Cache 主要在以下源文件中实现:
- rgw_cache.h
- rgw_cache.cc
- svc_sys_obj_cache.h
- svc_sys_obj_cache.cc
类图结构如下:
根据各部分起到的作用,其中
- ObjectCache 就是
CacheManager
的角色,管理一个Cache(Map)
(即std::unordered_map<string, ObjectCacheEntry> cache_map)。 - RGWSI_SysObj_Cache 相当于
CachingProvider
,管理一个CacheManager
(即ObjectCache cache)。 - ObjectCacheEntry 当然就是
Entry
的角色。
CachingProvider
RGWSI_SysObj_Cache:
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
//......
RGWSI_Notify *notify_svc{nullptr};
ObjectCache cache; //
std::shared_ptr<RGWSI_SysObj_Cache_CB> cb;
};
关于Entry
ObjectCacheEntry
struct ObjectCacheEntry {
ObjectCacheInfo info; //包含缓存对象data、metadata及xattr
std::list<string>::iterator lru_iter;
uint64_t lru_promotion_ts;
uint64_t gen; //entry 的版本,初始为0,每次更新后加一
std::vector<pair<RGWChainedCache *, string> > chained_entries; //
ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
};
每个Entry 中包含对应Object 的缓存数据及相关信息,LRU信息,版本信息,chained_entries 等。
struct ObjectCacheInfo {
int status = 0;
uint32_t flags = 0; //?
uint64_t epoch = 0; //?
bufferlist data;
map<string, bufferlist> xattrs;
map<string, bufferlist> rm_xattrs; // 待移除xattrs
ObjectMetaInfo meta;
obj_version version = {};
ceph::coarse_mono_time time_added; //加入缓存的时间, 重新加入缓存的对象需要更新该时间
......
};
可以看到Cache 中包含了数据、元数据以及xattr等信息。
缓存管理
前面提到ObjectCache
充当了CacheManager
的角色,而RGWSI_SysObj_Cache
相当于CachingProvider
。
图像形态学提取边界和区域填充
基于LRU 的淘汰算法
LRU 是一类常见的缓存淘汰算法,在Ehcache,Redis等很多系统中都有实现或改进实现。
LRU(Least recently used,最近最少使用)算法根据数据的历访问记录来进行数据淘汰,其核心思想是:如果数据最近被访问过,那么将来被访问到的概率也很高。
- 而最近很少被使用的数据,很大概率下一次不再用到。
- 当缓存容量的满时候,优先淘汰最近很少使用的数据。
LRU 操作总结:
- 新数据直接插入到列表头部
- 缓存数据被命中,将数据移动到列表头部
- 缓存已满的时候,移除列表尾部数据。
CachingProvider
RGWSI_SysObj_Cache 作为CachingProvider,它负责对CacheManager ObjectCache的管理。
新的系统对象服务(system objects service)通过sysobj_core 用于核心的操作,这样可以在system objects service 上扩展cache service,以实现object cache,其在PR 24014中引入。
RGWSI_SysObj_Core 是系统对象的基本抽象:属性和方法,RGWSI_SysObj_Cache 继承自RGWSI_SysObj_Core,实现cache service 的扩展。
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
//......
RGWSI_Notify *notify_svc{nullptr};
ObjectCache cache; //
std::shared_ptr<RGWSI_SysObj_Cache_CB> cb;
protected:
void init(RGWSI_RADOS *_rados_svc,
RGWSI_Zone *_zone_svc,
RGWSI_Notify *_notify_svc) {
core_init(_rados_svc, _zone_svc);
notify_svc = _notify_svc;
}
int do_start() override;
int raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch,
map<string, bufferlist> *attrs, bufferlist *first_chunk,
RGWObjVersionTracker *objv_tracker) override;
int read(); //读操作
int get_attr(); // 获取xattr
int set_attrs(); // 设置xattr
int remove(); //移除缓存
int write();
int write_data(); //
int distribute_cache(); // 分发缓存,因为通常会有多个RGW 实例,需要将缓存在多个RGW 实例间同步,保证数据一致性。
int watch_cb(); // watch 回调函数
void set_enabled(bool status); // watch/notify 开关,用于分布式多RGW 实例的缓存同步
public:
// chain cache
bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
RGWChainedCache::Entry *chained_entry);
......
};
移除缓存remove()
int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
RGWObjVersionTracker *objv_tracker,
const rgw_raw_obj& obj)
{
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
string name = normal_name(pool, oid);
// 根据前面构成的标准cache name,调用CacheManager的bool ObjectCache::remove(const string& name) 执行缓存删除
cache.remove(name);
ObjectCacheInfo info;
// 向分布式系统中的其他RGW 实例分发缓存操作
int r = distribute_cache(name, obj, info, REMOVE_OBJ);
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
}
// 删除sysobj_core 对象
return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
}
具体的缓存删除操作由CacheManager ObjectCache 执行
bool ObjectCache::remove(const string& name)
{
RWLock::WLocker l(lock); // 第一步:获取写锁
if (!enabled) {
return false;
}
// 在cache map中找到指定缓存
auto iter = cache_map.find(name);
if (iter == cache_map.end())
return false;
ldout(cct, 10) << "removing " << name << " from cache" << dendl;
ObjectCacheEntry& entry = iter->second;
// 移除指定ObjectCacheEntry 关联的所有 chained_entries
for (auto& kv : entry.chained_entries) {
kv.first->invalidate(kv.second);
}
remove_lru(name, iter->second.lru_iter); // 更新lru
cache_map.erase(iter); // cache map 中移除该对象缓存
return true;
}
以缓存中最常见、最重要的操作read()为例分析:
int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
GetObjState& read_state,
RGWObjVersionTracker *objv_tracker,
const rgw_raw_obj& obj,
bufferlist *obl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
bool raw_attrs,
rgw_cache_entry_info *cache_info,
boost::optional<obj_version> refresh_version)
{
rgw_pool pool;
string oid;
// 若指定非开始处的offset 读取,则直接读取sysobj_core 对象
if (ofs != 0) {
return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
obj, obl, ofs, end, attrs, raw_attrs,
cache_info, refresh_version);
}
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
string name = normal_name(pool, oid);
ObjectCacheInfo info;
uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0);
if (objv_tracker)
flags |= CACHE_FLAG_OBJV;
if (attrs)
flags |= CACHE_FLAG_XATTRS;
// 获取指定name 的cache
if ((cache.get(name, info, flags, cache_info) == 0) &&
(!refresh_version || !info.version.compare(&(*refresh_version)))) {
if (info.status < 0)
return info.status;
bufferlist& bl = info.data;
bufferlist::iterator i = bl.begin();
obl->clear();
i.copy_all(*obl);
if (objv_tracker)
objv_tracker->read_version = info.version;
if (attrs) {
if (raw_attrs) {
*attrs = info.xattrs;
} else {
rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
}
}
return obl->length();
}
map<string, bufferlist> unfiltered_attrset;
int r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
obj, obl, ofs, end,
(attrs ? &unfiltered_attrset : nullptr),
true, /* cache unfiltered attrs */
cache_info,
refresh_version);
if (r < 0) {
// 未读到该对象时,将该对象加入cache
if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
info.status = r;
cache.put(name, info, cache_info);
}
return r;
}
if (obl->length() == end + 1) {
/* in this case, most likely object contains more data, we can't cache it */
flags &= ~CACHE_FLAG_DATA;
} else {
bufferptr p(r);
bufferlist& bl = info.data;
bl.clear();
bufferlist::iterator o = obl->begin();
o.copy_all(bl);
}
info.status = 0;
info.flags = flags;
if (objv_tracker) {
info.version = objv_tracker->read_version;
}
if (attrs) {
info.xattrs = std::move(unfiltered_attrset);
if (raw_attrs) {
*attrs = info.xattrs;
} else {
rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
}
}
cache.put(name, info, cache_info);
return r;
}
CacheManager
CacheManager ObjectCache 负责具体Cache Entries的管理:缓存获取,缓存移除,LRU 管理
class ObjectCache {
std::unordered_map<string, ObjectCacheEntry> cache_map;
std::list<string> lru; // LRU 列表
unsigned long lru_size; // LRU 表的大小
unsigned long lru_counter; // 当前LRU 数
unsigned long lru_window; // rgw_cache_lru_size 的一半大小
RWLock lock;
CephContext *cct;
vector<RGWChainedCache *> chained_cache;
bool enabled; // watch/notify 的开关
ceph::timespan expiry; // 缓存过期时间大小
};
缓存获取
int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info)
{
RWLock::RLocker l(lock); // 第一步,先获取读锁
if (!enabled) {
return -ENOENT;
}
// 获取指定缓存
auto iter = cache_map.find(name);
if (iter == cache_map.end()) {
ldout(cct, 10) << "cache get: name=" << name << " : miss" << dendl;
if (perfcounter)
perfcounter->inc(l_rgw_cache_miss);
return -ENOENT;
}
// 缓存是否已经过期
// 过期缓存需要从cache map中移除,从LRU 表中移除
if (expiry.count() &&
(ceph::coarse_mono_clock::now() - iter->second.info.time_added) > expiry) {
ldout(cct, 10) << "cache get: name=" << name << " : expiry miss" << dendl;
lock.unlock();
lock.get_write(); // 由读锁转为写锁
// check that wasn't already removed by other thread
iter = cache_map.find(name);
if (iter != cache_map.end()) {
for (auto &kv : iter->second.chained_entries)
kv.first->invalidate(kv.second);
remove_lru(name, iter->second.lru_iter);
cache_map.erase(iter);
}
if(perfcounter)
perfcounter->inc(l_rgw_cache_miss);
return -ENOENT;
}
ObjectCacheEntry *entry = &iter->second;
// 当前entry 计数距离总计数lru_counter超过LRU 窗口大小,即当前entry 已经落在LRU 表后半段,这时才去更新entry LRU表
// [lru window](https://github.com/ceph/ceph/commit/a84cf15f64211c00bc6c95687ff4509d16b1f909)
if (lru_counter - entry->lru_promotion_ts > lru_window) {
ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter
<< " promotion_ts=" << entry->lru_promotion_ts << dendl;
lock.unlock();
lock.get_write(); /* promote lock to writer */
/* need to redo this because entry might have dropped off the cache */
iter = cache_map.find(name);
if (iter == cache_map.end()) {
ldout(cct, 10) << "lost race! cache get: name=" << name << " : miss" << dendl;
if(perfcounter) perfcounter->inc(l_rgw_cache_miss);
return -ENOENT;
}
entry = &iter->second;
/* check again, we might have lost a race here */
if (lru_counter - entry->lru_promotion_ts > lru_window) {
touch_lru(name, *entry, iter->second.lru_iter); // 更新缓存LRU
}
}
ObjectCacheInfo& src = iter->second.info;
if ((src.flags & mask) != mask) {
ldout(cct, 10) << "cache get: name=" << name << " : type miss (requested=0x"
<< std::hex << mask << ", cached=0x" << src.flags
<< std::dec << ")" << dendl;
if(perfcounter) perfcounter->inc(l_rgw_cache_miss);
return -ENOENT;
}
ldout(cct, 10) << "cache get: name=" << name << " : hit (requested=0x"
<< std::hex << mask << ", cached=0x" << src.flags
<< std::dec << ")" << dendl;
info = src;
if (cache_info) {
cache_info->cache_locator = name;
cache_info->gen = entry->gen;
}
if(perfcounter) perfcounter->inc(l_rgw_cache_hit);
return 0;
}
缓存添加
void ObjectCache::put(const string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info)
{
RWLock::WLocker l(lock);
if (!enabled) {
return;
}
ldout(cct, 10) << "cache put: name=" << name << " info.flags=0x"
<< std::hex << info.flags << std::dec << dendl;
auto [iter, inserted] = cache_map.emplace(name, ObjectCacheEntry{});
ObjectCacheEntry& entry = iter->second;
entry.info.time_added = ceph::coarse_mono_clock::now();
if (inserted) {
entry.lru_iter = lru.end();
}
ObjectCacheInfo& target = entry.info;
invalidate_lru(entry);
entry.chained_entries.clear();
entry.gen++;
touch_lru(name, entry, entry.lru_iter);
target.status = info.status;
if (info.status < 0) {
target.flags = 0;
target.xattrs.clear();
target.data.clear();
return;
}
if (cache_info) {
cache_info->cache_locator = name;
cache_info->gen = entry.gen;
}
target.flags |= info.flags;
if (info.flags & CACHE_FLAG_META)
target.meta = info.meta;
else if (!(info.flags & CACHE_FLAG_MODIFY_XATTRS))
target.flags &= ~CACHE_FLAG_META; // non-meta change should reset meta
if (info.flags & CACHE_FLAG_XATTRS) {
target.xattrs = info.xattrs;
map<string, bufferlist>::iterator iter;
for (iter = target.xattrs.begin(); iter != target.xattrs.end(); ++iter) {
ldout(cct, 10) << "updating xattr: name=" << iter->first << " bl.length()=" << iter->second.length() << dendl;
}
} else if (info.flags & CACHE_FLAG_MODIFY_XATTRS) {
map<string, bufferlist>::iterator iter;
for (iter = info.rm_xattrs.begin(); iter != info.rm_xattrs.end(); ++iter) {
ldout(cct, 10) << "removing xattr: name=" << iter->first << dendl;
target.xattrs.erase(iter->first);
}
for (iter = info.xattrs.begin(); iter != info.xattrs.end(); ++iter) {
ldout(cct, 10) << "appending xattr: name=" << iter->first << " bl.length()=" << iter->second.length() << dendl;
target.xattrs[iter->first] = iter->second;
}
}
if (info.flags & CACHE_FLAG_DATA)
target.data = info.data;
if (info.flags & CACHE_FLAG_OBJV)
target.version = info.version;
}
缓存移除
bool ObjectCache::remove(const string& name)
{
RWLock::WLocker l(lock); // 第一步,获取写锁
if (!enabled) {
return false;
}
auto iter = cache_map.find(name);
if (iter == cache_map.end())
return false;
ldout(cct, 10) << "removing " << name << " from cache" << dendl;
ObjectCacheEntry& entry = iter->second;
// 移除跟cache entry 关联的所有chained entries
for (auto& kv : entry.chained_entries) {
kv.first->invalidate(kv.second);
}
// 移除LRU 表中的cache object对应项
remove_lru(name, iter->second.lru_iter);
cache_map.erase(iter);
return true;
}
LRU 更新
LRU 表是一个双向列表 std:list<>,可支持表头插入、表尾插入。RGW Cache 实现在LRU 表头
std::list<string> lru;
LRU 移除
void ObjectCache::remove_lru(const string& name,
std::list<string>::iterator& lru_iter)
{
if (lru_iter == lru.end())//确定是否在LRU 表中
return;
lru.erase(lru_iter);// 移除该项
lru_size--; // LRU 当前size 减一
lru_iter = lru.end(); //将当前iter 置为无效
}
touch_lru 负责更新缓存项至LRU 表:
void ObjectCache::touch_lru(const string& name, ObjectCacheEntry& entry,
std::list<string>::iterator& lru_iter)
{
// 当前lru size 超过预设值rgw_cache_lru_size,需要先删除LRU 头
while (lru_size > (size_t)cct->_conf->rgw_cache_lru_size) {
auto iter = lru.begin(); // LRU 表尾项
if ((*iter).compare(name) == 0) { // 如果当前对象是LRU 是LRU 表尾项,不用立马显式删除,LRU 会根据rgw_cache_lru_size 自动不包含该项
/*
* if the entry we're touching happens to be at the lru end, don't remove it,
* lru shrinking can wait for next time
*/
break;
}
// 移除LRU 表尾项对应的对象缓存
auto map_iter = cache_map.find(*iter);
ldout(cct, 10) << "removing entry: name=" << *iter << " from cache LRU" << dendl;
if (map_iter != cache_map.end()) {
ObjectCacheEntry& entry = map_iter->second;
invalidate_lru(entry);
cache_map.erase(map_iter);
}
// 删除LRU 表尾项,并将当前LRU size 减一
lru.pop_front();
lru_size--;
}
if (lru_iter == lru.end()) { // lru_iter不在LRU 表中:插入当前项至LRU 表头(list 尾)
lru.push_back(name);
lru_size++;
lru_iter--;
ldout(cct, 10) << "adding " << name << " to cache LRU end" << dendl;
} else { // lru_iter在LRU 表中:移动至当前项至LRU 表头(list 尾)
ldout(cct, 10) << "moving " << name << " to cache LRU end" << dendl;
lru.erase(lru_iter);
lru.push_back(name);
lru_iter = lru.end();
--lru_iter;
}
lru_counter++;
entry.lru_promotion_ts = lru_counter; //
}
缓存一致性
RGW Cache 属于分布式缓存,通常会有多个RGW 实例,缓存需要在各个RGW 实例间分发,且需要保证缓存一致性。
RGW Cache的调用路径中已经给出,CachingProvider RGWSI_SysObj_Cache 会在服务启动do_start() 中start notify_svc,并注册watch_cb 函数。
notify_svc 这个服务的作用就是提供一种watch/notify 机制,以确保缓存一致性。
watch/notify 机制由librados提供。其中,notify rados object 存在default.rgw.control 池中。
[root@umstor14 build]# bin/rados ls -p default.rgw.control
notify.1
notify.6
notify.3
notify.7
notify.2
notify.4
notify.5
notify.0
[root@umstor14 build]# bin/rados -p default.rgw.control stat notify.1
default.rgw.control/notify.1 mtime 2020-01-10 18:59:13.000000, size 0
[root@umstor14 build]# bin/rados -p default.rgw.control stat notify.7
default.rgw.control/notify.7 mtime 2020-01-10 18:59:14.000000, size 0
notify_svc 服务的启动路径跟cache_svc 类似:
int RGWServiceInstance::start() ==>
virtual int RGWServiceInstance::do_start() ==>
int RGWSI_Notify::do_start()
do_start() 会初始化watch:
int RGWSI_Notify::init_watch()
{
num_watchers = cct->_conf->rgw_num_control_oids; // 有参数rgw_num_control_oids 配置,默认8个 watcher
bool compat_oid = (num_watchers == 0);
if (num_watchers <= 0)
num_watchers = 1;
watchers = new RGWWatcher *[num_watchers];
......
}
在cache op 之后,会执行cache 分发操作distribute_cache():
int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
{
RGWCacheNotifyInfo info;
info.op = op;
info.obj_info = obj_info;
info.obj = obj;
bufferlist bl;
encode(info, bl);
return notify_svc->distribute(normal_name, bl); // 利用notify_svc 分发
}
分发过程:
int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
{
// 选择一个notify obj
RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
<< " bl.length()=" << bl.length() << dendl;
// 执行分发
return robust_notify(notify_obj, bl);
}
分发细节会在RGW Services — Notify Service 中说明。
另外,在notify_svc 服务的watcher 的handle_notify()中调用已注册的回调函数。
watcher 收到notify的更新通知后,会更新本地缓存。
void RGWWatcher::handle_notify()
{
......
// 调用cache_svc 服务注册的回调函数
svc->watch_cb(notify_id, cookie, notifier_id, bl);
// 向通知者发送确认消息
bufferlist reply_bl; // empty reply payload
obj.notify_ack(notify_id, cookie, reply_bl);
......
}
回调函数中根据操作类型,利用CacheManager 完成cache 更新或移除:
int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl)
{
RGWCacheNotifyInfo info; //cache notify 信息,包含:操作、rgw raw object、obj cache info、offset等
try {
auto iter = bl.cbegin();
decode(info, iter);
} catch (buffer::end_of_buffer& err) {
ldout(cct, 0) << "ERROR: got bad notification" << dendl;
return -EIO;
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: buffer::error" << dendl;
return -EIO;
}
rgw_pool pool;
string oid;
normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
string name = normal_name(pool, oid);
switch (info.op) {
case UPDATE_OBJ: //利用CacheManager 更新缓存
cache.put(name, info.obj_info, NULL);
break;
case REMOVE_OBJ: //利用CacheManager 移除缓存
cache.remove(name);
break;
default:
ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
return -EINVAL;
}
return 0;
}
Chained cache
Chained cache 让user info,bucket info 可以通过链接原生缓存,得以开启缓存。
Basically chains bucket info and user info caches to the raw metadata object cache.
binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
static RGWChainedCacheImpl<user_info_entry> uinfo_cache;
以user cache 为例,在开启RGW Cache后,优先从缓存中获取:
void rgw_user_init(RGWRados *store)
{
uinfo_cache.init(store->svc.cache);
user_meta_handler = new RGWUserMetadataHandler;
store->meta_mgr->register_handler(user_meta_handler);
}
int rgw_get_user_info_from_index(RGWRados * const store,
const string& key,
const rgw_pool& pool,
RGWUserInfo& info,
RGWObjVersionTracker * const objv_tracker,
real_time * const pmtime)
{
// 首选尝试获取缓存
if (auto e = uinfo_cache.find(key)) {
info = e->info;
if (objv_tracker)
*objv_tracker = e->objv_tracker;
if (pmtime)
*pmtime = e->mtime;
return 0;
}
......
// 未能从缓存中获取,直接从RADOS 集群中获取
// 获取到之后,更新uinfo 缓存
uinfo_cache.put(store->svc.cache, key, &e, { &cache_info });
.......
class RGWChainedCache {
public:
......
struct Entry {
RGWChainedCache *cache; // 关联cache
const string& key; // email/swift_name/access_key/bucket name
void *data; // 指向bucket_info_entry或user_info_entry
Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {}
};
};
通过sysobj_cache_svc 服务提供chain cache:
将chain_entry添加到chained cache,并和cache_info_entries 指向的ObjectCacheEntry相关联。
bool RGWChainedCache::put(RGWSI_SysObj_Cache *svc, const string& key, T *entry,
std::initializer_list<rgw_cache_entry_info *> cache_info_entries) {
if (!svc) {
return false;
}
Entry chain_entry(this, key, entry);
/* we need the svc cache to call us under its lock to maintain lock ordering */
return svc->chain_cache_entry(cache_info_entries, &chain_entry);
}
bool ObjectCache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info*> cache_info_entries, RGWChainedCache::Entry *chained_entry)
{
// 确认所有有效ObjectCacheEntry
......
// 将待添加entry添加到对应chain cache中
chained_entry->cache->chain_cb(chained_entry->key, chained_entry->data);
// 将chained entry关联到指定的所有有效的ObjectCacheEntry
for (auto entry : entries) {
entry->chained_entries.push_back(make_pair(chained_entry->cache,
chained_entry->key));
}
......
}
chained cache 依赖于ObjectCache,
更新ObjectCache的成员 vector<RGWChainedCache *> chained_cache:
void ObjectCache::chain_cache(RGWChainedCache *cache);
void ObjectCache::unchain_cache(RGWChainedCache *cache);
RGW Cache 优化方向
前面的测试系统的cache 命中率:”cache_hit”: 336,”cache_miss”: 135, 336/(336+135)*100% = 71%
缓存系统适合读多写少的场景。如何在这种场景下,提高RGW Cache 的命中率,以下方向可以考虑:
- 将缓存粒度设计的更细?
- 增大缓存容量(这个已经可以根据实际配置)
References
- 《深入分布式缓存》于君泽、曹洪伟、邱硕 机械工业出版社
- https://docs.ceph.com/docs/master/radosgw/config-ref/
- https://my.oschina.net/linuxhunter/blog/662801
- rgw: initial RGWRados refactoring work #24014
- rgw: update ObjectCacheInfo::time_added on overwrite
- rgw: add support for new watch/notify functionality
- rgw: an infrastructure for hooking into the raw cache
常用控件