VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/github.com/DataDog/zstd/zstdmt_compress.c b/vendor/github.com/DataDog/zstd/zstdmt_compress.c
old mode 100755
new mode 100644
index c7a205d..38fbb90
--- a/vendor/github.com/DataDog/zstd/zstdmt_compress.c
+++ b/vendor/github.com/DataDog/zstd/zstdmt_compress.c
@@ -9,21 +9,20 @@
  */
 
 
-/* ======   Tuning parameters   ====== */
-#define ZSTDMT_NBWORKERS_MAX 200
-#define ZSTDMT_JOBSIZE_MAX  (MEM_32bits() ? (512 MB) : (2 GB))  /* note : limited by `jobSize` type, which is `unsigned` */
-#define ZSTDMT_OVERLAPLOG_DEFAULT 6
-
-
 /* ======   Compiler specifics   ====== */
 #if defined(_MSC_VER)
 #  pragma warning(disable : 4204)   /* disable: C4204: non-constant aggregate initializer */
 #endif
 
 
+/* ======   Constants   ====== */
+#define ZSTDMT_OVERLAPLOG_DEFAULT 0
+
+
 /* ======   Dependencies   ====== */
 #include <string.h>      /* memcpy, memset */
-#include <limits.h>      /* INT_MAX */
+#include <limits.h>      /* INT_MAX, UINT_MAX */
+#include "mem.h"         /* MEM_STATIC */
 #include "pool.h"        /* threadpool */
 #include "threading.h"   /* mutex */
 #include "zstd_compress_internal.h"  /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
@@ -37,18 +36,19 @@
 #define ZSTD_RESIZE_SEQPOOL 0
 
 /* ======   Debug   ====== */
-#if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
+#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
+    && !defined(_MSC_VER) \
+    && !defined(__MINGW32__)
 
 #  include <stdio.h>
 #  include <unistd.h>
 #  include <sys/times.h>
-#  define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); }
 
 #  define DEBUG_PRINTHEX(l,p,n) {            \
     unsigned debug_u;                        \
     for (debug_u=0; debug_u<(n); debug_u++)  \
-        DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
-    DEBUGLOGRAW(l, " \n");                   \
+        RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
+    RAWLOG(l, " \n");                        \
 }
 
 static unsigned long long GetCurrentClockTimeMicroseconds(void)
@@ -56,13 +56,13 @@
    static clock_t _ticksPerSecond = 0;
    if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
 
