Spring動的データソースと実行時動的追加データソース


1、マルチデータソースと動的データソース
プロジェクトが1つのデータベースだけを使用する必要がない場合は、複数のデータソースを使用する必要があります.たとえば、検索するデータが同じデータベース・ライブラリにないか、データベースの読み書き分離を行うなど、シーンが多いです.上記の問題に対処するには主に2つの解決方法がある.
1つ目は、Springに複数のDataSourceを注入し、異なるDataSourceに基づいて異なるSqlSessionFactoryを生成し、異なるデータベースを操作するmapperインタフェースを異なるパケットの下に置き、MyBatisの@MapperScan注釈を使用して、異なるSqlSessionFactoryを使用して異なるパケットの下のmapperインタフェースをスキャンすることです.
例:
   : Spring        
@Bean("db1-DataSource")
@Primary//    @Primary      ,               ,  SpringBoot    
@ConfigurationProperties(prefix = "datasource.db1")
fun db1DataSource(): DataSource {
    return DataSourceBuilder.create().build()
}

@Bean("db2-DataSource")
@ConfigurationProperties(prefix = "datasource.db2")
fun db2DataSource(): DataSource {
    return DataSourceBuilder.create().build()
}

   :    SqlSessionFactory   Spring
@Bean("db1-SqlSessionFactory")
fun db1SqlSessionFactory(@Qualifier("db1-DataSource") ds: DataSource): SqlSessionFactory {
    val fb = SqlSessionFactoryBean()
    fb.setDataSource(ds)
    fb.setMapperLocations("  mapper xml    ")
    return fb.getObject()
}

@Bean("db2-SqlSessionFactory")
fun db2SqlSessionFactory(@Qualifier("db2-DataSource") ds: DataSource): SqlSessionFactory {
    val fb = SqlSessionFactoryBean()
    fb.setDataSource(ds)
    fb.setMapperLocations("  mapper xml    ")
    return fb.getObject()
}

   :@MapperScan     SqlSessionFactory       mapper        ,     db1    mapper    app.mapper.db1  ,   db2    mapper    app.mapper.db2  

@MapperScan(value = ["app.mapper.db1"], sqlSessionFactoryRef = "db1-SqlSessionFactory")

@MapperScan(value = ["app.mapper.db2"], sqlSessionFactoryRef = "db2-SqlSessionFactory")

       mapper              。

次に、2つ目の方法について詳しく説明します.
2つ目:Springを使用して提供される動的データソースAbstractRoutingDataSource
AbstractRoutingDataSourceは抽象クラスで、DataSourceインタフェースを実現し、内部に複数のDataSourceを格納することができ、必要に応じて異なるDataSourceを返すことができます.
次に、AbstractRoutingDataSourceのソースの一部を解析します.
//AbstractRoutingDataSource        map       ,key         (      ,       ),value    DataSource

private Map targetDataSources;
//            
private Object defaultTargetDataSource;
//               ,    DataSource     ,          DataSource
protected abstract Object determineCurrentLookupKey();
//               ,  determineCurrentLookupKey       
protected DataSource determineTargetDataSource() {
    Object lookupKey = determineCurrentLookupKey();
	DataSource dataSource = this.resolvedDataSources.get(lookupKey);
	if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
		dataSource = this.resolvedDefaultDataSource;
	}

	if (dataSource == null) {
		throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
	}

	return dataSource;
}
//            map
public void setTargetDataSources(Map targetDataSources) {
   this.targetDataSources = targetDataSources;
}

//        
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
	this.defaultTargetDataSource = defaultTargetDataSource;
}

