mongodbソース分析

6219 ワード

githubでコードをローカルにダウンロードし、まずプログラムエントリmainを見つけます.
ここにいるよcpp
#if defined(_WIN32)
// In Windows, wmain() is an alternate entry point for main(), and receives the same parameters
// as main() but encoded in Windows Unicode (UTF-16); "wide" 16-bit wchar_t characters.  The
// WindowsCommandLine object converts these wide character strings to a UTF-8 coded equivalent
// and makes them available through the argv() and envp() members.  This enables mongoDbMain()
// to process UTF-8 encoded arguments and environment variables without regard to platform.
int wmain(int argc, wchar_t* argvW[]) {
    mongo::quickExit(mongo::mongod_main(argc, mongo::WindowsCommandLine(argc, argvW).argv()));
}
#else
int main(int argc, char* argv[]) {
    mongo::quickExit(mongo::mongod_main(argc, argv));
}
#endif

実際の実行はlinuxの下にあるので、linuxの下に注目すればいいです.
まずはmongo::mongod_main、mongod_main.cppでは,起動したパラメータリストをパラメータとして受け入れる.
int mongod_main(int argc, char* argv[]) {
    //       
    ThreadSafetyContext::getThreadSafetyContext()->forbidMultiThreading();

    //        (    )
    registerShutdownTask(shutdownTask);

    //          
    setupSignalHandlers();

    //       
    srand(static_cast(curTimeMicros64()));

    Status status = mongo::runGlobalInitializers(std::vector<:string>(argv, argv + argc));
    if (!status.isOK()) {
        LOGV2_FATAL_OPTIONS(
            20574,
            logv2::LogOptions(logv2::LogComponent::kControl, logv2::FatalMode::kContinue),
            "Error during global initialization: {error}",
            "Error during global initialization",
            "error"_attr = status);
        quickExit(EXIT_FAILURE);
    }

    //      
    auto* service = [] {
        try {
            auto serviceContextHolder = ServiceContext::make();
            auto* serviceContext = serviceContextHolder.get();
            setGlobalServiceContext(std::move(serviceContextHolder));

            return serviceContext;
        } catch (...) {
            auto cause = exceptionToStatus();
            LOGV2_FATAL_OPTIONS(
                20575,
                logv2::LogOptions(logv2::LogComponent::kControl, logv2::FatalMode::kContinue),
                "Error creating service context: {error}",
                "Error creating service context",
                "error"_attr = redact(cause));
            quickExit(EXIT_FAILURE);
        }
    }();

    setUpCollectionShardingState(service);
    setUpCatalog(service);
    setUpReplication(service);
    setUpObservers(service);
    service->setServiceEntryPoint(std::make_unique(service));

    ErrorExtraInfo::invariantHaveAllParsers();

    startupConfigActions(std::vector<:string>(argv, argv + argc));
    cmdline_utils::censorArgvArray(argc, argv);

    if (!initializeServerGlobalState(service))
        quickExit(EXIT_FAILURE);

    if (!initializeServerSecurityGlobalState(service))
        quickExit(EXIT_FAILURE);

    // There is no single-threaded guarantee beyond this point.
    ThreadSafetyContext::getThreadSafetyContext()->allowMultiThreading();

    // Per SERVER-7434, startSignalProcessingThread must run after any forks (i.e.
    // initializeServerGlobalState) and before the creation of any other threads
    startSignalProcessingThread();

    ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD);

#if defined(_WIN32)
    if (ntservice::shouldStartService()) {
        ntservice::startService();
        // exits directly and so never reaches here either.
    }
#endif

    //     
    ExitCode exitCode = initAndListen(service, serverGlobalParams.port);
    exitCleanly(exitCode);
    return 0;
}

次に、ネットワークを初期化する方法を見てみましょう.主にTransportLayerManagerを使用しています.
 
    if (!storageGlobalParams.repair) {
        //      
        auto tl =
            transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext);
        auto res = tl->setup();
        if (!res.isOK()) {
            LOGV2_ERROR(20568,
                        "Error setting up listener: {error}",
                        "Error setting up listener",
                        "error"_attr = res);
            return EXIT_NET_ERROR;
        }
        serviceContext->setTransportLayer(std::move(tl));
    }

//            asio
std::unique_ptr TransportLayerManager::createWithConfig(
    const ServerGlobalParams* config, ServiceContext* ctx) {
    auto sep = ctx->getServiceEntryPoint();

    transport::TransportLayerASIO::Options opts(config);
    opts.transportMode = transport::Mode::kSynchronous;

    ctx->setServiceExecutor(std::make_unique(ctx));

    std::vector<:unique_ptr>> retVector;
    retVector.emplace_back(std::make_unique<:transportlayerasio>(opts, sep));
    return std::make_unique(std::move(retVector));
}

//               
    if (!storageGlobalParams.repair) {
        start = serviceContext->getTransportLayer()->start();
        if (!start.isOK()) {
            LOGV2_ERROR(20572,
                        "Error starting listener: {error}",
                        "Error starting listener",
                        "error"_attr = start);
            return EXIT_NET_ERROR;
        }
    }

次にリスニングスレッドを開始します
Status TransportLayerASIO::start() {
    stdx::unique_lock lk(_mutex);

    // Make sure we haven't shutdown already
    invariant(!_isShutdown);

    if (_listenerOptions.isIngress()) {
        //      
        _listener.thread = stdx::thread([this] { _runListener(); });
        _listener.cv.wait(lk, [&] { return _isShutdown || _listener.active; });
        return Status::OK();
    }

    invariant(_acceptors.empty());
    return Status::OK();
}

新しい接続を受け入れ、セッションを処理するのはServiceEntryPointMongodです.