Cephソース分析--MonitorDB

50353 ワード

1.概要
Ceph monitorの主な機能はpaxos分散プロトコルを使用してkey/valueデータベースの一貫性を維持することです(最も主要なのは各mapの一貫性であり、monitorにとってmonmapです).12.2.2バージョンで使用されたデータベースエンジンは元のleveldbからrocksdbに変わりました.
以前、monitorが配備されている間にcephを通過できるという疑問があった.confファイルmon_hostのipが取得するとmonmapは静的cephに従うかどうか.confファイルは一致していますか?paxosアルゴリズムを見て初めて、答えは否定的で、monmapファイルに依存していることが分かった.(mapファイルが変更されるたびに、多くのmonitorノードが同意し、多数またはすべての(quorum)ノードのmapファイルが一致し、mapファイルがインクリメンタル更新され、更新ごとに新しいversionが生成されるため、monノードがオフラインで再加入する場合、インクリメンタル部分の情報を同期する必要がある)
2.MonitorDBStoreクラス
モニタのデータベース整合性を維持するといえば、モニタDBStoreクラスがk/vデータベースの操作をどのように抽象化しているかを言わざるを得ない.MonitorDBStoreクラスの定義から、その具体的な実装を一歩一歩探知します.
//   op          key/value    
struct Op {
    uint8_t type;
    string prefix;//           ,               string key, endkey;//prefix+key          key
    bufferlist bl;

これらのopはトランザクションがencode,decodeされることを形成します.
//Transaction               (put erase compact)
 struct Transaction;
  typedef ceph::shared_ptr TransactionRef;
  struct Transaction {
    list ops;
    uint64_t bytes, keys;
    Transaction() : bytes(0), keys(0) {}
    enum {
      OP_PUT    = 1,//   key value
      OP_ERASE  = 2,//  
      OP_COMPACT = 3,//  ??
    };

サーバ間での転送のために、すべてのtransactionがencode,decodeを通過します.
    void encode(bufferlist& bl) const {
      ENCODE_START(2, 1, bl);
      ::encode(ops, bl);
      ::encode(bytes, bl);
      ::encode(keys, bl);
      ENCODE_FINISH(bl);
    }

    void decode(bufferlist::iterator& bl) {
      DECODE_START(2, bl);
      ::decode(ops, bl);
      if (struct_v >= 2) {
    ::decode(bytes, bl);
    ::decode(keys, bl);
      }
      DECODE_FINISH(bl);
    }

次はcephバックエンドの処理です.
int apply_transaction(MonitorDBStore::TransactionRef t) {
    KeyValueDB::Transaction dbt = db->get_transaction();
    ...
    //  transaction      op,      compact  key->value
        liststring, pair<string,string> > > compact;
        //
    for (list::const_iterator it = t->ops.begin();
     it != t->ops.end();
     ++it) {
      const Op& op = *it;
      switch (op.type) {
      case Transaction::OP_PUT:
    dbt->set(op.prefix, op.key, op.bl);
    break;
      case Transaction::OP_ERASE:
    dbt->rmkey(op.prefix, op.key);
    break;
      case Transaction::OP_COMPACT:
    compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
    break;
      default:
    derr << __func__ << " unknown op type " << op.type << dendl;
    ceph_abort();
    break;
      }
    }
    //submit value
    int r = db->submit_transaction_sync(dbt);
    if (r >= 0) {
      while (!compact.empty()) {
    if (compact.front().second.first == string() &&
        compact.front().second.second == string())
      db->compact_prefix_async(compact.front().first);
    else
      db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
    compact.pop_front();
      }
    } else {
      assert(0 == "failed to write to db");
    }
    return r;
  }

異なるprefix対応transaction間でqueue_を使用Transaction関数は非同期処理を行います.
  void queue_transaction(MonitorDBStore::TransactionRef t,
             Context *oncommit) {
    io_work.queue(new C_DoTransaction(this, t, oncommit));
  }

複数のprefixに対応するtransactionの取得はget_を通過するiterator関数ですが、各transactionに対応するkey valueはget関数を介します.他の関数はこれ以上説明されません.
3.MonitorDBStoreデータの変更
モニタ起動から、MonitorDbstoreで発生したkey/valueベースの一連の更新を見てみましょう.モニタが起動するたびにmonmapのサーバアドレスに従って他のモニタサーバに接続し、データを同期します.ここには2つの状況があります.1つはdbにmonmapが存在しない(mkfs再生成を実行する必要がある)ことであり、もう1つは、追加したmonノードがネットワークまたはその他の理由で異常になり、正常に回復した後の再起動である(dbから直接読み出す).すべてのノードが無から有になるのは明らかであるため、ここでの2つの状況は、単に保存した区分でmonmapを取得する方法にすぎない.注意点:monmapは重要であり、一旦発生すると、その後の起動は再構成されないため、構成の正確性を確保しなければならない.
次にmkfsの大まかな流れについて説明します.
if (mkfs) {

        // resolve public_network -> public_addr
    pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);

