12#include <jaffarCommon/bitwise.hpp>
13#include <jaffarCommon/concurrent.hpp>
14#include <jaffarCommon/deserializers/contiguous.hpp>
15#include <jaffarCommon/json.hpp>
16#include <jaffarCommon/logger.hpp>
17#include <jaffarCommon/parallel.hpp>
18#include <jaffarCommon/serializers/contiguous.hpp>
23#define _JAFFAR_STATE_PADDING_BYTES 16
56 jaffarCommon::logger::log(
"[J+] Initializing State Database...\n");
61 _maxSizeMb = jaffarCommon::json::getNumber<size_t>(config,
"Max Size (Mb)");
64 if (
auto* value = std::getenv(
"JAFFAR_ENGINE_OVERRIDE_MAX_STATEDB_SIZE_MB"))
_maxSizeMb = std::stoul(value);
132 size_t stateDbSizePerNUMA = std::ceil((
_maxSizeMb * 1024 * 1024) / _numaCount);
133 for (
int i = 0; i < _numaCount; i++)
_maxSizePerNuma.push_back(stateDbSizePerNUMA);
136 std::vector<size_t> maxFreeMemoryPerNuma(_numaCount);
137 for (
int i = 0; i < _numaCount; i++)
139 size_t freeMemory = 0;
140 numa_node_size64(i, (
long long*)&freeMemory);
141 maxFreeMemoryPerNuma[i] = freeMemory;
145 for (
int i = 0; i < _numaCount; i++)
147 JAFFAR_THROW_RUNTIME(
"The requested memory (%lu) for NUMA domain %d exceeds its available free space (%lu)\n",
_maxSizePerNuma[i], i, maxFreeMemoryPerNuma[i]);
159 for (
int i = 0; i < _numaCount; i++)
172 for (
int i = 0; i < _numaCount; i++)
174 JAFFAR_THROW_RUNTIME(
"State count per NUMA domain (%lu) exceeds the DrainBuffer capacity limit (%u)\n",
_maxStatesPerNuma[i], UINT32_MAX);
178 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
180 auto* currentQueue =
new jaffarCommon::concurrent::DrainBuffer<void*>();
183 _numaNextStateQueues[_preferredNumaDomain] =
new jaffarCommon::concurrent::concurrentMultimap_t<float, void*>();
194 for (
int i = 0; i < _numaCount; i++)
197 if (
_internalBuffersStart[i] == NULL) JAFFAR_THROW_RUNTIME(
"Error trying to allocate memory for numa domain %d\n", i);
207 for (
int i = 0; i < _numaCount; i++)
210 if (
_histBuffersStart[i] == NULL) JAFFAR_THROW_RUNTIME(
"Error trying to allocate history slab for numa domain %d\n", i);
214 const size_t pageSize = sysconf(_SC_PAGESIZE);
217 for (
int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
221 for (
int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
228 for (
int numaNodeIdx = 0; numaNodeIdx < _numaCount; numaNodeIdx++) JAFFAR_PARALLEL_FOR
235 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
256 return s.getOutputSize();
265 jaffarCommon::serializer::Contiguous s(statePtr,
_stateSizeRaw);
269 return s.getOutputSize();
302 if (statePtr ==
nullptr) JAFFAR_THROW_RUNTIME(
"Provided a null state -- probably ran out of free states\n");
310 catch (
const std::runtime_error& x)
345 jaffarCommon::deserializer::Contiguous d(statePtr,
_stateSizeRaw);
365 jaffarCommon::logger::log(
"[J+] + Current State Count: %lu (%f Mstates) / %lu (%f Mstates) Max / %5.2f%% Full\n", currentStateCount,
367 jaffarCommon::logger::log(
"[J+] + Current State Size: %.3f Mb (%.6f Gb) / %.3f Mb (%.6f Gb) Max\n", (
double)currentStateBytes / (1024.0 * 1024.0),
368 (
double)currentStateBytes / (1024.0 * 1024.0 * 1024.0), (
double)
_maxSize / (1024.0 * 1024.0), (
double)
_maxSize / (1024.0 * 1024.0 * 1024.0));
370 jaffarCommon::logger::log(
"[J+] + State Size in DB: %lu bytes (hot %lu + %lu padding to %u, cold %lu)\n",
_stateSize +
_histSize,
_stateSize,
375 const double MB = 1024.0 * 1024.0;
376 const double sharedMb = historyMem / MB;
377 const double coldMb = currentStateCount * (double)
_histSize / MB;
379 jaffarCommon::logger::log(
"[J+] + Input History (shared): %.1f Mb shared + %.1f Mb cold slots = %.1f Mb total (raw would be %.1f Mb)\n", sharedMb, coldMb,
380 sharedMb + coldMb, bitpackMb);
384 for (
int i = 0; i < _numaCount; i++)
385 if (i == 0 || i == _numaCount - 1)
390 size_t localDatabaseState = 0, nonLocalDatabaseState = 0, databaseStateNotFound = 0;
391 size_t localFreeState = 0, nonLocalFreeState = 0, stealingFreeState = 0, freeStateNotFound = 0;
392 size_t distanceAccumulator = 0;
393 size_t freeStateCacheHit = 0, freeStateCacheReturn = 0;
396 localDatabaseState += sc.localDatabaseState;
397 nonLocalDatabaseState += sc.nonLocalDatabaseState;
398 databaseStateNotFound += sc.databaseStateNotFound;
399 localFreeState += sc.localFreeState;
400 nonLocalFreeState += sc.nonLocalFreeState;
401 stealingFreeState += sc.stealingFreeState;
402 freeStateNotFound += sc.freeStateNotFound;
403 distanceAccumulator += sc.distanceAccumulator;
404 freeStateCacheHit += sc.freeStateCacheHit;
405 freeStateCacheReturn += sc.freeStateCacheReturn;
408 size_t totalDatabaseStatesRequested = nonLocalDatabaseState + localDatabaseState + databaseStateNotFound;
409 jaffarCommon::logger::log(
"[J+] + Database Popping State Rates:\n");
410 jaffarCommon::logger::log(
"[J+] + Numa Locality Success Rate: %5.3f%%\n", 100.0 * (
double)localDatabaseState / (
double)totalDatabaseStatesRequested);
411 jaffarCommon::logger::log(
"[J+] + Numa Locality Fail Rate: %5.3f%%\n", 100.0 * (
double)nonLocalDatabaseState / (
double)totalDatabaseStatesRequested);
412 jaffarCommon::logger::log(
"[J+] + Numa No DB State Found Rate: %5.3f%%\n", 100.0 * (
double)databaseStateNotFound / (
double)totalDatabaseStatesRequested);
416 const size_t totalGets = freeStateCacheHit + nonLocalFreeState + localFreeState + stealingFreeState + freeStateNotFound;
417 const size_t totalReturns = freeStateCacheReturn + 0;
418 jaffarCommon::logger::log(
"[J+] + Free-Slot Cache:\n");
419 jaffarCommon::logger::log(
"[J+] + Get Cache Hit Rate: %5.3f%% (%lu hits)\n",
420 totalGets == 0 ? 0.0 : 100.0 * (double)freeStateCacheHit / (
double)totalGets, freeStateCacheHit);
421 jaffarCommon::logger::log(
"[J+] + Return Cache Absorb Count: %lu\n", freeStateCacheReturn);
424 size_t totalFreeStatesRequested = nonLocalFreeState + localFreeState + freeStateNotFound + stealingFreeState;
425 jaffarCommon::logger::log(
"[J+] + Get Free State Rates (shared-queue only):\n");
426 jaffarCommon::logger::log(
"[J+] + Numa Locality Success Rate: %5.3f%%\n", 100.0 * (
double)localFreeState / (
double)totalFreeStatesRequested);
427 jaffarCommon::logger::log(
"[J+] + Numa Locality Fail Rate: %5.3f%%\n", 100.0 * (
double)nonLocalFreeState / (
double)totalFreeStatesRequested);
428 jaffarCommon::logger::log(
"[J+] + State DB Stealing Rate: %5.3f%%\n", 100.0 * (
double)stealingFreeState / (
double)totalFreeStatesRequested);
429 jaffarCommon::logger::log(
"[J+] + Numa No Free State Found Rate: %5.3f%%\n", 100.0 * (
double)freeStateNotFound / (
double)totalFreeStatesRequested);
431 size_t NUMAAccessCount = nonLocalDatabaseState + localDatabaseState + nonLocalFreeState + localFreeState + stealingFreeState;
432 jaffarCommon::logger::log(
"[J+] + Average NUMA Distance: %lu / %lu = %5.3f\n", distanceAccumulator, NUMAAccessCount,
433 (
double)distanceAccumulator / (
double)NUMAAccessCount);
459 sc.freeStateCacheHit++;
460 return cache.slots[--cache.count];
467 for (
size_t i = 0; i < (size_t)_numaCount; i++)
469 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
477 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
478 if (numaIdx == (
size_t)_preferredNumaDomain)
481 sc.nonLocalFreeState++;
489 for (
size_t i = 0; i < (size_t)_numaCount; i++)
491 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
498 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
499 sc.stealingFreeState++;
505 sc.freeStateNotFound++;
517 for (
int i = 0; i < _numaCount; i++)
521 JAFFAR_THROW_RUNTIME(
"Did not find the corresponding numa domain for the provided state pointer. This must be a bug in Jaffar\n");
573 cache.slots[cache.count++] = statePtr;
584 if (success ==
false) JAFFAR_THROW_RUNTIME(
"Failed on pushing free state back. This must be a bug in Jaffar\n");
604 float bestStateReward = std::numeric_limits<float>::lowest();
605 float worstStateReward = std::numeric_limits<float>::max();
611 if (_myThreadId == _numaDelegateThreadId[_preferredNumaDomain])
629 float firstReward = 0.0f, lastReward = 0.0f;
630 void *firstPtr =
nullptr, *lastPtr =
nullptr;
631 for (
const auto& entry : nextMap)
633 if (firstPtr ==
nullptr)
635 firstReward = entry.first;
636 firstPtr = entry.second;
638 currentBuf->push_back_no_lock(entry.second);
639 lastReward = entry.first;
640 lastPtr = entry.second;
647 if (firstPtr !=
nullptr)
650 if (firstReward > bestStateReward)
652 bestStateReward = firstReward;
655 if (lastReward < worstStateReward)
657 worstStateReward = lastReward;
676 auto& sc =
_statCounters[jaffarCommon::parallel::getThreadId()];
682 for (
size_t i = 0; i < (size_t)_numaCount; i++)
684 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
692 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx];
693 if (numaIdx == (
size_t)_preferredNumaDomain)
694 sc.localDatabaseState++;
696 sc.nonLocalDatabaseState++;
702 sc.databaseStateNotFound++;
718 __INLINE__
size_t popStates(
void** elements,
const size_t maxCount,
const size_t threadId)
723 for (
size_t i = 0; i < (size_t)_numaCount; i++)
725 const auto numaIdx = _numaPreferenceMatrix[_preferredNumaDomain][i];
734 sc.distanceAccumulator += _numaDistanceMatrix[_preferredNumaDomain][numaIdx] * count;
735 if (numaIdx == (
size_t)_preferredNumaDomain)
736 sc.localDatabaseState += count;
738 sc.nonLocalDatabaseState += count;
744 sc.databaseStateNotFound++;
757 size_t stateCount = 0;
761 for (
int i = 0; i < _numaCount; i++)
859 std::vector<std::unique_ptr<jaffarCommon::concurrent::atomicQueue_t<void*>>>
_freeStateQueues;
Abstract strategy for remembering the input path that produced each search state.
virtual void releaseColdSlot(void *cold, const size_t shard) const
Releases any shared resource a freed cold slot was holding (trie GC).
virtual size_t getApproxMemoryBytes() const
Approximate resident memory of any shared structure (e.g. the trie), in bytes. Default: 0.
virtual void initColdSlot(void *cold) const
Prepares a fresh/recycled cold slot (e.g. marks it as holding no trie node). Default: no-op.
virtual void captureColdToFull(const void *cold, void *full) const =0
Converts a stored cold path into a self-contained full one (best/worst snapshot).
Owns a Game instance and advances it according to configured inputs.
size_t getStateSize() const
Computes the size in bytes of the serialized runner state.
size_t getHistorySize() const
Returns the serialized size of the cold input-history "path", in bytes.
void serializeState(jaffarCommon::serializer::Base &serializer) const
Serializes the runner state: the game state, the input history, and the input counter.
void deserializeHotState(jaffarCommon::deserializer::Base &deserializer)
Restores only the hot game+emulator state from deserializer.
void deserializeHistory(jaffarCommon::deserializer::Base &deserializer)
Restores only the cold input-history "path" from deserializer.
size_t getHotStateSize() const
Returns the serialized size of the hot game+emulator state, in bytes.
void serializeHistory(jaffarCommon::serializer::Base &serializer) const
Serializes only the cold input-history "path" (written once at state creation, read at solution time)...
InputHistory * getInputHistory() const
The input-history strategy in use (for the StateDb's per-slot manager operations).
void serializeHotState(jaffarCommon::serializer::Base &serializer) const
Serializes only the hot game+emulator state (what the search reads every step) into serializer.
void deserializeState(jaffarCommon::deserializer::Base &deserializer)
Restores the runner state: the game state, the input history, and the input counter.
Stores serialized game states across the machine's NUMA domains and serves them to the search engine ...
void * getWorstState() const
Returns a pointer to the lowest-reward state recorded by the last advanceStep.
std::vector< uint8_t * > _internalBuffersEnd
End pointer of each NUMA domain's contiguous state slab.
std::vector< std::unique_ptr< jaffarCommon::concurrent::atomicQueue_t< void * > > > _freeStateQueues
Per-NUMA-domain queues holding pointers to all currently free state slots.
void printInfo() const
Logs database occupancy, sizing, per-NUMA-domain figures, and reduced statistics counters.
void * getHistoryPtr(const void *const statePtr)
Returns the cold history slot mirroring the given hot state slot (same NUMA domain + index).
void * _worstState
Pointer to the worst (lowest-reward) state from the last advanceStep().
size_t getMaxBudgetBytes() const
Configured maximum state-DB footprint in bytes (used by the engine's combined RAM guard).
void loadStateFromSlot(Runner &r, const void *statePtr)
Deserializes a state-database slab slot into the runner: hot state from the slot, path from the paral...
StateDb(Runner &r, const nlohmann::json &config)
Constructs the state database and reads its maximum size from configuration.
void * getBestState() const
Returns a pointer to the highest-reward state recorded by the last advanceStep.
std::vector< statCounters_t > _statCounters
Per-thread (OpenMP-thread-indexed) statistics counters.
size_t saveStateToSlot(Runner &r, void *statePtr)
Serializes the runner into a state-database slab slot: hot state into the slot, path (input history +...
void captureSlotToBuffer(const void *slotPtr, void *buffer)
Gathers a slab slot's hot state and its cold history mirror into a contiguous full-state buffer ([hot...
InputHistory * _ih
The reference runner's input-history strategy, used for the per-slot manager operations (initColdSlot...
size_t _stateSizePadding
Number of padding bytes added to the raw size to reach _stateSize.
void returnFreeState(void *const statePtr, const size_t threadId)
Returns a state slot to the free pool for later reuse.
std::vector< size_t > _maxStatesPerNuma
Calculated maximum number of states the state database can hold in each NUMA domain.
std::vector< size_t > _currentStatesPerNuma
Number of current states held in each NUMA domain (updated each advanceStep()).
void advanceStep()
Moves each NUMA domain's next-state queue into its current-state queue, best reward first,...
std::vector< jaffarCommon::concurrent::concurrentMultimap_t< float, void * > * > _numaNextStateQueues
Per-NUMA-domain reward-ordered queues collecting the next step's states (null for non-delegate domain...
Runner *const _runner
The runner used to serialize states into and deserialize states out of the database.
std::vector< uint8_t * > _histBuffersStart
Start pointer of each NUMA domain's parallel history (cold) slab.
std::vector< size_t > _allocableBytesPerNuma
Number of bytes allocated for the state slab in each NUMA domain.
~StateDb()
Frees the per-NUMA current- and next-state queues allocated in initialize().
std::vector< jaffarCommon::concurrent::DrainBuffer< void * > * > _numaCurrentStateQueues
Per-NUMA-domain current-state queues drained by the workers during a step (null for non-delegate doma...
size_t getFullStateSize() const
Full self-contained serialized state size ([hot][history]); for standalone state buffers.
size_t _fullStateSizeBytes
Full self-contained serialized state size ([hot]+[full bit-packed history]) for standalone snapshot b...
size_t getStateSizeInDatabase() const
Returns the per-state size (including padding) as stored in the database.
size_t _histSize
Unpadded size of one state's cold "path" data (bit-packed input history + step counter).
bool isStateInNumaDomain(void *const statePtr, const int numaDomainId)
Tests whether a state slot lies within a given NUMA domain's slab.
void * _bestState
Pointer to the best (highest-reward) state from the last advanceStep().
size_t popStates(void **elements, const size_t maxCount, const size_t threadId)
Pops a batch of base states from the current-state database, preferring NUMA-local domains.
static constexpr size_t FREE_STATE_CACHE_CAPACITY
Capacity of each worker's thread-local free-slot cache ("magazine").
size_t _maxStates
Total maximum number of states the database can hold across all NUMA domains.
size_t _maxSizeMb
User-provided maximum megabytes to use for the entire state database.
size_t saveStateFromRunner(Runner &r, void *statePtr)
Serializes the runner's state (raw, uncompressed) into the given slot.
void * popState()
Pops a single base state from the current-state database, preferring NUMA-local domains.
void * getFreeState(const size_t threadId)
Obtains a free state slot for the calling thread to write a new state into.
int getStateNumaDomain(void *const statePtr)
Finds the NUMA domain whose slab contains the given state slot.
bool pushState(const float reward, Runner &r, void *statePtr)
Serializes the runner's state into the given slot and inserts it into the next-state queue.
size_t _stateSize
Size occupied by each stored state, including alignment padding.
std::vector< freeStateCache_t > _freeStateCache
Per-thread free-slot caches, indexed by OpenMP thread id.
size_t getStateCount() const
Returns the total number of states currently held in the current-state database.
size_t _stateSizeRaw
Raw (unpadded) serialized size of a single state.
std::vector< uint8_t * > _internalBuffersStart
Start pointer of each NUMA domain's contiguous state slab.
size_t _maxSize
Total maximum size (bytes) the state database may grow to across all NUMA domains.
void loadStateIntoRunner(Runner &r, const void *statePtr)
Deserializes a stored state (raw, uncompressed) from a slot into the runner.
void initialize()
Allocates and initializes the per-NUMA state slabs, queues, and per-thread caches.
std::mutex _workMutex
Mutex guarding the cross-NUMA best/worst-state updates in advanceStep().
std::vector< size_t > _maxSizePerNuma
Maximum size (bytes) of the state database in each NUMA domain.
Abstract interface for how a search remembers the sequence of inputs ("path") that produced each stat...
NUMA topology detection: distance/preference matrices and per-domain delegate-thread selection,...
#define _JAFFAR_STATE_PADDING_BYTES
Alignment boundary (in bytes) each stored state is padded up to, for vectorized access and false-shar...
Per-thread (OpenMP-thread-indexed) cache of free state slots fronting the shared free-state queues.
size_t count
Number of slots currently held in slots.
void * slots[FREE_STATE_CACHE_CAPACITY]
Cached free-slot pointers (used as a LIFO stack).
Per-thread statistics counters for state-DB popping and free-state acquisition.
size_t freeStateCacheHit
getFreeState requests served from the thread-local free-slot cache.
size_t distanceAccumulator
Sum of NUMA distances over all accesses, for the average-distance metric.
size_t localDatabaseState
Base states popped from the thread's own (preferred) NUMA domain.
size_t nonLocalDatabaseState
Base states popped from a non-preferred NUMA domain.
size_t localFreeState
Free slots obtained from the preferred NUMA domain's free queue.
size_t nonLocalFreeState
Free slots obtained from a non-preferred domain's free queue.
size_t freeStateCacheReturn
returnFreeState calls absorbed by the thread-local free-slot cache.
size_t stealingFreeState
Free slots stolen from the back of a current-state queue.
size_t databaseStateNotFound
Pop attempts that found every current-state queue drained.
size_t freeStateNotFound
getFreeState attempts that found no slot anywhere.