このAbstractRoutingDataSource抽象クラスを継承し、determineCurrentLookupKey()という方法を実装し、値を返すことで使用するデータソースを動的に変更します.
次に、@Transactional注記をブロックする方法として、「add」、「update」、「delete」を接頭辞として使用する場合はdb 1-DataSource、「get」を接頭辞として使用する場合はdb 2-DataSourceを使用する適用シーンを想定します.これは単純な読み書き分離に相当する.
すなわち,@Transactional注釈を付けたトランザクションメソッドの実行前に,メソッド署名に従って使用するDataSourceを動的に変更する必要がある.
ここでは、実行するトランザクションをブロックするためにフェースを作成し、フェースで実行するメソッド名を判断し、取得した結果情報をThreadLocalに保存することで、AbstractRoutingDataSourceのdetermineCurrentLookupKeyメソッドでThreadLocalから取得し、対応するデータソースの名前を返すことができます.ThreadLocalを使用する主な理由は、トランザクション・メソッドが常に同時実行されるため、相互の干渉を防止するためです.
具体的なサンプルコードは次のとおりです.
D y n amicDataSourceを作成してAbstractRoutingDataSourceを継承し、determineCurrentLookupKeyメソッドを実現しました.
class DynamicDataSource : AbstractRoutingDataSource() {
    override fun determineCurrentLookupKey(): Any {
        //  MultipleDataSourceConfig         、sqlSessionFactory       
        val key = MultipleDataSourceConfig.threadLocal.get()

        println(" DynamicDataSource           : ${key} ")

        if (key == null) {
            println("error dataSource key ")
            return "db1"//         key
        }

        return key
    }
}

これは構成クラスです.
EnableTransactionManagement
@Configuration
open class MultipleDataSourceConfig {

    companion object {
        val threadLocal: ThreadLocal = ThreadLocal()
    }

    @Bean("db1-DataSource")
    @Primary
    open fun masterDataSource(): DataSource? {
                     
    }

    @Bean("db2-DataSource")
    open fun masterDataSource(): DataSource? {
                     
    }


    @Bean("dataSource")
    open fun dataSource(@Qualifier("db1-DataSource") db1: DataSource, @Qualifier("db2-DataSource") db2: DataSource): DynamicDataSource {
        val map =  db1 db2    map  ,key db1 db2,value       
        return DynamicDataSource().apply {
            this.setTargetDataSources(map)
            this.setDefaultTargetDataSource(db1)
        }
    }

    @Bean("sqlSessionFactory")
    open fun sqlSessionFactory(@Qualifier("dataSource") ds: DataSource): SqlSessionFactory {
        val bean = SqlSessionFactoryBean()
        bean.setDataSource(ds)
        return bean.getObject()
    }

    @Bean
    open fun transactionManager(@Qualifier("dataSource") ds: DataSource): DataSourceTransactionManager {
        return DataSourceTransactionManager(ds)
    }
}

これはトランザクションメソッドをブロックする断面です.
@Component
@Aspect
@Order(1)//              
class SelectDataSourceAspect {

   @Before("@annotation(org.springframework.transaction.annotation.Transactional)")
    fun before(jp: JoinPoint) {
        println(" SelectDataSourceAspect         ${jp.signature.name} ")

        try {
            println(" SelectDataSourceAspect   ${jp.signature.name}   :        -> " + TransactionInterceptor.currentTransactionStatus().toString())
            //println(TransactionInterceptor.currentTransactionStatus().toString() + " " + txm)
        } catch (e: Exception) {
            println(" SelectDataSourceAspect   ${jp.signature.name}   :        ")
        }

        if (   jp.signature.name   add、update、delete    ) {
            MultipleDataSourceConfig.threadLocal.set("db1")
        } else {
            MultipleDataSourceConfig.threadLocal.set("db2")
        }
    }

}

