Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
Expand Down Expand Up @@ -473,7 +474,7 @@ void testRequiresCompression(final CompressionType compressionType, final boolea

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingIndexes(final boolean encryption) {
void testTransformingIndexes(final boolean encryption) throws IOException {
final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
Expand All @@ -494,51 +495,63 @@ void testTransformingIndexes(final boolean encryption) {
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var bytes = "test".getBytes();
final var is = rsm.transformIndex(
IndexType.OFFSET,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
);
assertThat(is).isNotEmpty();
assertThat(segmentIndexBuilder.indexes()).containsOnly(IndexType.OFFSET);


// adding required indexes to test builder
rsm.transformIndex(
IndexType.TIMESTAMP,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
);
rsm.transformIndex(
IndexType.LEADER_EPOCH,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
);
rsm.transformIndex(
IndexType.PRODUCER_SNAPSHOT,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
);
final var index = segmentIndexBuilder.build();
assertThat(index.offset().size()).isGreaterThan(0);
assertThat(index.timestamp().size()).isGreaterThan(0);
assertThat(index.leaderEpoch().size()).isGreaterThan(0);
assertThat(index.producerSnapshot().size()).isGreaterThan(0);
assertThat(index.transaction()).isNull();
try (final var os = new ByteBufferOutputStream(0)) {
final var offset = "offset".getBytes();
rsm.transformIndex(
IndexType.OFFSET,
new ByteArrayInputStream(offset),
offset.length,
encryptionMetadata,
segmentIndexBuilder,
os
);
assertThat(os.buffer().position()).isGreaterThan(0);
assertThat(segmentIndexBuilder.indexes()).containsOnly(IndexType.OFFSET);

// adding required indexes to test builder
final var timestamp = "timestamp".getBytes();
rsm.transformIndex(
IndexType.TIMESTAMP,
new ByteArrayInputStream(timestamp),
timestamp.length,
encryptionMetadata,
segmentIndexBuilder,
os
);
final var leaderEpoch = "leader-epoch".getBytes();
rsm.transformIndex(
IndexType.LEADER_EPOCH,
new ByteArrayInputStream(leaderEpoch),
leaderEpoch.length,
encryptionMetadata,
segmentIndexBuilder,
os
);
final var producerSnapshot = "producer-snapshot".getBytes();
rsm.transformIndex(
IndexType.PRODUCER_SNAPSHOT,
new ByteArrayInputStream(producerSnapshot),
producerSnapshot.length,
encryptionMetadata,
segmentIndexBuilder,
os
);
final var index = segmentIndexBuilder.build();
assertThat(index.offset().size()).isGreaterThan(0);
assertThat(index.timestamp().size()).isGreaterThan(0);
assertThat(index.leaderEpoch().size()).isGreaterThan(0);
assertThat(index.producerSnapshot().size()).isGreaterThan(0);
assertThat(index.transaction()).isNull();
if (!encryption) {
assertThat(os.buffer().array())
.isEqualTo(("offset" + "timestamp" + "leader-epoch" + "producer-snapshot").getBytes());
}
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingEmptyIndexes(final boolean encryption) {
void testTransformingEmptyIndexes(final boolean encryption) throws IOException {
final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
Expand All @@ -557,44 +570,50 @@ void testTransformingEmptyIndexes(final boolean encryption) {
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var is = rsm.transformIndex(
IndexType.OFFSET,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
assertThat(is).isEmpty();
assertThat(segmentIndexBuilder.indexes()).containsOnly(IndexType.OFFSET);

// adding required indexes to test builder
rsm.transformIndex(
IndexType.TIMESTAMP,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
rsm.transformIndex(
IndexType.LEADER_EPOCH,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
rsm.transformIndex(
IndexType.PRODUCER_SNAPSHOT,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
final var index = segmentIndexBuilder.build();
assertThat(index.offset().size()).isEqualTo(0);
assertThat(index.timestamp().size()).isEqualTo(0);
assertThat(index.leaderEpoch().size()).isEqualTo(0);
assertThat(index.producerSnapshot().size()).isEqualTo(0);
assertThat(index.transaction()).isNull();
try (final var os = new ByteBufferOutputStream(0)) {
rsm.transformIndex(
IndexType.OFFSET,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder,
os
);
assertThat(os.position()).isEqualTo(0);
assertThat(segmentIndexBuilder.indexes()).containsOnly(IndexType.OFFSET);

// adding required indexes to test builder
rsm.transformIndex(
IndexType.TIMESTAMP,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder,
os
);
rsm.transformIndex(
IndexType.LEADER_EPOCH,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder,
os
);
rsm.transformIndex(
IndexType.PRODUCER_SNAPSHOT,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder,
os
);
final var index = segmentIndexBuilder.build();
assertThat(index.offset().size()).isEqualTo(0);
assertThat(index.timestamp().size()).isEqualTo(0);
assertThat(index.leaderEpoch().size()).isEqualTo(0);
assertThat(index.producerSnapshot().size()).isEqualTo(0);
assertThat(index.transaction()).isNull();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.io.OutputStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -42,6 +39,7 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
Expand Down Expand Up @@ -273,55 +271,56 @@ private SegmentIndexesV1 uploadIndexes(
final SegmentEncryptionMetadataV1 encryptionMeta,
final SegmentCustomMetadataBuilder customMetadataBuilder
) throws IOException, RemoteStorageException, StorageBackendException {
final List<InputStream> indexes = new ArrayList<>(IndexType.values().length);
final SegmentIndexesV1Builder segmentIndexBuilder = new SegmentIndexesV1Builder();

try (final ClosableInputStreamHolder closableInputStreamHolder = new ClosableInputStreamHolder()) {
final var offsetIndex = transformIndex(
final var outputStream = new ByteBufferOutputStream(0);
transformIndex(
IndexType.OFFSET,
closableInputStreamHolder.add(Files.newInputStream(segmentData.offsetIndex())),
indexSize(segmentData.offsetIndex()),
encryptionMeta,
segmentIndexBuilder
segmentIndexBuilder,
outputStream
);
indexes.add(offsetIndex);
final var timeIndex = transformIndex(
transformIndex(
IndexType.TIMESTAMP,
closableInputStreamHolder.add(Files.newInputStream(segmentData.timeIndex())),
indexSize(segmentData.timeIndex()),
encryptionMeta,
segmentIndexBuilder
segmentIndexBuilder,
outputStream
);
indexes.add(timeIndex);
final var producerSnapshotIndex = transformIndex(
transformIndex(
IndexType.PRODUCER_SNAPSHOT,
closableInputStreamHolder.add(Files.newInputStream(segmentData.producerSnapshotIndex())),
indexSize(segmentData.producerSnapshotIndex()),
encryptionMeta,
segmentIndexBuilder
segmentIndexBuilder,
outputStream
);
indexes.add(producerSnapshotIndex);
final var leaderEpoch = transformIndex(
transformIndex(
IndexType.LEADER_EPOCH,
closableInputStreamHolder.add(new ByteBufferInputStream(segmentData.leaderEpochIndex())),
segmentData.leaderEpochIndex().remaining(),
encryptionMeta,
segmentIndexBuilder
segmentIndexBuilder,
outputStream
);
indexes.add(leaderEpoch);
if (segmentData.transactionIndex().isPresent()) {
final var transactionIndex = transformIndex(
transformIndex(
IndexType.TRANSACTION,
closableInputStreamHolder.add(Files.newInputStream(segmentData.transactionIndex().get())),
indexSize(segmentData.transactionIndex().get()),
encryptionMeta,
segmentIndexBuilder
segmentIndexBuilder,
outputStream
);
indexes.add(transactionIndex);
}
final var suffix = ObjectKeyFactory.Suffix.INDEXES;
final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
try (final var in = new SequenceInputStream(Collections.enumeration(indexes))) {
final var buffer = outputStream.buffer().rewind();
try (final var in = new ByteBufferInputStream(buffer)) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
Expand Down Expand Up @@ -398,11 +397,14 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

InputStream transformIndex(final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder) {
void transformIndex(
final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder,
final OutputStream outputStream
) throws IOException {
log.debug("Transforming index {} with size {}", indexType, size);
if (size > 0) {
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size);
Expand All @@ -413,12 +415,12 @@ InputStream transformIndex(final IndexType indexType,
() -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD));
}
final var transformFinisher = new TransformFinisher(transformEnum, size);
final var inputStream = transformFinisher.nextElement();
try (final var inputStream = transformFinisher.toInputStream()) {
inputStream.transferTo(outputStream);
}
segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size());
return inputStream;
} else {
segmentIndexBuilder.add(indexType, 0);
return InputStream.nullInputStream();
}
}

Expand Down