DataBuffer doesn't write to file
Matthew Harrington
So I'm trying to send videochunks(Blobs) from a client to my server through reactive websocket in webflux and save each chunk into the same file, for now. The blobs are about 100kb each and it seems like the serverside is receiving them, because the terminal gives me a cryptic output every time the clients sends a blob.
Unfortunately the file is beeing created but the received data(videoDataFlux) is not written to the file. The file is always 0 bytes.
It makes no difference if I don't use DatabufferUtils::realease.
If someone could give me a hint, that would be great.
ReactiveWebSocketHandler.java
[...]
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) { String filename = "Streamname_" + webSocketSession.getId(); Path path = FileSystems.getDefault().getPath("C:\\Program Files (x86)\\Apache Software Foundation\\Tomcat 9.0\\webapps\\stream\\videos"); //File filelocation = new File(path + filename); System.out.println("The path is: " + path); Flux<DataBuffer> videoDataFlux = webSocketSession.receive() .map(WebSocketMessage::getPayload); try { //AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); //DataBufferUtils.write(videoDataFlux, channel, 0).subscribe(); Path file = Files.createTempFile(path, filename, ".webm"); WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE); DataBufferUtils.write(videoDataFlux, channel) .map(DataBufferUtils::release) .then(Mono.just(file)); } catch (IOException e) { } return webSocketSession.send(intervalFlux .map(webSocketSession::textMessage)) .and(webSocketSession.receive()); //.map(WebSocketMessage::getPayloadAsText).log());
}UPDATE:
I tried it like Thomas Andolf said but now there is a weird behaviour. It gets data which actually shouldn't even be send by the client and writes it once to a file. So that is workin partially. But if the client starts sending data, it takes the first blob, doesn't write it to a file and immmediately closes the websocket connection.
UPDATE 2:
I changed the return a little bit. Deleted .then(Mono.just(file)), because it probably caused the websocket to terminate immediately. And I deleted .map(DataBufferUtils::release, it causes terminating the socket aswell. Or more likely it calls completion for the flux, after what the socket closes. It still doesn't work like I want it to, but at least it takes a bunch of blobs and writes them to the file. Unfortunately it writes to the file only after I stop the client/code.
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) { try { String filename = "Streamname_" + webSocketSession.getId(); Path path = FileSystems.getDefault().getPath("C:\\Program Files (x86)\\Apache Software Foundation\\Tomcat 9.0\\webapps\\stream\\videos"); //File filelocation = new File(path + filename); System.out.println("The path is: " + path); Flux<DataBuffer> videoDataFlux = webSocketSession.receive() .map(WebSocketMessage::getPayload); //AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); //DataBufferUtils.write(videoDataFlux, channel, 0).subscribe(); Path file = Files.createTempFile(path, filename, ".webm"); WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE); return DataBufferUtils.write(videoDataFlux, channel) //.map(DataBufferUtils::release) .then(/*Mono.just(file)*/) .flatMap(out -> { return webSocketSession.send(intervalFlux .map(webSocketSession::textMessage)) .and(webSocketSession.receive()); }); } catch (IOException e) { } return webSocketSession.send(intervalFlux .map(webSocketSession::textMessage)) .and(webSocketSession.receive()); //.map(WebSocketMessage::getPayloadAsText).log());
}Exception thrown after updated code
This is before the changes from update 2
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s): |_ checkpoint ? [ReactorNettyRequestUpgradeStrategy]
Stack trace: at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at
[...]
[...] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na]
2020-05-23 15:06:20.908 WARN 7888 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil : Failed to release a message: BinaryWebSocketFrame(data: PooledUnsafeDirectByteBuf(freed))
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at
[...]
[...] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na]
2020-05-23 15:06:20.970 ERROR 7888 --- [ctor-http-nio-4] r.n.channel.ChannelOperationsHandler : [id: 0x156dc7a0, L:/[0:0:0:0:0:0:0:1]:8081 ! R:/[0:0:0:0:0:0:0:1]:53149] Error was received while reading the incoming data. The connection will be closed.
reactor.core.Exceptions$BubblingException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.core.publisher.Operators.onErrorDropped(Operators.java:590) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:147) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:418) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:243) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:353) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:154) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) ~[netty-codec-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.48.Final.jar:4.1.48.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s): |_ checkpoint ? [ReactorNettyRequestUpgradeStrategy]
Stack trace: at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:224) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:353) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at
[...]
[...] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na]
2020-05-23 15:06:21.022 ERROR 7888 --- [ctor-http-nio-4] reactor.netty.tcp.TcpServer : [id: 0x156dc7a0, L:/[0:0:0:0:0:0:0:1]:8081 ! R:/[0:0:0:0:0:0:0:1]:53149] onUncaughtException(SimpleConnection{channel=[id: 0x156dc7a0, L:/[0:0:0:0:0:0:0:1]:8081 ! R:/[0:0:0:0:0:0:0:1]:53149]})
reactor.core.Exceptions$BubblingException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.core.publisher.Operators.onErrorDropped(Operators.java:590) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:147) ~[reactor-core-3.3.4.RELEASE.jar:3.3.4.RELEASE] at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:418) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:243) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:353) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at
[...]
[...] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s): |_ checkpoint ? [ReactorNettyRequestUpgradeStrategy]
Stack trace: at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115) ~[netty-buffer-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:224) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:353) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:154) ~[reactor-netty-0.9.6.RELEASE.jar:0.9.6.RELEASE] at
[...]
[...]
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final] at (Thread.java:832) ~[na:na] 6 1 Answer
its because here:
DataBufferUtils.write(videoDataFlux, channel) .map(DataBufferUtils::release) .then(Mono.just(file));you are breaking the chain. You are ignoring the return value here.
you need to chain on and not ignore the returns, this code will probably not work but you get the point
return DataBufferUtils.write(videoDataFlux, channel) .map(DataBufferUtils::release) .then(Mono.just(file)) .flatMap(file -> { return webSocketSession.send(intervalFlux .map(webSocketSession::textMessage)) .and(webSocketSession.receive()); }); 3