RocksDBソース分析-インタフェースの下のデータ構造

24154 ワード

RocksDBソース分析-インタフェースの下のデータ構造
RocksDBは非常にポピュラーなKVデータベースであり、LSM-Treeデータベースの典型的な代表であり、多くの分散データベースNewSQL、図データベースはRocksDBをベースストレージエンジンとして使用しており、RocksDBは安定性や性能などの面で優れている.
HugeGraph図データベースの下部にもRocksDBがバックエンドストレージとしてサポートされており、HugeGraphはJava言語、RocksDBはC++言語で記述されており、幸い公式にJava JNIインタフェースが直接使用できるようになった.RocksDBの機能は非常にフォーカスしており、キー値ペアにアクセスするためにMapを1つずつ提供していると簡単に理解できるので、コアインタフェースは基本的にput、get、scanなどであり、使用は比較的簡単です.しかし、簡単なインタフェースの下には、非常に複雑な内部構造が含まれており、本稿では、そのインタフェースの下のいくつかのコア構造を分析する.
最も頻繁に使用されるRocksDBインタフェース:
  • RocksDB:データベースインスタンス、すべての操作のエントリ
  • ColumnFamilyHandle:CF記述子、類似ファイル記述子、簡単にはMapのポインタ
  • と理解できる
  • RocksIterator:クエリー反復器、scanクエリーの操作インタフェース
  • まずいくつかの問題を見てみましょう.
  • Iterator、ColumnFamilyHandleの背後にあるのはどのようにMemTable、ImmMemTable、Manifest、SSTなどを組織したのですか?
  • CFで指定されたkey範囲の値を検索するには、ファイルの場所にどのようにナビゲートしますか?
  • Iteratorのライフサイクルはどのように管理されますか?CF closeの後、Iteratorはどのようにして解放されずに使用できますか?

  • 重点クラス構造とその関係:ColumnFamilyHandle
    ColumnFamilyHandle 

    ColumnFamilyHandle(ColumnFamilyHandle)は、CF(テーブルTableのような)の記述子であり、CFの作成やデータベースのオープン時から各CFのHandleを取得することができ、テーブルに対する操作は、put、get、scanなどのColumnFamilyHandle記述子で行う必要がある、例:rocksdb.put(cfHandle, key, value).
    ColumnFamilyHandleは、次のコードの例で取得できます.
    cfHandle=RocksDB.createCF()
    または
    cfHandles=RocksDB.open(cfNames) 
    ColumnFamilyHandle下層のColumnFamilyDataでは、memtable、immutables、および現在のバージョン番号、SSTsファイル情報などのCFのメタデータをSuperVersionで管理し、すべてのColumnFamilyDataはdbインスタンスのColumnFamilySetに配置されます.
    重点クラス構造とその関係:Iterator
    Iterator  children  ---+ MemTableIterator memtable
                                        + Arena arena              + bool valid                                        + MergerMinIterHeap minHeap             + MemTableIterator immutables
                                        + uint64_t sv_number       + IterKey saved_key                                 + InternalIterator current              + BlockBasedTableIterator level 0
                                                                   + string saved_value                                                                        + LevelIterator level 1~n
                                                                   + SequenceNumber sequence
                                                                   + iterate_lower_bound、iterate_upper_bound、prefix_start_key
                                                                   + user_comparator、merge_operator、prefix_extractor
                                                                   + LocalStatistics local_stats
    

    クエリの場合、最外層はRocksDBを使用する.NewIterator(cfHandle)によりIteratorが得られ、さらにIteratorにより指定されたCFのデータがクエリされ、ポイント・チャートget操作によりkeyに基づいてvalueが得られるほか、他のすべてのクエリはIteratorに基づいており、全表スキャン、範囲検索(より大きい、より小さい、区間)、接頭辞検索などが含まれる.Iteratorは内容とライフサイクルが複雑で、読み取りパスにはRocksDBの重要な概念の大部分が含まれています.
    最外層反復器の構築:RocksDB.新Iterator(cfHandle)呼び出しスタック:
    ArenaWrappedDBIter::Init 0x7feefb8f5c00, allow_refresh_=1
    ArenaWrappedDBIter::Init()
     0   librocksdbjni3300438414871377681.jnilib 0x0000000121dc8236 _ZN7rocksdb18ArenaWrappedDBIter4InitEPNS_3EnvERKNS_11ReadOptionsERKNS_18ImmutableCFOptionsERKyyyPNS_12ReadCallbackEbb + 214
     1   librocksdbjni3300438414871377681.jnilib 0x0000000121dc85ba _ZN7rocksdb25NewArenaWrappedDbIteratorEPNS_3EnvERKNS_11ReadOptionsERKNS_18ImmutableCFOptionsERKyyyPNS_12ReadCallbackEPNS_6DBImplEPNS_16ColumnFamilyDataEbb + 266
     2   librocksdbjni3300438414871377681.jnilib 0x0000000121d640f9 _ZN7rocksdb6DBImpl11NewIteratorERKNS_11ReadOptionsEPNS_18ColumnFamilyHandleE + 617
     3   librocksdbjni3300438414871377681.jnilib 0x0000000121c8757e Java_org_rocksdb_RocksDB_iteratorCF__JJ + 78
    

    RocksDB.NewIterator()は、ArenaWrappedDBIterオブジェクトを返します.ArenaWrappedDBIterはハウジングに相当します.このDBIterには、現在のkey&valueのような状態変数が多数含まれています.内部反復器InternalIteratorもあります.DBIterの役割は、クエリーを最下位のInternalIteratorに転送することです.InternalIteratorが返すKVは元のバイナリデータであり、DBIterはデータを取得した後に意味のある内容として解析し、バージョン番号sequence(末尾8-1バイト)、操作タイプtype(末尾1バイト、通常のValue Key、削除操作Delete Key、連結操作Merge Keyなどを含む)、実際のユーザKeyの内容、たとえば、Delete Keyは次のKeyをスキップして読み込む必要があり、Merge Keyは新しい古い値をマージし、処理が完了してから結果を返す必要があります.
    ここで、Arenaは、DBITEとその内部のInternalIteratorを格納するために使用され、メモリの断片化を防止するために使用され、DBITEには多くのメンバーが含まれており、Arenaは、各メンバーが小さなメモリを申請するのではなく、すべてのメンバーを格納するために大きなスペースを申請した.
    さらに、ArenaWrappedDBIterは、反復器Refreshのために追加された情報ColumnFamilyData cfd_、DBImpl db_impl_ 、ReadOptions read_options_,Refreshとは、SuperVersionNumberが反復器の作成時のバージョンよりも更新された場合に、内部DBTERとInternalIteratorを再作成する必要があることを意味します.詳細は、方法ArenaWrappedDBTer::Refresh()を参照してください.
    詳細なKVフォーマットはdb/memtable.cc/MemTable::Add():internal_key_size(varint) + internal_key(user_key+sequence+type) + value_size(varint) + value.上層部にとってuser_keyは、実際のユーザーデータの末尾にtimestampが含まれている可能性があります.
    WriteBatchレイヤフォーマットはdb/write_を参照batch.cc/WriteBatchInternal::Put():tag(type) + cf_id(varint) + key_and_timestamp_size(varint) + key_data + timestamp + value_size(varint) + value_data.
    注意TTLが有効になっている場合、DBWithTTLImpl::Write()には、timestampがvalueの後に追加された4バイトであり、TTLのフィルタはTtlCompactionFilterを参照してください.
    詳細については、DBImpl::WriteImpl()->WriteBatchInternal::InsertInto()->WriteBatch::Iterate()->WriteBatchInternal::Iterate()->MemTable Inserter::PutCFImpl()->MemTable::Add()を参照してください.
    MergingIteratorは万象を網羅した反復器であり、InternalIteratorの一種であり、下層の様々なタイプのサブ反復器がMergingIteratorに配置され、memtable、immutables、SSTsを含むInternalIteratorは、1つのvector集合によって保持され、最小スタックminHeapによってpickのどの字反復器のKVを最適化する.
    重点コードの概要:
  • InternalIterator:DBImpl::NewInternalIterator()を構築します.コードの詳細は末尾を参照してください.
  • MergingIteratorサブ反復器から次のキー値:MergingIterator::SeekToFirst()&Next()を読み込むように選択し、コードの詳細は末尾を参照してください.
  • 反復器解析データメソッド:DBTER::FindNextUserEntryInternal()コード詳細は末尾を参照.

  • 最初の質問に答えてください.
    質問1、Iterator、ColumnFamilyHandleの背後にあるのは、MemTable、ImmMemTable、Manifest、SSTなどをどのように組織しているのか.
    上の分析から見ればほぼはっきりしているはずだ.
    質問2.あるCFで指定されたkey範囲の値を検索するには、どのようにしてファイルのある場所にナビゲートしますか?
    ArenaWrappedDBIter::Seek(const Slice&target)メソッドからずっと下に追えばいい、MergingIterator::Seek(const Slice&target)の場合、すべてのサブ反復器に対してSeekを1回行い、key順にサブ反復器を最小スタックに入れ、最小keyのサブ反復器に戻り、ArenaWrappedDBIter::Next()で次のkeyを取得する場合、前回の最小反復器の値を取り出し、最小keyのサブ反復器を戻し、上界までループ往復します.
    では、サブ反復器のSeekはどのように完成しましたか?
  • メモリ内のMemTableIteratorのSeekは、SkipListテーブルを例にとると、SkipList Rep::Iterator::Seek()によってSkipListに対応するノードが見つかります.
  • level 0 SSTファイルのSeekは、BlockBasedTable:Iterator::Seek()/PlainTable:Iterator::Seek()で見つかります.BlockBasedTableはSSTのデフォルトフォーマットです.BlockBasedTable内部はSSTのBlockインデックスIndexIterator::Seek()でファイル内部の大まかな位置(どのBlock、1つのBlockを運ぶと4 Kサイズ)をすばやく特定します.最終的にBlock内でBlockIter::Seek()を介してkeyに対応する特定のEntryを2点で検索する.
  • level 1~n SSTファイルのSeekは、各層に1つのLevelIteratorがあり、1層の複数のSSTファイルに対して、その内容はすべて順序付けされており、LevelIterator::Seek()はkeyに対応する当該層ファイルを先に見つけ、あるSSTファイルのBlockBasedTable Iteratorを返し、BlockBasedTable Iterator::Seek()を呼び出し、次のフローは上記level 0での分析と類似している.

  • 質問3、Iteratorのライフサイクルはどのように管理されていますか?CF closeの後、Iteratorはどのようにして解放されずに使用できますか?
    ColumnFamilyData構造には、ColumnFamilyHandleを呼び出すときにrefs参照カウントがある.close()CF記述子を解放すると,下層のColumnFamilyDataへの参照が1だけ減少し,refs=0を参照した場合にのみ真に解放される(コード参照構造関数~ColumnFamilyHandleImpl()である).
     
    キー構造
    重要構造:ColumnFamilyData
    コードパス:rocksdb/db/column_family.cc
    // This class keeps all the data that a column family needs.
    // Most methods require DB mutex held, unless otherwise noted
    class ColumnFamilyData {
      uint32_t id_;
      const std::string name_;
      Version* dummy_versions_;  // Head of circular doubly-linked list of versions.
      Version* current_;         // == dummy_versions->prev_
    
      std::atomic refs_;      // outstanding references to ColumnFamilyData
      std::atomic initialized_;
      std::atomic dropped_;  // true if client dropped it
    
      const InternalKeyComparator internal_comparator_;
      std::vector<:unique_ptr>>
          int_tbl_prop_collector_factories_;
    
      const ColumnFamilyOptions initial_cf_options_;
      const ImmutableCFOptions ioptions_;
      MutableCFOptions mutable_cf_options_;
    
      const bool is_delete_range_supported_;
    
      std::unique_ptr table_cache_;
    
      std::unique_ptr internal_stats_;
    
      WriteBufferManager* write_buffer_manager_;
    
      MemTable* mem_;
      MemTableList imm_;
      SuperVersion* super_version_;
    
      // An ordinal representing the current SuperVersion. Updated by
      // InstallSuperVersion(), i.e. incremented every time super_version_
      // changes.
      std::atomic super_version_number_;
    
      // Thread's local copy of SuperVersion pointer
      // This needs to be destructed before mutex_
      std::unique_ptr local_sv_;
    
      // pointers for a circular linked list. we use it to support iterations over
      // all column families that are alive (note: dropped column families can also
      // be alive as long as client holds a reference)
      ColumnFamilyData* next_;
      ColumnFamilyData* prev_;
    
      // This is the earliest log file number that contains data from this
      // Column Family. All earlier log files must be ignored and not
      // recovered from
      uint64_t log_number_;
    
      std::atomic flush_reason_;
    
      // An object that keeps all the compaction stats
      // and picks the next compaction
      std::unique_ptr compaction_picker_;
    
      ColumnFamilySet* column_family_set_;
    
      std::unique_ptr write_controller_token_;
    
      // If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
      bool queued_for_flush_;
    
      // If true --> this ColumnFamily is currently present in
      // DBImpl::compaction_queue_
      bool queued_for_compaction_;
    
      uint64_t prev_compaction_needed_bytes_;
    
      // if the database was opened with 2pc enabled
      bool allow_2pc_;
    
      // Memtable id to track flush.
      std::atomic last_memtable_id_;
    
      // Directories corresponding to cf_paths.
      std::vector<:unique_ptr>> data_dirs_;
    };

    重要構造:ArenaWrappedDBTER
    コードパス:rocksdb/db/db_iter.cc(rocksdb/db/db_impl.cc ArenaWrappedDBIter* DBImpl::NewIteratorImpl() <= Iterator* DBImpl::NewIterator())
    // A wrapper iterator which wraps DB Iterator and the arena, with which the DB
    // iterator is supposed be allocated. This class is used as an entry point of
    // a iterator hierarchy whose memory can be allocated inline. In that way,
    // accessing the iterator tree can be more cache friendly. It is also faster
    // to allocate.
    class ArenaWrappedDBIter : public Iterator {
      DBIter* db_iter_;
      Arena arena_;
      uint64_t sv_number_;
      ColumnFamilyData* cfd_ = nullptr;
      DBImpl* db_impl_ = nullptr;
      ReadOptions read_options_;
      ReadCallback* read_callback_;
      bool allow_blob_ = false;
      bool allow_refresh_ = true;
    };
    ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
                                                ColumnFamilyData* cfd,
                                                SequenceNumber snapshot,
                                                ReadCallback* read_callback,
                                                bool allow_blob,
                                                bool allow_refresh) {
      // Try to generate a DB iterator tree in continuous memory area to be
      // cache friendly. Here is an example of result:
      // +-------------------------------+
      // |                               |
      // | ArenaWrappedDBIter            |
      // |  +                            |
      // |  +---> Inner Iterator   ------------+
      // |  |                            |     |
      // |  |    +-- -- -- -- -- -- -- --+     |
      // |  +--- | Arena                 |     |
      // |       |                       |     |
      // |          Allocated Memory:    |     |
      // |       |   +-------------------+     |
      // |       |   | DBIter            |  iter_  ------------+
      // |       |   |                   |     |
      // |       |   +-------------------+     |
      // |       |   | MergingIterator   | child iter1  ------------+
      // |       |   |  |                |          |
      // |           |  +->child iter2  ----------+ |
      // |       |   |  |                |        | |
      // |       |   |  +->child iter3  --------+ | |
      // |           |                   |      | | |
      // |       |   +-------------------+      | | |
      // |       |   | Iterator1         | 

     
    詳細コード
    InternalIteratorの構築:DBImpl::NewInternalIterator():
    InternalIterator* DBImpl::NewInternalIterator(
        const ReadOptions& read_options, ColumnFamilyData* cfd,
        SuperVersion* super_version, Arena* arena,
        RangeDelAggregator* range_del_agg) {
      InternalIterator* internal_iter;
      assert(arena != nullptr);
      assert(range_del_agg != nullptr);
      // Need to create internal iterator from the arena.
      MergeIteratorBuilder merge_iter_builder(
          &cfd->internal_comparator(), arena,
          !read_options.total_order_seek &&
              cfd->ioptions()->prefix_extractor != nullptr);
      // Collect iterator for mutable mem
      merge_iter_builder.AddIterator(
          super_version->mem->NewIterator(read_options, arena));
      std::unique_ptr range_del_iter;
      Status s;
      if (!read_options.ignore_range_deletions) {
        range_del_iter.reset(
            super_version->mem->NewRangeTombstoneIterator(read_options));
        s = range_del_agg->AddTombstones(std::move(range_del_iter));
      }
      // Collect all needed child iterators for immutable memtables
      if (s.ok()) {
        super_version->imm->AddIterators(read_options, &merge_iter_builder);
        if (!read_options.ignore_range_deletions) {
          s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
                                                             range_del_agg);
        }
      }
      TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
      if (s.ok()) {
        // Collect iterators for files in L0 - Ln
        if (read_options.read_tier != kMemtableTier) {
          super_version->current->AddIterators(read_options, env_options_,
                                               &merge_iter_builder, range_del_agg);
        }
        internal_iter = merge_iter_builder.Finish();
        IterState* cleanup =
            new IterState(this, &mutex_, super_version,
                          read_options.background_purge_on_iterator_cleanup);
        internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
    
        return internal_iter;
      } else {
        CleanupSuperVersion(super_version);
      }
      return NewErrorInternalIterator(s, arena);
    }

     
    MergingIteratorサブ反復器から次のキーの読み込みを選択し、最小スタック加速pick:MergingIterator::SeekToFirst()&Next()を使用します.
    virtual void SeekToFirst() override {
        ClearHeaps();
        status_ = Status::OK();
        for (auto& child : children_) {
          child.SeekToFirst();
          if (child.Valid()) {
            assert(child.status().ok());
            minHeap_.push(&child);
          } else {
            considerStatus(child.status());
          }
        }
        direction_ = kForward;
        current_ = CurrentForward();
      }
      
      IteratorWrapper* CurrentForward() const {
        assert(direction_ == kForward);
        return !minHeap_.empty() ? minHeap_.top() : nullptr;
      }
      
      virtual void Next() override {
        assert(Valid());
    
        // Ensure that all children are positioned after key().
        // If we are moving in the forward direction, it is already
        // true for all of the non-current children since current_ is
        // the smallest child and key() == current_->key().
        if (direction_ != kForward) {
          SwitchToForward();
          // The loop advanced all non-current children to be > key() so current_
          // should still be strictly the smallest key.
          assert(current_ == CurrentForward());
        }
    
        // For the heap modifications below to be correct, current_ must be the
        // current top of the heap.
        assert(current_ == CurrentForward());
    
        // as the current points to the current record. move the iterator forward.
        current_->Next();
        if (current_->Valid()) {
          // current is still valid after the Next() call above.  Call
          // replace_top() to restore the heap property.  When the same child
          // iterator yields a sequence of keys, this is cheap.
          assert(current_->status().ok());
          minHeap_.replace_top(current_);
        } else {
          // current stopped being valid, remove it from the heap.
          considerStatus(current_->status());
          minHeap_.pop();
        }
        current_ = CurrentForward();
      }

     
    反復解析データメソッド:DBTER::FindNextUserEntryInternal():
    bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
      // Loop until we hit an acceptable entry to yield
      assert(iter_->Valid());
      assert(status_.ok());
      assert(direction_ == kForward);
      current_entry_is_merged_ = false;
    
      // How many times in a row we have skipped an entry with user key less than
      // or equal to saved_key_. We could skip these entries either because
      // sequence numbers were too high or because skipping = true.
      // What saved_key_ contains throughout this method:
      //  - if skipping        : saved_key_ contains the key that we need to skip,
      //                         and we haven't seen any keys greater than that,
      //  - if num_skipped > 0 : saved_key_ contains the key that we have skipped
      //                         num_skipped times, and we haven't seen any keys
      //                         greater than that,
      //  - none of the above  : saved_key_ can contain anything, it doesn't matter.
      uint64_t num_skipped = 0;
    
      is_blob_ = false;
    
      do {
        if (!ParseKey(&ikey_)) {
          return false;
        }
    
        if (iterate_upper_bound_ != nullptr &&
            user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
          break;
        }
    
        if (prefix_extractor_ && prefix_check &&
            prefix_extractor_->Transform(ikey_.user_key)
                    .compare(prefix_start_key_) != 0) {
          break;
        }
    
        if (TooManyInternalKeysSkipped()) {
          return false;
        }
    
        if (IsVisible(ikey_.sequence)) {
          if (skipping && user_comparator_->Compare(ikey_.user_key,
                                                    saved_key_.GetUserKey()) <= 0) {
            num_skipped++;  // skip this entry
            PERF_COUNTER_ADD(internal_key_skipped_count, 1);
          } else {
            num_skipped = 0;
            switch (ikey_.type) {
              case kTypeDeletion:
              case kTypeSingleDeletion:
                // Arrange to skip all upcoming entries for this key since
                // they are hidden by this deletion.
                // if iterartor specified start_seqnum we
                // 1) return internal key, including the type
                // 2) return ikey only if ikey.seqnum >= start_seqnum_
                // note that if deletion seqnum is < start_seqnum_ we
                // just skip it like in normal iterator.
                if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_)  {
                  saved_key_.SetInternalKey(ikey_);
                  valid_ = true;
                  return true;
                } else {
                  saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
                  skipping = true;
                  PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
                }
                break;
              case kTypeValue:
              case kTypeBlobIndex:
                if (start_seqnum_ > 0) {
                  // we are taking incremental snapshot here
                  // incremental snapshots aren't supported on DB with range deletes
                  assert(!(
                    (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
                  ));
                  if (ikey_.sequence >= start_seqnum_) {
                    saved_key_.SetInternalKey(ikey_);
                    valid_ = true;
                    return true;
                  } else {
                    // this key and all previous versions shouldn't be included,
                    // skipping
                    saved_key_.SetUserKey(ikey_.user_key,
                      !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
                    skipping = true;
                  }
                } else {
                  saved_key_.SetUserKey(
                      ikey_.user_key,
                      !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
                  if (range_del_agg_.ShouldDelete(
                          ikey_, RangeDelAggregator::RangePositioningMode::
                                     kForwardTraversal)) {
                    // Arrange to skip all upcoming entries for this key since
                    // they are hidden by this deletion.
                    ...
                }
                break;
              case kTypeMerge:
                saved_key_.SetUserKey(
                    ikey_.user_key,
                    !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
                if (range_del_agg_.ShouldDelete(
                        ikey_, RangeDelAggregator::RangePositioningMode::
                                   kForwardTraversal)) {
                  // Arrange to skip all upcoming entries for this key since
                  // they are hidden by this deletion.
                  skipping = true;
                  num_skipped = 0;
                  PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
                } else {
                  // By now, we are sure the current ikey is going to yield a
                  // value
                  current_entry_is_merged_ = true;
                  valid_ = true;
                  return MergeValuesNewToOld();  // Go to a different state machine
                }
                break;
              default:
                assert(false);
                break;
            }
          }
        } else {
          PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
    
          // This key was inserted after our snapshot was taken.
          // If this happens too many times in a row for the same user key, we want
          // to seek to the target sequence number.
          int cmp =
              user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
          if (cmp == 0 || (skipping && cmp <= 0)) {
            num_skipped++;
          } else {
            saved_key_.SetUserKey(
                ikey_.user_key,
                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
            skipping = false;
            num_skipped = 0;
          }
        }
    
        // If we have sequentially iterated via numerous equal keys, then it's
        // better to seek so that we can avoid too many key comparisons.
        if (num_skipped > max_skip_) {
          num_skipped = 0;
          std::string last_key;
          if (skipping) {
            // We're looking for the next user-key but all we see are the same
            // user-key with decreasing sequence numbers. Fast forward to
            // sequence number 0 and type deletion (the smallest type).
            AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
                                                           0, kTypeDeletion));
            // Don't set skipping = false because we may still see more user-keys
            // equal to saved_key_.
          } else {
            // We saw multiple entries with this user key and sequence numbers
            // higher than sequence_. Fast forward to sequence_.
            // Note that this only covers a case when a higher key was overwritten
            // many times since our snapshot was taken, not the case when a lot of
            // different keys were inserted after our snapshot was taken.
            AppendInternalKey(&last_key,
                              ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
                                                kValueTypeForSeek));
          }
          iter_->Seek(last_key);
          RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
        } else {
          iter_->Next();
        }
      } while (iter_->Valid());
    
      valid_ = false;
      return iter_->status().ok();
    }