ここまでは大丈夫そうですが、大きなバグがあります.
Springには、主にトランザクション・メソッドの実行前にデータ・ソースからconnectionを取り出し、オープン・トランザクションを設定し、正常に実行されるとコミットし、例外を投げ出すとロールバックします(Springのトランザクション・マネージャDataSourceTransactionManagerの具体的なソースコード).
我々のスライスは、トランザクション・メソッドが実行される前にデータ・ソースを使用して判断されます.つまり、この2つのスライスには実行の順序があります.
たとえば、Springのトランザクション・スライスが最初に実行され、データ・ソースからconnectionを取得したとします.ThreadLocalには値がないため、取得したconnectionはデフォルトのデータ・ソースdb 1であり、connectionを取得してから使用するデータ・ソースを変更しました.これは明らかに間違っています.
では、切断面の実行順序をどのように変更しますか?
公式文書を参照すると、次のような説明があります.
点我跳转:Spring概要-接面の実行順序の説明
大まかな内容は,接面に@Order注記を付けることができ,@Order注記の内部にintタイプの値が優先度を表し,この値が小さいほど接面が優先的に実行される.
したがって,我々が配置した切面に@Order(1)注釈を加えることで,我々の優先実行を保証できる.
これが第2の方法のすべての内容といくつかの注意事項です.
説明:
1、AOPを使用して使用するデータソースを判断する必要はありません.filter、controller、controllerのブロッカーなどで判断できます.
2、例ではメソッド名で使用するデータソースを判断するのは明らかに優美ではありません.@Read、@Writeなどの独自の注釈を作成し、AOPでMethodを取得し、反射を利用して注釈の内容を出して判断すると、コードもずっときれいになります.注意:Methodはオービットの通知(@Around)のみ取得でき、前置きの通知@Beforeのようにメソッド署名のみ取得できます.
2、運転時にデータソースを動的に追加する
上記のspringダイナミックデータソースに対して、プロジェクトの実行時にデータソースを追加できないかという突発的なアイデアがあります.すなわち、Webアプリケーションを停止する必要がない場合に、ネットワークを介していくつかのデータソースの構成情報をWebアプリケーションに転送し、Webアプリケーションはこの新しいデータソース操作に対応するデータベースを使用することができる.
前述したAbstractRoutingDataSourceでは[Map targetDataSource]を使用して複数のデータソースを保存しており、実行時にこのmapに新しいデータソースをputすればよいと考えています.
具体的な実装では、データソース構成情報を送信するapolloを使用することにしました.apolloに追加された構成はwebアプリケーションに同期します.もちろん、controllerでデータソース構成を受け入れるかdubboのようなrpcフレームワークを通じて、listenerリスニングポートを書いてオリジナルのsocketで情報伝送を行うなど、他の方法を使用することができます.それが好きならそれを使います.私がapolloを使ったのは、主に私のプロジェクトの構成情報がすべてapolloに置かれて統一管理されているからです.
これらのデータソースを保存するにはConcureentHashMapを使用し、Springのコンテナに登録し、setTargetDataSourceメソッドでAbstractRoutingDataSourceにセットします.ConcureentHashMapを使用してHashMapを使用しないのは、主にデータ・ソース・オペレーションとトランザクション・スライス取得データ・ソース・オペレーションの間で同時実行される可能性があるため、スレッドのセキュリティを考慮しています.
コードの例は次のとおりです.
SpringにConcureentHashMapを注入
@Bean("slaves")
open fun runtimeDataSource(): ConcurrentHashMap {
    val map = ConcurrentHashMap()
    val cfg = ConfigService.getAppConfig()
    val set = cfg.propertyNames
    for (key in set) {
        //      db.slave.,              
        if (key.startsWith("db.slave.")) {
            val value = cfg.getProperty(key, "")
            //    getDataSource          ,kv key        ,value     。
            val kv = getDataSource(key, value)
            if (kv != null) {
                map.put(kv.key, kv.value)
            }
        }
    }

    //                     
    cfg.addChangeListener(ConfigChangeListener { changeEvent ->
        //            ,                。
        //    synchronized  ,                    ,               。(        jvm    )
        //                           。
        synchronized(this) {
            for (key in changeEvent.changedKeys()) {
                val change = changeEvent.getChange(key)
                if (change.changeType.equals(PropertyChangeType.ADDED) && change.propertyName.startsWith("db.slave.")) {
                    //    getDataSource          ,kv key        ,value     。
                    val kv = getDataSource(change.propertyName, change.newValue)
                    if (kv != null) {
                        //    put  map  
                        map.put(kv.key, kv.value)
                    }
                }
            }
        }
    })

    return map
}

