Skip to content

Commit 1ccec48

Browse files
committed
- Support mutable lazy vtables and port remaining accord tables
- Integrate txn_blocked_by deadline and depth filter with execution logic, to ensure we terminate promptly and get a best effort reply
1 parent add004b commit 1ccec48

File tree

11 files changed

+982
-972
lines changed

11 files changed

+982
-972
lines changed

src/java/org/apache/cassandra/db/marshal/TxnIdUtf8Type.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public <V> void validate(V value, ValueAccessor<V> accessor) throws MarshalExcep
4848

4949
String describe() { return "TxnId"; }
5050

51+
@Override
52+
public boolean isEmptyValueMeaningless()
53+
{
54+
return true;
55+
}
56+
5157
@Override
5258
public TypeSerializer<String> getSerializer()
5359
{

src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java

Lines changed: 184 additions & 84 deletions
Large diffs are not rendered by default.
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.db.virtual;
19+
20+
import java.util.Iterator;
21+
22+
import javax.annotation.Nullable;
23+
24+
import org.apache.cassandra.db.Clustering;
25+
import org.apache.cassandra.db.ClusteringPrefix;
26+
import org.apache.cassandra.db.DeletionInfo;
27+
import org.apache.cassandra.db.RangeTombstone;
28+
import org.apache.cassandra.db.Slice;
29+
import org.apache.cassandra.db.partitions.PartitionUpdate;
30+
import org.apache.cassandra.db.rows.Cell;
31+
import org.apache.cassandra.db.rows.ColumnData;
32+
import org.apache.cassandra.db.rows.Row;
33+
import org.apache.cassandra.exceptions.InvalidRequestException;
34+
import org.apache.cassandra.schema.ColumnMetadata;
35+
import org.apache.cassandra.schema.TableMetadata;
36+
37+
import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
38+
import static org.apache.cassandra.db.ClusteringPrefix.Kind.STATIC_CLUSTERING;
39+
40+
/**
41+
* An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source
42+
* modification via INSERT/UPDATE, DELETE and TRUNCATE operations.
43+
*
44+
* Virtual table implementation need to be thread-safe has they can be called from different threads.
45+
*/
46+
public abstract class AbstractMutableLazyVirtualTable extends AbstractLazyVirtualTable
47+
{
48+
protected AbstractMutableLazyVirtualTable(TableMetadata metadata, OnTimeout onTimeout, Sorted sorted)
49+
{
50+
super(metadata, onTimeout, sorted);
51+
}
52+
53+
protected AbstractMutableLazyVirtualTable(TableMetadata metadata, OnTimeout onTimeout, Sorted sorted, Sorted sortedByPartitionKey)
54+
{
55+
super(metadata, onTimeout, sorted, sortedByPartitionKey);
56+
}
57+
58+
protected void applyPartitionDeletion(Object[] partitionKeys)
59+
{
60+
throw invalidRequest("Partition deletion is not supported by table %s", metadata());
61+
}
62+
63+
protected void applyRangeTombstone(Object[] partitionKey, Object[] start, boolean startInclusive, Object[] end, boolean endInclusive)
64+
{
65+
throw invalidRequest("Range deletion is not supported by table %s", metadata());
66+
}
67+
68+
protected void applyRowDeletion(Object[] partitionKey, @Nullable Object[] clusteringKeys)
69+
{
70+
throw invalidRequest("Row deletion is not supported by table %s", metadata());
71+
}
72+
73+
protected void applyRowUpdate(Object[] partitionKey, @Nullable Object[] clusteringColumns, ColumnMetadata[] columns, Object[] values)
74+
{
75+
throw invalidRequest("Column modification is not supported by table %s", metadata());
76+
}
77+
78+
private void applyRangeTombstone(Object[] pks, RangeTombstone rt)
79+
{
80+
Slice slice = rt.deletedSlice();
81+
Object[] starts = composeClusterings(slice.start(), metadata());
82+
Object[] ends = composeClusterings(slice.end(), metadata());
83+
applyRangeTombstone(pks, starts, slice.start().isInclusive(), ends, slice.end().isInclusive());
84+
}
85+
86+
private void applyRow(Object[] pks, Row row)
87+
{
88+
Object[] cks = row.clustering().kind() == STATIC_CLUSTERING ? null : composeClusterings(row.clustering(), metadata());
89+
if (!row.deletion().isLive())
90+
{
91+
applyRowDeletion(pks, cks);
92+
}
93+
else
94+
{
95+
ColumnMetadata[] columns = new ColumnMetadata[row.columnCount()];
96+
Object[] values = new Object[row.columnCount()];
97+
int i = 0;
98+
for (ColumnData cd : row)
99+
{
100+
ColumnMetadata cm = cd.column();
101+
if (cm.isComplex())
102+
throw new InvalidRequestException(metadata() + " does not support complex column updates");
103+
Cell cell = (Cell)cd;
104+
columns[i] = cm;
105+
if (!cell.isTombstone())
106+
values[i] = cm.type.compose(cell.value(), cell.accessor());
107+
++i;
108+
}
109+
applyRowUpdate(pks, cks, columns, values);
110+
}
111+
}
112+
113+
public void apply(PartitionUpdate update)
114+
{
115+
TableMetadata metadata = metadata();
116+
Object[] pks = composePartitionKeys(update.partitionKey(), metadata);
117+
118+
DeletionInfo deletionInfo = update.deletionInfo();
119+
if (!deletionInfo.getPartitionDeletion().isLive())
120+
{
121+
applyPartitionDeletion(pks);
122+
}
123+
else if (deletionInfo.hasRanges())
124+
{
125+
Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
126+
while (iter.hasNext())
127+
applyRangeTombstone(pks, iter.next());
128+
}
129+
else
130+
{
131+
for (Row row : update)
132+
applyRow(pks, row);
133+
if (!update.staticRow().isEmpty())
134+
applyRow(pks, update.staticRow());
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)