-   { struct tms junk; clock_t newTicks = (clock_t) times(&junk);
-     return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
-}
+   {   struct tms junk; clock_t newTicks = (clock_t) times(&junk);
+       return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
+}  }
 
 #define MUTEX_WAIT_TIME_DLEVEL 6
 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) {          \
-    if (ZSTD_DEBUG >= MUTEX_WAIT_TIME_DLEVEL) {   \
+    if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) {   \
         unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
         ZSTD_pthread_mutex_lock(mutex);           \
         {   unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
@@ -160,6 +160,25 @@
     ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
 }
 
+
+static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
+{
+    unsigned const maxNbBuffers = 2*nbWorkers + 3;
+    if (srcBufPool==NULL) return NULL;
+    if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
+        return srcBufPool;
+    /* need a larger buffer pool */
+    {   ZSTD_customMem const cMem = srcBufPool->cMem;
+        size_t const bSize = srcBufPool->bufferSize;   /* forward parameters */
+        ZSTDMT_bufferPool* newBufPool;
+        ZSTDMT_freeBufferPool(srcBufPool);
+        newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+        if (newBufPool==NULL) return newBufPool;
+        ZSTDMT_setBufferSize(newBufPool, bSize);
+        return newBufPool;
+    }
+}
+
 /** ZSTDMT_getBuffer() :
  *  assumption : bufPool must be valid
  * @return : a buffer, with start pointer and size
@@ -229,8 +248,8 @@
 /* store buffer for later re-use, up to pool capacity */
 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
 {
-    if (buf.start == NULL) return;   /* compatible with release on NULL */
     DEBUGLOG(5, "ZSTDMT_releaseBuffer");
+    if (buf.start == NULL) return;   /* compatible with release on NULL */
     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
     if (bufPool->nbBuffers < bufPool->totalBuffers) {
         bufPool->bTable[bufPool->nbBuffers++] = buf;  /* stored for later use */
@@ -300,7 +319,8 @@
 
 static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
 {
-    ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+    ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+    if (seqPool == NULL) return NULL;
     ZSTDMT_setNbSeq(seqPool, 0);
     return seqPool;
 }
@@ -310,6 +330,10 @@
     ZSTDMT_freeBufferPool(seqPool);
 }
 
+static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
+{
+    return ZSTDMT_expandBufferPool(pool, nbWorkers);
+}
 
 
 /* =====   CCtx Pool   ===== */
@@ -317,8 +341,8 @@
 
 typedef struct {
     ZSTD_pthread_mutex_t poolMutex;
-    unsigned totalCCtx;
-    unsigned availCCtx;
+    int totalCCtx;
+    int availCCtx;
     ZSTD_customMem cMem;
     ZSTD_CCtx* cctx[1];   /* variable size */
 } ZSTDMT_CCtxPool;
@@ -326,16 +350,16 @@
 /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
 static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
 {
-    unsigned u;
-    for (u=0; u<pool->totalCCtx; u++)
-        ZSTD_freeCCtx(pool->cctx[u]);  /* note : compatible with free on NULL */
+    int cid;
+    for (cid=0; cid<pool->totalCCtx; cid++)
+        ZSTD_freeCCtx(pool->cctx[cid]);  /* note : compatible with free on NULL */
     ZSTD_pthread_mutex_destroy(&pool->poolMutex);
     ZSTD_free(pool, pool->cMem);
 }
 
 /* ZSTDMT_createCCtxPool() :
  * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
-static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers,
+static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers,
                                               ZSTD_customMem cMem)
 {
     ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
@@ -355,6 +379,18 @@
     return cctxPool;
 }
 
+static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
+                                              int nbWorkers)
+{
+    if (srcPool==NULL) return NULL;
+    if (nbWorkers <= srcPool->totalCCtx) return srcPool;   /* good enough */
+    /* need a larger cctx pool */
+    {   ZSTD_customMem const cMem = srcPool->cMem;
+        ZSTDMT_freeCCtxPool(srcPool);
+        return ZSTDMT_createCCtxPool(nbWorkers, cMem);
+    }
+}
+
 /* only works during initialization phase, not during compression */
 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
 {
@@ -421,21 +457,20 @@
      * Must be acquired after the main mutex when acquiring both.
      */
     ZSTD_pthread_mutex_t ldmWindowMutex;
-    ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is udpated */
+    ZSTD_pthread_cond_t ldmWindowCond;  /* Signaled when ldmWindow is updated */
     ZSTD_window_t ldmWindow;  /* A thread-safe copy of ldmState.window */
 } serialState_t;
 
-static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
+static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize)
 {
     /* Adjust parameters */
     if (params.ldmParams.enableLdm) {
         DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
-        params.ldmParams.windowLog = params.cParams.windowLog;
         ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams);
         assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
-        assert(params.ldmParams.hashEveryLog < 32);
+        assert(params.ldmParams.hashRateLog < 32);
         serialState->ldmState.hashPower =
-                ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
+                ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength);
     } else {
         memset(&params.ldmParams, 0, sizeof(params.ldmParams));
     }
@@ -453,7 +488,7 @@
             serialState->params.ldmParams.hashLog -
             serialState->params.ldmParams.bucketSizeLog;
         /* Size the seq pool tables */
-        ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
+        ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));
         /* Reset the window */
         ZSTD_window_clear(&serialState->ldmState.window);
         serialState->ldmWindow = serialState->ldmState.window;
@@ -473,6 +508,7 @@
         memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
     }
     serialState->params = params;
+    serialState->params.jobSize = (U32)jobSize;
     return 0;
 }
 
@@ -505,6 +541,7 @@
     /* Wait for our turn */
     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
     while (serialState->nextJobID < jobID) {
+        DEBUGLOG(5, "wait for serialState->cond");
         ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
     }
     /* A future job may error and skip our job */
@@ -514,6 +551,7 @@
             size_t error;
             assert(seqStore.seq != NULL && seqStore.pos == 0 &&
                    seqStore.size == 0 && seqStore.capacity > 0);
+            assert(src.size <= serialState->params.jobSize);
             ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
             error = ZSTD_ldm_generateSequences(
                 &serialState->ldmState, &seqStore,
@@ -593,14 +631,32 @@
     unsigned frameChecksumNeeded;        /* used only by mtctx */
 } ZSTDMT_jobDescription;
 
+#define JOB_ERROR(e) {                          \
+    ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);   \
+    job->cSize = e;                             \
+    ZSTD_pthread_mutex_unlock(&job->job_mutex); \
+    goto _endJob;                               \
+}
+
 /* ZSTDMT_compressionJob() is a POOL_function type */
