Kafka Streams 使用实例

上为,基于 Kafka Streams 的清算消息分发流程,这个流程中涉及到一下的5个对象:

  • KafkaStreamTunnel:KafkaStreams 的通用逻辑实现,用于创建 Topology 和 KafkaStreams 实例
  • MessageAdapter:为 StreamStarter 提供的通用消息分发处理接口,提供 getKey(Message msg)、withHeaders(Message msg) 两个方法,以便外部进行实现,可根据消息对象进行不同的处理
  • StreamStateListener:Kafka Message 状态变化监听器,这里做了成功后的通知
  • StreamUncaughtExceptionHandler:Kafka Message 消息处理失败的异常处理 Handler,这里做了失败后的通知

代码实例

KafkaStreamTunnel

public class KafkaStreamTunnel {
    private static final Logger log = LoggerFactory.getLogger(StreamStarter.class);
    private final KafkaStreams streams;

    public KafkaStreamTunnel(String bootStrapServers, String inTopic, String outTopic, String component, MessageAdapter messageAdapter) {
        this(bootStrapServers, inTopic, outTopic, component, messageAdapter, 1);
    }

    public void setStateListener(StreamStateListener listener) {
        this.streams.setStateListener(listener);
    }

    public void setUncaughtExceptionHandler(StreamUncaughtExceptionHandler handler) {
        this.streams.setUncaughtExceptionHandler(handler);
    }

    public KafkaStreamTunnel(String bootStrapServers, String inTopic, String outTopic, String component, MessageAdapter messageAdapter, int threads) {
        Topology topology = this.buildTopology(inTopic, outTopic, messageAdapter);
        System.out.println(topology.describe());
        this.streams = new KafkaStreams(topology, this.buildStreamsProperties(bootStrapServers, component, threads));
    }

