Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account


7. Flink-- Asynchronous IO

2024-05-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >


Shulou( Report--

1. Flink Asynchronous IO Overview 1.1 requirements for Asynchronous IO

​ Async I Pot O is a very popular feature that Alibaba contributed to the community and was introduced in version 1.2. The main purpose is to solve the problem that the communication delay when the data stream interacts with the external system (such as waiting for the response of the external system) has become the bottleneck of the system. For real-time processing, when you need to use external storage data, you need to be careful not to let the interaction with the external system delay the entire work schedule of convection processing.

​, for example, accesses external storage in operators such as mapfunction, and the interaction is actually synchronous: for example, if request an is sent to the database, mapfunction waits for a response. In many cases, this waiting process is a waste of function time. Asynchronous interaction with the database means that a single function instance can process many requests concurrently and receive responses concurrently. Then, while waiting, it saves time because other requests are also sent and other responses are received. At the very least, the wait time is amortized on multiple requests. This makes many use cases have higher throughput.

​ figure 1.1 flink-- Asynchronous IO

Note: throughput can also be improved by increasing MapFunction to a larger degree of parallelism, but this means higher resource overhead: more MapFunction instances mean more task, threads, flink internal network connections, database links, caching, and more internal state overhead.

1.2 prerequisites for using asynchronous IO

When using flink's asynchronous IO, you need to connect to a database that supports asynchronous clients. Fortunately, many popular databases support such clients. If there is no asynchronous client, you can also create multiple synchronous clients, put them in the thread pool, and use the thread pool to complete the asynchronous function. Of course, this approach is less efficient than asynchronous clients.

II. The use of flink Asynchronous IO 2.1 the use of Asynchronous IO

The API of ​ flink Asynchronous IO enables users to use asynchronous request clients in data stream. API itself deals with the integration of data flow, message order, timing, fault tolerance, etc.

If you have an asynchronous client for the target database and use asynchronous IO, you need to implement the following three steps:

1. Implement AsyncFunction or RichAsyncFunction, which implements the function of requesting asynchronous distribution.

2. A callback callback, which takes back the result of the operation and passes it to ResultFuture.

3. Use asynchronous IO operations on DataStream.

You can take a look at the source code of AsyncFunction.

Public interface AsyncFunction extends Function, Serializable {void asyncInvoke (IN var1, ResultFuture var2) throws Exception; default void timeout (IN input, ResultFuture resultFuture) throws Exception {resultFuture.completeExceptionally ("Async function call has timed out.");}}

There are two main methods you need to implement:

Void asyncInvoke (IN var1, ResultFuture var2): this is the real way to implement the external operation logic. Var1 is the input parameter, and var2 is the collection default void timeout (IN input, ResultFuture resultFuture) that returns the result. This method is called when the asynchronous request times out. The use of the parameter is the same as above

RichAsyncFunction also provides two methods, open and close, because it inherits the RichAsyncFunction class. Generally, we use the open method to create a client connection to external storage (such as a jdbc connection to mysql), and close to close the client connection. As for the use of asyncInvoke and timeout, which are the same as above, they are not repeated here. Generally speaking, we usually use RichAsyncFunction.

2.2 Asynchronous IO official website template instance class AsyncDatabaseRequest extends RichAsyncFunction {/ * * The database specific client that can issue concurrent requests with callbacks * / private transient DatabaseClient client; @ Override public void open (Configuration parameters) throws Exception {client = new DatabaseClient (host, post, credentials);} @ Override public void close () throws Exception {client.close () } @ Override public void asyncInvoke (String key, final ResultFuture resultFuture) throws Exception {/ / issue the asynchronous request, receive a future for result final Future result = client.query (key) / / set the callback to be executed once the request by the client is complete / / the callback simply forwards the result to the result future CompletableFuture.supplyAsync (new Supplier () {@ Override public String get () {try {return result.get ();} catch (InterruptedException | ExecutionException e) {/ / Normally handled explicitly. Return null;}) .thenAccept ((String dbResult)-> {resultFuture.complete (Collections.singleton (new Tuple2 (key, dbResult));});}} / / create the original streamDataStream stream =.; / / apply asynchronous IO classes to data flow DataStream resultStream = AsyncDataStream.unorderedWait (stream, new AsyncDatabaseRequest (), 1000, TimeUnit.MILLISECONDS, 100)

Note that in the end, you need to put the queried data into resultFuture, that is, passing the results to the framework through resultFuture.complete. The first time ResultFuture.complete is called, ResultFuture is done. All subsequent complete calls are ignored.

2.3 Asynchronous IO uses the parameters of note 2.3.1 AsyncDataStream.unorderedWait ()

There are 4 parameters, in, asyncObject, timeout, timeUnit, capacity

In: input data flow asyncObject: asynchronous IO operation class object timeout: an asynchronous IO request is regarded as a failed timeout, beyond which an asynchronous request is considered a failure. The main purpose of this parameter is to eliminate dead or failed requests. TimeUnit: a unit of time, such as TimeUnit.MICROSECONDS, representing millisecond capacity: this parameter defines the maximum number of asynchronous requests being processed at the same time. Even if the asynchronous IO approach results in higher throughput, this operation is a bottleneck for real-time applications. Limit the number of concurrent requests, the operator will not backlog too many outstanding requests, but once the capacity is exceeded, the display will trigger back pressure. 2.3.2 timeout processing

When an asynchronous IO request times out multiple times, an exception is thrown by default, and the job is restarted. If you want to handle timeouts, you can override the AsyncFunction.timeout method.

2.3.3 order of results

The order in which concurrent requests initiated by AsyncFunction are completed is unexpected. To control the order in which results are sent, flink provides two modes:

1)。 Unordered

The result record is sent immediately after the asynchronous request ends. The order of the data in the stream after the asynchronous IO operation is not the same as before, that is, the order of the request and the order of the request result are not guaranteed to be consistent. When processing time is used as the underlying time characteristic, this method has very low latency and very low load. Call mode AsyncDataStream.unorderedWait (...)

2)。 Ordered

The order of the streams in this way is preserved. The order in which the resulting records are sent is the same as the order in which asynchronous requests are triggered, which is the order of events in the original stream. To achieve this goal, the operator caches the result record before it is sent. This tends to introduce additional latency and some Checkpoint load, because the result records are kept in the Checkpoint state for a longer time than the unordered mode. Call mode AsyncDataStream.orderedWait (...)

2.3.4 watermark time and sequence

Asynchronous IO operations also handle the watermark mechanism correctly when using event time. This means that the specific operations of the two order modes are as follows:

1)。 Unordered

The watermark does not exceed the record, which means that watermark establishes an order boundary. Records are only emitted out of order between two watermark. Records after the current watermark are sent only after the current watermark is sent. The watermark will also be sent only after all records prior to the watermark have been launched. This means that in the presence of watermark, unordered patterns introduce some of the same latency and administrative overhead as ordered patterns. The amount of overhead depends on the frequency of the watermark. That is, the watermark is ordered, but the requests within the same watermark are disordered.

2)。 Ordered

