Skip to content

Commit 8ebda98

Browse files
committed
Journal state tracking prototype
Improvements * clean up RecordPointer * cache last segment Implement mutation journal replay Persist clean/dirty state in metadata component Add mutation journal replay test, fix several issues, wire in journal into memtable Add bounce dtest, fix some more tests
1 parent 4bbb14c commit 8ebda98

27 files changed

+1381
-100
lines changed

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import org.apache.cassandra.repair.consistent.admin.PendingStat;
135135
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
136136
import org.apache.cassandra.replication.MutationId;
137+
import org.apache.cassandra.replication.MutationJournal;
137138
import org.apache.cassandra.schema.ColumnMetadata;
138139
import org.apache.cassandra.schema.CompactionParams;
139140
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
@@ -498,9 +499,16 @@ public ColumnFamilyStore(Keyspace keyspace,
498499

499500
logger.info("Initializing {}.{}", getKeyspaceName(), name);
500501

501-
Memtable initialMemtable = DatabaseDescriptor.isDaemonInitialized() ?
502-
createMemtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition())) :
503-
null;
502+
Memtable initialMemtable = null;
503+
if (DatabaseDescriptor.isDaemonInitialized())
504+
{
505+
CommitLogPosition commitLogPosition;
506+
if (metadata().replicationType().isTracked())
507+
commitLogPosition = MutationJournal.instance.getCurrentPosition();
508+
else
509+
commitLogPosition = CommitLog.instance.getCurrentPosition();
510+
initialMemtable = createMemtable(new AtomicReference<>(commitLogPosition));
511+
}
504512
memtableMetricsReleaser = memtableFactory.createMemtableMetricsReleaser(metadata);
505513

506514
data = new Tracker(this, initialMemtable, loadSSTables);
@@ -1147,8 +1155,13 @@ public CommitLogPosition call()
11471155
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
11481156
if (flushFailure == null && mainMemtable != null)
11491157
{
1158+
CommitLogPosition commitLogLowerBound = mainMemtable.getCommitLogLowerBound();
11501159
commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound();
1151-
CommitLog.instance.discardCompletedSegments(metadata.id, mainMemtable.getCommitLogLowerBound(), commitLogUpperBound);
1160+
TableMetadata metadata = metadata();
1161+
if (metadata.replicationType().isTracked())
1162+
MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
1163+
else
1164+
CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound);
11521165
}
11531166

11541167
metric.pendingFlushes.dec();
@@ -1215,7 +1228,7 @@ private Flush(boolean truncate)
12151228

12161229
// we then ensure an atomic decision is made about the upper bound of the continuous range of commit log
12171230
// records owned by this memtable
1218-
setCommitLogUpperBound(commitLogUpperBound);
1231+
setCommitLogUpperBound(commitLogUpperBound, metadata().replicationType().isTracked());
12191232

12201233
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
12211234
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
@@ -1416,15 +1429,21 @@ public Memtable createMemtable(AtomicReference<CommitLogPosition> commitLogUpper
14161429
}
14171430

14181431
// atomically set the upper bound for the commit log
1419-
private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound)
1432+
private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound, boolean useMutationJournal)
14201433
{
14211434
// we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are
14221435
// also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
14231436
// so that we know all operations prior to the position have not reached it yet
14241437
CommitLogPosition lastReplayPosition;
14251438
while (true)
14261439
{
1427-
lastReplayPosition = new Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition()));
1440+
CommitLogPosition commitLogPosition;
1441+
if (useMutationJournal)
1442+
commitLogPosition = MutationJournal.instance.getCurrentPosition();
1443+
else
1444+
commitLogPosition = CommitLog.instance.getCurrentPosition();
1445+
1446+
lastReplayPosition = new Memtable.LastCommitLogPosition(commitLogPosition);
14281447
CommitLogPosition currentLast = commitLogUpperBound.get();
14291448
if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
14301449
&& commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition))
@@ -3234,7 +3253,11 @@ void onTableDropped()
32343253

32353254
data.notifyDropped(DatabaseDescriptor.getAutoSnapshotTtl());
32363255

3237-
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
3256+
// TODO (required): mutation tracking
3257+
if (metadata().replicationType().isTracked())
3258+
throw new IllegalStateException("not implemented");
3259+
else
3260+
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
32383261

32393262
compactionStrategyManager.shutdown();
32403263

src/java/org/apache/cassandra/db/Mutation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ public DecoratedKey key()
201201
return key;
202202
}
203203

204+
public ImmutableMap<TableId, PartitionUpdate> modifications()
205+
{
206+
return modifications;
207+
}
208+
204209
public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
205210
{
206211
return modifications.values();

src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public static CommitLogReplayer construct(CommitLog commitLog, UUID localHostId)
121121

122122
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
123123
{
124-
// but, if we've truncated the cf in question, then we need to need to start replay after the truncation
124+
// but, if we've truncated the cf in question, then we need to start replay after the truncation
125125
CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
126126
if (truncatedAt != null)
127127
{
@@ -323,7 +323,7 @@ public void runMayThrow()
323323
{
324324
assert !newPUCollector.isEmpty();
325325

326-
Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false);
326+
keyspace.apply(newPUCollector.build(), false, true, false);
327327
commitLogReplayer.keyspacesReplayed.add(keyspace);
328328
}
329329
}
@@ -439,7 +439,7 @@ public static ReplayFilter create()
439439
if (toReplay.isEmpty())
440440
logger.info("All tables will be included in commit log replay.");
441441
else
442-
logger.info("Tables to be replayed: {}", toReplay.asMap().toString());
442+
logger.info("Tables to be replayed: {}", toReplay.asMap());
443443

444444
return new CustomReplayFilter(toReplay);
445445
}