-void ZSTDMT_compressionJob(void* jobDescription)
+static void ZSTDMT_compressionJob(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
     ZSTD_CCtx_params jobParams = job->params;   /* do not modify job->params ! copy it, modify the copy */
     ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
     rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
     buffer_t dstBuff = job->dstBuff;
+    size_t lastCBlockSize = 0;
+
+    /* resources */
+    if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
+    if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
+        dstBuff = ZSTDMT_getBuffer(job->bufPool);
+        if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
+        job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
+    }
+    if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL)
+        JOB_ERROR(ERROR(memory_allocation));
 
     /* Don't compute the checksum for chunks, since we compute it externally,
      * but write it in the header.
@@ -609,47 +665,31 @@
     /* Don't run LDM for the chunks, since we handle it externally */
     jobParams.ldmParams.enableLdm = 0;
 
-    /* ressources */
-    if (cctx==NULL) {
-        job->cSize = ERROR(memory_allocation);
-        goto _endJob;
-    }
-    if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
-        dstBuff = ZSTDMT_getBuffer(job->bufPool);
-        if (dstBuff.start==NULL) {
-            job->cSize = ERROR(memory_allocation);
-            goto _endJob;
-        }
-        job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
-    }
 
     /* init */
     if (job->cdict) {
-        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, job->cdict, jobParams, job->fullFrameSize);
+        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
         assert(job->firstJob);  /* only allowed for first job */
-        if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
+        if (ZSTD_isError(initError)) JOB_ERROR(initError);
     } else {  /* srcStart points at reloaded section */
         U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
-        {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
-            if (ZSTD_isError(forceWindowError)) {
-                job->cSize = forceWindowError;
-                goto _endJob;
-        }   }
+        {   size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
+            if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
+        }
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
                                         job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
+                                        ZSTD_dtlm_fast,
                                         NULL, /*cdict*/
                                         jobParams, pledgedSrcSize);
-            if (ZSTD_isError(initError)) {
-                job->cSize = initError;
-                goto _endJob;
-    }   }   }
+            if (ZSTD_isError(initError)) JOB_ERROR(initError);
+    }   }
 
     /* Perform serial step as early as possible, but after CCtx initialization */
     ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
 
     if (!job->firstJob) {  /* flush and overwrite frame header when it's not first job */
         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
-        if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
+        if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
         DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
         ZSTD_invalidateRepCodes(cctx);
     }
@@ -667,7 +707,7 @@
         assert(job->cSize == 0);
         for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
-            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+            if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
             ip += chunkSize;
             op += cSize; assert(op < oend);
             /* stats */
@@ -680,18 +720,16 @@
             ZSTD_pthread_mutex_unlock(&job->job_mutex);
         }
         /* last block */
-        assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
+        assert(chunkSize > 0);
+        assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
         if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
             size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
             size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
             size_t const cSize = (job->lastJob) ?
                  ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
-            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
-            /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
-            job->cSize += cSize;
-            ZSTD_pthread_mutex_unlock(&job->job_mutex);
+            if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
+            lastCBlockSize = cSize;
     }   }
 
 _endJob:
@@ -704,7 +742,9 @@
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
     /* report */
     ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
-    job->consumed = job->src.size;
+    if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
+    job->cSize += lastCBlockSize;
+    job->consumed = job->src.size;  /* when job->consumed == job->src.size , compression job is presumed completed */
     ZSTD_pthread_cond_signal(&job->job_cond);
     ZSTD_pthread_mutex_unlock(&job->job_mutex);
 }
@@ -736,6 +776,14 @@
 
 static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};
 
+#define RSYNC_LENGTH 32
+
+typedef struct {
+  U64 hash;
+  U64 hitMask;
+  U64 primePower;
+} rsyncState_t;
+
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
     ZSTDMT_jobDescription* jobs;
@@ -745,10 +793,11 @@
     ZSTD_CCtx_params params;
     size_t targetSectionSize;
     size_t targetPrefixSize;
-    roundBuff_t roundBuff;
+    int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
     inBuff_t inBuff;
-    int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
+    roundBuff_t roundBuff;
     serialState_t serial;
+    rsyncState_t rsync;
     unsigned singleBlockingThread;
     unsigned jobIDMask;
     unsigned doneJobID;
@@ -798,18 +847,28 @@
     return jobTable;
 }
 
+static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
+    U32 nbJobs = nbWorkers + 2;
+    if (nbJobs > mtctx->jobIDMask+1) {  /* need more job capacity */
+        ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
+        mtctx->jobIDMask = 0;
+        mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
+        if (mtctx->jobs==NULL) return ERROR(memory_allocation);
+        assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0));  /* ensure nbJobs is a power of 2 */
+        mtctx->jobIDMask = nbJobs - 1;
+    }
+    return 0;
+}
+
+
 /* ZSTDMT_CCtxParam_setNbWorkers():
  * Internal use only */
 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
 {
-    if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
-    params->nbWorkers = nbWorkers;
-    params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
-    params->jobSize = 0;
-    return nbWorkers;
+    return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
 }
 
-ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem)
 {
     ZSTDMT_CCtx* mtctx;
     U32 nbJobs = nbWorkers + 2;
@@ -844,6 +903,17 @@
     return mtctx;
 }
 
+ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+{
+#ifdef ZSTD_MULTITHREAD
+    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem);
+#else
+    (void)nbWorkers;
+    (void)cMem;
+    return NULL;
+#endif
+}
+
 ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
 {
     return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
@@ -875,7 +945,7 @@
         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
         ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
-            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
+            DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
             ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
         }
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
@@ -914,38 +984,44 @@
 }
 
 /* Internal only */
-size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
-                                ZSTDMT_parameter parameter, unsigned value) {
+size_t
+ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
+                                   ZSTDMT_parameter parameter,
+                                   int value)
+{
     DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter");
     switch(parameter)
     {
     case ZSTDMT_p_jobSize :
-        DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value);
-        if ( (value > 0)  /* value==0 => automatic job size */
-           & (value < ZSTDMT_JOBSIZE_MIN) )
-            value = ZSTDMT_JOBSIZE_MIN;
-        params->jobSize = value;
-        return value;
-    case ZSTDMT_p_overlapSectionLog :
-        if (value > 9) value = 9;
-        DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
-        params->overlapSizeLog = (value >= 9) ? 9 : value;
-        return value;
+        DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value);
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value);
+    case ZSTDMT_p_overlapLog :
+        DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value);
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value);
+    case ZSTDMT_p_rsyncable :
+        DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value);
+        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value);
     default :
         return ERROR(parameter_unsupported);
     }
 }
 
-size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value)
+size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value)
 {
     DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
-    switch(parameter)
-    {
-    case ZSTDMT_p_jobSize :
-        return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
-    case ZSTDMT_p_overlapSectionLog :
-        return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
-    default :
+    return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
+}
+
+size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value)
+{
+    switch (parameter) {
+    case ZSTDMT_p_jobSize:
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value);
+    case ZSTDMT_p_overlapLog:
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value);
+    case ZSTDMT_p_rsyncable:
+        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value);
+    default:
         return ERROR(parameter_unsupported);
     }
 }
@@ -954,19 +1030,38 @@
  * initializing others to default values. */
 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
 {
-    ZSTD_CCtx_params jobParams;
-    memset(&jobParams, 0, sizeof(jobParams));
-
-    jobParams.cParams = params.cParams;
-    jobParams.fParams = params.fParams;
-    jobParams.compressionLevel = params.compressionLevel;
-    jobParams.disableLiteralCompression = params.disableLiteralCompression;
-
+    ZSTD_CCtx_params jobParams = params;
+    /* Clear parameters related to multithreading */
+    jobParams.forceWindow = 0;
+    jobParams.nbWorkers = 0;
+    jobParams.jobSize = 0;
+    jobParams.overlapLog = 0;
+    jobParams.rsyncable = 0;
+    memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t));
+    memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem));
     return jobParams;
 }
 
+
+/* ZSTDMT_resize() :
+ * @return : error code if fails, 0 on success */
+static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
+{
+    if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
+    FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
+    mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
+    if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
+    mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
+    if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
+    mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers);
+    if (mtctx->seqPool == NULL) return ERROR(memory_allocation);
+    ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
+    return 0;
+}
+
+
 /*! ZSTDMT_updateCParams_whileCompressing() :
- *  Updates only a selected set of compression parameters, to remain compatible with current frame.
+ *  Updates a selected set of compression parameters, remaining compatible with currently active frame.
  *  New parameters will be applied to next compression job. */
 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
 {
@@ -981,38 +1076,36 @@
     }
 }
 
-/* ZSTDMT_getNbWorkers():
- * @return nb threads currently active in mtctx.
- * mtctx must be valid */
-unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
-{
-    assert(mtctx != NULL);
-    return mtctx->params.nbWorkers;
-}
-
 /* ZSTDMT_getFrameProgression():
  * tells how much data has been consumed (input) and produced (output) for current frame.
  * able to count progression inside worker threads.
- * Note : mutex will be acquired during statistics collection. */
+ * Note : mutex will be acquired during statistics collection inside workers. */
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fps;
-    DEBUGLOG(6, "ZSTDMT_getFrameProgression");
-    fps.consumed = mtctx->consumed;
-    fps.produced = mtctx->produced;
+    DEBUGLOG(5, "ZSTDMT_getFrameProgression");
     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
