Kafka Spring[2]Connect利用


🔨Maria DB設定

 create table orders(
     id int auto_increment primary key,
     user_id varchar(50) not null,
     product_id varchar(20) not null,
     order_id varchar(50) not null,
     qty int default 0,
     unit_price int default 0,
     total_price int default 0,
     created_at datetime default now()
);
クエリー文を使用してmaria dbに受注テーブルを追加します.
  datasource:
#    driver-class-name: org.h2.Driver
#    url: jdbc:h2:mem:testdb
    url: jdbc:mariadb://localhost:3306/mydb
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: 비밀번호
次のように設定をymlファイルに変更します.
依存性にmariadb依存性が追加されている場合は、可能です.

その後、これらのサービスを実行し、次のような注文を生成します.

maria dbは、正常に転送されたデータも確認できます.
本来論理的に正常に追加されていたデータですが、今はカフカで行いましょう.

🔨Orderサービスの変更

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      ...
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    ...
  }
}
以前kafkaでdbinsertが発生した場合、データが次のように伝達されていることが確認されました.schemaはdbのテーブル情報をフィールドに入れて伝達し、payloadはデータの一部を伝達し、javaで実現します.

👉現場実施

@Data
@Builder
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

👉Payload実装

@Data
@Builder
public class Payload {
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

👉Schema実施

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

👉KafkaOrderDto実施

@Data
@Builder
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}
上記のオブジェクトの情報をKafkaOrderDtoに読み込んで伝達すると,以前にKafkaでdbinsertが発生したときに伝達された内容がそのまま使用されていたことがわかる.

👉KafkaOrder Producer実装

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaOrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            Field.builder().type("string").optional(true).field("order_id").build(),
            Field.builder().type("string").optional(true).field("user_id").build(),
            Field.builder().type("string").optional(true).field("product_id").build(),
            Field.builder().type("int32").optional(true).field("qty").build(),
            Field.builder().type("int32").optional(true).field("unit_price").build(),
            Field.builder().type("int32").optional(true).field("total_price").build()
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    public OrderDto orderSend(String topic, OrderDto orderDto){

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()
                .schema(schema)
                .payload(payload)
                .build();

        ObjectMapper mapper = new ObjectMapper();
        //json format으로 변경
        String json = "";
        try {
            json = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        //kafka 메세지 전송
        kafkaTemplate.send(topic, json);
        log.info("Kafka Producer send data from the order service = {}",kafkaOrderDto);

        return orderDto;
    }
}
以前はProducerでKafka sendを実行しており,コードを記述することで論理がうまくデータに入るようにしている.

👉Order Controlの変更

@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
    private final Environment env;
    private final OrderService orderService;
    private final KafkaProducer kafkaProducer;  //kafka producer 주입
    private final KafkaOrderProducer kafkaOrderProducer;    //주문 전송 producer 주입

	...

    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        //기존의 jpa 로직
//        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
//        orderDto.setUserId(userId);
//        OrderDto createOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
        orderDto.setUserId(userId);
        //kafka 주문 로직 추가
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());

        //kafka 메세지 전송
        kafkaProducer.orderSend("example-catalog-topic", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
        kafkaOrderProducer.orderSend("orders", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }
}
ここで重要なのは,注釈処理が既存のjpaを用いてdbに直接データを格納する論理である.OrderControllerにも、jpaロジックではなく、カフカにデータを転送するロジックのみが追加されています.

👉Kafka Sink Connect登録

{
    "name":"my-order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"비밀번호",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}

次のようにsink connectを登録します.

👏テスト


👉Consumerの実行

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
コマンドを使用してKafka Consumerをサスペンションし、メッセージが送信されたことを確認できます.

👉2つのOrderサービスをオフセット



下図のように、2つ以上order-serviceを置きます.

エリカでも確認できます

こうして、データの転送と確認のために、いくつかの値を変えて、何度も転送してみました.

👊dbのチェック



以前に転送したデータがよく入力されていることを確認できます.
転送に成功したがdbに何の反応もない場合は、kafkaサーバ上のすべてのサーバを再実行します.メッセージ自体は消えず、データが失われることはありませんので、サーバが再起動すると、すべてのデータが有効になります.