How Java Is Used for Asynchronous Non-blocking Programming

By Chending

1) Synchronous HTTP Call

Reverse geographic interface: You can obtain the county, district, city, and province as well as the response code that the longitude and latitude represent through the longitude and latitude:

curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"{"adcode":"510722"}

Execution at the server with the simplest synchronous call method:

Before the server responds, the Input/Output (IO) is blocked on the native method of java.net.SocketInputStream#socketRead0:

Through the jstack log, you can find that the Thread remains in the runnable state:

"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)
.......

Thread model:

The biggest problem of synchronization is that thread resources are not utilized fully when waiting for IO responses, resulting in limited business throughput in a large number of IO scenarios.

2) JDK NIO and Future

Generally, not all Futures are implemented in this way. For example, io.netty.util.concurrent. AbstractFuture is implemented through thread polling.

An advantage of this implementation is that the main thread can do other tasks without waiting for IO responses, such as sending another IO request which can be returned together with the IO responses:

"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)
at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)
.....
"AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000] java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)
- locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)

However, the main thread still needs to wait until the results are returned. This problem is not fundamentally resolved.

3) Use the Callback Method

As shown in the above figure, the main thread no longer needs to pay attention to the business logic after initiating IO. After sending the request, the main thread can complete other tasks or return to the thread pool for scheduling. If it is HttpServer, it is necessary to combine the asynchronous Servlet of Servlet 3.1.

For more information about asynchronous Servlet, refer https://www.cnblogs.com/davenkin/p/async-servlet.html

With the Callback method, you can see in the thread model that the thread resources have been utilized fully, and there is no thread blocking during the whole process.

4) Callback Hell

Take a typical scenario as an example. The reverse geographical interface of the administrative region — adcode — is obtained through the longitude and latitude. Then, you can obtain the local weather information through the adcode.

Such problems basically don’t arise in the synchronous programming model.

Fig: Core defects of the Callback method

5) JDK 1.8 — CompletableFuture

The reverse geographical Callback logic is encapsulated into an independent CompletableFuture. When the asynchronous thread is called back, future.complete(T) is called to encapsulate the result.

The Call logic executed by weather is encapsulated into an independent CompletableFuture. After the procedure, the logic is the same as above.

Compose indicates join and whenComplete displays output:

You can encapsulate each IO operation as an independent CompletableFuture to avoid the Callback hell problem.

CompletableFuture has only two attributes:

  • result: The result of Future execution (either the result or boxed AltResult)
  • stack: The operation stack that defines the next operation action of the Future (top of Treiber stack of dependent actions)

How is the weatherFuture method called?

The stack shows that the obtained adcode is used as a parameter to execute the following logic when reverseCOdeFuture.complete(result) is executed.

This solution solves the Callback hell problem perfectly. It looks like the synchronous coding in the main logic.

6) Vert.x Future

The core execution logic is similar:

Of course, this is not what the Vert.x is all about. However, we don’t provide more details here.

7) Reactive Streams

Core abstraction: In the entire package, there are only four interfaces — Publisher, Subscriber, Processor, and Subscription — without one for implementation.

In JDK 9, they have been encapsulated as a specification in java.util.concurrent.Flow:

The following is a simple example:

8) Reactor, Spring 5, and Spring WebFlux

References

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.