src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.cassandra.db.WriteContext;
2525
import org.apache.cassandra.db.commitlog.CommitLogPosition;
2626
import org.apache.cassandra.exceptions.RequestExecutionException;
27-
import org.apache.cassandra.journal.RecordPointer;
2827
import org.apache.cassandra.replication.MutationJournal;
2928
import org.apache.cassandra.tracing.Tracing;
3029
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -40,10 +39,9 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
4039
group = Keyspace.writeOrder.start();
4140

4241
Tracing.trace("Appending to mutation journal");
43-
RecordPointer pointer = MutationJournal.instance.write(mutation.id(), mutation);
42+
CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation);
4443

45-
// TODO (preferred): update journal to return CommitLogPosition or otherwise remove requirement to allocate second object here
46-
return new CassandraWriteContext(group, new CommitLogPosition(pointer.segment, pointer.position));
44+
return new CassandraWriteContext(group, pointer);
4745
}
4846
catch (Throwable t)
4947
{

src/java/org/apache/cassandra/journal/ActiveSegment.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import accord.utils.Invariants;
3131
import com.codahale.metrics.Timer;
3232
import com.google.common.annotations.VisibleForTesting;
33+
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

36-
import org.apache.cassandra.io.util.*;
3737
import org.apache.cassandra.utils.*;
3838
import org.apache.cassandra.utils.concurrent.OpOrder;
3939
import org.apache.cassandra.utils.concurrent.Ref;
@@ -98,6 +98,11 @@ private ActiveSegment(
9898
}
9999
}
100100

101+
public CommitLogPosition currentPosition()
102+
{
103+
return new CommitLogPosition(id(), (int) allocateOffset);
104+
}
105+
101106
static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport)
102107
{
103108
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
@@ -134,6 +139,12 @@ StaticSegment<K, V> asStatic()
134139
throw new UnsupportedOperationException();
135140
}
136141

142+
@Override
143+
public void persistMetadata()
144+
{
145+
throw new UnsupportedOperationException("Can not mutate active segment's metadata. It should be done on flush.");
146+
}
147+
137148
/**
138149
* Read the entry and specified offset into the entry holder.
139150
* Expects the caller to acquire the ref to the segment and the record to exist.
@@ -425,19 +436,21 @@ private int allocateBytes(int size)
425436
}
426437
}
427438

428-
final class Allocation
439+
final class Allocation extends RecordPointer
429440
{
430441
private final OpOrder.Group appendOp;
431442
private final ByteBuffer buffer;
432-
private final int start;
433-
private final int length;
434443

435444
Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length)
436445
{
446+
super(descriptor.timestamp, buffer.position(), length);
437447
this.appendOp = appendOp;
438448
this.buffer = buffer;
439-
this.start = buffer.position();
440-
this.length = length;
449+
}
450+
451+
Segment<K, V> segment()
452+
{
453+
return ActiveSegment.this;
441454
}
442455

443456
void write(K id, ByteBuffer record)
@@ -446,7 +459,7 @@ void write(K id, ByteBuffer record)
446459
{
447460
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
448461
metadata.update();
449-
index.update(id, start, length);
462+
index.update(id, position, length);
450463
}
451464
catch (IOException e)
452465
{
@@ -472,14 +485,12 @@ void consumeBufferUnsafe(Consumer<ByteBuffer> fn)
472485
}
473486
}
474487

475-
476-
// Variant of write that does not allocate/return a record pointer
477488
void writeInternal(K id, ByteBuffer record)
478489
{
479490
try
480491
{
481492
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
482-
index.update(id, start, length);
493+
index.update(id, position, length);
483494
metadata.update();
484495
}
485496
catch (IOException e)
@@ -496,13 +507,13 @@ void awaitDurable(Timer waitingOnFlush)
496507
{
497508
try (Timer.Context ignored = waitingOnFlush.time())
498509
{
499-
waitForFlush(start);
510+
waitForFlush(position);
500511
}
501512
}
502513

503514
boolean isFsynced()
504515
{
505-
return fsyncedTo >= start + length;
516+
return fsyncedTo >= position + length;
506517
}
507518

508519
Descriptor descriptor()
@@ -512,12 +523,12 @@ Descriptor descriptor()
512523

513524
int start()
514525
{
515-
return start;
526+
return position;
516527
}
517528

518529
RecordPointer recordPointer()
519530
{
520-
return new RecordPointer(descriptor.timestamp, start, length);
531+
return this;
521532
}
522533
}
523534

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.journal;
20+
21+
import org.apache.cassandra.io.util.DataInputBuffer;
22+
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
26+
public abstract class DeserializedRecordConsumer<K, V> implements RecordConsumer<K>
27+
{
28+
final ValueSerializer<K, V> valueSerializer;
29+
30+
public DeserializedRecordConsumer(ValueSerializer<K, V> valueSerializer)
31+
{
32+
this.valueSerializer = valueSerializer;
33+
}
34+
35+
@Override
36+
public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion)
37+
{
38+
try (DataInputBuffer in = new DataInputBuffer(buffer, false))
39+
{
40+
V value = valueSerializer.deserialize(key, in, userVersion);
41+
accept(segment, position, key, value);
42+
}
43+
catch (IOException e)
44+
{
45+
// can only throw if serializer is buggy
46+
throw new RuntimeException(e);
47+
}
48+
}
49+
50+
protected abstract void accept(long segment, int position, K key, V buffer);
51+
}

0 commit comments

Comments
 (0)