Spark Q&A : Kryo serialization failed: Buffer overflow
15595 ワード
Q1 . Spark運転Jobエラー
A 1:エラーメッセージに基づいて逆コード解析
エラーを報告する位置は、
ここでは詳細なログがないので、どの新聞の間違いを見ることができます.そこで、この方法に関連する方法をフォローして見ました.私の理解は
同様に、
最終呼び出し
問題の鍵は、
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 2, required: 4.
To avoid this, increase spark.kryoserializer.buffer.max value
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:299)
...
A 1:エラーメッセージに基づいて逆コード解析
org.apache.spark.serializer.KryoSerializerInstance
でserialize
メソッドソースコードを表示override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
val kryo = borrowKryo()
try {
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.")
} finally {
releaseKryo(kryo)
}
ByteBuffer.wrap(output.toBytes)
}
エラーを報告する位置は、
try-catch
ブロックに関連するwriteClassAndObject
メソッドであり、フォローを続けます.public void writeClassAndObject (Output output, Object object) {
if (output == null) throw new IllegalArgumentException("output cannot be null.");
beginObject();
try {
if (object == null) {
writeClass(output, null);
return;
}
Registration registration = writeClass(output, object.getClass());
if (references && writeReferenceOrNull(output, object, false)) return;
if (TRACE || (DEBUG && depth == 1)) log("Write", object);
registration.getSerializer().write(this, output, object);
} finally {
if (--depth == 0 && autoReset) reset();
}
}
ここでは詳細なログがないので、どの新聞の間違いを見ることができます.そこで、この方法に関連する方法をフォローして見ました.私の理解は
writeReferenceOrNull
新聞の間違いで、この方法にフォローしました.boolean writeReferenceOrNull (Output output, Object object, boolean mayBeNull) {
if (object == null) {
if (TRACE || (DEBUG && depth == 1)) log("Write", null);
output.writeByte(Kryo.NULL);
return true;
}
if (!referenceResolver.useReferences(object.getClass())) {
if (mayBeNull) output.writeByte(Kryo.NOT_NULL);
return false;
}
// Determine if this object has already been seen in this object graph.
int id = referenceResolver.getWrittenId(object);
// If not the first time encountered, only write reference ID.
if (id != -1) {
if (DEBUG) debug("kryo", "Write object reference " + id + ": " + string(object));
output.writeInt(id + 2, true); // + 2 because 0 and 1 are used for NULL and NOT_NULL. // Q!
return true;
}
// Otherwise write NOT_NULL and then the object bytes.
id = referenceResolver.addWrittenObject(object);
output.writeByte(NOT_NULL);
if (TRACE) trace("kryo", "Write initial object reference " + id + ": " + string(object));
return false;
}
同様に、
writeInt
メソッドにフォローします.public int writeInt (int value, boolean optimizePositive) throws KryoException {
if (!optimizePositive) value = (value << 1) ^ (value >> 31);
if (value >>> 7 == 0) {
require(1);
buffer[position++] = (byte)value;
return 1;
}
if (value >>> 14 == 0) {
require(2);
buffer[position++] = (byte)((value & 0x7F) | 0x80);
buffer[position++] = (byte)(value >>> 7);
return 2;
}
if (value >>> 21 == 0) {
require(3);
buffer[position++] = (byte)((value & 0x7F) | 0x80);
buffer[position++] = (byte)(value >>> 7 | 0x80);
buffer[position++] = (byte)(value >>> 14);
return 3;
}
if (value >>> 28 == 0) {
require(4);
buffer[position++] = (byte)((value & 0x7F) | 0x80);
buffer[position++] = (byte)(value >>> 7 | 0x80);
buffer[position++] = (byte)(value >>> 14 | 0x80);
buffer[position++] = (byte)(value >>> 21);
return 4;
}
require(5);
buffer[position++] = (byte)((value & 0x7F) | 0x80);
buffer[position++] = (byte)(value >>> 7 | 0x80);
buffer[position++] = (byte)(value >>> 14 | 0x80);
buffer[position++] = (byte)(value >>> 21 | 0x80);
buffer[position++] = (byte)(value >>> 28);
return 5;
}
最終呼び出し
com.esotericsoftware.kryo.io
のrequire
メソッド:private boolean require(int required) throws KryoException {
if(this.capacity - this.position >= required) {
return false;
} else if(required > this.maxCapacity) {
throw new KryoException("Buffer overflow. Max capacity: " + this.maxCapacity + ", required: " + required);
} else {
this.flush();
while(this.capacity - this.position < required) {
if(this.capacity == this.maxCapacity) {
throw new KryoException("Buffer overflow. Available: " + (this.capacity - this.position) + ", required: " + required);
}
this.capacity = Math.min(this.capacity * 2, this.maxCapacity);
if(this.capacity < 0) {
this.capacity = this.maxCapacity;
}
byte[] newBuffer = new byte[this.capacity];
System.arraycopy(this.buffer, 0, newBuffer, 0, this.position);
this.buffer = newBuffer;
}
return true;
}
}
問題の鍵は、
output
がid
に書き込む場合、id+2
の値が大きい(value >>> 28 == 0
)ため、4つのbyte
コードを申請する必要があることから分かるように、maxCapacity
の値はid
の上限値であり、それを超えるとエラーが報告される.maxCapacity
は、以下の論理(以下、上に説明する)によって設定される.private lazy val output = ks.newKryoOutput() \\ KryoOutput
def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) \\ maxBufferSize
private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt \\ buffer
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt \\ buffer
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
spark.kryoserializer.buffer.max
は対を設けていないと結論する.最大2048mb
とすることができる.