From dcfe23a0381da232eb427f8616abd8949fb9693e Mon Sep 17 00:00:00 2001 From: ningmingxiao Date: Thu, 30 Dec 2021 23:10:48 +0800 Subject: [PATCH] fix blockThreshold full bug Signed-off-by: ningmingxiao --- components/engine/pkg/ioutils/bytespipe.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components/engine/pkg/ioutils/bytespipe.go b/components/engine/pkg/ioutils/bytespipe.go index d4bbf3c9..e04a5bf5 100644 --- a/components/engine/pkg/ioutils/bytespipe.go +++ b/components/engine/pkg/ioutils/bytespipe.go @@ -34,6 +34,7 @@ type BytesPipe struct { buf []*fixedBuffer bufLen int closeErr error // error to return from next Read. set to nil if not closed. + readBlock bool // check read BytesPipe is Wait() or not } // NewBytesPipe creates new BytesPipe, initialized by specified slice. @@ -86,6 +87,9 @@ loop0: // make sure the buffer doesn't grow too big from this write for bp.bufLen >= blockThreshold { + if bp.readBlock { + bp.wait.Broadcast() + } bp.wait.Wait() if bp.closeErr != nil { continue loop0 @@ -131,7 +135,9 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) { bp.mu.Unlock() return 0, bp.closeErr } + bp.readBlock = true bp.wait.Wait() + bp.readBlock = false if bp.bufLen == 0 && bp.closeErr != nil { err := bp.closeErr bp.mu.Unlock() -- 2.23.0