+    fps.consumed = mtctx->consumed;
+    fps.produced = fps.flushed = mtctx->produced;
+    fps.currentJobID = mtctx->nextJobID;
+    fps.nbActiveWorkers = 0;
     {   unsigned jobNb;
         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
                     mtctx->doneJobID, lastJobNb, mtctx->jobReady)
         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
-            ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
-            {   size_t const cResult = mtctx->jobs[wJobID].cSize;
+            ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
+            ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+            {   size_t const cResult = jobPtr->cSize;
                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
-                fps.consumed += mtctx->jobs[wJobID].consumed;
-                fps.ingested += mtctx->jobs[wJobID].src.size;
+                size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
+                assert(flushed <= produced);
+                fps.ingested += jobPtr->src.size;
+                fps.consumed += jobPtr->consumed;
                 fps.produced += produced;
+                fps.flushed  += flushed;
+                fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
             }
             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         }
@@ -1021,26 +1114,98 @@
 }
 
 
+size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
+{
+    size_t toFlush;
+    unsigned const jobID = mtctx->doneJobID;
+    assert(jobID <= mtctx->nextJobID);
+    if (jobID == mtctx->nextJobID) return 0;   /* no active job => nothing to flush */
+
+    /* look into oldest non-fully-flushed job */
+    {   unsigned const wJobID = jobID & mtctx->jobIDMask;
+        ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
+        ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+        {   size_t const cResult = jobPtr->cSize;
+            size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
+            size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
+            assert(flushed <= produced);
+            toFlush = produced - flushed;
+            if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
+                /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
+                assert(jobPtr->consumed < jobPtr->src.size);
+            }
+        }
+        ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+    }
+
+    return toFlush;
+}
+
+
 /* ------------------------------------------ */
 /* =====   Multi-threaded compression   ===== */
 /* ------------------------------------------ */
 
-static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
+static unsigned ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
 {
     if (params.ldmParams.enableLdm)
+        /* In Long Range Mode, the windowLog is typically oversized.
+         * In which case, it's preferable to determine the jobSize
+         * based on chainLog instead. */
         return MAX(21, params.cParams.chainLog + 4);
     return MAX(20, params.cParams.windowLog + 2);
 }
 
-static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params)
+static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)
 {
-    unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
-    if (params.ldmParams.enableLdm)
-        return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog);
-    return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog);
+    switch(strat)
+    {
+        case ZSTD_btultra2:
+            return 9;
+        case ZSTD_btultra:
+        case ZSTD_btopt:
+            return 8;
+        case ZSTD_btlazy2:
+        case ZSTD_lazy2:
+            return 7;
+        case ZSTD_lazy:
+        case ZSTD_greedy:
+        case ZSTD_dfast:
+        case ZSTD_fast:
+        default:;
+    }
+    return 6;
 }
 