    common_init_finish(g_ceph_context);

    bufferlist monmapbl, osdmapbl;
    std::string error;
    MonMap monmap;
// load or generate monmap
...
          try {
    monmap.decode(monmapbl);
    // always mark seed/mkfs monmap as epoch 0
    monmap.set_epoch(0);
      }
      ...
            ostringstream oss;
       //  ceph.conf     monmap  build init
      int err = monmap.build_initial(g_ceph_context, oss);
     ...
   }

具体的な実現過程は以下の通りである.
int MonMap::build_initial(CephContext *cct, ostream& errout)
{
  const md_config_t *conf = cct->_conf;
  ...
  // -m foo?  ceph.conf noname mon_host     mon ip
  if (!conf->mon_host.empty()) {
    int r = build_from_host_list(conf->mon_host, "noname-");
      // What monitors are in the config file?
  std::vector <std::string> sections;
  int ret = conf->get_all_sections(sections);
  if (ret) {
    errout << "Unable to find any monitors in the configuration "
         << "file, because there was an error listing the sections. error "
     << ret << std::endl;
    return -ENOENT;
  }
  std::vector <std::string> mon_names;
  for (std::vector <std::string>::const_iterator s = sections.begin();
       s != sections.end(); ++s) {
    if ((s->substr(0, 4) == "mon.") && (s->size() > 4)) {
      mon_names.push_back(s->substr(4));
    }
  }

  // Find an address for each monitor in the config file.
  for (std::vector <std::string>::const_iterator m = mon_names.begin();
       m != mon_names.end(); ++m) {
    std::vector <std::string> sections;
    std::string m_name("mon");
    m_name += ".";
    m_name += *m;
    sections.push_back(m_name);
    sections.push_back("mon");
    sections.push_back("global");
    std::string val;
    int res = conf->get_val_from_conf_file(sections, "mon addr", val, true);
    if (res) {
      errout << "failed to get an address for mon." << *m << ": error "
       << res << std::endl;
      continue;
    }
    entity_addr_t addr;
    if (!addr.parse(val.c_str())) {
      errout << "unable to parse address for mon." << *m
       << ": addr='" << val << "'" << std::endl;
      continue;
    }
    if (addr.get_port() == 0)
      addr.set_port(CEPH_MON_PORT);

    uint16_t priority = 0;
    if (!conf->get_val_from_conf_file(sections, "mon priority", val, false)) {
      try {
        priority = std::stoul(val);
      } catch (std::logic_error&) {
        errout << "unable to parse priority for mon." << *m
               << ": priority='" << val << "'" << std::endl;
        continue;
      }
    }
    // the make sure this mon isn't already in the map
    if (contains(addr))
      remove(get_name(addr));
    if (contains(*m))
      remove(*m);
    add(mon_info_t{*m, addr, priority});
  }

  if (size() == 0) {
    // no info found from conf options lets try use DNS SRV records
    string srv_name = conf->mon_dns_srv_name;
    string domain;
    // check if domain is also provided and extract it from srv_name
    size_t idx = srv_name.find("_");
    if (idx != string::npos) {
      domain = srv_name.substr(idx + 1);
      srv_name = srv_name.substr(0, idx);
    }

    map<string, DNSResolver::Record> records;
    if (DNSResolver::get_instance()->resolve_srv_hosts(cct, srv_name,
        DNSResolver::SRV_Protocol::TCP, domain, &records) != 0) {

      errout << "unable to get monitor info from DNS SRV with service name: " << 
       "ceph-mon" << std::endl;
    }
    else {
      for (const auto& record : records) {
        add(mon_info_t{record.first,
                       record.second.addr,
                       record.second.priority});
      }
    }
  }

  if (size() == 0) {
    errout << "no monitors specified to connect to." << std::endl;
    return -ENOENT;
  }
  created = ceph_clock_now();
  last_changed = created;
  return 0;
}

こうして新しいmonが誕生し、次にそのmonitorの情報を後で直接読み取るためにMonitorDBOStoreのdbに格納する
 MonitorDBStore store(g_conf->mon_data);
    ostringstream oss;
    //      rocksdb
    int r = store.create_and_open(oss);
    //  mon  
    Monitor mon(g_ceph_context, g_conf->name.get_id(), &store, 0, 0, &monmap);
        monmap
    MonitorDBStore *store = new MonitorDBStore(g_conf->mon_data);
  {
    ostringstream oss;
    err = store->open(oss);
    if (oss.tellp())
      derr << oss.str() << dendl;
    if (err < 0) {
      derr << "error opening mon data directory at '"
           << g_conf->mon_data << "': " << cpp_strerror(err) << dendl;
      prefork.exit(1);
    }
  }

  bufferlist magicbl;
  err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
  if (err || !magicbl.length()) {
    derr << "unable to read magic from mon data" << dendl;
    prefork.exit(1);
  }
  string magic(magicbl.c_str(), magicbl.length()-1);  // ignore trailing 
if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) { derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl; prefork.exit(1); } err = Monitor::check_features(store); if (err < 0) { derr << "error checking features: " << cpp_strerror(err) << dendl; prefork.exit(1); } // inject new monmap? if (!inject_monmap.empty()) { bufferlist bl; std::string error; int r = bl.read_file(inject_monmap.c_str(), &error); if (r) { derr << "unable to read monmap from " << inject_monmap << ": " << error << dendl; prefork.exit(1); } // get next version version_t v = store->get("monmap", "last_committed"); dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1) << dendl; v++; // set the version MonMap tmp; tmp.decode(bl); if (tmp.get_epoch() != v) { dout(0) << "changing monmap epoch from " << tmp.get_epoch() << " to " << v << dendl; tmp.set_epoch(v); } bufferlist mapbl; tmp.encode(mapbl, CEPH_FEATURES_ALL); bufferlist final; ::encode(v, final); ::encode(mapbl, final); auto t(std::make_shared<:transaction>()); // save it t->put("monmap", v, mapbl); t->put("monmap", "latest", final); t->put("monmap", "last_committed", v); store->apply_transaction(t); dout(0) << "done." << dendl; prefork.exit(0); } // monmap? MonMap monmap; { // note that even if we don't find a viable monmap, we should go ahead // and try to build it up in the next if-else block. bufferlist mapbl; int err = obtain_monmap(*store, mapbl); if (err >= 0) { try { monmap.decode(mapbl); } catch (const buffer::error& e) { derr << "can't decode monmap: " << e.what() << dendl; } } else { derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl; } if (!extract_monmap.empty()) { int r = mapbl.write_file(extract_monmap.c_str()); if (r < 0) { r = -errno; derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl; prefork.exit(1); } derr << "wrote monmap to " << extract_monmap << dendl; prefork.exit(0); } } // msg , 。

新しく加わったモニタノードがdbから直接取得されたわけではないことはご存じでしょう.のハハ次は私たちの非常に重要な一節で、monitorが起動したときのデータ同期です.
4.各ノードのmonmap同期.
モニタサーバが起動するたびにmonmapのサーバアドレスに従って他のモニタサーバに接続し、データを同期します.このプロセスをbootstrap()と言います.bootstrapの最初の目的は、他のサービスから欠落したpaxos logまたは全量レプリケーションデータベースを補完することであり、次に必要に応じて多数派を形成してpaxosクラスタを構築したり、既存の多数派に参加したりすることである.
int Monitor::init()
{
  ..
  bootstrap();//    ,     
  // add features of myself into feature_map
  session_map.feature_map.add_mon(con_self->get_features());
  return 0;
}

bootstrap()の具体的な実装を見てみましょう.
void Monitor::bootstrap()
{
  ..
    // reset
  state = STATE_PROBING;

  _reset();

  // sync store
  if (g_conf->mon_compact_on_bootstrap) {
    dout(10) << "bootstrap -- triggering compaction" << dendl;
    store->compact();
    dout(10) << "bootstrap -- finished compaction" << dendl;
  }

  // singleton monitor?
  if (monmap->size() == 1 && rank == 0) {
    win_standalone_election();
    return;
  }

  reset_probe_timeout();

  // i'm outside the quorum
  if (monmap->contains(name))
    outside_quorum.insert(name);

  // probe monitors
  dout(10) << "probing other monitors" << dendl;
  for (unsigned i = 0; i < monmap->size(); i++) {
    if ((int)i != rank)
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
                  monmap->get_inst(i));
  }
  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
       p != extra_probe_peers.end();
       ++p) {
    if (*p != messenger->get_myaddr()) {
      entity_inst_t i;
      i.name = entity_name_t::MON(-1);
      i.addr = *p;
      //         mon     
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
    }
  }
}

