Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ c2d5c9dd48ce9221f1c53bfc8c7cb15213fe7c89

# Scala Steward: Reformat with scalafmt 3.9.4
9a84b87564feab8a4a75150a27dd9223f2b87366

# Scala Steward: Reformat with scalafmt 3.9.10
0497fa4c62602d99cb61164b28f268829ebe3812
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.9.5"
version = "3.9.10"
runner.dialect = scala212
maxColumn = 100

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ object Ratatool {
sys.exit(1)
} else {
args.head match {
case BigDiffy.command => BigDiffy.run(args.tail)
case BigSampler.command => BigSampler.run(args.tail)
case BigDiffy.command => BigDiffy.run(args.tail)
case BigSampler.command => BigSampler.run(args.tail)
case DirectSamplerParser.command =>
val opts = DirectSamplerParser.parse(args.tail)
if (opts.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,23 @@ object BigDiffy extends Command with Serializable {
val accMissingRhs = ScioMetrics.counter[Long]("MISSING_RHS")

(lKeyed ++ rKeyed).groupByKey
.map { case (key, values) => // values is a list of tuples: "l" -> record or "r" -> record
if (values.size > 2) {
throw new RuntimeException(s"""More than two values found for key: $key.
.map {
case (key, values) => // values is a list of tuples: "l" -> record or "r" -> record
if (values.size > 2) {
throw new RuntimeException(s"""More than two values found for key: $key.
| Your key must be unique in both SCollections""".stripMargin)
}
}

val valuesMap = values.toMap // L/R -> record
if (valuesMap.size == 2) {
val deltas: Seq[Delta] = diffy(valuesMap("l"), valuesMap("r"))
val diffType = if (deltas.isEmpty) DiffType.SAME else DiffType.DIFFERENT
(key, (deltas, diffType))
} else {
val diffType = if (valuesMap.contains("l")) DiffType.MISSING_RHS else DiffType.MISSING_LHS
(key, (Nil, diffType))
}
val valuesMap = values.toMap // L/R -> record
if (valuesMap.size == 2) {
val deltas: Seq[Delta] = diffy(valuesMap("l"), valuesMap("r"))
val diffType = if (deltas.isEmpty) DiffType.SAME else DiffType.DIFFERENT
(key, (deltas, diffType))
} else {
val diffType =
if (valuesMap.contains("l")) DiffType.MISSING_RHS else DiffType.MISSING_LHS
(key, (Nil, diffType))
}
}
.tap {
case (_, (_, DiffType.SAME)) => accSame.inc()
Expand Down Expand Up @@ -280,7 +282,7 @@ object BigDiffy extends Command with Serializable {
val optD = d.delta match {
case UnknownDelta => None
case TypedDelta(t, v) if ignoreNan && v.isNaN => None
case TypedDelta(t, v) =>
case TypedDelta(t, v) =>
Some((t, Min(v), Max(v), Moments.aggregator.prepare(v)))
}
// Map of field -> (count, delta statistics)
Expand Down Expand Up @@ -557,8 +559,8 @@ object BigDiffy extends Command with Serializable {
yMap.foreach(kv => names.add(kv._1))
names.map { n =>
(xMap.get(n), yMap.get(n)) match {
case (Some(f), None) => f
case (None, Some(f)) => f
case (Some(f), None) => f
case (None, Some(f)) => f
case (Some(fx), Some(fy)) =>
val fxMode = getFieldModeWithDefault(fx.getMode)
val fyMode = getFieldModeWithDefault(fy.getMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[samplers] object BigSamplerAvro {
hasher: Hasher
): Hasher = {
fieldSchema.getType match {
case Type.ENUM => hashEnum(fieldName, fieldSchema, fieldValue, hasher)
case Type.ENUM => hashEnum(fieldName, fieldSchema, fieldValue, hasher)
case Type.STRING =>
hasher.putString(fieldValue.asInstanceOf[CharSequence], BigSampler.utf8Charset)
case Type.BYTES => hashBytes(fieldName, fieldSchema, fieldValue, hasher)
Expand All @@ -177,7 +177,7 @@ private[samplers] object BigSamplerAvro {
case Type.BOOLEAN => hasher.putBoolean(fieldValue.asInstanceOf[Boolean])
case Type.FIXED => hashBytes(fieldName, fieldSchema, fieldValue, hasher)
case Type.NULL => hasher // Ignore nulls
case t =>
case t =>
throw new UnsupportedOperationException(
s"Type `${fieldSchema.getType}` of `${fieldName}` is not supported as sampling key!"
)
Expand All @@ -194,7 +194,7 @@ private[samplers] object BigSamplerAvro {
fieldValue match {
case sv: Enum[_] => hasher.putString(sv.name, BigSampler.utf8Charset)
case gv: GenericData.EnumSymbol => hasher.putString(gv.toString, BigSampler.utf8Charset)
case _ =>
case _ =>
throw new UnsupportedOperationException(
s"Internal type of `${fieldName}` not consistent with `${fieldSchema.getType}`!"
)
Expand All @@ -212,7 +212,7 @@ private[samplers] object BigSamplerAvro {
case sv: Array[Byte] => hasher.putBytes(sv)
case gv: ByteBuffer => hasher.putBytes(gv.array())
case fv: GenericFixed => hasher.putBytes(fv.bytes())
case _ =>
case _ =>
throw new UnsupportedOperationException(
s"Internal type of `${fieldName}` not consistent with `${fieldSchema.getType}`!"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[samplers] object BigSamplerBigQuery {
vs.foldLeft(hasher)((hasher, v) => hasher.putBoolean(v.toString.toBoolean))
case "INTEGER" => vs.foldLeft(hasher)((hasher, v) => hasher.putLong(v.toString.toLong))
case "FLOAT" => vs.foldLeft(hasher)((hasher, v) => hasher.putFloat(v.toString.toFloat))
case "STRING" =>
case "STRING" =>
vs.foldLeft(hasher)((hasher, v) => hasher.putString(v.toString, BigSampler.utf8Charset))
case "BYTES" =>
vs.foldLeft(hasher)((hasher, v) => hasher.putBytes(v.asInstanceOf[Array[Byte]]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[samplers] object BigSamplerProto {
case JavaType.FLOAT => hasher.putFloat(v.asInstanceOf[Float])
case JavaType.DOUBLE => hasher.putDouble(v.asInstanceOf[Double])
case JavaType.BOOLEAN => hasher.putBoolean(v.asInstanceOf[Boolean])
case JavaType.STRING =>
case JavaType.STRING =>
hasher.putString(v.asInstanceOf[CharSequence], BigSampler.utf8Charset)
case JavaType.BYTE_STRING => hasher.putBytes(v.asInstanceOf[ByteString].toByteArray)
case JavaType.ENUM => hasher.putString(v.asInstanceOf[Enum[_]].name, BigSampler.utf8Charset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ trait AvroGeneratorOps {
val clazz = Class.forName(javaClass)
stringableGens.get(clazz) match {
case Some(gen) => gen
case None =>
case None =>
val ctor = clazz.getDeclaredConstructor(classOf[String])
ctor.setAccessible(true)
genString.map { str =>
Expand Down Expand Up @@ -223,7 +223,7 @@ trait AvroGeneratorOps {
case Schema.Type.STRING =>
Option(schema.getProp(SpecificData.CLASS_PROP)) match {
case Some(cls) => genStringable(cls)
case None =>
case None =>
val str = genAvroString(schema)
conversion match {
case Some(c) => str.map(cs => c.fromCharSequence(cs, schema, schema.getLogicalType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ trait TableRowGeneratorOps {
private def tableFieldValueOf(fieldSchema: TableFieldSchema): Gen[TableFieldValue] = {
val n = fieldSchema.getName
def genV(): Gen[TableFieldValue] = fieldSchema.getType match {
case "INTEGER" => Arbitrary.arbInt.arbitrary.map(TableFieldValue(n, _))
case "FLOAT" => Arbitrary.arbFloat.arbitrary.map(TableFieldValue(n, _))
case "BOOLEAN" => Arbitrary.arbBool.arbitrary.map(TableFieldValue(n, _))
case "STRING" => Arbitrary.arbString.arbitrary.map(TableFieldValue(n, _))
case "INTEGER" => Arbitrary.arbInt.arbitrary.map(TableFieldValue(n, _))
case "FLOAT" => Arbitrary.arbFloat.arbitrary.map(TableFieldValue(n, _))
case "BOOLEAN" => Arbitrary.arbBool.arbitrary.map(TableFieldValue(n, _))
case "STRING" => Arbitrary.arbString.arbitrary.map(TableFieldValue(n, _))
case "TIMESTAMP" =>
instantGen.map(i => TableFieldValue(n, timeStampFormatter.print(i) + " UTC"))
case "DATE" => instantGen.map(i => TableFieldValue(n, dateFormatter.print(i)))
case "TIME" => instantGen.map(i => TableFieldValue(n, timeFormatter.print(i)))
case "DATETIME" => instantGen.map(i => TableFieldValue(n, dateTimeFormatter.print(i)))
case "BYTES" =>
case "BYTES" =>
Gen
.listOf(Arbitrary.arbByte.arbitrary)
.map(i => ByteBuffer.wrap(i.toArray))
Expand Down