From 638d6a612da228d39a208be2e12bc9a450842765 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 28 Jul 2025 08:34:56 -0400 Subject: [PATCH 1/5] Initial sketch of adding metrics to fs2.io.net --- io/js/src/main/scala/fs2/io/ioplatform.scala | 12 +++++- .../scala/fs2/io/net/SocketPlatform.scala | 25 ++++++++++-- .../fs2/io/net/tls/TLSSocketPlatform.scala | 1 + .../scala/fs2/io/net/SocketPlatform.scala | 30 +++++++++++--- .../fs2/io/net/AsyncUnixSocketsProvider.scala | 21 ++++++++-- .../scala/fs2/io/net/SelectingSocket.scala | 16 +++++++- .../fs2/io/net/tls/TLSSocketPlatform.scala | 3 ++ .../scala/fs2/io/net/FdPollingSocket.scala | 26 +++++++++--- .../fs2/io/net/tls/TLSSocketPlatform.scala | 2 + .../src/main/scala/fs2/io/net/Socket.scala | 2 + .../main/scala/fs2/io/net/SocketMetrics.scala | 40 +++++++++++++++++++ 11 files changed, 158 insertions(+), 20 deletions(-) create mode 100644 io/shared/src/main/scala/fs2/io/net/SocketMetrics.scala diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 1770e4dc5e..8ccf1b2cd9 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -191,6 +191,13 @@ private[fs2] trait ioplatform { def writeWritable[F[_]]( writable: F[Writable], endAfterUse: Boolean = true + )(implicit F: Async[F]): Pipe[F, Byte, Nothing] = + writeWritableInstrumented(writable, endAfterUse, _ => ()) + + private[io] def writeWritableInstrumented[F[_]]( + writable: F[Writable], + endAfterUse: Boolean = true, + onWrite: Chunk[Byte] => Unit )(implicit F: Async[F]): Pipe[F, Byte, Nothing] = in => Stream @@ -201,7 +208,10 @@ private[fs2] trait ioplatform { F.delay { writable.write( chunk.toUint8Array, - e => cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException)) + e => { + onWrite(chunk) + cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException)) + } ) Some(F.delay(writable.destroy())) } diff --git a/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala index 52d7424d18..12c14b7434 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala @@ -27,6 +27,7 @@ import cats.data.{Kleisli, OptionT} import cats.effect.{Async, Resource} import com.comcast.ip4s.{GenSocketAddress, IpAddress, SocketAddress} import fs2.io.internal.{facade, SuspendedStream} +import java.util.concurrent.atomic.AtomicLong private[net] trait SocketCompanionPlatform { @@ -55,15 +56,27 @@ private[net] trait SocketCompanionPlatform { val address: GenSocketAddress, val peerAddress: GenSocketAddress )(implicit F: Async[F]) - extends Socket[F] { + extends Socket[F] { outer => + + private val totalBytesRead: AtomicLong = new AtomicLong(0L) + private val totalBytesWritten: AtomicLong = new AtomicLong(0L) + private val incompleteWriteCount: AtomicLong = new AtomicLong(0L) + + def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics { + def totalBytesRead(): Long = outer.totalBytesRead.get + def totalBytesWritten(): Long = outer.totalBytesWritten.get + def incompleteWriteCount(): Long = outer.incompleteWriteCount.get + } private def read( f: Stream[F, Byte] => Pull[F, Chunk[Byte], Option[(Chunk[Byte], Stream[F, Byte])]] ): F[Option[Chunk[Byte]]] = readStream .getAndUpdate(Kleisli(f).flatMapF { - case Some((chunk, tail)) => Pull.output1(chunk).as(tail) - case None => Pull.pure(Stream.empty) + case Some((chunk, tail)) => + totalBytesRead.addAndGet(chunk.size.toLong) + Pull.output1(chunk).as(tail) + case None => Pull.pure(Stream.empty) }.run) .compile .last @@ -113,7 +126,11 @@ private[net] trait SocketCompanionPlatform { Stream.chunk(bytes).through(writes).compile.drain override def writes: Pipe[F, Byte, Nothing] = - writeWritable(F.pure(sock), endAfterUse = false) + writeWritableInstrumented( + F.pure(sock), + endAfterUse = false, + c => { totalBytesWritten.addAndGet(c.size.toLong); () } + ) } } diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index 9fb45cf2f0..7b53cf647d 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -78,5 +78,6 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => override def getOption[A](key: SocketOption.Key[A]) = underlying.getOption(key) override def setOption[A](key: SocketOption.Key[A], value: A) = underlying.setOption(key, value) override def supportedOptions = underlying.supportedOptions + override def metrics = underlying.metrics } } diff --git a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala index f42cfd7a2d..25a0b22940 100644 --- a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala @@ -30,6 +30,7 @@ import cats.syntax.all._ import java.nio.channels.{AsynchronousSocketChannel, CompletionHandler} import java.nio.{Buffer, ByteBuffer} +import java.util.concurrent.atomic.AtomicLong private[net] trait SocketCompanionPlatform { private[net] def forAsync[F[_]: Async]( @@ -50,6 +51,7 @@ private[net] trait SocketCompanionPlatform { extends Socket[F] { private[this] final val defaultReadSize = 8192 private[this] var readBuffer: ByteBuffer = ByteBuffer.allocate(defaultReadSize) + protected val totalBytesRead: AtomicLong = new AtomicLong(0L) private def withReadBuffer[A](size: Int)(f: ByteBuffer => F[A]): F[A] = readMutex.lock.surround { @@ -65,6 +67,13 @@ private[net] trait SocketCompanionPlatform { /** Performs a single channel read operation in to the supplied buffer. */ protected def readChunk(buffer: ByteBuffer): F[Int] + /** Instrumented version of `readChunk`. */ + protected def readChunk0(buffer: ByteBuffer): F[Int] = + readChunk(buffer).map { bytesRead => + totalBytesRead.addAndGet(bytesRead.toLong) + bytesRead + } + /** Copies the contents of the supplied buffer to a `Chunk[Byte]` and clears the buffer contents. */ private def releaseBuffer(buffer: ByteBuffer): F[Chunk[Byte]] = F.delay { @@ -83,7 +92,7 @@ private[net] trait SocketCompanionPlatform { def read(max: Int): F[Option[Chunk[Byte]]] = withReadBuffer(max) { buffer => - readChunk(buffer).flatMap { read => + readChunk0(buffer).flatMap { read => if (read < 0) F.pure(None) else releaseBuffer(buffer).map(Some(_)) } @@ -92,7 +101,7 @@ private[net] trait SocketCompanionPlatform { def readN(max: Int): F[Chunk[Byte]] = withReadBuffer(max) { buffer => def go: F[Chunk[Byte]] = - readChunk(buffer).flatMap { readBytes => + readChunk0(buffer).flatMap { readBytes => if (readBytes < 0 || buffer.position() >= max) releaseBuffer(buffer) else go @@ -114,11 +123,20 @@ private[net] trait SocketCompanionPlatform { val peerAddress: GenSocketAddress )(implicit F: Async[F]) extends BufferedReads[F](readMutex) - with SocketInfo.AsyncSocketInfo[F] { + with SocketInfo.AsyncSocketInfo[F] { outer => protected def asyncInstance = F protected def channel = ch + private val totalBytesWritten: AtomicLong = new AtomicLong(0L) + private val incompleteWriteCount: AtomicLong = new AtomicLong(0L) + + def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics { + def totalBytesRead(): Long = outer.totalBytesRead.get + def totalBytesWritten(): Long = outer.totalBytesWritten.get + def incompleteWriteCount(): Long = outer.incompleteWriteCount.get + } + protected def readChunk(buffer: ByteBuffer): F[Int] = F.async[Int] { cb => ch.read( @@ -139,9 +157,11 @@ private[net] trait SocketCompanionPlatform { ) F.delay(Some(endOfOutput.voidError)) }.flatMap { written => - if (written >= 0 && buff.remaining() > 0) + totalBytesWritten.addAndGet(written.toLong) + if (written >= 0 && buff.remaining() > 0) { + incompleteWriteCount.incrementAndGet() go(buff) - else F.unit + } else F.unit } writeMutex.lock.surround { F.delay(bytes.toByteBuffer).flatMap(go) diff --git a/io/jvm/src/main/scala/fs2/io/net/AsyncUnixSocketsProvider.scala b/io/jvm/src/main/scala/fs2/io/net/AsyncUnixSocketsProvider.scala index 073bc7fa2d..f82b268e7a 100644 --- a/io/jvm/src/main/scala/fs2/io/net/AsyncUnixSocketsProvider.scala +++ b/io/jvm/src/main/scala/fs2/io/net/AsyncUnixSocketsProvider.scala @@ -34,6 +34,7 @@ import fs2.io.file.{Files, FileHandle, SyncFileHandle} import java.nio.ByteBuffer import java.nio.channels.SocketChannel +import java.util.concurrent.atomic.AtomicLong private[net] abstract class AsyncUnixSocketsProvider[F[_]: Files](implicit F: Async[F]) extends UnixSocketsProvider[F] { @@ -98,19 +99,33 @@ private[net] object AsyncUnixSocketsProvider { val peerAddress: UnixSocketAddress )(implicit F: Async[F]) extends Socket.BufferedReads[F](readMutex) - with SocketInfo.OptionsSupport[F] { + with SocketInfo.OptionsSupport[F] { outer => protected def asyncInstance = F protected def channel = ch + private val totalBytesWritten: AtomicLong = new AtomicLong(0L) + private val incompleteWriteCount: AtomicLong = new AtomicLong(0L) + + def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics { + def totalBytesRead(): Long = outer.totalBytesRead.get + def totalBytesWritten(): Long = outer.totalBytesWritten.get + def incompleteWriteCount(): Long = outer.incompleteWriteCount.get + } + def readChunk(buff: ByteBuffer): F[Int] = evalOnVirtualThreadIfAvailable(F.blocking(ch.read(buff))) .cancelable(close) def write(bytes: Chunk[Byte]): F[Unit] = { def go(buff: ByteBuffer): F[Unit] = - F.blocking(ch.write(buff)).cancelable(close) *> - F.delay(buff.remaining <= 0).ifM(F.unit, go(buff)) + F.blocking(ch.write(buff)).cancelable(close).flatMap { written => + totalBytesWritten.addAndGet(written.toLong) + if (written >= 0 && buff.remaining() > 0) { + incompleteWriteCount.incrementAndGet() + go(buff) + } else F.unit + } writeMutex.lock.surround { F.delay(bytes.toByteBuffer).flatMap(buffer => evalOnVirtualThreadIfAvailable(go(buffer))) diff --git a/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala b/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala index 2b7cf81031..979f19b40f 100644 --- a/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala +++ b/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala @@ -35,6 +35,7 @@ import java.nio.ByteBuffer import java.nio.channels.SelectionKey.OP_READ import java.nio.channels.SelectionKey.OP_WRITE import java.nio.channels.SocketChannel +import java.util.concurrent.atomic.AtomicLong private final class SelectingSocket[F[_]: LiftIO] private ( selector: Selector, @@ -45,11 +46,20 @@ private final class SelectingSocket[F[_]: LiftIO] private ( val peerAddress: SocketAddress[IpAddress] )(implicit F: Async[F]) extends Socket.BufferedReads(readMutex) - with SocketInfo.AsyncSocketInfo[F] { + with SocketInfo.AsyncSocketInfo[F] { outer => protected def asyncInstance = F protected def channel = ch + private val totalBytesWritten: AtomicLong = new AtomicLong(0L) + private val incompleteWriteCount: AtomicLong = new AtomicLong(0L) + + def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics { + def totalBytesRead(): Long = outer.totalBytesRead.get + def totalBytesWritten(): Long = outer.totalBytesWritten.get + def incompleteWriteCount(): Long = outer.incompleteWriteCount.get + } + override def localAddress: F[SocketAddress[IpAddress]] = asyncInstance.pure(address) @@ -65,10 +75,12 @@ private final class SelectingSocket[F[_]: LiftIO] private ( def write(bytes: Chunk[Byte]): F[Unit] = { def go(buf: ByteBuffer): F[Unit] = F.delay { - ch.write(buf) + val written = ch.write(buf) + totalBytesWritten.addAndGet(written.toLong) buf.remaining() }.flatMap { remaining => if (remaining > 0) { + incompleteWriteCount.incrementAndGet() selector.select(ch, OP_WRITE).to *> go(buf) } else F.unit } diff --git a/io/jvm/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/jvm/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index ae31cce0db..bd9a5e5ce7 100644 --- a/io/jvm/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -120,6 +120,9 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => def applicationProtocol: F[String] = engine.applicationProtocol + def metrics: SocketMetrics = + socket.metrics + @deprecated("3.13.0", "No replacement; sockets are open until they are finalized") def isOpen: F[Boolean] = socket.isOpen } diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index f0f9c4bf57..da363d9433 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -35,6 +35,8 @@ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ +import java.util.concurrent.atomic.AtomicLong + import FdPollingSocket._ private final class FdPollingSocket[F[_]: LiftIO] private ( @@ -45,7 +47,17 @@ private final class FdPollingSocket[F[_]: LiftIO] private ( val address: GenSocketAddress, val peerAddress: GenSocketAddress )(implicit F: Async[F]) - extends Socket[F] { + extends Socket[F] { outer => + + private val totalBytesRead: AtomicLong = new AtomicLong(0L) + private val totalBytesWritten: AtomicLong = new AtomicLong(0L) + private val incompleteWriteCount: AtomicLong = new AtomicLong(0L) + + def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics { + def totalBytesRead() = outer.totalBytesRead.get + def totalBytesWritten() = outer.totalBytesWritten.get + def incompleteWriteCount() = outer.incompleteWriteCount.get + } def localAddress = F.pure(address.asIpUnsafe) def remoteAddress = F.pure(peerAddress.asIpUnsafe) @@ -60,9 +72,10 @@ private final class FdPollingSocket[F[_]: LiftIO] private ( handle .pollReadRec(()) { _ => IO(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { readed => - if (readed > 0) + if (readed > 0) { + totalBytesRead.addAndGet(readed.toLong) IO(Right(Some(Chunk.fromBytePtr(buf, readed)))) - else if (readed == 0) + } else if (readed == 0) IO.pure(Right(None)) else IO.pure(Left(())) @@ -76,6 +89,7 @@ private final class FdPollingSocket[F[_]: LiftIO] private ( def go(pos: Int): IO[Either[Int, Chunk[Byte]]] = IO(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))).flatMap { readed => if (readed > 0) { + totalBytesRead.addAndGet(readed.toLong) val newPos = pos + readed if (newPos < numBytes) go(newPos) else IO(Right(Chunk.fromBytePtr(buf, newPos))) @@ -103,10 +117,12 @@ private final class FdPollingSocket[F[_]: LiftIO] private ( guard(unistd.write(fd, buf.atUnsafe(offset + pos), (length - pos).toULong)) }.flatMap { wrote => if (wrote >= 0) { + totalBytesWritten.addAndGet(wrote.toLong) val newPos = pos + wrote - if (newPos < length) + if (newPos < length) { + incompleteWriteCount.incrementAndGet() go(newPos) - else + } else IO.pure(Either.unit) } else IO.pure(Left(pos)) diff --git a/io/native/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/native/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index 43ddc7daee..d19dfb15d2 100644 --- a/io/native/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/native/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -112,6 +112,8 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => def applicationProtocol: F[String] = connection.applicationProtocol + def metrics: SocketMetrics = socket.metrics + @deprecated("3.13.0", "No replacement; sockets are open until they are finalized") def isOpen: F[Boolean] = socket.isOpen } diff --git a/io/shared/src/main/scala/fs2/io/net/Socket.scala b/io/shared/src/main/scala/fs2/io/net/Socket.scala index 50b540f7b4..fe3433fc31 100644 --- a/io/shared/src/main/scala/fs2/io/net/Socket.scala +++ b/io/shared/src/main/scala/fs2/io/net/Socket.scala @@ -94,6 +94,8 @@ trait Socket[F[_]] extends SocketInfo[F] { go(offset, count).through(writes) } + def metrics: SocketMetrics + // Deprecated members @deprecated("3.13.0", "No replacement; sockets are open until they are finalized") diff --git a/io/shared/src/main/scala/fs2/io/net/SocketMetrics.scala b/io/shared/src/main/scala/fs2/io/net/SocketMetrics.scala new file mode 100644 index 0000000000..bf52a4d6f8 --- /dev/null +++ b/io/shared/src/main/scala/fs2/io/net/SocketMetrics.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package net + +sealed trait SocketMetrics { + + /** Total number of bytes read from the network through this socket. */ + def totalBytesRead(): Long + + /** Total number of bytes written to the network through this socket. */ + def totalBytesWritten(): Long + + /** Number of times a write request consumed only part of the write buffer. */ + def incompleteWriteCount(): Long +} + +object SocketMetrics { + private[net] trait UnsealedSocketMetrics extends SocketMetrics +} From f4a40db6a8f424cdc9adb778696f07f7932b2416 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 28 Jul 2025 08:52:16 -0400 Subject: [PATCH 2/5] Mima exclusions --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 37bbf25caf..1ed7f5b968 100644 --- a/build.sbt +++ b/build.sbt @@ -353,7 +353,8 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( "fs2.io.net.DatagramSocketOption.multicastInterface" ), ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.dns"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.interfaces") + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.interfaces"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.metrics") ) lazy val root = tlCrossRootProject From 1930d098ff3476727bc95e13b460ff3317fa46fd Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 4 Aug 2025 08:23:49 -0400 Subject: [PATCH 3/5] Guard increment of totalBytesRead --- io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala index 25a0b22940..36b63a9504 100644 --- a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala @@ -70,7 +70,8 @@ private[net] trait SocketCompanionPlatform { /** Instrumented version of `readChunk`. */ protected def readChunk0(buffer: ByteBuffer): F[Int] = readChunk(buffer).map { bytesRead => - totalBytesRead.addAndGet(bytesRead.toLong) + if (bytesRead >= 0) + totalBytesRead.addAndGet(bytesRead.toLong) bytesRead } From 33135666010b906d99941f7abefd230f838bb972 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 4 Aug 2025 09:03:19 -0400 Subject: [PATCH 4/5] Fix warnings --- io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala index 36b63a9504..0619e0ec44 100644 --- a/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala @@ -71,7 +71,7 @@ private[net] trait SocketCompanionPlatform { protected def readChunk0(buffer: ByteBuffer): F[Int] = readChunk(buffer).map { bytesRead => if (bytesRead >= 0) - totalBytesRead.addAndGet(bytesRead.toLong) + totalBytesRead.addAndGet(bytesRead.toLong): Unit bytesRead } From 55b55dbe3199abecd79e739048d5c5046cd65b7a Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 3 Sep 2025 22:10:59 -0400 Subject: [PATCH 5/5] Fix compilation due to merge issue --- io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala b/io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala index 459f2d0adf..536ae9dce0 100644 --- a/io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala +++ b/io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala @@ -237,6 +237,7 @@ class TLSSocketSuite extends TLSSuite { @deprecated("", "") def remoteAddress = raw.remoteAddress def writes = raw.writes + def metrics = raw.metrics def address = raw.address def getOption[A](key: SocketOption.Key[A]) = raw.getOption(key)