Kafka Streams 使用实例
上为,基于 Kafka Streams 的清算消息分发流程,这个流程中涉及到一下的5个对象:
- KafkaStreamTunnel:KafkaStreams 的通用逻辑实现,用于创建 Topology 和 KafkaStreams 实例
- MessageAdapter:为 StreamStarter 提供的通用消息分发处理接口,提供 getKey(Message msg)、withHeaders(Message msg) 两个方法,以便外部进行实现,可根据消息对象进行不同的处理
- StreamStateListener:Kafka Message 状态变化监听器,这里做了成功后的通知
- StreamUncaughtExceptionHandler:Kafka Message 消息处理失败的异常处理 Handler,这里做了失败后的通知
代码实例
KafkaStreamTunnel
public class KafkaStreamTunnel {
private static final Logger log = LoggerFactory.getLogger(StreamStarter.class);
private final KafkaStreams streams;
public KafkaStreamTunnel(String bootStrapServers, String inTopic, String outTopic, String component, MessageAdapter messageAdapter) {
this(bootStrapServers, inTopic, outTopic, component, messageAdapter, 1);
}
public void setStateListener(StreamStateListener listener) {
this.streams.setStateListener(listener);
}
public void setUncaughtExceptionHandler(StreamUncaughtExceptionHandler handler) {
this.streams.setUncaughtExceptionHandler(handler);
}
public KafkaStreamTunnel(String bootStrapServers, String inTopic, String outTopic, String component, MessageAdapter messageAdapter, int threads) {
Topology topology = this.buildTopology(inTopic, outTopic, messageAdapter);
System.out.println(topology.describe());
this.streams = new KafkaStreams(topology, this.buildStreamsProperties(bootStrapServers, component, threads));
}
private Properties buildStreamsProperties(String bootStrapServers, String applicationId, int threads) {
Properties props = new Properties();
props.put("application.id", applicationId);
props.put("bootstrap.servers", bootStrapServers);
props.put("processing.guarantee", "exactly_once_beta");
props.put("default.key.serde", Serdes.Long().getClass());
props.put("default.value.serde", Serdes.ByteArray().getClass());
props.put("commit.interval.ms", "100");
props.put("replication.factor", -1);
props.put("num.stream.threads", threads);
props.put("default.production.exception.handler", ContinueProductionExceptionHandler.class.getName());
props.put(StreamsConfig.producerPrefix("linger.ms"), "10");
props.put(StreamsConfig.producerPrefix("batch.size"), 524288);
props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
props.put(StreamsConfig.consumerPrefix("session.timeout.ms"), 30000);
props.put(StreamsConfig.consumerPrefix("max.poll.records"), 10000);
props.put(StreamsConfig.consumerPrefix("max.partition.fetch.bytes"), 10485760);
props.put(StreamsConfig.consumerPrefix("fetch.max.wait.ms"), 10);
props.put(StreamsConfig.consumerPrefix("fetch.min.bytes"), 100000);
return props;
}
private Topology buildTopology(String inTopic, String outTopic, final MessageAdapter messageAdapter) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, ZipMessage> input = builder.stream(inTopic, Consumed.with(Serdes.Long(), Serdes.ByteArray())).transformValues(new ValueTransformerWithKeySupplier<Long, byte[], ZipMessage>() {
public ValueTransformerWithKey<Long, byte[], ZipMessage> get() {
return new ValueTransformerWithKey<Long, byte[], ZipMessage>() {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public ZipMessage transform(Long readOnlyKey, byte[] value) {
try {
if (Ints.fromByteArray(((Header)this.context.headers().iterator().next()).value()) != 0) {
return null;
} else {
Stream var10000 = Arrays.stream(this.context.headers().toArray()).map(Header::key);
Headers var10001 = this.context.headers();
Objects.requireNonNull(var10001);
var10000.forEach(var10001::remove);
return new ZipMessage(value);
}
} catch (Exception var4) {
Exception e = var4;
StreamStarter.log.error("invalid zip message, key = {}, partition = {}, offset = {}", new Object[]{readOnlyKey, this.context.partition(), this.context.offset(), e});
return null;
}
}
public void close() {
}
};
}
}, new String[0]).filter((key, value) -> {
return value != null;
});
KStream<Long, byte[]> output = input.flatMap((key, value) -> {
return (Iterable)value.getMessages().stream().map((m) -> {
try {
return new KeyValue(messageAdapter.getKey(m), m);
} catch (Exception var3) {
Exception e = var3;
log.error("invalid key convert, message = {} ", m, e);
return new KeyValue((Long)null, m);
}
}).collect(Collectors.toList());
}).filter((key, value) -> {
return key != null;
}).transform(new TransformerSupplier<Long, Message, KeyValue<Long, byte[]>>() {
public Transformer<Long, Message, KeyValue<Long, byte[]>> get() {
return new Transformer<Long, Message, KeyValue<Long, byte[]>>() {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public KeyValue<Long, byte[]> transform(Long key, Message value) {
try {
this.context.headers().add("type", Ints.toByteArray(value.getMessageType()));
Headers headers = messageAdapter.withHeaders(value);
if (headers != null) {
Headers var10001 = this.context.headers();
Objects.requireNonNull(var10001);
headers.forEach(var10001::add);
}
StreamStarter.log.info("routing message {} ", value);
return new KeyValue(key, value.toBytes());
} catch (Exception var4) {
Exception e = var4;
StreamStarter.log.error("invalid message, key = {}, message = {}, partition = {}, offset = {}", new Object[]{key, value, this.context.partition(), this.context.offset(), e});
return new KeyValue(key, (Object)null);
}
}
public void close() {
}
};
}
}, new String[0]).filter((key, value) -> {
return value != null;
});
output.to(outTopic, Produced.with(Serdes.Long(), Serdes.ByteArray(), (topic, key, value, numPartitions) -> {
return (int)(key % (long)numPartitions);
}));
return builder.build();
}
public void start() {
this.streams.start();
}
public void stop() {
this.streams.close();
this.streams.cleanUp();
}
}
MessageAdapter
public interface MessageAdapter {
Long getKey(Message var1);
Headers withHeaders(Message var1);
}
ClearingMessageAdapter
public class ClearingMessageAdapter implements MessageAdapter {
@Override
public Long getKey(Message message) {
if (message.getMessageType() == MessageType.TAKE_OVER_RESULT.getValue()) {
return ((TakeOverResult)message).getRiskUserId();
} else if (message.getMessageType() == MessageType.TRANSFER_POSITION_STEP_TWO.getValue()) {
return ((TransferPositionStepTwo)message).getUserId();
}
return null;
}
@Override
public Headers withHeaders(Message message) {
Headers headers = new RecordHeaders();
if (message.getMessageType() == MessageType.TAKE_OVER_RESULT.getValue()) {
headers.add(KafkaHeaderType.USER_GROUP_ID, Longs.toByteArray(((TakeOverResult)message).getRiskUserGroupId()));
} else if (message.getMessageType() == MessageType.TRANSFER_POSITION_STEP_TWO.getValue()) {
headers.add(KafkaHeaderType.USER_GROUP_ID, Longs.toByteArray(((TransferPositionStepTwo)message).getUserGroupId()));
}
return headers;
}
}
StreamStateListener
import java.util.Objects;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamStateListener implements KafkaStreams.StateListener {
private static final Logger log = LoggerFactory.getLogger(StreamStateListener.class);
private final String steamName;
private final Producer producer;
private final String NOTIFY_TOPIC = Objects.nonNull(System.getenv("PRODUCT_NAME")) ? System.getenv("PRODUCT_NAME") + "sys_notify" : "sys_notify";
public StreamStateListener(String steamName, Producer producer) {
this.steamName = steamName;
this.producer = producer;
}
public void onChange(KafkaStreams.State oldState, KafkaStreams.State newState) {
log.info("onChange state change from {} to {}", oldState, newState);
SystemMessage message = SystemMessage.builder().componentName(this.steamName).type(0).msg("state change" + newState).build();
this.producer.send(this.NOTIFY_TOPIC, 0L, message, (r, e) -> {
if (e != null) {
log.error("send SystemMessage failed", e);
}
});
if (newState == State.NOT_RUNNING || newState == State.ERROR) {
log.error("state change from {} to {}", oldState, newState);
System.exit(-1);
}
}
}
StreamUncaughtExceptionHandler
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(StreamUncaughtExceptionHandler.class);
private final String steamName;
private final KafkaManager kafkaManager;
private final String NOTIFY_TOPIC = Objects.nonNull(System.getenv("PRODUCT_NAME")) ? System.getenv("PRODUCT_NAME") + "sys_notify" : "sys_notify";
public StreamUncaughtExceptionHandler(String steamName, KafkaManager kafkaManager) {
this.steamName = steamName;
this.kafkaManager = kafkaManager;
}
public void uncaughtException(Thread t, Throwable throwable) {
log.error("uncaughtException treadName {} message {}", t.getName(), throwable.getMessage());
SystemMessage message = SystemMessage.builder().componentName(this.steamName).type(1).msg("uncaughtException:" + throwable.getMessage()).build();
this.kafkaManager.getProducer().send(this.NOTIFY_TOPIC, 0L, message, (r, e) -> {
if (e != null) {
log.error("send SystemMessage failed", e);
}
});
}
}
SystemMessage
public class SystemMessage extends Message {
private String componentName;
private Integer type;
private String msg;
}
StreamClearingStarter
@Service
@RequiredArgsConstructor
public class StreamClearingStarter {
private final KafkaConfig kafkaConfig;
private final Producer producer;
private KafkaStreamTunnel streamTunnel;
@Value("${router.clearing.threads:4}")
private int threads;
@PostConstruct
public void init() {
streamTunnel = new KafkaStreamTunnel(kafkaConfig.getBootstrapServers(),
KafkaTopic.FUND_POSITION,
KafkaTopic.CLEARING,
KafkaTopic.PRODUCT_NAME + "msg_router",
new ClearingMessageAdapter(),
threads);
StreamStateListener listener = new StreamStateListener("msg-router", producer);
StreamUncaughtExceptionHandler handler = new StreamUncaughtExceptionHandler("msg-router", kafkaManager);
//注册状态变化监听器和异常处理器
streamClearingStarter.setStateListener(listener);
streamClearingStarter.setUncaughtExceptionHandler(handler);
streamClearingStarter.start();
}
@PreDestroy
public void close() {
if (streamClearingStarter != null) {
streamClearingStarter.stop();
}
}
}
作者:Jeebiz 创建时间:2024-10-23 16:53
最后编辑:Jeebiz 更新时间:2024-11-01 10:06
最后编辑:Jeebiz 更新时间:2024-11-01 10:06