Class Topology
ProcessorTopology.
A topology is a graph of sources, processors, and sinks.
A SourceNode is a node in the graph that consumes one or more Kafka topics and forwards them to its
successor nodes.
A Processor is a node in the graph that receives input records from upstream nodes, processes the
records, and optionally forwarding new records to one, multiple, or all of its downstream nodes.
Finally, a SinkNode is a node in the graph that receives records from upstream nodes and writes them to
a Kafka topic.
A Topology allows you to construct a graph of these nodes, and then passed into a new
KafkaStreams instance that will then begin consuming, processing, and producing
records.-
Nested Class Summary
Nested Classes -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<K,V, S extends StateStore>
TopologyaddGlobalStore(StoreBuilder<S> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) Adds a globalstate storeto the topology.<K,V, S extends StateStore>
TopologyaddGlobalStore(StoreBuilder<S> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) <KIn,VIn, KOut, VOut>
TopologyaddProcessor(String name, ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier, String... parentNames) <K,V, S extends StateStore>
TopologyaddReadOnlyStateStore(StoreBuilder<S> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) Adds a read-onlystate storeto the topology.<K,V, S extends StateStore>
TopologyaddReadOnlyStateStore(StoreBuilder<S> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) Add a sink that sends records from upstreamprocessorsorsourcesto the named Kafka topic.<K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) Add a source that consumes the named topics and forwards the records to childprocessorsandsinks.<K,V> Topology addSource(String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) <K,V> Topology addSource(String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, String name, String... topics) addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource(TimestampExtractor timestampExtractor, String name, String... topics) addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0.<K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) Deprecated.Since 4.0.<K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.<K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) Deprecated.Since 4.0.<K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0.<S extends StateStore>
TopologyaddStateStore(StoreBuilder<S> storeBuilder, String... processorNames) Add astate storeto the topology, and optionally connect it to one or moreprocessors.connectProcessorAndStateStores(String processorName, String... stateStoreNames) Connect aprocessorto one or morestate stores.describe()Returns a description of the specifiedTopology.
-
Constructor Details
-
Topology
public Topology() -
Topology
-
-
Method Details
-
addSource
Add a source that consumes the named topics and forwards the records to childprocessorsandsinks.The source will use the default values from
If you want to specify a source specificStreamsConfigforauto.offset.reset strategy,TimestampExtractor, or key/valueDeserializer, use the corresponding overloadedaddSource(...)method.- Parameters:
name- the unique name of the source used to reference this node when addingprocessororsinkchildrentopics- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException- if the provided source name is not unique, no topics are specified, or a topic has already been registered by another source,read-only state store, orglobal state storeNullPointerException- ifnameortopicsisnull, ortopicscontains anulltopic- See Also:
-
addSource
SeeaddSource(String, String...).Takes a
Pattern(cannot benull) to match topics to consumes from, instead of a list of topic names. -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)instead. -
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)instead. -
addSource
-
addSource
-
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) -
addSource
public <K,V> Topology addSource(String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) -
addSource
public <K,V> Topology addSource(String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public <K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) Deprecated. -
addSource
public <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) -
addSource
@Deprecated public <K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)instead. -
addSource
public <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public <K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) Deprecated. -
addSource
public <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String... topics) -
addSource
@Deprecated public <K,V> Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) Deprecated. -
addSource
public <K,V> Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Pattern topicPattern) -
addSink
Add a sink that sends records from upstreamprocessorsorsourcesto the named Kafka topic. The specified topic should be created before theKafkaStreamsinstance is started.The sink will use the default values from
Furthermore, the producer's configured partitioner is used to write into the topic. If you want to specify a sink specific key or valueStreamsConfigforSerializer, or use a differentpartitioner, use the corresponding overloadedaddSink(...)method.- Parameters:
name- the unique name of the sinktopic- the name of the Kafka topic to which this sink should write its recordsparentNames- the name of one or moreprocessorsorsources, whose output records this sink should consume and write to the specified output topic- Returns:
- itself
- Throws:
TopologyException- if the provided sink name is not unique, or if a parent processor/source name is unknown or specifies a sinkNullPointerException- ifname,topic, orparentNamesisnull, orparentNamescontains anullparent name- See Also:
-
addSink
public <K,V> Topology addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, String... parentNames) SeeaddSink(String, String, String...).Takes a
TopicNameExtractor(cannot benull) that computes topic names to send records into, instead of a single topic name. The topic name extractor is called for every result record and may compute a different topic name each time. All topics, that the topic name extractor may compute, should be created before theKafkaStreamsinstance is started. Returningnullas topic name is invalid and will result in a runtime exception. -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addProcessor
public <KIn,VIn, Topology addProcessorKOut, VOut> (String name, ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier, String... parentNames) Add aprocessorthat receives and processed records from one or more parent processors orsources. TheProcessorcan emit any number of result records viaProcessorContext.forward(Record). Any record output by this processor will be forwarded to its child processors andsinks.By default, the processor is stateless. There is two different
state stores, which can be added to theTopologyand directly connected to a processor, making the processor stateful:state storesfor processing (i.e., read/write access)read-only state stores
connectedlater. If thesupplierprovides state stores viaConnectedStoreProvider.stores(), the correspondingStoreBuilderswill beadded to the topology and connectedto this processor automatically. Additionally, even if a processor is stateless, it can still access allglobal state stores(read-only). There is no need to connect global stores to processors.All state stores which are connected to a processor and all global stores, can be accessed via
context.getStateStore(String)using the context provided viaProcessor#init():
Furthermore, the providedpublic class MyProcessor implements Processor<String, Integer, String, Integer> { private ProcessorContext<String, Integer> context; private KeyValueStore<String, String> store; @Override void init(final ProcessorContext<String, Integer> context) { this.context = context; this.store = context.getStateStore("myStore"); } @Override void process(final Record<String, Integer> record) { // can access this.context and this.store } }ProcessorContextgives access to topology, runtime, andrecord metadata, and allows to schedulepunctuationsand to request offset commits.- Parameters:
name- the unique name of the processor used to reference this node when adding other processor orsinkchildrenprocessorSupplier- the supplier used to obtainProcessorinstancesparentNames- the name of one or more processors orsources, whose output records this processor should receive and process- Returns:
- itself
- Throws:
TopologyException- if the provided processor name is not unique, or if a parent processor/source name is unknown or specifies a sinkNullPointerException- ifname,processorSupplier, orparentNamesisnull, orparentNamescontains anullparent name- See Also:
-
addStateStore
public <S extends StateStore> Topology addStateStore(StoreBuilder<S> storeBuilder, String... processorNames) Add astate storeto the topology, and optionally connect it to one or moreprocessors. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions and the structure of the topology. Each connectedProcessorinstance in the topology has access to a single shard of the state store. Additionally, the state store can be accessed from "outside" using the Interactive Queries (IQ) API (cf.KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)). If you need access to all data in a state store inside aProcessor, you can use a (read-only)global state store.If no
processorNamesis specified, the state store can beconnectedto one or moreprocessorslater.Note, if a state store is never connected to any
processor, the state store is "dangling" and would not be added to the createdProcessorTopology, whenKafkaStreamsis started. For this case, the state store is not available for Interactive Queries. If you want to add a state store only for Interactive Queries, you can use aread-only state store.For failure and recovery, a state store
may be backedby an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is provided by thestore builder, and "-changelog" is a fixed suffix.You can verify the created
ProcessorTopologyand added state stores, and retrieve all generated internal topic names, viadescribe().- Parameters:
storeBuilder- theStoreBuilderused to obtainstate storeinstances (one per shard)processorNames- the names of theprocessorsthat should be able to access the provided state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if a processor name is unknown or specifies a source or sinkNullPointerException- ifstoreBuilderorparentNamesisnull, orparentNamescontains anullparent name
-
addReadOnlyStateStore
public <K,V, Topology addReadOnlyStateStoreS extends StateStore> (StoreBuilder<S> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) Adds a read-onlystate storeto the topology. The state store will be populated with data from the named source topic. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions for the source topic and the connected processors (if any). Read-only state stores can be accessed from "outside" using the Interactive Queries (IQ) API (cf.KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)).The
auto.offset.resetproperty will be set to"earliest"for the source topic. If you want to specify a source specificTimestampExtractoryou can useaddReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier).Connectinga read-only state store toprocessorsis optional. If not connected to any processor, the state store will still be created and can be queried viaKafkaStreams.store(StoreQueryParameters)orKafkaStreams.query(StateQueryRequest). If the state store is connected to another processor, each correspondingProcessorinstance in the topology has read-only access to a single shard of the state store. If you need write access to a state store, you can use a"regular" state storeinstead. If you need access to all data in a state store inside aProcessor, you can use a (read-only)global state store.The provided
ProcessorSupplierwill be used to createProcessorinstances which will be used to process the records from the source topic. Theseprocessorsare the only ones with write access to the state store, and should contain logic to keep theStateStoreup-to-date.Read-only state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state storesno dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder- theStoreBuilderused to obtainstate storeinstances (one per shard)sourceName- the unique name of the internally addedsourcekeyDeserializer- theDeserializerfor record keys (can benullto use the default key deserializer fromStreamsConfig)valueDeserializer- theDeserializerfor record values (can benullto use the default value deserializer fromStreamsConfig)topic- the source topic to read the data fromprocessorName- the unique name of the internally addedprocessorwhich maintains the state storestateUpdateSupplier- the supplier used to obtainProcessorinstances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource, read-only state store, orglobal state storeNullPointerException- ifstoreBuilder,sourceName,topic,processorName, orstateUpdateSupplierisnull
-
addReadOnlyStateStore
public <K,V, Topology addReadOnlyStateStoreS extends StateStore> (StoreBuilder<S> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) -
addGlobalStore
public <K,V, Topology addGlobalStoreS extends StateStore> (StoreBuilder<S> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) Adds a globalstate storeto the topology. The state store will be populated with data from the named source topic. Global state stores are read-only, and contain data from all partitions of the specified source topic. Thus, eachKafkaStreamsinstance has a full copy to the data; the source topic records are effectively broadcast to all instances. In contrast toread-only state storesglobal state stores are "bootstrapped" on startup, and are maintained by a separate thread. Thus, updates to a global store are not "stream-time synchronized" what may lead to non-deterministic results. Like all other stores, global state stores can be accessed from "outside" using the Interactive Queries (IQ) API) (cf.KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)).The
auto.offset.resetproperty will be set to"earliest"for the source topic. If you want to specify a source specificTimestampExtractoryou can useaddGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier).All
processorsof the topology automatically have read-only access to the global store; it is not necessary to connect them. If you need write access to a state store, you can use a"regular" state storeinstead.The provided
ProcessorSupplierwill be used to createProcessorinstances which will be used to process the records from the source topic. Theseprocessorsare the only ones with write access to the state store, and should contain logic to keep theStateStoreup-to-date.Global state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state storesno dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder- theStoreBuilderused to obtain thestate store(one perKafkaStreamsinstance)sourceName- the unique name of the internally added sourcekeyDeserializer- theDeserializerfor record keys (can benullto use the default key deserializer fromStreamsConfig)valueDeserializer- theDeserializerfor record values (can benullto use the default value deserializer fromStreamsConfig)topic- the source topic to read the data fromprocessorName- the unique name of the internally added processor which maintains the state storestateUpdateSupplier- the supplier used to obtainProcessorinstances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource,read-only state store, or global state storeNullPointerException- ifstoreBuilder,sourceName,topic,processorName, orstateUpdateSupplierisnull
-
addGlobalStore
public <K,V, Topology addGlobalStoreS extends StateStore> (StoreBuilder<S> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) -
connectProcessorAndStateStores
Connect aprocessorto one or morestate stores. The state stores must have been previously added to the topology viaaddStateStore(StoreBuilder, String...), oraddReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier).- Parameters:
processorName- the name of the processorstateStoreNames- the names of state stores that the processor should be able to access- Returns:
- itself
- Throws:
TopologyException- if the processor name or a state store name is unknown, or if the processor name specifies a source or sinkNullPointerException- ifprocessorNameorstateStoreNamesisnull, or ifstateStoreNamescontains anullstate store name
-
describe
Returns a description of the specifiedTopology.- Returns:
- A description of the topology.
-