diff --git a/src/core/exception.d b/src/core/exception.d index 35321183a0..c90a457df9 100644 --- a/src/core/exception.d +++ b/src/core/exception.d @@ -19,6 +19,19 @@ void __switch_errorT()(string file = __FILE__, size_t line = __LINE__) @trusted assert(0, "No appropriate switch clause found"); } + +/** + * Thrown on a configuration error. + */ +class ForkError : Error +{ + this( string file = __FILE__, size_t line = __LINE__, Throwable next = null ) @nogc nothrow pure @safe + { + super( "fork() failed", file, line, next ); + } +} + + version (D_BetterC) { // When compiling with -betterC we use template functions so if they are @@ -539,6 +552,22 @@ extern (C) void onInvalidMemoryOperationError(void* pretend_sideffect = null) @t throw staticError!InvalidMemoryOperationError(); } + +/** + * A callback for errors in the case of a failed fork in D. A $(LREF ForkError) will be thrown. + * + * Params: + * file = The name of the file that signaled this error. + * line = The line number on which this error occurred. + * + * Throws: + * $(LREF ConfigurationError). + */ +extern (C) void onForkError( string file = __FILE__, size_t line = __LINE__ ) @trusted pure nothrow @nogc +{ + throw staticError!ForkError( file, line, null ); +} + /** * A callback for unicode errors in D. A $(LREF UnicodeException) will be thrown. * diff --git a/src/core/gc/config.d b/src/core/gc/config.d index cf6ec189a4..429bd76442 100644 --- a/src/core/gc/config.d +++ b/src/core/gc/config.d @@ -15,6 +15,7 @@ __gshared Config config; struct Config { bool disable; // start disabled + bool fork = true; // optional concurrent behaviour ubyte profile; // enable profiling with summary when terminating program string gc = "conservative"; // select gc implementation conservative|precise|manual @@ -39,8 +40,9 @@ struct Config printf("GC options are specified as white space separated assignments: disable:0|1 - start disabled (%d) + fork:0|1 - set fork behaviour (%d) profile:0|1|2 - enable profiling with summary when terminating program (%d) - gc:".ptr, disable, profile); + gc:".ptr, disable, profile, fork); foreach (i, entry; registeredGCFactories) { if (i) printf("|"); diff --git a/src/gc/bits.d b/src/gc/bits.d index caf8416a32..ce6b200b52 100644 --- a/src/gc/bits.d +++ b/src/gc/bits.d @@ -13,6 +13,7 @@ */ module gc.bits; +import gc.os : os_mem_map, os_mem_unmap, HaveFork; import core.bitop; import core.stdc.string; @@ -38,24 +39,34 @@ struct GCBits wordtype* data; size_t nbits; - void Dtor() nothrow + void Dtor(bool share = false) nothrow @nogc { if (data) { - free(data); + static if (!HaveFork) + free(data); + else if (share) + os_mem_unmap(data, nwords * data[0].sizeof); + else + free(data); data = null; } } - void alloc(size_t nbits) nothrow + void alloc(size_t nbits, bool share = false) nothrow { this.nbits = nbits; - data = cast(typeof(data[0])*)calloc(nwords, data[0].sizeof); + static if (!HaveFork) + data = cast(typeof(data[0])*)calloc(nwords, data[0].sizeof); + else if (share) + data = cast(typeof(data[0])*)os_mem_map(nwords * data[0].sizeof, true); // Allocate as MAP_SHARED + else + data = cast(typeof(data[0])*)calloc(nwords, data[0].sizeof); if (!data) onOutOfMemoryError(); } - wordtype test(size_t i) const nothrow + wordtype test(size_t i) const nothrow @nogc in { assert(i < nbits); @@ -65,7 +76,7 @@ struct GCBits return core.bitop.bt(data, i); } - int set(size_t i) nothrow + int set(size_t i) nothrow @nogc in { assert(i < nbits); @@ -75,7 +86,7 @@ struct GCBits return core.bitop.bts(data, i); } - int clear(size_t i) nothrow + int clear(size_t i) nothrow @nogc in { assert(i <= nbits); @@ -432,12 +443,17 @@ struct GCBits testCopyRange(2, 3, 166); // failed with assert } - void zero() nothrow + void zero() nothrow @nogc { memset(data, 0, nwords * wordtype.sizeof); } - void copy(GCBits *f) nothrow + void setAll() nothrow @nogc + { + memset(data, 0xFF, nwords * wordtype.sizeof); + } + + void copy(GCBits *f) nothrow @nogc in { assert(nwords == f.nwords); @@ -447,7 +463,7 @@ struct GCBits memcpy(data, f.data, nwords * wordtype.sizeof); } - @property size_t nwords() const pure nothrow + @property size_t nwords() const pure nothrow @nogc { return (nbits + (BITS_PER_WORD - 1)) >> BITS_SHIFT; } diff --git a/src/gc/impl/conservative/gc.d b/src/gc/impl/conservative/gc.d index 19aa70de14..aab0948d15 100644 --- a/src/gc/impl/conservative/gc.d +++ b/src/gc/impl/conservative/gc.d @@ -34,6 +34,8 @@ module gc.impl.conservative.gc; /***************************************************/ version = COLLECT_PARALLEL; // parallel scanning +version (Posix) + version = COLLECT_FORK; import gc.bits; import gc.os; @@ -1000,6 +1002,7 @@ class ConservativeGC : GC /** * Do full garbage collection. + * The collection is done concurrently only if block is false. * Return number of pages free'd. */ size_t fullCollect() nothrow @@ -1010,7 +1013,7 @@ class ConservativeGC : GC // when collecting. static size_t go(Gcx* gcx) nothrow { - return gcx.fullcollect(); + return gcx.fullcollect(false, true); // standard stop the world } immutable result = runLocked!go(gcx); @@ -1037,7 +1040,7 @@ class ConservativeGC : GC // when collecting. static size_t go(Gcx* gcx) nothrow { - return gcx.fullcollect(true); + return gcx.fullcollect(true, true, true); // standard stop the world } runLocked!go(gcx); } @@ -1228,6 +1231,12 @@ struct Gcx auto rangesLock = shared(AlignedSpinLock)(SpinLock.Contention.brief); Treap!Root roots; Treap!Range ranges; + bool minimizeAfterNextCollection = false; + version (COLLECT_FORK) + { + private pid_t markProcPid = 0; + bool shouldFork; + } debug(INVARIANT) bool initialized; debug(INVARIANT) bool inCollection; @@ -1263,6 +1272,9 @@ struct Gcx mappedPages = 0; //printf("gcx = %p, self = %x\n", &this, self); debug(INVARIANT) initialized = true; + version (COLLECT_FORK) + shouldFork = config.fork; + } @@ -1372,6 +1384,14 @@ struct Gcx } } + @property bool collectInProgress() const nothrow + { + version (COLLECT_FORK) + return markProcPid != 0; + else + return false; + } + /** * @@ -1646,7 +1666,7 @@ struct Gcx if (!newPool(1, false)) { // out of memory => try to free some memory - fullcollect(); + fullcollect(false, true); // stop the world if (lowMem) minimize(); recoverNextPage(bin); @@ -1669,8 +1689,11 @@ struct Gcx // Return next item from free list bucket[bin] = (cast(List*)p).next; auto pool = (cast(List*)p).pool; + auto biti = (p - pool.baseAddr) >> pool.shiftBy; assert(pool.freebits.test(biti)); + if (collectInProgress) + pool.mark.set(biti); // be sure that the child is aware of the page being used pool.freebits.clear(biti); if (bits) pool.setBits(biti, bits); @@ -1733,14 +1756,14 @@ struct Gcx if (!tryAllocNewPool()) { // disabled but out of memory => try to free some memory - fullcollect(); - minimize(); + minimizeAfterNextCollection = true; + fullcollect(false, true); } } else { + minimizeAfterNextCollection = true; fullcollect(); - minimize(); } // If alloc didn't yet succeed retry now that we collected/minimized if (!pool && !tryAlloc() && !tryAllocNewPool()) @@ -1750,6 +1773,8 @@ struct Gcx assert(pool); debug(PRINTF) printFreeInfo(&pool.base); + if (collectInProgress) + pool.mark.set(pn); usedLargePages += npages; debug(PRINTF) printFreeInfo(&pool.base); @@ -1818,6 +1843,8 @@ struct Gcx if (pool) { pool.initialize(npages, isLargeObject); + if (collectInProgress) + pool.mark.setAll(); if (!pool.baseAddr || !pooltable.insert(pool)) { pool.Dtor(); @@ -2561,17 +2588,127 @@ struct Gcx return recoverPool[bin] = poolIndex < npools ? cast(SmallObjectPool*)pool : null; } + version (COLLECT_FORK) + void disableFork() nothrow + { + markProcPid = 0; + shouldFork = false; + } + + version (COLLECT_FORK) + ChildStatus collectFork(bool block) nothrow + { + typeof(return) rc = wait_pid(markProcPid, block); + final switch (rc) + { + case ChildStatus.done: + debug(COLLECT_PRINTF) printf("\t\tmark proc DONE (block=%d)\n", + cast(int) block); + markProcPid = 0; + // process GC marks then sweep + thread_suspendAll(); + thread_processGCMarks(&isMarked); + thread_resumeAll(); + break; + case ChildStatus.running: + debug(COLLECT_PRINTF) printf("\t\tmark proc RUNNING\n"); + if (!block) + break; + // Something went wrong, if block is true, wait() should never + // return RUNNING. + goto case ChildStatus.error; + case ChildStatus.error: + debug(COLLECT_PRINTF) printf("\t\tmark proc ERROR\n"); + // Try to keep going without forking + // and do the marking in this thread + break; + } + return rc; + } + + version (COLLECT_FORK) + ChildStatus markFork(bool nostack, bool block, bool doParallel) nothrow + { + // Forking is enabled, so we fork() and start a new concurrent mark phase + // in the child. If the collection should not block, the parent process + // tells the caller no memory could be recycled immediately (if this collection + // was triggered by an allocation, the caller should allocate more memory + // to fulfill the request). + // If the collection should block, the parent will wait for the mark phase + // to finish before returning control to the mutator, + // but other threads are restarted and may run in parallel with the mark phase + // (unless they allocate or use the GC themselves, in which case + // the global GC lock will stop them). + // fork now and sweep later + import core.stdc.stdlib : _Exit; + debug (PRINTF_TO_FILE){ + import core.stdc.stdio : fflush; + fflush(null); // avoid duplicated FILE* output + } + auto pid = fork(); + assert(pid != -1); + switch (pid) + { + case -1: // fork() failed, retry without forking + return ChildStatus.error; + case 0: // child process + if (doParallel) + markParallel(nostack); + else if (ConservativeGC.isPrecise) + markAll!markPrecise(nostack); + else + markAll!markConservative(nostack); + _Exit(0); + return ChildStatus.done; // bogus + default: // the parent + thread_resumeAll(); + if (!block) + { + markProcPid = pid; + return ChildStatus.running; + } + ChildStatus r = wait_pid(pid); // block until marking is done + if (r == ChildStatus.error) + { + thread_suspendAll(); + // there was an error + // do the marking in this thread + disableFork(); + if (doParallel) + markParallel(nostack); + else if (ConservativeGC.isPrecise) + markAll!markPrecise(nostack); + else + markAll!markConservative(nostack); + } else { + assert(r == ChildStatus.done); + assert(r != ChildStatus.running); + } + } + return ChildStatus.done; // waited for the child + } /** * Return number of full pages free'd. */ - size_t fullcollect(bool nostack = false) nothrow + size_t fullcollect(bool nostack = false, bool block = false, bool isFinal = false) nothrow { // It is possible that `fullcollect` will be called from a thread which // is not yet registered in runtime (because allocating `new Thread` is // part of `thread_attachThis` implementation). In that case it is // better not to try actually collecting anything + version (COLLECT_FORK) + bool doFork = shouldFork; + else + enum doFork = false; + + version (COLLECT_PARALLEL) + bool doParallel = config.parallel > 0; + else + enum doParallel = false; + + if (Thread.getThis() is null) return 0; @@ -2581,7 +2718,30 @@ struct Gcx debug(COLLECT_PRINTF) printf("Gcx.fullcollect()\n"); //printf("\tpool address range = %p .. %p\n", minAddr, maxAddr); + // If there is a mark process running, check if it already finished. + // If that is the case, we move to the sweep phase. + // If it's still running, either we block until the mark phase is + // done (and then sweep to finish the collection), or in case of error + // we redo the mark phase without forking. + if (doFork && collectInProgress) + { + version (COLLECT_FORK){ + ChildStatus rc = collectFork(block); + final switch (rc) + { + case ChildStatus.done: + break; + case ChildStatus.running: + return 0; + case ChildStatus.error: + disableFork(); + goto Lmark; + } + } + } + else { +Lmark: // lock roots and ranges around suspending threads b/c they're not reentrant safe rangesLock.lock(); rootsLock.lock(); @@ -2600,12 +2760,32 @@ struct Gcx prepTime += (stop - start); start = stop; - version (COLLECT_PARALLEL) - bool doParallel = config.parallel > 0; - else - enum doParallel = false; - - if (doParallel) + if (doFork) + { + version (COLLECT_FORK) + { + auto forkResult = markFork(nostack, block, doParallel); + final switch (forkResult) + { + case ChildStatus.error: + disableFork(); + goto Lmark; + case ChildStatus.running: + // update profiling informations + stop = currTime; + markTime += (stop - start); + Duration pause = stop - begin; + if (pause > maxPauseTime) + maxPauseTime = pause; + pauseTime += pause; + + return 0; + case ChildStatus.done: + break; + } + } + } + else if (doParallel) { version (COLLECT_PARALLEL) markParallel(nostack); @@ -2618,10 +2798,19 @@ struct Gcx markAll!markConservative(nostack); } + // if we get here, forking failed and a standard STW collection got issued + // threads were suspended again, restart them + if (doFork) + thread_suspendAll(); + thread_processGCMarks(&isMarked); thread_resumeAll(); } + // If we reach here, the child process has finished the marking phase + // or block == true and we are using standard stop the world collection. + // It is time to sweep + stop = currTime; markTime += (stop - start); Duration pause = stop - begin; @@ -2638,6 +2827,14 @@ struct Gcx ConservativeGC._inFinalizer = false; } + // minimize() should be called only after a call to fullcollect + // terminates with a sweep + if (minimizeAfterNextCollection || lowMem) + { + minimizeAfterNextCollection = false; + minimize(); + } + // init bucket lists bucket[] = null; foreach (Bins bin; 0..B_NUMSMALL) @@ -2653,7 +2850,8 @@ struct Gcx ++numCollections; updateCollectThresholds(); - + if (isFinal) + return fullcollect(true, true, false); return freedPages; } @@ -2989,7 +3187,10 @@ struct Pool topAddr = baseAddr + poolsize; auto nbits = cast(size_t)poolsize >> shiftBy; - mark.alloc(nbits); + version (COLLECT_FORK) + mark.alloc(nbits, config.fork); + else + mark.alloc(nbits); if (ConservativeGC.isPrecise) { if (isLargeObject) @@ -3078,7 +3279,7 @@ struct Pool bPageOffsets = null; } - mark.Dtor(); + mark.Dtor(config.fork); if (ConservativeGC.isPrecise) { if (isLargeObject) @@ -3823,6 +4024,7 @@ struct SmallObjectPool // Convert page to free list size_t size = binsize[bin]; void* p = baseAddr + pn * PAGESIZE; + auto first = cast(List*) p; // ensure 2 bytes blocks are available below ptop, one diff --git a/src/gc/os.d b/src/gc/os.d index baef98c17f..014ddf9d3f 100644 --- a/src/gc/os.d +++ b/src/gc/os.d @@ -49,6 +49,51 @@ else version (Posix) version (CRuntime_UClibc) import core.sys.linux.sys.mman : MAP_ANON; import core.stdc.stdlib; + + /// Possible results for the wait_pid() function. + enum ChildStatus + { + done, /// The process has finished successfully + running, /// The process is still running + error /// There was an error waiting for the process + } + + /** + * Wait for a process with PID pid to finish. + * + * If block is false, this function will not block, and return ChildStatus.running if + * the process is still running. Otherwise it will return always ChildStatus.done + * (unless there is an error, in which case ChildStatus.error is returned). + */ + ChildStatus wait_pid(pid_t pid, bool block = true) nothrow @nogc + { + import core.exception : onForkError; + + int status = void; + pid_t waited_pid = void; + // In the case where we are blocking, we need to consider signals + // arriving while we wait, and resume the waiting if EINTR is returned + do { + errno = 0; + waited_pid = waitpid(pid, &status, block ? 0 : WNOHANG); + } + while (waited_pid == -1 && errno == EINTR); + if (waited_pid == 0) + return ChildStatus.running; + else if (errno == ECHILD) + return ChildStatus.done; // someone called posix.syswait + else if (waited_pid != pid || status != 0) + { + onForkError(); + return ChildStatus.error; + } + return ChildStatus.done; + } + + public import core.sys.posix.unistd: pid_t, fork; + import core.sys.posix.sys.wait: waitpid, WNOHANG; + import core.stdc.errno: errno, EINTR, ECHILD; + //version = GC_Use_Alloc_MMap; } else @@ -72,10 +117,19 @@ else static assert(false, "No supported allocation methods available."); static if (is(typeof(VirtualAlloc))) // version (GC_Use_Alloc_Win32) { + /** + * Indicates if an implementation supports fork(). + * + * The value shown here is just demostrative, the real value is defined based + * on the OS it's being compiled in. + * enum HaveFork = true; + */ + enum HaveFork = false; + /** * Map memory. */ - void *os_mem_map(size_t nbytes) nothrow + void *os_mem_map(size_t nbytes) nothrow @nogc { return VirtualAlloc(null, nbytes, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE); @@ -88,35 +142,40 @@ static if (is(typeof(VirtualAlloc))) // version (GC_Use_Alloc_Win32) * 0 success * !=0 failure */ - int os_mem_unmap(void *base, size_t nbytes) nothrow + int os_mem_unmap(void *base, size_t nbytes) nothrow @nogc { return cast(int)(VirtualFree(base, 0, MEM_RELEASE) == 0); } } else static if (is(typeof(mmap))) // else version (GC_Use_Alloc_MMap) { - void *os_mem_map(size_t nbytes) nothrow + enum HaveFork = true; + + void *os_mem_map(size_t nbytes, bool share = false) nothrow @nogc { void *p; - p = mmap(null, nbytes, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); + auto map_f = share ? MAP_SHARED : MAP_PRIVATE; + p = mmap(null, nbytes, PROT_READ | PROT_WRITE, map_f | MAP_ANON, -1, 0); return (p == MAP_FAILED) ? null : p; } - int os_mem_unmap(void *base, size_t nbytes) nothrow + int os_mem_unmap(void *base, size_t nbytes) nothrow @nogc { return munmap(base, nbytes); } } else static if (is(typeof(valloc))) // else version (GC_Use_Alloc_Valloc) { - void *os_mem_map(size_t nbytes) nothrow + enum HaveFork = false; + + void *os_mem_map(size_t nbytes) nothrow @nogc { return valloc(nbytes); } - int os_mem_unmap(void *base, size_t nbytes) nothrow + int os_mem_unmap(void *base, size_t nbytes) nothrow @nogc { free(base); return 0; @@ -129,6 +188,7 @@ else static if (is(typeof(malloc))) // else version (GC_Use_Alloc_Malloc) // to PAGESIZE alignment, there will be space for a void* at the end // after PAGESIZE bytes used by the GC. + enum HaveFork = false; import gc.gc; @@ -136,7 +196,7 @@ else static if (is(typeof(malloc))) // else version (GC_Use_Alloc_Malloc) const size_t PAGE_MASK = PAGESIZE - 1; - void *os_mem_map(size_t nbytes) nothrow + void *os_mem_map(size_t nbytes) nothrow @nogc { byte *p, q; p = cast(byte *) malloc(nbytes + PAGESIZE); q = p + ((PAGESIZE - ((cast(size_t) p & PAGE_MASK))) & PAGE_MASK); @@ -145,7 +205,7 @@ else static if (is(typeof(malloc))) // else version (GC_Use_Alloc_Malloc) } - int os_mem_unmap(void *base, size_t nbytes) nothrow + int os_mem_unmap(void *base, size_t nbytes) nothrow @nogc { free( *cast(void**)( cast(byte*) base + nbytes ) ); return 0;