次はすべてOP_を受け取りますPROBEのノード、handle_によるprobeで定義されたopタイプは、関数handle_を使用します.probe_probeが処理を開始しました:
void Monitor::handle_probe_probe(MonOpRequestRef op)
{
...
  if (!is_probing() && !is_synchronizing()) {
    // If the probing mon is way ahead of us, we need to re-bootstrap.
    // Normally we capture this case when we initially bootstrap, but
    // it is possible we pass those checks (we overlap with
    // quorum-to-be) but fail to join a quorum before it moves past
    // us.  We need to be kicked back to bootstrap so we can
    // synchonize, not keep calling elections.
    if (paxos->get_version() + 1 < m->paxos_first_version) {
      dout(1) << " peer " << m->get_source_addr() << " has first_committed "
          << "ahead of us, re-bootstrapping" << dendl;
      bootstrap();
      //         ,    paxos         ,,       bootstrap        。
      goto out;

    }
  }
  //    ,  paxos   ,       last_commit,first_commit
  //     paxos     ,         last_pn,first_pn,     paxos   ,     。
  MMonProbe *r;
  r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
  r->name = name;
  r->quorum = quorum;
  monmap->encode(r->monmap_bl, m->get_connection()->get_features());
  r->paxos_first_version = paxos->get_first_committed();
  r->paxos_last_version = paxos->get_version();
  m->get_connection()->send_message(r);
...
 }