-static unsigned ZSTDMT_computeNbJobs(ZSTD_CCtx_params params, size_t srcSize, unsigned nbWorkers) {
+static int ZSTDMT_overlapLog(int ovlog, ZSTD_strategy strat)
+{
+    assert(0 <= ovlog && ovlog <= 9);
+    if (ovlog == 0) return ZSTDMT_overlapLog_default(strat);
+    return ovlog;
+}
+
+static size_t ZSTDMT_computeOverlapSize(ZSTD_CCtx_params const params)
+{
+    int const overlapRLog = 9 - ZSTDMT_overlapLog(params.overlapLog, params.cParams.strategy);
+    int ovLog = (overlapRLog >= 8) ? 0 : (params.cParams.windowLog - overlapRLog);
+    assert(0 <= overlapRLog && overlapRLog <= 8);
+    if (params.ldmParams.enableLdm) {
+        /* In Long Range Mode, the windowLog is typically oversized.
+         * In which case, it's preferable to determine the jobSize
+         * based on chainLog instead.
+         * Then, ovLog becomes a fraction of the jobSize, rather than windowSize */
+        ovLog = MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
+                - overlapRLog;
+    }
+    assert(0 <= ovLog && ovLog <= 30);
+    DEBUGLOG(4, "overlapLog : %i", params.overlapLog);
+    DEBUGLOG(4, "overlap size : %i", 1 << ovLog);
+    return (ovLog==0) ? 0 : (size_t)1 << ovLog;
+}
+
+static unsigned
+ZSTDMT_computeNbJobs(ZSTD_CCtx_params params, size_t srcSize, unsigned nbWorkers)
+{
     assert(nbWorkers>0);
     {   size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params);
         size_t const jobMaxSize = jobSizeTarget << 2;
@@ -1063,7 +1228,7 @@
                 ZSTD_CCtx_params params)
 {
     ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
-    size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
+    size_t const overlapSize = ZSTDMT_computeOverlapSize(params);
     unsigned const nbJobs = ZSTDMT_computeNbJobs(params, srcSize, params.nbWorkers);
     size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
     size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize;   /* avoid too small last block */
@@ -1087,18 +1252,10 @@
 
     assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
-    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
         return ERROR(memory_allocation);
 
-    if (nbJobs > mtctx->jobIDMask+1) {  /* enlarge job table */
-        U32 jobsTableSize = nbJobs;
-        ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
-        mtctx->jobIDMask = 0;
-        mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem);
-        if (mtctx->jobs==NULL) return ERROR(memory_allocation);
-        assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0));  /* ensure jobsTableSize is a power of 2 */
-        mtctx->jobIDMask = jobsTableSize - 1;
-    }
+    FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
 
     {   unsigned u;
         for (u=0; u<nbJobs; u++) {
@@ -1182,16 +1339,17 @@
 }
 
 size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
-                               void* dst, size_t dstCapacity,
-                         const void* src, size_t srcSize,
-                         const ZSTD_CDict* cdict,
-                               ZSTD_parameters params,
-                               unsigned overlapLog)
+                                void* dst, size_t dstCapacity,
+                          const void* src, size_t srcSize,
+                          const ZSTD_CDict* cdict,
+                                ZSTD_parameters params,
+                                int overlapLog)
 {
     ZSTD_CCtx_params cctxParams = mtctx->params;
     cctxParams.cParams = params.cParams;
     cctxParams.fParams = params.fParams;
-    cctxParams.overlapSizeLog = overlapLog;
+    assert(ZSTD_OVERLAPLOG_MIN <= overlapLog && overlapLog <= ZSTD_OVERLAPLOG_MAX);
+    cctxParams.overlapLog = overlapLog;
     return ZSTDMT_compress_advanced_internal(mtctx,
                                              dst, dstCapacity,
                                              src, srcSize,
@@ -1204,8 +1362,8 @@
                      const void* src, size_t srcSize,
                            int compressionLevel)
 {
-    U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 9 : ZSTDMT_OVERLAPLOG_DEFAULT;
     ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
+    int const overlapLog = ZSTDMT_overlapLog_default(params.cParams.strategy);
     params.fParams.contentSizeFlag = 1;
     return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog);
 }
@@ -1221,18 +1379,19 @@
         const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
         unsigned long long pledgedSrcSize)
 {
-    DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u, disableLiteralCompression=%i)",
-                (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx, params.disableLiteralCompression);
-    /* params are supposed to be fully validated at this point */
+    DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
+                (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
+
+    /* params supposed partially fully validated at this point */
     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
-    assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
 
     /* init */
-    if (params.jobSize == 0) {
-        params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
-    }
-    if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
+    if (params.nbWorkers != mtctx->params.nbWorkers)
+        FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) );
+
+    if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
+    if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
 
     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
     if (mtctx->singleBlockingThread) {
@@ -1267,12 +1426,24 @@
         mtctx->cdict = cdict;
     }
 
-    mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
-    DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
+    mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(params);
+    DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
     mtctx->targetSectionSize = params.jobSize;
-    if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
+    if (mtctx->targetSectionSize == 0) {
+        mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
+    }
+    if (params.rsyncable) {
+        /* Aim for the targetsectionSize as the average job size. */
+        U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
+        U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20;
+        assert(jobSizeMB >= 1);
+        DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
+        mtctx->rsync.hash = 0;
+        mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
+        mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
+    }
     if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
-    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
+    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);
     DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
     {
@@ -1312,7 +1483,7 @@
     mtctx->allJobsCompleted = 0;
     mtctx->consumed = 0;
     mtctx->produced = 0;
-    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize))
         return ERROR(memory_allocation);
     return 0;
 }