私たちのうどんにこのmapを注入します.
@Component
@Aspect
@Order(1)
class SelectDataSourceAspect {
    @Autowired
    @Qualifier("slaves")
    lateinit var dbs: ConcurrentHashMap

    @Before("@annotation(org.springframework.transaction.annotation.Transactional)")
    fun before(jp: JoinPoint) {
        println(" SelectDataSourceAspect         ${jp.signature.name} ")

        try {
            println(" SelectDataSourceAspect   ${jp.signature.name}   :        -> " + TransactionInterceptor.currentTransactionStatus().toString())
        } catch (e: Exception) {
            println(" SelectDataSourceAspect   ${jp.signature.name}   :        ")
        }

        val methodName = jp.signature.name

        if (methodName    add、updat、delete   || dbs.size == 1) {
            MultipleDataSourceConfig.threadLocal.set("db1")
        } else {
            val li = mutableListOf()
            for (k in dbs.keys) {
                if (!"db1".equals(k.toString())) {
                    li.add(k.toString())
                }
            }

            //                       。
            //    ,        ,          。
            //                            。
            val idx = Math.abs(random.nextInt()) % li.size
            MultipleDataSourceConfig.threadLocal.set(li[idx])
        }


    }

}

DynamicDataSourceのコードは変わらず、ThreadLocalから取得して返すだけでよい.
でも!テストの発見は動的に追加されたデータソースには選択されず、事前に構成されたデータソースのみが可能ですが、DynamicDataSourceでは正しいデータソースのkeyが返されています.
AbstractRoutingDataSourceのソースコードを開いて、データソースを選択するプロセスを表示します.
//    ,  connection    determineTargetDataSource         
@Override
public Connection getConnection() throws SQLException {
	return determineTargetDataSource().getConnection();
}

//  determineTargetDataSource   determineCurrentLookupKey   key      
//  !   resolvedDataSources    ,    resolvedDataSources        ?
protected DataSource determineTargetDataSource() {
	Object lookupKey = determineCurrentLookupKey();
	DataSource dataSource = this.resolvedDataSources.get(lookupKey);
	if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
		dataSource = this.resolvedDefaultDataSource;
	}
	if (dataSource == null) {
		throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
	}
	return dataSource;
}

//  resolvedDataSources   targetDataSources copy       
@Override
public void afterPropertiesSet() {
	if (this.targetDataSources == null) {
		throw new IllegalArgumentException("Property 'targetDataSources' is required");
	}
	this.resolvedDataSources = new HashMap(this.targetDataSources.size());
	for (Map.Entry entry : this.targetDataSources.entrySet()) {
		Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
		DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
		this.resolvedDataSources.put(lookupKey, dataSource);
	}
	if (this.defaultTargetDataSource != null) {
		this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
	}
}

理由を知ってから修正できるようになったので、いっそDynamicDataSourceでdetermineTargetDataSourceメソッドも書き換えて、注入したmapからデータソースを選択し、resolvedDataSourcesで取得する必要はありません.
変更後のDynamicDataSourceコードは次のとおりです.
class DynamicDataSource : AbstractRoutingDataSource() {

    @Autowired
    @Qualifier("slaves")
    lateinit var dbs: ConcurrentHashMap

    override fun determineCurrentLookupKey(): Any {
        val key = MultipleDataSourceConfig.threadLocal.get()

        if (key == null) {
            println("error dataSource key ")
            return "db1"
        }

        return key
    }

    //      ,         protect,          
    override fun determineTargetDataSource(): DataSource? {
        val lookupKey = determineCurrentLookupKey()
        var dataSource = dbs[lookupKey]

        if (dataSource is DataSource) {
            return dataSource
        }

        return null
    }

}

このプロセス全体を共有し、いくつかのシーンのテストを行い、正しい結果を得ました.
3、工事のソースコード
コード管理github:エンジニアリングソース