JaffarPlus
High-performance best-first search optimizer for tool-assisted speedruns
Loading...
Searching...
No Matches
stateDb.hpp
Go to the documentation of this file.
1#pragma once
2
10#include "numa.hpp"
11#include <cstdlib>
12#include <jaffarCommon/bitwise.hpp>
13#include <jaffarCommon/concurrent.hpp>
14#include <jaffarCommon/deserializers/contiguous.hpp>
15#include <jaffarCommon/json.hpp>
16#include <jaffarCommon/logger.hpp>
17#include <jaffarCommon/parallel.hpp>
18#include <jaffarCommon/serializers/contiguous.hpp>
19#include <memory>
20#include <utmpx.h>
21
23#define _JAFFAR_STATE_PADDING_BYTES 16
24
25namespace jaffarPlus
26{
27
43{
44public:
53 StateDb(Runner& r, const nlohmann::json& config) : _runner(&r)
54 {
55 // Initialization message
56 jaffarCommon::logger::log("[J+] Initializing State Database...\n");
57
59
60 // Checking maximum db sizes per each numa group
61 _maxSizeMb = jaffarCommon::json::getNumber<size_t>(config, "Max Size (Mb)");
62
63 // Overriding if provided
64 if (auto* value = std::getenv("JAFFAR_ENGINE_OVERRIDE_MAX_STATEDB_SIZE_MB")) _maxSizeMb = std::stoul(value);
65 }
66
68 __INLINE__ size_t getMaxBudgetBytes() const { return _maxSizeMb * 1024UL * 1024UL; }
69
77 {
78 // Free the per-NUMA queues allocated with `new` in initialize() (one per delegate domain; the
79 // other entries are nullptr, which delete safely ignores).
80 for (auto* queue : _numaCurrentStateQueues) delete queue;
81 for (auto* queue : _numaNextStateQueues) delete queue;
82 }
83
99 __INLINE__ void initialize()
100 {
101 // Getting game state size. The hot slot holds only the game+emulator state the search hashes and
102 // advances each step; the per-state path (input history + step counter) goes to a parallel cold slab.
103 _ih = _runner->getInputHistory(); // reference strategy for per-slot manager ops
106 // Standalone buffers hold the FULL self-contained serialize ([hot]+[full history]). In trie mode the
107 // cold slot's history (a node id) is smaller than the full history (the reconstructed bit-packed
108 // buffer), so the full size is the runner's own getStateSize(), not _stateSizeRaw + _histSize.
110
111 // States are stored raw (uncompressed). We align each state to a padding boundary to favor
112 // vectorized access and prevent false sharing.
114
115 // Padding state size to enforce alignment and prevent false sharing as much as possible
116 const size_t baseStateSize = _stateSize;
118
119 // Padding is the difference between the aligned state size and the raw one
120 _stateSizePadding = _stateSize - baseStateSize;
121
122 // Allocating/zeroing the per-thread statistics counters. These were previously global atomics
123 // bumped on every pop/get-free; at high thread count that single-cache-line ping-pong cost more
124 // than the work it measured. They are now plain per-thread counters (indexed by the dense OpenMP
125 // thread id) reduced only when printInfo() is called.
126 _statCounters.assign(_threadCount, statCounters_t{});
127
128 // Allocating/zeroing the per-thread free-slot caches (empty at start)
129 _freeStateCache.assign(_threadCount, freeStateCache_t{});
130
131 // Splitting state database equally among numa domains
132 size_t stateDbSizePerNUMA = std::ceil((_maxSizeMb * 1024 * 1024) / _numaCount);
133 for (int i = 0; i < _numaCount; i++) _maxSizePerNuma.push_back(stateDbSizePerNUMA);
134
135 // Getting maximum allocatable memory in each NUMA domain
136 std::vector<size_t> maxFreeMemoryPerNuma(_numaCount);
137 for (int i = 0; i < _numaCount; i++)
138 {
139 size_t freeMemory = 0;
140 numa_node_size64(i, (long long*)&freeMemory);
141 maxFreeMemoryPerNuma[i] = freeMemory;
142 }
143
144 // Checking if this is enough memory to satisfy requirement
145 for (int i = 0; i < _numaCount; i++)
146 if (_maxSizePerNuma[i] > (size_t)maxFreeMemoryPerNuma[i])
147 JAFFAR_THROW_RUNTIME("The requested memory (%lu) for NUMA domain %d exceeds its available free space (%lu)\n", _maxSizePerNuma[i], i, maxFreeMemoryPerNuma[i]);
148
149 // Getting maximum number of states for each NUMA domain
150 _maxStatesPerNuma.resize(_numaCount);
151 _currentStatesPerNuma.resize(_numaCount);
152 // The per-state memory footprint is the padded hot slot plus its cold history slot, so the budget
153 // covers both slabs and the state count stays ~unchanged versus storing them together.
154 for (int i = 0; i < _numaCount; i++) _maxStatesPerNuma[i] = std::max(_maxSizePerNuma[i] / (_stateSize + _histSize), 1ul);
155
156 // Getting totals for statistics
157 _maxSize = 0;
158 _maxStates = 0;
159 for (int i = 0; i < _numaCount; i++)
160 {
163 }
164
165 // Allocating state databases in the correponding NUMA domains
166 _numaCurrentStateQueues.resize(_numaCount);
167 _numaNextStateQueues.resize(_numaCount);
168 // The current-state queue is a DrainBuffer: it is filled single-threaded in advanceStep() and
169 // then drained concurrently from both ends by the workers, never both at once (the step barrier
170 // separates the phases). Its capacity is the NUMA domain's full state count -- it can never hold
171 // more current states than that. The claim counters are 32-bit, so guard the capacity.
172 for (int i = 0; i < _numaCount; i++)
173 if (_maxStatesPerNuma[i] > (size_t)UINT32_MAX)
174 JAFFAR_THROW_RUNTIME("State count per NUMA domain (%lu) exceeds the DrainBuffer capacity limit (%u)\n", _maxStatesPerNuma[i], UINT32_MAX);
175
176 JAFFAR_PARALLEL
177 {
178 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
179 {
180 auto* currentQueue = new jaffarCommon::concurrent::DrainBuffer<void*>();
181 currentQueue->reserve(_maxStatesPerNuma[_preferredNumaDomain]);
182 _numaCurrentStateQueues[_preferredNumaDomain] = currentQueue;
183 _numaNextStateQueues[_preferredNumaDomain] = new jaffarCommon::concurrent::concurrentMultimap_t<float, void*>();
184 }
185 }
186
187 // Getting number of bytes to reserve for each NUMA domain
188 _allocableBytesPerNuma.resize(_numaCount);
189 for (int i = 0; i < _numaCount; i++) _allocableBytesPerNuma[i] = _maxStatesPerNuma[i] * _stateSize;
190
191 // Creating internal buffers, one per NUMA domain
192 _internalBuffersStart.resize(_numaCount);
193 _internalBuffersEnd.resize(_numaCount);
194 for (int i = 0; i < _numaCount; i++)
195 {
196 _internalBuffersStart[i] = (uint8_t*)numa_alloc_onnode(_allocableBytesPerNuma[i], i);
197 if (_internalBuffersStart[i] == NULL) JAFFAR_THROW_RUNTIME("Error trying to allocate memory for numa domain %d\n", i);
199 }
200
201 // Creating the parallel history (cold) slabs, one per NUMA domain, on the same node. Mirrors the
202 // state-slab grid: the cold slot for state index i is at _histBuffersStart[numa] + i*_histSize.
203 // _histSize is 0 for the "None" strategy (no path stored, and the step counter is now runner-owned, not
204 // per-state): there is no cold slab to allocate, and all cold-slot ops are no-ops over 0 bytes.
205 _histBuffersStart.resize(_numaCount, nullptr);
206 if (_histSize > 0)
207 for (int i = 0; i < _numaCount; i++)
208 {
209 _histBuffersStart[i] = (uint8_t*)numa_alloc_onnode(_maxStatesPerNuma[i] * _histSize, i);
210 if (_histBuffersStart[i] == NULL) JAFFAR_THROW_RUNTIME("Error trying to allocate history slab for numa domain %d\n", i);
211 }
212
213 // Getting system's page size (typically 4K but it may change in the future)
214 const size_t pageSize = sysconf(_SC_PAGESIZE);
215
216 // Initializing the internal buffers (touch one byte per page to fault them in on the owning node)
217 for (int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
218 for (size_t i = 0; i < _allocableBytesPerNuma[numaNodeIdx]; i += pageSize) _internalBuffersStart[numaNodeIdx][i] = 1;
219
220 // Likewise fault in the history slabs (one byte per page).
221 for (int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
222 for (size_t i = 0; i < _maxStatesPerNuma[numaNodeIdx] * _histSize; i += pageSize) _histBuffersStart[numaNodeIdx][i] = 1;
223
224 // Let the input-history strategy initialize each fresh cold slot (e.g. the trie marks its node id as
225 // empty so a recycled slot is never mistaken for holding a node). No-op for raw/none. Skipped entirely
226 // when there is no cold slab (_histSize == 0, the "None" strategy).
227 if (_histSize > 0)
228 for (int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
229 for (size_t s = 0; s < _maxStatesPerNuma[numaNodeIdx]; s++) _ih->initColdSlot(&_histBuffersStart[numaNodeIdx][s * _histSize]);
230
231 // Adding the state pointers to the free state queues
232 _freeStateQueues.resize(_numaCount);
233 JAFFAR_PARALLEL
234 {
235 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
236 {
237 _freeStateQueues[_preferredNumaDomain] = std::make_unique<jaffarCommon::concurrent::atomicQueue_t<void*>>(_maxStatesPerNuma[_preferredNumaDomain]);
238 for (size_t i = 0; i < _maxStatesPerNuma[_preferredNumaDomain]; i++)
239 _freeStateQueues[_preferredNumaDomain]->try_push((void*)&_internalBuffersStart[_preferredNumaDomain][i * _stateSize]);
240 }
241 }
242 }
243
250 __INLINE__ size_t saveStateFromRunner(Runner& r, void* statePtr)
251 {
252 // Full, self-contained serialization ([hot][history]) into a standalone buffer (NOT a slab slot):
253 // used for the best/worst/win/manual snapshots kept outside the NUMA slabs, which have no cold mirror.
254 jaffarCommon::serializer::Contiguous s(statePtr, _fullStateSizeBytes);
255 r.serializeState(s);
256 return s.getOutputSize();
257 }
258
263 __INLINE__ size_t saveStateToSlot(Runner& r, void* statePtr)
264 {
265 jaffarCommon::serializer::Contiguous s(statePtr, _stateSizeRaw);
267 jaffarCommon::serializer::Contiguous h(getHistoryPtr(statePtr), _histSize);
268 r.serializeHistory(h);
269 return s.getOutputSize();
270 }
271
277 __INLINE__ void captureSlotToBuffer(const void* slotPtr, void* buffer)
278 {
279 // Hot state copies verbatim; the input-history strategy converts its cold slot into the buffer's
280 // self-contained full history region (raw/none: copy; trie: reconstruct the path from its node id).
281 memcpy(buffer, slotPtr, _stateSizeRaw);
282 _ih->captureColdToFull(getHistoryPtr(const_cast<void*>(slotPtr)), (uint8_t*)buffer + _stateSizeRaw);
283 }
284
286 __INLINE__ size_t getFullStateSize() const { return _fullStateSizeBytes; }
287
299 __INLINE__ bool pushState(const float reward, Runner& r, void* statePtr)
300 {
301 // Check that we got a free state (we did not overflow state memory)
302 if (statePtr == nullptr) JAFFAR_THROW_RUNTIME("Provided a null state -- probably ran out of free states\n");
303
304 // Encoding internal runner state into the state pointer (slab slot: hot into slot, path into cold slab)
305 try
306 {
307 // Getting state from the runner
308 saveStateToSlot(r, statePtr);
309 }
310 catch (const std::runtime_error& x)
311 {
312 // If failed return false
313 return false;
314 }
315
316 // Inserting new state into the next state database
317 // Getting the corresponding numa domain for this state
318 int targetNumaIdx = getStateNumaDomain(statePtr);
319
320 // Inserting new state into the next state database
321 _numaNextStateQueues[targetNumaIdx]->insert({reward, statePtr});
322
323 // If succeeded, return true
324 return true;
325 }
326
332 __INLINE__ void loadStateIntoRunner(Runner& r, const void* statePtr)
333 {
334 // Full, self-contained deserialization ([hot][history]) from a standalone buffer (NOT a slab slot).
335 jaffarCommon::deserializer::Contiguous d(statePtr, _fullStateSizeBytes);
336 r.deserializeState(d);
337 }
338
343 __INLINE__ void loadStateFromSlot(Runner& r, const void* statePtr)
344 {
345 jaffarCommon::deserializer::Contiguous d(statePtr, _stateSizeRaw);
347 jaffarCommon::deserializer::Contiguous h(getHistoryPtr(statePtr), _histSize);
349 }
350
359 // Function to print relevant information
360 __INLINE__ void printInfo() const
361 {
362 const size_t currentStateCount = getStateCount();
363 const size_t currentStateBytes = currentStateCount * (_stateSize + _histSize); // hot slab + cold history slab
364
365 jaffarCommon::logger::log("[J+] + Current State Count: %lu (%f Mstates) / %lu (%f Mstates) Max / %5.2f%% Full\n", currentStateCount,
366 (double)currentStateCount * 1.0e-6, _maxStates, (double)_maxStates * 1.0e-6, 100.0 * (double)currentStateCount / (double)_maxStates);
367 jaffarCommon::logger::log("[J+] + Current State Size: %.3f Mb (%.6f Gb) / %.3f Mb (%.6f Gb) Max\n", (double)currentStateBytes / (1024.0 * 1024.0),
368 (double)currentStateBytes / (1024.0 * 1024.0 * 1024.0), (double)_maxSize / (1024.0 * 1024.0), (double)_maxSize / (1024.0 * 1024.0 * 1024.0));
369 jaffarCommon::logger::log("[J+] + State Size Raw: %lu bytes (hot %lu + cold/history %lu)\n", _stateSizeRaw + _histSize, _stateSizeRaw, _histSize);
370 jaffarCommon::logger::log("[J+] + State Size in DB: %lu bytes (hot %lu + %lu padding to %u, cold %lu)\n", _stateSize + _histSize, _stateSize,
372 const size_t historyMem = _ih->getApproxMemoryBytes(); // shared-structure memory (e.g. the trie); 0 for raw/none
373 if (historyMem > 0)
374 {
375 const double MB = 1024.0 * 1024.0;
376 const double sharedMb = historyMem / MB; // shared structure (trie nodes)
377 const double coldMb = currentStateCount * (double)_histSize / MB; // per-state cold slot (node id)
378 const double bitpackMb = currentStateCount * (double)(_fullStateSizeBytes - _stateSizeRaw) / MB; // raw bit-packed equivalent
379 jaffarCommon::logger::log("[J+] + Input History (shared): %.1f Mb shared + %.1f Mb cold slots = %.1f Mb total (raw would be %.1f Mb)\n", sharedMb, coldMb,
380 sharedMb + coldMb, bitpackMb);
381 }
382
383 // Only print the first and the last
384 for (int i = 0; i < _numaCount; i++)
385 if (i == 0 || i == _numaCount - 1)
386 jaffarCommon::logger::log("[J+] + NUMA Domain %3d States: %lu (Max: %lu), Size: %.3f Mb (%.6f Gb)\n", i, _currentStatesPerNuma[i], _maxStatesPerNuma[i],
387 (double)_maxSizePerNuma[i] / (1024.0 * 1024.0), (double)_maxSizePerNuma[i] / (1024.0 * 1024.0 * 1024.0));
388
389 // Reducing the per-thread statistics counters into totals (off the hot path, only when printing)
390 size_t localDatabaseState = 0, nonLocalDatabaseState = 0, databaseStateNotFound = 0;
391 size_t localFreeState = 0, nonLocalFreeState = 0, stealingFreeState = 0, freeStateNotFound = 0;
392 size_t distanceAccumulator = 0;
393 size_t freeStateCacheHit = 0, freeStateCacheReturn = 0;
394 for (const auto& sc : _statCounters)
395 {
396 localDatabaseState += sc.localDatabaseState;
397 nonLocalDatabaseState += sc.nonLocalDatabaseState;
398 databaseStateNotFound += sc.databaseStateNotFound;
399 localFreeState += sc.localFreeState;
400 nonLocalFreeState += sc.nonLocalFreeState;
401 stealingFreeState += sc.stealingFreeState;
402 freeStateNotFound += sc.freeStateNotFound;
403 distanceAccumulator += sc.distanceAccumulator;
404 freeStateCacheHit += sc.freeStateCacheHit;
405 freeStateCacheReturn += sc.freeStateCacheReturn;
406 }
407
408 size_t totalDatabaseStatesRequested = nonLocalDatabaseState + localDatabaseState + databaseStateNotFound;
409 jaffarCommon::logger::log("[J+] + Database Popping State Rates:\n");
410 jaffarCommon::logger::log("[J+] + Numa Locality Success Rate: %5.3f%%\n", 100.0 * (double)localDatabaseState / (double)totalDatabaseStatesRequested);
411 jaffarCommon::logger::log("[J+] + Numa Locality Fail Rate: %5.3f%%\n", 100.0 * (double)nonLocalDatabaseState / (double)totalDatabaseStatesRequested);
412 jaffarCommon::logger::log("[J+] + Numa No DB State Found Rate: %5.3f%%\n", 100.0 * (double)databaseStateNotFound / (double)totalDatabaseStatesRequested);
413
414 // Thread-local free-slot cache effectiveness: fraction of all getFreeState / returnFreeState
415 // calls served locally (and thus kept off the shared free-state queue).
416 const size_t totalGets = freeStateCacheHit + nonLocalFreeState + localFreeState + stealingFreeState + freeStateNotFound;
417 const size_t totalReturns = freeStateCacheReturn + /* shared-queue spills are not separately counted */ 0;
418 jaffarCommon::logger::log("[J+] + Free-Slot Cache:\n");
419 jaffarCommon::logger::log("[J+] + Get Cache Hit Rate: %5.3f%% (%lu hits)\n",
420 totalGets == 0 ? 0.0 : 100.0 * (double)freeStateCacheHit / (double)totalGets, freeStateCacheHit);
421 jaffarCommon::logger::log("[J+] + Return Cache Absorb Count: %lu\n", freeStateCacheReturn);
422 (void)totalReturns;
423
424 size_t totalFreeStatesRequested = nonLocalFreeState + localFreeState + freeStateNotFound + stealingFreeState;
425 jaffarCommon::logger::log("[J+] + Get Free State Rates (shared-queue only):\n");
426 jaffarCommon::logger::log("[J+] + Numa Locality Success Rate: %5.3f%%\n", 100.0 * (double)localFreeState / (double)totalFreeStatesRequested);
427 jaffarCommon::logger::log("[J+] + Numa Locality Fail Rate: %5.3f%%\n", 100.0 * (double)nonLocalFreeState / (double)totalFreeStatesRequested);
428 jaffarCommon::logger::log("[J+] + State DB Stealing Rate: %5.3f%%\n", 100.0 * (double)stealingFreeState / (double)totalFreeStatesRequested);
429 jaffarCommon::logger::log("[J+] + Numa No Free State Found Rate: %5.3f%%\n", 100.0 * (double)freeStateNotFound / (double)totalFreeStatesRequested);
430
431 size_t NUMAAccessCount = nonLocalDatabaseState + localDatabaseState + nonLocalFreeState + localFreeState + stealingFreeState;
432 jaffarCommon::logger::log("[J+] + Average NUMA Distance: %lu / %lu = %5.3f\n", distanceAccumulator, NUMAAccessCount,
433 (double)distanceAccumulator / (double)NUMAAccessCount);
434 }
435
447 __INLINE__ void* getFreeState(const size_t threadId)
448 {
449 auto& sc = _statCounters[threadId];
450
451 // First, try this thread's own free-slot cache (a "magazine"). A slot this worker freed earlier
452 // is served straight back to it with no shared-queue atomic and no NUMA lookup -- and it is the
453 // hottest, most NUMA-local memory available (LIFO). This recycling is where the win comes from:
454 // a worker frees ~1 slot and requests several per base state, so each free typically satisfies a
455 // subsequent request locally, removing it from both the get and the return shared-queue traffic.
456 auto& cache = _freeStateCache[threadId];
457 if (cache.count > 0)
458 {
459 sc.freeStateCacheHit++;
460 return cache.slots[--cache.count];
461 }
462
463 // Storage for the new free state space
464 void* stateSpace;
465
466 // Trying to get free space for a new state
467 for (size_t i = 0; i < (size_t)_numaCount; i++)
468 {
469 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
470 // Skip NUMA domains with no worker thread: their queues are never allocated (see initialize()).
471 if (_freeStateQueues[numaIdx] == nullptr) continue;
472 bool success = _freeStateQueues[numaIdx]->try_pop(stateSpace);
473
474 // If successful, return the pointer immediately
475 if (success == true)
476 {
477 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
478 if (numaIdx == (size_t)_preferredNumaDomain)
479 sc.localFreeState++;
480 else
481 sc.nonLocalFreeState++;
482 return stateSpace;
483 }
484 }
485
486 // If failed, then try to get it from the back of my numa-specific queues. Looking for the worst state possible
487
488 // Trying to get free space for a new state
489 for (size_t i = 0; i < (size_t)_numaCount; i++)
490 {
491 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
492 if (_numaCurrentStateQueues[numaIdx] == nullptr) continue;
493 bool success = _numaCurrentStateQueues[numaIdx]->pop_back_get(stateSpace);
494
495 // If successful, return the pointer immediately
496 if (success == true)
497 {
498 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
499 sc.stealingFreeState++;
500 return stateSpace;
501 }
502 }
503
504 // Otherwise, return a null pointer. The state will be discarded
505 sc.freeStateNotFound++;
506 return nullptr;
507 }
508
515 __INLINE__ int getStateNumaDomain(void* const statePtr)
516 {
517 for (int i = 0; i < _numaCount; i++)
518 if (isStateInNumaDomain(statePtr, i)) return i;
519
520 // Check for error
521 JAFFAR_THROW_RUNTIME("Did not find the corresponding numa domain for the provided state pointer. This must be a bug in Jaffar\n");
522 }
523
531 __INLINE__ void* getHistoryPtr(const void* const statePtr)
532 {
533 const int numa = getStateNumaDomain(const_cast<void*>(statePtr));
534 const size_t idx = ((const uint8_t*)statePtr - _internalBuffersStart[numa]) / _stateSize;
535 return _histBuffersStart[numa] + idx * _histSize;
536 }
537
544 __INLINE__ bool isStateInNumaDomain(void* const statePtr, const int numaDomainId)
545 {
546 return statePtr >= _internalBuffersStart[numaDomainId] && statePtr <= _internalBuffersEnd[numaDomainId];
547 }
548
559 __INLINE__ void returnFreeState(void* const statePtr, const size_t threadId)
560 {
561 // Let the input-history strategy release any shared resource this slot held (the trie frees its path
562 // node, recycling into this worker's own contention-free shard). No-op for raw/none.
563 _ih->releaseColdSlot(getHistoryPtr(statePtr), threadId);
564
565 // Return the slot to this thread's own free-slot cache when there is room, so the next
566 // getFreeState() on this thread can reuse it without touching the shared queue (no atomic, no
567 // NUMA scan). The cache is bounded so a worker can never hoard more than a negligible number of
568 // slots from the shared pool (FREE_STATE_CACHE_CAPACITY * threadCount total).
569 auto& cache = _freeStateCache[threadId];
570 if (cache.count < FREE_STATE_CACHE_CAPACITY)
571 {
572 _statCounters[threadId].freeStateCacheReturn++;
573 cache.slots[cache.count++] = statePtr;
574 return;
575 }
576
577 // Cache full: spill to the shared free-state queue of the slot's owning NUMA domain
578 const auto numaIdx = getStateNumaDomain(statePtr);
579
580 // Pushing the state in the corresponding queue
581 bool success = _freeStateQueues[numaIdx]->try_push(statePtr);
582
583 // Check for success
584 if (success == false) JAFFAR_THROW_RUNTIME("Failed on pushing free state back. This must be a bug in Jaffar\n");
585 }
586
598 __INLINE__ void advanceStep()
599 {
600 // // Resetting NUMA distance accumulator
601 // _numaDistanceAccumulator = 0;
602
603 // Calculation of best and worst states
604 float bestStateReward = std::numeric_limits<float>::lowest();
605 float worstStateReward = std::numeric_limits<float>::max();
606
607 // Copying state pointers into the numa-specific queues
608 JAFFAR_PARALLEL
609 {
610 // Only process if I am the delegate
611 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
612 {
613 auto& nextMap = *_numaNextStateQueues[_preferredNumaDomain];
614 auto* currentBuf = _numaCurrentStateQueues[_preferredNumaDomain];
615
616 // Updating state count per numa
617 _currentStatesPerNuma[_preferredNumaDomain] = nextMap.size();
618
619 // Resetting the current-state buffer for this step's fill (it was drained by the workers in
620 // the previous step; the step barrier guarantees no drain is in flight here)
621 currentBuf->clear();
622
623 // Drain the (reward-ordered) next-state map into the current buffer in a single forward
624 // traversal, then free the whole map at once. This replaces the previous N-iteration loop of
625 // begin()+unsafe_extract (each of which unlinks and frees one node): a plain traversal is
626 // cheaper per element, and a single bulk clear() deallocates all nodes without the per-node
627 // extract bookkeeping. The map iterates in descending-reward order (std::greater), so the
628 // first element seen is the best and the last is the worst -- same ordering as before.
629 float firstReward = 0.0f, lastReward = 0.0f;
630 void *firstPtr = nullptr, *lastPtr = nullptr;
631 for (const auto& entry : nextMap)
632 {
633 if (firstPtr == nullptr)
634 {
635 firstReward = entry.first;
636 firstPtr = entry.second;
637 }
638 currentBuf->push_back_no_lock(entry.second);
639 lastReward = entry.first;
640 lastPtr = entry.second;
641 }
642
643 // Releasing all next-map nodes in one go
644 nextMap.clear();
645
646 // Updating the cross-NUMA best/worst states (shared, hence locked) once per domain
647 if (firstPtr != nullptr)
648 {
649 _workMutex.lock();
650 if (firstReward > bestStateReward)
651 {
652 bestStateReward = firstReward;
653 _bestState = firstPtr;
654 }
655 if (lastReward < worstStateReward)
656 {
657 worstStateReward = lastReward;
658 _worstState = lastPtr;
659 }
660 _workMutex.unlock();
661 }
662 }
663 }
664 }
665
674 __INLINE__ void* popState()
675 {
676 auto& sc = _statCounters[jaffarCommon::parallel::getThreadId()];
677
678 // Pointer to return
679 void* statePtr;
680
681 // Trying to next state
682 for (size_t i = 0; i < (size_t)_numaCount; i++)
683 {
684 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
685 // Skip NUMA domains with no worker thread: their queues are never allocated (see initialize()).
686 if (_numaCurrentStateQueues[numaIdx] == nullptr) continue;
687 bool success = _numaCurrentStateQueues[numaIdx]->pop_front_get(statePtr);
688
689 // If successful, return the pointer immediately
690 if (success == true)
691 {
692 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
693 if (numaIdx == (size_t)_preferredNumaDomain)
694 sc.localDatabaseState++;
695 else
696 sc.nonLocalDatabaseState++;
697 return statePtr;
698 }
699 }
700
701 // If no success at all, just return a nullptr
702 sc.databaseStateNotFound++;
703 return nullptr;
704 }
705
718 __INLINE__ size_t popStates(void** elements, const size_t maxCount, const size_t threadId)
719 {
720 auto& sc = _statCounters[threadId];
721
722 // Trying the NUMA domains in preference order; take the whole batch from the first non-empty one
723 for (size_t i = 0; i < (size_t)_numaCount; i++)
724 {
725 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
726 // Skip NUMA domains with no worker thread: their queues are never allocated (see initialize()).
727 if (_numaCurrentStateQueues[numaIdx] == nullptr) continue;
728
729 const size_t count = _numaCurrentStateQueues[numaIdx]->pop_front_get_batch(elements, maxCount);
730
731 // If we got anything, account for it once for the whole batch and return
732 if (count > 0)
733 {
734 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx] * count;
735 if (numaIdx == (size_t)_preferredNumaDomain)
736 sc.localDatabaseState += count;
737 else
738 sc.nonLocalDatabaseState += count;
739 return count;
740 }
741 }
742
743 // If no success at all, the databases are drained for this step
744 sc.databaseStateNotFound++;
745 return 0;
746 }
747
755 __INLINE__ size_t getStateCount() const
756 {
757 size_t stateCount = 0;
758 // Per-NUMA current-state queues are only allocated for domains that have a delegate thread.
759 // Skip any unallocated (null) queue so this (informational) count can't dereference a null
760 // pointer when threads don't cover every NUMA domain.
761 for (int i = 0; i < _numaCount; i++)
762 if (_numaCurrentStateQueues[i] != nullptr) stateCount += _numaCurrentStateQueues[i]->wasSize();
763 return stateCount;
764 }
765
767 __INLINE__ void* getBestState() const { return _bestState; }
768
770 __INLINE__ void* getWorstState() const { return _worstState; }
771
773 __INLINE__ size_t getStateSizeInDatabase() const { return _stateSize; }
774
775private:
776 void* _bestState = nullptr;
777 void* _worstState = nullptr;
778
780
783
786
789
791 size_t _maxSize;
792
795
802 struct alignas(64) statCounters_t
803 {
807 size_t localFreeState = 0;
808 size_t nonLocalFreeState = 0;
809 size_t stealingFreeState = 0;
810 size_t freeStateNotFound = 0;
812 size_t freeStateCacheHit = 0;
814 };
815 std::vector<statCounters_t> _statCounters;
816
823 static constexpr size_t FREE_STATE_CACHE_CAPACITY = 32;
824
830 struct alignas(64) freeStateCache_t
831 {
832 size_t count = 0;
834 };
835 std::vector<freeStateCache_t> _freeStateCache;
836
839
841 std::vector<size_t> _maxSizePerNuma;
842
844 std::vector<size_t> _maxStatesPerNuma;
845
847 std::vector<size_t> _currentStatesPerNuma;
848
850 std::mutex _workMutex;
851
853 std::vector<jaffarCommon::concurrent::DrainBuffer<void*>*> _numaCurrentStateQueues;
854
856 std::vector<jaffarCommon::concurrent::concurrentMultimap_t<float, void*>*> _numaNextStateQueues;
857
859 std::vector<std::unique_ptr<jaffarCommon::concurrent::atomicQueue_t<void*>>> _freeStateQueues;
860
862 std::vector<uint8_t*> _internalBuffersStart;
863
865 std::vector<uint8_t*> _internalBuffersEnd;
866
868 std::vector<size_t> _allocableBytesPerNuma;
869
874 size_t _histSize = 0;
875
878 std::vector<uint8_t*> _histBuffersStart;
879
882 InputHistory* _ih = nullptr;
883
887};
888
889} // namespace jaffarPlus
Abstract strategy for remembering the input path that produced each search state.
virtual void releaseColdSlot(void *cold, const size_t shard) const
Releases any shared resource a freed cold slot was holding (trie GC).
virtual size_t getApproxMemoryBytes() const
Approximate resident memory of any shared structure (e.g. the trie), in bytes. Default: 0.
virtual void initColdSlot(void *cold) const
Prepares a fresh/recycled cold slot (e.g. marks it as holding no trie node). Default: no-op.
virtual void captureColdToFull(const void *cold, void *full) const =0
Converts a stored cold path into a self-contained full one (best/worst snapshot).
Owns a Game instance and advances it according to configured inputs.
Definition runner.hpp:38
size_t getStateSize() const
Computes the size in bytes of the serialized runner state.
Definition runner.hpp:384
size_t getHistorySize() const
Returns the serialized size of the cold input-history "path", in bytes.
Definition runner.hpp:416
void serializeState(jaffarCommon::serializer::Base &serializer) const
Serializes the runner state: the game state, the input history, and the input counter.
Definition runner.hpp:360
void deserializeHotState(jaffarCommon::deserializer::Base &deserializer)
Restores only the hot game+emulator state from deserializer.
Definition runner.hpp:400
void deserializeHistory(jaffarCommon::deserializer::Base &deserializer)
Restores only the cold input-history "path" from deserializer.
Definition runner.hpp:406
size_t getHotStateSize() const
Returns the serialized size of the hot game+emulator state, in bytes.
Definition runner.hpp:409
void serializeHistory(jaffarCommon::serializer::Base &serializer) const
Serializes only the cold input-history "path" (written once at state creation, read at solution time)...
Definition runner.hpp:404
InputHistory * getInputHistory() const
The input-history strategy in use (for the StateDb's per-slot manager operations).
Definition runner.hpp:145
void serializeHotState(jaffarCommon::serializer::Base &serializer) const
Serializes only the hot game+emulator state (what the search reads every step) into serializer.
Definition runner.hpp:398
void deserializeState(jaffarCommon::deserializer::Base &deserializer)
Restores the runner state: the game state, the input history, and the input counter.
Definition runner.hpp:372
Stores serialized game states across the machine's NUMA domains and serves them to the search engine ...
Definition stateDb.hpp:43
void * getWorstState() const
Returns a pointer to the lowest-reward state recorded by the last advanceStep.
Definition stateDb.hpp:770
std::vector< uint8_t * > _internalBuffersEnd
End pointer of each NUMA domain's contiguous state slab.
Definition stateDb.hpp:865
std::vector< std::unique_ptr< jaffarCommon::concurrent::atomicQueue_t< void * > > > _freeStateQueues
Per-NUMA-domain queues holding pointers to all currently free state slots.
Definition stateDb.hpp:859
void printInfo() const
Logs database occupancy, sizing, per-NUMA-domain figures, and reduced statistics counters.
Definition stateDb.hpp:360
void * getHistoryPtr(const void *const statePtr)
Returns the cold history slot mirroring the given hot state slot (same NUMA domain + index).
Definition stateDb.hpp:531
void * _worstState
Pointer to the worst (lowest-reward) state from the last advanceStep().
Definition stateDb.hpp:777
size_t getMaxBudgetBytes() const
Configured maximum state-DB footprint in bytes (used by the engine's combined RAM guard).
Definition stateDb.hpp:68
void loadStateFromSlot(Runner &r, const void *statePtr)
Deserializes a state-database slab slot into the runner: hot state from the slot, path from the paral...
Definition stateDb.hpp:343
StateDb(Runner &r, const nlohmann::json &config)
Constructs the state database and reads its maximum size from configuration.
Definition stateDb.hpp:53
void * getBestState() const
Returns a pointer to the highest-reward state recorded by the last advanceStep.
Definition stateDb.hpp:767
std::vector< statCounters_t > _statCounters
Per-thread (OpenMP-thread-indexed) statistics counters.
Definition stateDb.hpp:815
size_t saveStateToSlot(Runner &r, void *statePtr)
Serializes the runner into a state-database slab slot: hot state into the slot, path (input history +...
Definition stateDb.hpp:263
void captureSlotToBuffer(const void *slotPtr, void *buffer)
Gathers a slab slot's hot state and its cold history mirror into a contiguous full-state buffer ([hot...
Definition stateDb.hpp:277
InputHistory * _ih
The reference runner's input-history strategy, used for the per-slot manager operations (initColdSlot...
Definition stateDb.hpp:882
size_t _stateSizePadding
Number of padding bytes added to the raw size to reach _stateSize.
Definition stateDb.hpp:788
void returnFreeState(void *const statePtr, const size_t threadId)
Returns a state slot to the free pool for later reuse.
Definition stateDb.hpp:559
std::vector< size_t > _maxStatesPerNuma
Calculated maximum number of states the state database can hold in each NUMA domain.
Definition stateDb.hpp:844
std::vector< size_t > _currentStatesPerNuma
Number of current states held in each NUMA domain (updated each advanceStep()).
Definition stateDb.hpp:847
void advanceStep()
Moves each NUMA domain's next-state queue into its current-state queue, best reward first,...
Definition stateDb.hpp:598
std::vector< jaffarCommon::concurrent::concurrentMultimap_t< float, void * > * > _numaNextStateQueues
Per-NUMA-domain reward-ordered queues collecting the next step's states (null for non-delegate domain...
Definition stateDb.hpp:856
Runner *const _runner
The runner used to serialize states into and deserialize states out of the database.
Definition stateDb.hpp:779
std::vector< uint8_t * > _histBuffersStart
Start pointer of each NUMA domain's parallel history (cold) slab.
Definition stateDb.hpp:878
std::vector< size_t > _allocableBytesPerNuma
Number of bytes allocated for the state slab in each NUMA domain.
Definition stateDb.hpp:868
~StateDb()
Frees the per-NUMA current- and next-state queues allocated in initialize().
Definition stateDb.hpp:76
std::vector< jaffarCommon::concurrent::DrainBuffer< void * > * > _numaCurrentStateQueues
Per-NUMA-domain current-state queues drained by the workers during a step (null for non-delegate doma...
Definition stateDb.hpp:853
size_t getFullStateSize() const
Full self-contained serialized state size ([hot][history]); for standalone state buffers.
Definition stateDb.hpp:286
size_t _fullStateSizeBytes
Full self-contained serialized state size ([hot]+[full bit-packed history]) for standalone snapshot b...
Definition stateDb.hpp:886
size_t getStateSizeInDatabase() const
Returns the per-state size (including padding) as stored in the database.
Definition stateDb.hpp:773
size_t _histSize
Unpadded size of one state's cold "path" data (bit-packed input history + step counter).
Definition stateDb.hpp:874
bool isStateInNumaDomain(void *const statePtr, const int numaDomainId)
Tests whether a state slot lies within a given NUMA domain's slab.
Definition stateDb.hpp:544
void * _bestState
Pointer to the best (highest-reward) state from the last advanceStep().
Definition stateDb.hpp:776
size_t popStates(void **elements, const size_t maxCount, const size_t threadId)
Pops a batch of base states from the current-state database, preferring NUMA-local domains.
Definition stateDb.hpp:718
static constexpr size_t FREE_STATE_CACHE_CAPACITY
Capacity of each worker's thread-local free-slot cache ("magazine").
Definition stateDb.hpp:823
size_t _maxStates
Total maximum number of states the database can hold across all NUMA domains.
Definition stateDb.hpp:794
size_t _maxSizeMb
User-provided maximum megabytes to use for the entire state database.
Definition stateDb.hpp:838
size_t saveStateFromRunner(Runner &r, void *statePtr)
Serializes the runner's state (raw, uncompressed) into the given slot.
Definition stateDb.hpp:250
void * popState()
Pops a single base state from the current-state database, preferring NUMA-local domains.
Definition stateDb.hpp:674
void * getFreeState(const size_t threadId)
Obtains a free state slot for the calling thread to write a new state into.
Definition stateDb.hpp:447
int getStateNumaDomain(void *const statePtr)
Finds the NUMA domain whose slab contains the given state slot.
Definition stateDb.hpp:515
bool pushState(const float reward, Runner &r, void *statePtr)
Serializes the runner's state into the given slot and inserts it into the next-state queue.
Definition stateDb.hpp:299
size_t _stateSize
Size occupied by each stored state, including alignment padding.
Definition stateDb.hpp:782
std::vector< freeStateCache_t > _freeStateCache
Per-thread free-slot caches, indexed by OpenMP thread id.
Definition stateDb.hpp:835
size_t getStateCount() const
Returns the total number of states currently held in the current-state database.
Definition stateDb.hpp:755
size_t _stateSizeRaw
Raw (unpadded) serialized size of a single state.
Definition stateDb.hpp:785
std::vector< uint8_t * > _internalBuffersStart
Start pointer of each NUMA domain's contiguous state slab.
Definition stateDb.hpp:862
size_t _maxSize
Total maximum size (bytes) the state database may grow to across all NUMA domains.
Definition stateDb.hpp:791
void loadStateIntoRunner(Runner &r, const void *statePtr)
Deserializes a stored state (raw, uncompressed) from a slot into the runner.
Definition stateDb.hpp:332
void initialize()
Allocates and initializes the per-NUMA state slabs, queues, and per-thread caches.
Definition stateDb.hpp:99
std::mutex _workMutex
Mutex guarding the cross-NUMA best/worst-state updates in advanceStep().
Definition stateDb.hpp:850
std::vector< size_t > _maxSizePerNuma
Maximum size (bytes) of the state database in each NUMA domain.
Definition stateDb.hpp:841
Abstract interface for how a search remembers the sequence of inputs ("path") that produced each stat...
NUMA topology detection: distance/preference matrices and per-domain delegate-thread selection,...
#define _JAFFAR_STATE_PADDING_BYTES
Alignment boundary (in bytes) each stored state is padded up to, for vectorized access and false-shar...
Definition stateDb.hpp:23
Per-thread (OpenMP-thread-indexed) cache of free state slots fronting the shared free-state queues.
Definition stateDb.hpp:831
size_t count
Number of slots currently held in slots.
Definition stateDb.hpp:832
void * slots[FREE_STATE_CACHE_CAPACITY]
Cached free-slot pointers (used as a LIFO stack).
Definition stateDb.hpp:833
Per-thread statistics counters for state-DB popping and free-state acquisition.
Definition stateDb.hpp:803
size_t freeStateCacheHit
getFreeState requests served from the thread-local free-slot cache.
Definition stateDb.hpp:812
size_t distanceAccumulator
Sum of NUMA distances over all accesses, for the average-distance metric.
Definition stateDb.hpp:811
size_t localDatabaseState
Base states popped from the thread's own (preferred) NUMA domain.
Definition stateDb.hpp:804
size_t nonLocalDatabaseState
Base states popped from a non-preferred NUMA domain.
Definition stateDb.hpp:805
size_t localFreeState
Free slots obtained from the preferred NUMA domain's free queue.
Definition stateDb.hpp:807
size_t nonLocalFreeState
Free slots obtained from a non-preferred domain's free queue.
Definition stateDb.hpp:808
size_t freeStateCacheReturn
returnFreeState calls absorbed by the thread-local free-slot cache.
Definition stateDb.hpp:813
size_t stealingFreeState
Free slots stolen from the back of a current-state queue.
Definition stateDb.hpp:809
size_t databaseStateNotFound
Pop attempts that found every current-state queue drained.
Definition stateDb.hpp:806
size_t freeStateNotFound
getFreeState attempts that found no slot anywhere.
Definition stateDb.hpp:810