@@ -1368,7 +1539,7 @@
 /* ZSTDMT_writeLastEmptyBlock()
  * Write a single empty block with an end-of-frame to finish a frame.
  * Job must be created from streaming variant.
- * This function is always successfull if expected conditions are fulfilled.
+ * This function is always successful if expected conditions are fulfilled.
  */
 static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
 {
@@ -1420,7 +1591,7 @@
         mtctx->jobs[jobID].jobID = mtctx->nextJobID;
         mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
         mtctx->jobs[jobID].lastJob = endFrame;
-        mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
+        mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
         mtctx->jobs[jobID].dstFlushed = 0;
 
         /* Update the round buffer pos and clear the input buffer to be reset */
@@ -1468,6 +1639,8 @@
 
 
 /*! ZSTDMT_flushProduced() :
+ *  flush whatever data has been produced but not yet flushed in current job.
+ *  move to next job if current one is fully flushed.
  * `output` : `pos` will be updated with amount of data flushed .
  * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
  * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
@@ -1496,7 +1669,7 @@
     /* try to flush something */
     {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
         size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
-        size_t const srcSize = mtctx->jobs[wJobID].src.size;        /* read-only, could be done after mutex lock, but no-declaration-after-statement */
+        size_t const srcSize = mtctx->jobs[wJobID].src.size;       /* read-only, could be done after mutex lock, but no-declaration-after-statement */
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         if (ZSTD_isError(cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
@@ -1516,6 +1689,7 @@
             mtctx->jobs[wJobID].cSize += 4;  /* can write this shared value, as worker is no longer active */
             mtctx->jobs[wJobID].frameChecksumNeeded = 0;
         }
+
         if (cSize > 0) {   /* compression is ongoing or completed */
             size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
             DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
@@ -1529,11 +1703,12 @@
             output->pos += toFlush;
             mtctx->jobs[wJobID].dstFlushed += toFlush;  /* can write : this value is only used by mtctx */
 
-            if ( (srcConsumed == srcSize)    /* job completed */
+            if ( (srcConsumed == srcSize)    /* job is completed */
               && (mtctx->jobs[wJobID].dstFlushed == cSize) ) {   /* output buffer fully flushed => free this job position */
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
+                DEBUGLOG(5, "dstBuffer released");
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
                 mtctx->consumed += srcSize;
@@ -1610,6 +1785,7 @@
     range_t extDict;
     range_t prefix;
 
+    DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
     extDict.start = window.dictBase + window.lowLimit;
     extDict.size = window.dictLimit - window.lowLimit;
 
@@ -1630,12 +1806,13 @@
 {
     if (mtctx->params.ldmParams.enableLdm) {
         ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
+        DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
         DEBUGLOG(5, "source  [0x%zx, 0x%zx)",
                     (size_t)buffer.start,
                     (size_t)buffer.start + buffer.capacity);
         ZSTD_PTHREAD_MUTEX_LOCK(mutex);
         while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
-            DEBUGLOG(6, "Waiting for LDM to finish...");
+            DEBUGLOG(5, "Waiting for LDM to finish...");
             ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
         }
         DEBUGLOG(6, "Done waiting for LDM to finish");
@@ -1655,6 +1832,7 @@
     size_t const target = mtctx->targetSectionSize;
     buffer_t buffer;
 
+    DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
     assert(mtctx->inBuff.buffer.start == NULL);
     assert(mtctx->roundBuff.capacity >= target);
 
@@ -1668,7 +1846,7 @@
         buffer.start = start;
         buffer.capacity = prefixSize;
         if (ZSTDMT_isOverlapped(buffer, inUse)) {
-            DEBUGLOG(6, "Waiting for buffer...");
+            DEBUGLOG(5, "Waiting for buffer...");
             return 0;
         }
         ZSTDMT_waitForLdmComplete(mtctx, buffer);
@@ -1680,7 +1858,7 @@
     buffer.capacity = target;
 
     if (ZSTDMT_isOverlapped(buffer, inUse)) {
-        DEBUGLOG(6, "Waiting for buffer...");
+        DEBUGLOG(5, "Waiting for buffer...");
         return 0;
     }
     assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
@@ -1701,6 +1879,89 @@
     return 1;
 }
 
+typedef struct {
+  size_t toLoad;  /* The number of bytes to load from the input. */
+  int flush;      /* Boolean declaring if we must flush because we found a synchronization point. */
+} syncPoint_t;
+
+/**
+ * Searches through the input for a synchronization point. If one is found, we
+ * will instruct the caller to flush, and return the number of bytes to load.
+ * Otherwise, we will load as many bytes as possible and instruct the caller
+ * to continue as normal.
+ */
+static syncPoint_t
+findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
+{
+    BYTE const* const istart = (BYTE const*)input.src + input.pos;
+    U64 const primePower = mtctx->rsync.primePower;
+    U64 const hitMask = mtctx->rsync.hitMask;
+
+    syncPoint_t syncPoint;
+    U64 hash;
+    BYTE const* prev;
+    size_t pos;
+
+    syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
+    syncPoint.flush = 0;
+    if (!mtctx->params.rsyncable)
+        /* Rsync is disabled. */
+        return syncPoint;
+    if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH)
+        /* Not enough to compute the hash.
+         * We will miss any synchronization points in this RSYNC_LENGTH byte
+         * window. However, since it depends only in the internal buffers, if the
+         * state is already synchronized, we will remain synchronized.
+         * Additionally, the probability that we miss a synchronization point is
+         * low: RSYNC_LENGTH / targetSectionSize.
+         */
+        return syncPoint;
+    /* Initialize the loop variables. */
+    if (mtctx->inBuff.filled >= RSYNC_LENGTH) {
+        /* We have enough bytes buffered to initialize the hash.
+         * Start scanning at the beginning of the input.
+         */
+        pos = 0;
+        prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;
+        hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);
+    } else {
+        /* We don't have enough bytes buffered to initialize the hash, but
+         * we know we have at least RSYNC_LENGTH bytes total.
+         * Start scanning after the first RSYNC_LENGTH bytes less the bytes
+         * already buffered.
+         */
+        pos = RSYNC_LENGTH - mtctx->inBuff.filled;
+        prev = (BYTE const*)mtctx->inBuff.buffer.start - pos;
+        hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled);
+        hash = ZSTD_rollingHash_append(hash, istart, pos);
+    }
+    /* Starting with the hash of the previous RSYNC_LENGTH bytes, roll
+     * through the input. If we hit a synchronization point, then cut the
+     * job off, and tell the compressor to flush the job. Otherwise, load
+     * all the bytes and continue as normal.
+     * If we go too long without a synchronization point (targetSectionSize)
+     * then a block will be emitted anyways, but this is okay, since if we
+     * are already synchronized we will remain synchronized.
+     */
+    for (; pos < syncPoint.toLoad; ++pos) {
+        BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH];
+        /* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */
+        hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower);
+        if ((hash & hitMask) == hitMask) {
+            syncPoint.toLoad = pos + 1;
+            syncPoint.flush = 1;
+            break;
+        }
+    }
+    return syncPoint;
+}
+
+size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
+{
+    size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
+    if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
+    return hintInSize;
+}
 
 /** ZSTDMT_compressStream_generic() :
  *  internal use only - exposed to be invoked from zstd_compress.c
@@ -1718,7 +1979,7 @@
     assert(input->pos  <= input->size);
 
     if (mtctx->singleBlockingThread) {  /* delegate to single-thread (synchronous) */
-        return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
+        return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp);
     }
 
     if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
