Apache DolphinScheduler has already been deployed on a virtual machine. Next, we want to try creating a Flink task node in DolphinScheduler and use Flink to consume data from Kafka. Apache DolphinScheduler is deployed in standalone mode. standalone mode For detailed installation steps, please refer to the official documentation: DolphinScheduler | Documentation Center https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone Prerequisites The following components have already been installed: Java 11Apache DolphinScheduler 3.3.2Apache Flink 1.18.1Apache Kafka 3.6.0ZooKeeper (using the built-in Kafka version) Java 11 Apache DolphinScheduler 3.3.2 Apache Flink 1.18.1 Apache Kafka 3.6.0 ZooKeeper (using the built-in Kafka version) It is recommended to download and install binary packagesdirectly on the virtual machine. Installing via system package managers may introduce uncontrollable dependencies. binary packages The downloaded binary packages are shown below: Configure Flink Environment Variables 1. Edit environment variables sudo vim ~/.bashrc sudo vim ~/.bashrc Add the Flink installation path: 2. Make the configuration effective # Reload environment variables source ~/.bashrc # Verify Flink environment variable echo $Flink_HOME # Reload environment variables source ~/.bashrc # Verify Flink environment variable echo $Flink_HOME Modify Kafka, Flink, and DolphinScheduler Configuration Files Since this setup runs inside a virtual machine, configuration changes are required so that services can be accessed from the host machine. 1. Modify Kafka configuration Navigate to the config directory under the Kafka installation path and edit server.properties. config server.properties The listeners and advertised.listeners settings must be modified so that Kafka can be accessed externally. Otherwise, Kafka will default to localhost, which may cause connection failures. listeners advertised.listeners localhost broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 # Replace with your VM IP address advertised.listeners=PLAINTEXT://192.168.146.132:9092 broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 # Replace with your VM IP address advertised.listeners=PLAINTEXT://192.168.146.132:9092 2. Modify Flink configuration Go to the conf directory under the Flink installation path and edit flink-conf.yaml. conf flink-conf.yaml Replace all localhost addresses with 0.0.0.0 so the Flink Web UI can be accessed externally. localhost 0.0.0.0 Additionally, adjust JobManager and TaskManager memory settings. jobmanager.rpc.address: 0.0.0.0 jobmanager.bind-host: 0.0.0.0 jobmanager.cpu.cores: 1 jobmanager.memory.process.size: 1600m taskmanager.bind-host: 0.0.0.0 taskmanager.host: 0.0.0.0 taskmanager.memory.process.size: 2048m taskmanager.cpu.cores: 1 jobmanager.rpc.address: 0.0.0.0 jobmanager.bind-host: 0.0.0.0 jobmanager.cpu.cores: 1 jobmanager.memory.process.size: 1600m taskmanager.bind-host: 0.0.0.0 taskmanager.host: 0.0.0.0 taskmanager.memory.process.size: 2048m taskmanager.cpu.cores: 1 3. Modify Apache DolphinScheduler configuration From the dolphinscheduler-daemon.sh startup script, we can see that DolphinScheduler loads environment variables from: dolphinscheduler-daemon.sh bin/env/dolphinscheduler_env.sh bin/env/dolphinscheduler_env.sh View dolphinscheduler-daemon.sh: dolphinscheduler-daemon.sh Edit dolphinscheduler_env.sh and add Java and Flink paths: dolphinscheduler_env.sh # Replace with your actual Java and Flink paths export JAVA_HOME=/data/jdk-11.0.29 export Flink_HOME=/data/Flink-1.18.1 # Replace with your actual Java and Flink paths export JAVA_HOME=/data/jdk-11.0.29 export Flink_HOME=/data/Flink-1.18.1 Disable Firewall and Start Services Start all required services, including ZooKeeper, Kafka, Flink, and Apache DolphinScheduler. # Disable firewall sudo systemctl stop firewalld # Start Flink cluster bin/start-cluster.sh # Start ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties & # Start Kafka broker bin/kafka-server-start.sh config/server.properties & # Create Kafka topic bin/kafka-topics.sh --create --topic test \ --bootstrap-server localhost:9092 \ --partitions 1 --replication-factor 1 # Produce messages bin/kafka-console-producer.sh --topic test \ --bootstrap-server localhost:9092 # Consume messages bin/kafka-console-consumer.sh --topic test \ --from-beginning --bootstrap-server localhost:9092 # Start DolphinScheduler Standalone Server bash ./bin/dolphinscheduler-daemon.sh start standalone-server # Disable firewall sudo systemctl stop firewalld # Start Flink cluster bin/start-cluster.sh # Start ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties & # Start Kafka broker bin/kafka-server-start.sh config/server.properties & # Create Kafka topic bin/kafka-topics.sh --create --topic test \ --bootstrap-server localhost:9092 \ --partitions 1 --replication-factor 1 # Produce messages bin/kafka-console-producer.sh --topic test \ --bootstrap-server localhost:9092 # Consume messages bin/kafka-console-consumer.sh --topic test \ --from-beginning --bootstrap-server localhost:9092 # Start DolphinScheduler Standalone Server bash ./bin/dolphinscheduler-daemon.sh start standalone-server Verification 1. Verify Flink Web UI Access the Flink dashboard at: http://<VM-IP>:8081 http://<VM-IP>:8081 2. Verify DolphinScheduler Web UI Access DolphinScheduler at: http://<VM-IP>:12345/dolphinscheduler/ui http://<VM-IP>:12345/dolphinscheduler/ui Default credentials: Username: adminPassword: dolphinscheduler123 Username: admin admin Password: dolphinscheduler123 dolphinscheduler123 Sample Implementation This example demonstrates how Flink consumes data from Kafka, packages the job, uploads it to DolphinScheduler, and executes it as a Flink task. 1. Sample Code pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>Flink-Kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <Flink.version>1.18.1</Flink.version> <scala.binary.version>2.12</scala.binary.version> <Kafka.version>3.6.0</Kafka.version> </properties> <dependencies> <!-- Flink core dependency --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-streaming-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-clients</artifactId> <version>${Flink.version}</version> </dependency> <!-- Connector Base Dependency --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-base</artifactId> <version>${Flink.version}</version> </dependency> <!-- Kafka Connector (Key Change) --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-Kafka</artifactId> <version>3.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>${Kafka.version}</version> </dependency> <!-- Logging Dependency --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> <scope>runtime</scope> </dependency> </dependencies> <repositories> <repository> <id>aliyun</id> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>apache-releases</id> <url>https://repository.apache.org/content/repositories/releases/</url> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.Flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>Flink-Kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <Flink.version>1.18.1</Flink.version> <scala.binary.version>2.12</scala.binary.version> <Kafka.version>3.6.0</Kafka.version> </properties> <dependencies> <!-- Flink core dependency --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-streaming-java</artifactId> <version>${Flink.version}</version> </dependency> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-clients</artifactId> <version>${Flink.version}</version> </dependency> <!-- Connector Base Dependency --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-base</artifactId> <version>${Flink.version}</version> </dependency> <!-- Kafka Connector (Key Change) --> <dependency> <groupId>org.apache.Flink</groupId> <artifactId>Flink-connector-Kafka</artifactId> <version>3.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>${Kafka.version}</version> </dependency> <!-- Logging Dependency --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> <scope>runtime</scope> </dependency> </dependencies> <repositories> <repository> <id>aliyun</id> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>apache-releases</id> <url>https://repository.apache.org/content/repositories/releases/</url> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.Flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> FlinkKafkaConsumerExample.java import org.apache.Flink.api.common.functions.FlatMapFunction; import org.apache.Flink.api.java.tuple.Tuple2; import org.apache.Flink.api.java.utils.ParameterTool; import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.Flink.streaming.api.datastream.DataStream; import org.apache.Flink.streaming.api.functions.ProcessFunction; import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.Flink.util.Collector; import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer; import org.apache.Flink.api.common.serialization.SimpleStringSchema; import org.apache.Kafka.clients.consumer.ConsumerConfig; import org.apache.Kafka.common.serialization.StringDeserializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; public class FlinkKafkaConsumerExample { private static volatile int messageCount = 0; private static volatile boolean shouldStop = false; public static void main(String[] args) throws Exception { // Set the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka configuration Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Consumer group properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Create Kafka Consumer FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); KafkaConsumer.setStartFromEarliest(); // Consume from the earliest messages DataStream<String> stream = env.addSource(KafkaConsumer); // Process data: tokenization and counting DataStream<Tuple2<String, Integer>> counts = stream .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() { @Override public void invoke(Tuple2<String, Integer> value, Context context) { System.out.println(value); messageCount++; // Check whether the stop condition is met if (messageCount >= 2 && !shouldStop) { System.out.println("Processed 2 messages, stopping job."); shouldStop = true; // Set a flag to indicate that the job should stop } } }); // Execute the job and obtain JobClient CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { // Start the job and obtain JobClient org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount"); System.out.println("Job ID: " + jobClient.getJobID()); // Monitor the condition and cancel the job while (!shouldStop) { Thread.sleep(100); // Check every 100 milliseconds } // Cancel the job when the stop condition is met if (shouldStop) { System.out.println("Cancelling the job..."); jobClient.cancel().get(); // Cancel the job } } catch (Exception e) { e.printStackTrace(); } }); // Wait for the job to finish in the main thread future.join(); // Wait for the job to finish } // Tokenizer Class for converting input strings into words public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } import org.apache.Flink.api.common.functions.FlatMapFunction; import org.apache.Flink.api.java.tuple.Tuple2; import org.apache.Flink.api.java.utils.ParameterTool; import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.Flink.streaming.api.datastream.DataStream; import org.apache.Flink.streaming.api.functions.ProcessFunction; import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.Flink.util.Collector; import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer; import org.apache.Flink.api.common.serialization.SimpleStringSchema; import org.apache.Kafka.clients.consumer.ConsumerConfig; import org.apache.Kafka.common.serialization.StringDeserializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; public class FlinkKafkaConsumerExample { private static volatile int messageCount = 0; private static volatile boolean shouldStop = false; public static void main(String[] args) throws Exception { // Set the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka configuration Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Consumer group properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Create Kafka Consumer FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); KafkaConsumer.setStartFromEarliest(); // Consume from the earliest messages DataStream<String> stream = env.addSource(KafkaConsumer); // Process data: tokenization and counting DataStream<Tuple2<String, Integer>> counts = stream .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() { @Override public void invoke(Tuple2<String, Integer> value, Context context) { System.out.println(value); messageCount++; // Check whether the stop condition is met if (messageCount >= 2 && !shouldStop) { System.out.println("Processed 2 messages, stopping job."); shouldStop = true; // Set a flag to indicate that the job should stop } } }); // Execute the job and obtain JobClient CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { // Start the job and obtain JobClient org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount"); System.out.println("Job ID: " + jobClient.getJobID()); // Monitor the condition and cancel the job while (!shouldStop) { Thread.sleep(100); // Check every 100 milliseconds } // Cancel the job when the stop condition is met if (shouldStop) { System.out.println("Cancelling the job..."); jobClient.cancel().get(); // Cancel the job } } catch (Exception e) { e.printStackTrace(); } }); // Wait for the job to finish in the main thread future.join(); // Wait for the job to finish } // Tokenizer Class for converting input strings into words public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } 2. Package and Upload to DolphinScheduler 3. Create and Run a Flink Task Node Kafka Producer and Flink Output Start the Kafka producer in the virtual machine and send messages. Flink successfully consumes and processes the Kafka data.