プローブパケットを送信するモニタノードは、最大タイムアウト時間内にOP_を受信するREPLY、handle_が呼び出されますprobe_reply関数は次の処理を行います.
void Monitor::handle_probe_reply(MonOpRequestRef op)
{
  MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
  dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
  dout(10) << " monmap is " << *monmap << dendl;
  // discover name and addrs during probing or electing states.
  if (!is_probing() && !is_electing()) {
    return;
  }
  // newer map, or they've joined a quorum and we haven't?
  bufferlist mybl;
  monmap->encode(mybl, m->get_connection()->get_features());
  // make sure it's actually different; the checks below err toward
  // taking the other guy's map, which could cause us to loop.
  if (!mybl.contents_equal(m->monmap_bl)) {
    MonMap *newmap = new MonMap;
    newmap->decode(m->monmap_bl);
    if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
                   !has_ever_joined)) {
      dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
           << ", mine was " << monmap->get_epoch() << dendl;
      delete newmap;
      monmap->decode(m->monmap_bl);

      bootstrap();
      return;
    }
    delete newmap;
  }

  // rename peer?
  string peer_name = monmap->get_name(m->get_source_addr());
  if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
    dout(10) << " renaming peer " << m->get_source_addr() << " "
         << peer_name << " -> " << m->name << " in my monmap"
         << dendl;
    monmap->rename(peer_name, m->name);

    if (is_electing()) {
      bootstrap();
      return;
    }
  } else {
    dout(10) << " peer name is " << peer_name << dendl;
  }

  // new initial peer?
  if (monmap->get_epoch() == 0 &&
      monmap->contains(m->name) &&
      monmap->get_addr(m->name).is_blank_ip()) {
    dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
    monmap->set_addr(m->name, m->get_source_addr());

    bootstrap();
    return;
  }

  // end discover phase
  if (!is_probing()) {
    return;
  }

  assert(paxos != NULL);

  if (is_synchronizing()) {
    dout(10) << " currently syncing" << dendl;
    return;
  }

  entity_inst_t other = m->get_source_inst();
  //    1:           log         log       ,     ,          。
      if (paxos->get_version() < m->paxos_first_version &&
    m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
      dout(10) << " peer paxos first versions [" << m->paxos_first_version
           << "," << m->paxos_last_version << "]"
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, true);//    
      return;
    }
    //    2:      paxos_max_join_drift,       ,     log      ,           。

    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
      dout(10) << " peer paxos last version " << m->paxos_last_version
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, false);
      return;
    }
  }

  // is there an existing quorum?
  if (m->quorum.size()) {//        
    dout(10) << " existing quorum " << m->quorum << dendl;

    dout(10) << " peer paxos version " << m->paxos_last_version
             << " vs my version " << paxos->get_version()
             << " (ok)"
             << dendl;

    if (monmap->contains(name) &&
        !monmap->get_addr(name).is_blank_ip()) {
      //           ,   start_election          

      start_election();
    } else {
    //               ,     monmap   ,    
    //           
    if (monmap->contains(m->name)) {
      dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
      outside_quorum.insert(m->name);
    } else {
      dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
      m->put();
      return;
    }

    //              2F + 1 (    ),         
    //  ,             
    unsigned need = monmap->size() / 2 + 1;
    dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
    if (outside_quorum.size() >= need) {
      if (outside_quorum.count(name)) {
        dout(10) << " that's enough to form a new quorum, calling election" << dendl;
        start_election();
      } else {
        dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
      }
    } else {
      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
    }
  }  }
}