The order of watermark is saved just like the order of records. There is no significant change in overhead compared to processing time. Keep in mind that the injection time Ingestion Time is a special case of the watermark event time automatically generated based on the source processing time.

2.3.5 Fault tolerance

Asynchronous IO operations provide a fault-tolerant guarantee that is processed only once. It saves outgoing asynchronous IO requests to the Checkpoint and then resumes them from the Checkpoint when the failure recovers.

2.4 query data from mysql using asynchronous IO

1. Pom dependence of maven

4.0.0 SparkDemo SparkDemoTest 1.0-SNAPSHOT UTF-8 2.1.0 2.11.8 2.7.3 2.11 1.6.1 org.apache.logging.log4j log4j-core 2.9.0 io.netty Netty-all 4.1.32.Final org.apache.flink flink-java 1.6.1 org.apache.flink flink-streaming-java_2.11 1.6.1 Org.apache.flink flink-streaming-scala_2.11 1.6.1 org.apache.flink flink-scala_2.11 1.6.1 org.apache.flink flink-clients_2.11 1.6.1 org.apache.flink flink-table_2.11 1.6.1 provided org.apache.hadoop hadoop-client ${hadoop.version} fastjson 1.2.22 org.apache.flink flink-connector-kafka-0.10_$ {scala.binary.version} ${flink.version} io.vertx vertx-core 3.7.0 mysql Mysql-connector-java 8.0.12 io.vertx vertx-jdbc-client 3.7.0 io.vertx vertx-web 3.7.0 com.github.ben-manes.caffeine Caffeine 2.6.2 org.scala-tools maven-scala-plugin 2.15.2 compile TestCompile maven-compiler-plugin 3.6.0 1.8 1.8 Org.apache.maven.plugins maven-surefire-plugin 2.19 true