@@ -1727,7 +1988,8 @@
     }
 
     /* single-pass shortcut (note : synchronous-mode) */
-    if ( (mtctx->nextJobID == 0)      /* just started */
+    if ( (!mtctx->params.rsyncable)   /* rsyncable mode is disabled */
+      && (mtctx->nextJobID == 0)      /* just started */
       && (mtctx->inBuff.filled == 0)  /* nothing buffered */
       && (!mtctx->jobReady)           /* no job already created */
       && (endOp == ZSTD_e_end)        /* end order */
@@ -1753,18 +2015,23 @@
                 /* It is only possible for this operation to fail if there are
                  * still compression jobs ongoing.
                  */
+                DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");
                 assert(mtctx->doneJobID != mtctx->nextJobID);
-            }
+            } else
+                DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
         }
         if (mtctx->inBuff.buffer.start != NULL) {
-            size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
+            syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input);
+            if (syncPoint.flush && endOp == ZSTD_e_continue) {
+                endOp = ZSTD_e_flush;
+            }
             assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
             DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
-                        (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
-            memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
-            input->pos += toLoad;
-            mtctx->inBuff.filled += toLoad;
-            forwardInputProgress = toLoad>0;
+                        (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
+            memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);
+            input->pos += syncPoint.toLoad;
+            mtctx->inBuff.filled += syncPoint.toLoad;
+            forwardInputProgress = syncPoint.toLoad>0;
         }
         if ((input->pos < input->size) && (endOp == ZSTD_e_end))
             endOp = ZSTD_e_flush;   /* can't end now : not all input consumed */
@@ -1776,12 +2043,13 @@
       || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) {   /* must finish the frame with a zero-size block */
         size_t const jobSize = mtctx->inBuff.filled;
         assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
-        CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
+        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
     }
 
     /* check for potential compressed data ready to be flushed */
     {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
+        DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
         return remainingToFlush;
     }
 }
@@ -1789,7 +2057,7 @@
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
-    CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
+    FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
 
     /* recommended next input size : fill current input buffer */
     return mtctx->targetSectionSize - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@@ -1806,7 +2074,7 @@
       || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) {  /* need a last 0-size block to end frame */
            DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
                         (U32)srcSize, (U32)endFrame);
-        CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
+        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
     }
 
     /* check if there is any data available to flush */