    private Properties buildStreamsProperties(String bootStrapServers, String applicationId, int threads) {
        Properties props = new Properties();
        props.put("application.id", applicationId);
        props.put("bootstrap.servers", bootStrapServers);
        props.put("processing.guarantee", "exactly_once_beta");
        props.put("default.key.serde", Serdes.Long().getClass());
        props.put("default.value.serde", Serdes.ByteArray().getClass());
        props.put("commit.interval.ms", "100");
        props.put("replication.factor", -1);
        props.put("num.stream.threads", threads);
        props.put("default.production.exception.handler", ContinueProductionExceptionHandler.class.getName());
        props.put(StreamsConfig.producerPrefix("linger.ms"), "10");
        props.put(StreamsConfig.producerPrefix("batch.size"), 524288);
        props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        props.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 30000);
        props.put(StreamsConfig.consumerPrefix("max.poll.records"), 10000);
        props.put(StreamsConfig.consumerPrefix("max.partition.fetch.bytes"), 10485760);
        props.put(StreamsConfig.consumerPrefix("fetch.max.wait.ms"), 10);
        props.put(StreamsConfig.consumerPrefix("fetch.min.bytes"), 100000);
        return props;
    }

    private Topology buildTopology(String inTopic, String outTopic, final MessageAdapter messageAdapter) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, ZipMessage> input = builder.stream(inTopic, Consumed.with(Serdes.Long(), Serdes.ByteArray())).transformValues(new ValueTransformerWithKeySupplier<Long, byte[], ZipMessage>() {
            public ValueTransformerWithKey<Long, byte[], ZipMessage> get() {
                return new ValueTransformerWithKey<Long, byte[], ZipMessage>() {
                    private ProcessorContext context;

                    public void init(ProcessorContext context) {
                        this.context = context;
                    }

                    public ZipMessage transform(Long readOnlyKey, byte[] value) {
                        try {
                            if (Ints.fromByteArray(((Header)this.context.headers().iterator().next()).value()) != 0) {
                                return null;
                            } else {
                                Stream var10000 = Arrays.stream(this.context.headers().toArray()).map(Header::key);
                                Headers var10001 = this.context.headers();
                                Objects.requireNonNull(var10001);
                                var10000.forEach(var10001::remove);
                                return new ZipMessage(value);
                            }
                        } catch (Exception var4) {
                            Exception e = var4;
                            StreamStarter.log.error("invalid zip message, key = {}, partition = {}, offset = {}", new Object[]{readOnlyKey, this.context.partition(), this.context.offset(), e});
                            return null;
                        }
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]).filter((key, value) -> {
            return value != null;
        });
        KStream<Long, byte[]> output = input.flatMap((key, value) -> {
            return (Iterable)value.getMessages().stream().map((m) -> {
                try {
                    return new KeyValue(messageAdapter.getKey(m), m);
                } catch (Exception var3) {
                    Exception e = var3;
                    log.error("invalid key convert, message = {} ", m, e);
                    return new KeyValue((Long)null, m);
                }
            }).collect(Collectors.toList());
        }).filter((key, value) -> {
            return key != null;
        }).transform(new TransformerSupplier<Long, Message, KeyValue<Long, byte[]>>() {
            public Transformer<Long, Message, KeyValue<Long, byte[]>> get() {
                return new Transformer<Long, Message, KeyValue<Long, byte[]>>() {
                    private ProcessorContext context;

                    public void init(ProcessorContext context) {
                        this.context = context;
                    }

                    public KeyValue<Long, byte[]> transform(Long key, Message value) {
                        try {
                            this.context.headers().add("type", Ints.toByteArray(value.getMessageType()));
                            Headers headers = messageAdapter.withHeaders(value);
                            if (headers != null) {
                                Headers var10001 = this.context.headers();
                                Objects.requireNonNull(var10001);
                                headers.forEach(var10001::add);
                            }

                            StreamStarter.log.info("routing message {} ", value);
                            return new KeyValue(key, value.toBytes());
                        } catch (Exception var4) {
                            Exception e = var4;
                            StreamStarter.log.error("invalid message, key = {}, message = {}, partition = {}, offset = {}", new Object[]{key, value, this.context.partition(), this.context.offset(), e});
                            return new KeyValue(key, (Object)null);
                        }
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]).filter((key, value) -> {
            return value != null;
        });
        output.to(outTopic, Produced.with(Serdes.Long(), Serdes.ByteArray(), (topic, key, value, numPartitions) -> {
            return (int)(key % (long)numPartitions);
        }));
        return builder.build();
    }

    public void start() {
        this.streams.start();
    }

    public void stop() {
        this.streams.close();
        this.streams.cleanUp();
    }
}

MessageAdapter

public interface MessageAdapter {
    Long getKey(Message var1);

    Headers withHeaders(Message var1);
}

ClearingMessageAdapter

public class ClearingMessageAdapter implements MessageAdapter {
    @Override
    public Long getKey(Message message) {
        if (message.getMessageType() == MessageType.TAKE_OVER_RESULT.getValue()) {
            return ((TakeOverResult)message).getRiskUserId();
        } else if (message.getMessageType() == MessageType.TRANSFER_POSITION_STEP_TWO.getValue()) {
            return ((TransferPositionStepTwo)message).getUserId();
        }
        return null;
    }

    @Override
    public Headers withHeaders(Message message) {
        Headers headers = new RecordHeaders();
        if (message.getMessageType() == MessageType.TAKE_OVER_RESULT.getValue()) {
            headers.add(KafkaHeaderType.USER_GROUP_ID, Longs.toByteArray(((TakeOverResult)message).getRiskUserGroupId()));
        } else if (message.getMessageType() == MessageType.TRANSFER_POSITION_STEP_TWO.getValue()) {
            headers.add(KafkaHeaderType.USER_GROUP_ID, Longs.toByteArray(((TransferPositionStepTwo)message).getUserGroupId()));
        }
        return headers;
    }
}

StreamStateListener

import java.util.Objects;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamStateListener implements KafkaStreams.StateListener {
    private static final Logger log = LoggerFactory.getLogger(StreamStateListener.class);
    private final String steamName;
    private final Producer producer;
    private final String NOTIFY_TOPIC = Objects.nonNull(System.getenv("PRODUCT_NAME")) ? System.getenv("PRODUCT_NAME") + "sys_notify" : "sys_notify";

    public StreamStateListener(String steamName, Producer producer) {
        this.steamName = steamName;
        this.producer = producer;
    }

    public void onChange(KafkaStreams.State oldState, KafkaStreams.State newState) {
        log.info("onChange state change from {} to {}", oldState, newState);
        SystemMessage message = SystemMessage.builder().componentName(this.steamName).type(0).msg("state change" + newState).build();
        this.producer.send(this.NOTIFY_TOPIC, 0L, message, (r, e) -> {
            if (e != null) {
                log.error("send SystemMessage failed", e);
            }

        });
        if (newState == State.NOT_RUNNING || newState == State.ERROR) {
            log.error("state change from {} to {}", oldState, newState);
            System.exit(-1);
        }

    }
}

StreamUncaughtExceptionHandler

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(StreamUncaughtExceptionHandler.class);
    private final String steamName;
    private final KafkaManager kafkaManager;
    private final String NOTIFY_TOPIC = Objects.nonNull(System.getenv("PRODUCT_NAME")) ? System.getenv("PRODUCT_NAME") + "sys_notify" : "sys_notify";

    public StreamUncaughtExceptionHandler(String steamName, KafkaManager kafkaManager) {
        this.steamName = steamName;
        this.kafkaManager = kafkaManager;
    }

    public void uncaughtException(Thread t, Throwable throwable) {
        log.error("uncaughtException treadName {} message {}", t.getName(), throwable.getMessage());
        SystemMessage message = SystemMessage.builder().componentName(this.steamName).type(1).msg("uncaughtException:" + throwable.getMessage()).build();
        this.kafkaManager.getProducer().send(this.NOTIFY_TOPIC, 0L, message, (r, e) -> {
            if (e != null) {
                log.error("send SystemMessage failed", e);
            }

        });
    }
}

SystemMessage

public class SystemMessage extends Message {
    private String componentName;
    private Integer type;
    private String msg;
}

StreamClearingStarter

@Service
@RequiredArgsConstructor
public class StreamClearingStarter {
    private final KafkaConfig kafkaConfig;
    private final Producer producer;

    private KafkaStreamTunnel streamTunnel;

    @Value("${router.clearing.threads:4}")
    private int threads;

    @PostConstruct
    public void init() {
        streamTunnel = new KafkaStreamTunnel(kafkaConfig.getBootstrapServers(),
                KafkaTopic.FUND_POSITION,
                KafkaTopic.CLEARING,
                KafkaTopic.PRODUCT_NAME + "msg_router",
                new ClearingMessageAdapter(),
                threads);
        StreamStateListener listener = new StreamStateListener("msg-router", producer);
        StreamUncaughtExceptionHandler handler = new StreamUncaughtExceptionHandler("msg-router", kafkaManager);
        //注册状态变化监听器和异常处理器
        streamClearingStarter.setStateListener(listener);
        streamClearingStarter.setUncaughtExceptionHandler(handler);
        streamClearingStarter.start();
    }

    @PreDestroy
    public void close() {
        if (streamClearingStarter != null) {
            streamClearingStarter.stop();
        }
    }
}
作者:Jeebiz  创建时间:2024-10-23 16:53
最后编辑:Jeebiz  更新时间:2024-11-01 10:06