2. Source code

The format of the target mysql table is:

Id name1 king2 tao3 ming needs to query id according to name.


Package flinktest;import com.github.benmanes.caffeine.cache.Cache;import com.github.benmanes.caffeine.cache.Caffeine;import io.vertx.core.Vertx;import io.vertx.core.VertxOptions;import io.vertx.core.json.JsonObject;import io.vertx.ext.jdbc.JDBCClient;import io.vertx.ext.sql.ResultSet;import io.vertx.ext.sql.SQLClient;import io.vertx.ext.sql.SQLConnection;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.AsyncDataStream Import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.TimeUnit / * flink asynchronous IO demo: use asynchronous IO and mysql interaction * because ordinary jdbc clients do not support asynchronous mode, the asynchronous jdbc client of vertx * is introduced here (asynchronous IO requires clients to support asynchronous operations) * * to achieve the goal: according to the data source, use asynchronous IO to query the corresponding data from mysql Then print out * / public class AsyncToMysql {public static void main (String [] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () List sourceList = new ArrayList (); / / build the data source query condition, which is later used as the query value sourceList.add ("king") of where in the sql query; sourceList.add ("tao"); DataStreamSource source = env.fromCollection (sourceList) / / call the asynchronous IO processing class DataStream result = AsyncDataStream.unorderedWait (source, new MysqlAsyncFunc (), 10, / / the timeout here should not be set too short if you run in the local idea, because the local execution delay is larger TimeUnit.SECONDS, 20) .setParallelism (1) Result.print (); try {env.execute ("TEST async");} catch (Exception e) {e.printStackTrace ();}} / * inherit the RichAsyncFunction class and write a custom asynchronous IO handling class * / private static class MysqlAsyncFunc extends RichAsyncFunction {private transient SQLClient mysqlClient; private Cache cache @ Override public void open (Configuration parameters) throws Exception { (parameters) / / build the mysql query cache. Here we use Caffeine, a high-performance cache cache = Caffeine .newBuilder () .maximumSize (1025) .AfterAccess (10, TimeUnit.MINUTES) / / to set the cache expiration time .build () / / build mysql jdbc connection JsonObject mysqlClientConfig = new JsonObject () / / set the jdbc connection parameter mysqlClientConfig.put ("url", "jdbc:mysql://") .put ("driver_class", "com.mysql.cj.jdbc.Driver") .put ("max_pool_size", 20) .put ("user") "root") .put ("password", "xxxxx") / / set the working parameters of vertx, such as thread pool size VertxOptions vo = new VertxOptions (); vo.setEventLoopPoolSize (10); vo.setWorkerPoolSize (20); Vertx vertx = Vertx.vertx (vo); mysqlClient = JDBCClient.createNonShared (vertx, mysqlClientConfig) If (mysqlClient! = null) {System.out.println ("connect mysql successfully!!");}} / / Clean the environment @ Override public void close () throws Exception {super.close () / / close mysql connection and clear cache if (mysqlClient! = null) {mysqlClient.close ();} if (cache! = null) {cache.cleanUp () } @ Override public void asyncInvoke (String input, ResultFuture resultFuture) throws Exception {System.out.println ("key is:" + input); String key = input; / / find it in the cache first, and then return String cacheIfPresent = cache.getIfPresent (key); JsonObject output = new JsonObject () If (cacheIfPresent! = null) {output.put ("name", key); output.put ("id-name", cacheIfPresent); resultFuture.complete (Collections.singleton (output)); / / return;} System.out.println ("start query") MysqlClient.getConnection (conn-> {if (conn.failed ()) {resultFuture.completeExceptionally (conn.cause ()); / / return;} final SQLConnection sqlConnection = conn.result () / / stitching query statement String querySql = "select id,name from customer where name='" + key + "'"; System.out.println ("sql executed is:" + querySql) / / execute the query and get the result sqlConnection.query (querySql, res-> {if (res.failed ()) {resultFuture.completeExceptionally (null); System.out.println ("execution failed"); / / return } if (res.succeeded ()) {System.out.println ("execute successfully, get the result"); ResultSet result = res.result (); List rows = result.getRows () System.out.println ("number of results:" + String.valueOf (rows.size (); if (rows.size () {if (done.failed ()) {throw new RuntimeException (done.cause ());}}) );}}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology


© 2024 SLNews company. All rights reserved.