次に、データの同期セクションを示します.
void Monitor::sync_start(entity_inst_t &other, bool full)
{
  dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;

  assert(state == STATE_PROBING ||
     state == STATE_SYNCHRONIZING);
  state = STATE_SYNCHRONIZING;

  // make sure are not a provider for anyone!
  sync_reset_provider();

  sync_full = full;

  if (sync_full) {
    // stash key state, and mark that we are syncing
    auto t(std::make_shared<MonitorDBStore::Transaction>());
    sync_stash_critical_state(t);
    t->put("mon_sync", "in_sync", 1);

    sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
    dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
         << sync_last_committed_floor << dendl;
    t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);

    store->apply_transaction(t);//     。。。

    assert(g_conf->mon_sync_requester_kill_at != 1);

    // clear the underlying store
    set<string> targets = get_sync_targets_names();
    dout(10) << __func__ << " clearing prefixes " << targets << dendl;
    store->clear(targets);

    // make sure paxos knows it has been reset.  this prevents a
    // bootstrap and then different probe reply order from possibly
    // deciding a partial or no sync is needed.
    paxos->init();

    assert(g_conf->mon_sync_requester_kill_at != 2);
  }

  // assume 'other' as the leader. We will update the leader once we receive
  // a reply to the sync start.
  sync_provider = other;

  sync_reset_timeout();

  MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
  if (!sync_full)
    m->last_committed = paxos->get_version();
  messenger->send_message(m, sync_provider);
}

...monmapの各種操作を未完に羅列する.stop ceph-mon id=*mon-2を停止し、停止しないとmonmapのdbを操作できない.monmap:ceph-mon-i ID-FOO–extract-monmap/tmp/monmap 3をエクスポートする.monmap:monmaptool–print-f/tmp/monmap 4を表示します.mon-3 monmaptool-rm mon-3-f/tmp/monmap 5を削除します.注入monmap ceph-mon-i ID–inject-monmap/tmp/monmap