[OpenMP][AMDGPU] Single eager resource init + HSA queue utilization tracking

This patch lazily initializes queues/streams/events since their initialization
might come at a cost even if we do not use them.

To further benefit from this, AMDGPU/HSA queue management is moved into the
AMDGPUStreamManager of an AMDGPUDevice. Streams may now use different HSA queues
during their lifetime and identify busy queues.

When a Stream is requested from the resource manager, it will search for and
try to assign an idle queue. During the search for an idle queue the manager
may initialize more queues, up to the set maximum (default: 4).
When no idle queue could be found: resort to round robin selection.

With contributions from Johannes Doerfert <johannes@jdoerfert.de>

Depends on D156245

Reviewed By: kevinsala

Differential Revision: https://reviews.llvm.org/D154523
This commit is contained in:
Michael Halkenhaeuser 2023-08-02 06:56:44 -04:00
parent 53881490c2
commit 5b19f42b63
5 changed files with 147 additions and 45 deletions

View File

@ -1193,7 +1193,7 @@ throughout the execution if needed. A stream is a queue of asynchronous
operations (e.g., kernel launches and memory copies) that are executed
sequentially. Parallelism is achieved by featuring multiple streams. The
``libomptarget`` leverages streams to exploit parallelism between plugin
operations. The default value is ``32``.
operations. The default value is ``1``, more streams are created as needed.
LIBOMPTARGET_NUM_INITIAL_EVENTS
"""""""""""""""""""""""""""""""
@ -1201,7 +1201,8 @@ LIBOMPTARGET_NUM_INITIAL_EVENTS
This environment variable sets the number of pre-created events in the
plugin (if supported) at initialization. More events will be created
dynamically throughout the execution if needed. An event is used to synchronize
a stream with another efficiently. The default value is ``32``.
a stream with another efficiently. The default value is ``1``, more events are
created as needed.
LIBOMPTARGET_LOCK_MAPPED_HOST_BUFFERS
"""""""""""""""""""""""""""""""""""""

View File

@ -83,6 +83,12 @@ public:
}
}
Envar<Ty> &operator=(const Ty &V) {
Data = V;
Initialized = true;
return *this;
}
/// Get the definitive value.
const Ty &get() const {
// Throw a runtime error in case this envar is not initialized.

View File

@ -583,10 +583,12 @@ using AMDGPUSignalManagerTy = GenericDeviceResourceManagerTy<AMDGPUSignalRef>;
/// Class holding an HSA queue to submit kernel and barrier packets.
struct AMDGPUQueueTy {
/// Create an empty queue.
AMDGPUQueueTy() : Queue(nullptr), Mutex() {}
AMDGPUQueueTy() : Queue(nullptr), Mutex(), NumUsers(0) {}
/// Initialize a new queue belonging to a specific agent.
/// Lazily initialize a new queue belonging to a specific agent.
Error init(hsa_agent_t Agent, int32_t QueueSize) {
if (Queue)
return Plugin::success();
hsa_status_t Status =
hsa_queue_create(Agent, QueueSize, HSA_QUEUE_TYPE_MULTI, callbackError,
nullptr, UINT32_MAX, UINT32_MAX, &Queue);
@ -595,10 +597,22 @@ struct AMDGPUQueueTy {
/// Deinitialize the queue and destroy its resources.
Error deinit() {
std::lock_guard<std::mutex> Lock(Mutex);
if (!Queue)
return Plugin::success();
hsa_status_t Status = hsa_queue_destroy(Queue);
return Plugin::check(Status, "Error in hsa_queue_destroy: %s");
}
/// Returns if this queue is considered busy
bool isBusy() const { return NumUsers > 0; }
/// Decrement user count of the queue object
void removeUser() { --NumUsers; }
/// Increase user count of the queue object
void addUser() { ++NumUsers; }
/// Push a kernel launch to the queue. The kernel launch requires an output
/// signal and can define an optional input signal (nullptr if none).
Error pushKernelLaunch(const AMDGPUKernelTy &Kernel, void *KernelArgs,
@ -611,6 +625,7 @@ struct AMDGPUQueueTy {
// the addition of other packets to the queue. The following piece of code
// should be lightweight; do not block the thread, allocate memory, etc.
std::lock_guard<std::mutex> Lock(Mutex);
assert(Queue && "Interacted with a non-initialized queue!");
// Avoid defining the input dependency if already satisfied.
if (InputSignal && !InputSignal->load())
@ -659,6 +674,7 @@ struct AMDGPUQueueTy {
const AMDGPUSignalTy *InputSignal2) {
// Lock the queue during the packet publishing process.
std::lock_guard<std::mutex> Lock(Mutex);
assert(Queue && "Interacted with a non-initialized queue!");
// Push the barrier with the lock acquired.
return pushBarrierImpl(OutputSignal, InputSignal1, InputSignal2);
@ -777,6 +793,9 @@ private:
/// TODO: There are other more advanced approaches to avoid this mutex using
/// atomic operations. We can further investigate it if this is a bottleneck.
std::mutex Mutex;
/// Indicates that the queue is busy when > 0
int NumUsers;
};
/// Struct that implements a stream of asynchronous operations for AMDGPU
@ -886,7 +905,7 @@ private:
hsa_agent_t Agent;
/// The queue that the stream uses to launch kernels.
AMDGPUQueueTy &Queue;
AMDGPUQueueTy *Queue;
/// The manager of signals to reuse signals.
AMDGPUSignalManagerTy &SignalManager;
@ -978,6 +997,9 @@ private:
/// signal of the current stream, and 2) the last signal of the other stream.
/// Use a barrier packet with two input signals.
Error waitOnStreamOperation(AMDGPUStreamTy &OtherStream, uint32_t Slot) {
if (Queue == nullptr)
return Plugin::error("Target queue was nullptr");
/// The signal that we must wait from the other stream.
AMDGPUSignalTy *OtherSignal = OtherStream.Slots[Slot].Signal;
@ -999,7 +1021,7 @@ private:
return Err;
// Push a barrier into the queue with both input signals.
return Queue.pushBarrier(OutputSignal, InputSignal, OtherSignal);
return Queue->pushBarrier(OutputSignal, InputSignal, OtherSignal);
}
/// Callback for running a specific asynchronous operation. This callback is
@ -1085,6 +1107,9 @@ public:
uint32_t NumThreads, uint64_t NumBlocks,
uint32_t GroupSize,
AMDGPUMemoryManagerTy &MemoryManager) {
if (Queue == nullptr)
return Plugin::error("Target queue was nullptr");
// Retrieve an available signal for the operation's output.
AMDGPUSignalTy *OutputSignal = nullptr;
if (auto Err = SignalManager.getResource(OutputSignal))
@ -1102,8 +1127,8 @@ public:
return Err;
// Push the kernel with the output signal and an input signal (optional)
return Queue.pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks,
GroupSize, OutputSignal, InputSignal);
return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks,
GroupSize, OutputSignal, InputSignal);
}
/// Push an asynchronous memory copy between pinned memory buffers.
@ -1331,6 +1356,8 @@ public:
/// Make the stream wait on an event.
Error waitEvent(const AMDGPUEventTy &Event);
friend struct AMDGPUStreamManagerTy;
};
/// Class representing an event on AMDGPU. The event basically stores some
@ -1428,6 +1455,99 @@ Error AMDGPUStreamTy::waitEvent(const AMDGPUEventTy &Event) {
return waitOnStreamOperation(RecordedStream, Event.RecordedSlot);
}
struct AMDGPUStreamManagerTy final
: GenericDeviceResourceManagerTy<AMDGPUResourceRef<AMDGPUStreamTy>> {
using ResourceRef = AMDGPUResourceRef<AMDGPUStreamTy>;
using ResourcePoolTy = GenericDeviceResourceManagerTy<ResourceRef>;
AMDGPUStreamManagerTy(GenericDeviceTy &Device, hsa_agent_t HSAAgent)
: GenericDeviceResourceManagerTy(Device), NextQueue(0), Agent(HSAAgent) {}
Error init(uint32_t InitialSize, int NumHSAQueues, int HSAQueueSize) {
Queues = std::vector<AMDGPUQueueTy>(NumHSAQueues);
QueueSize = HSAQueueSize;
MaxNumQueues = NumHSAQueues;
// Initialize one queue eagerly
if (auto Err = Queues.front().init(Agent, QueueSize))
return Err;
return GenericDeviceResourceManagerTy::init(InitialSize);
}
/// Deinitialize the resource pool and delete all resources. This function
/// must be called before the destructor.
Error deinit() override {
// De-init all queues
for (AMDGPUQueueTy &Queue : Queues) {
if (auto Err = Queue.deinit())
return Err;
}
return GenericDeviceResourceManagerTy::deinit();
}
/// Get a single stream from the pool or create new resources.
virtual Error getResource(AMDGPUStreamTy *&StreamHandle) override {
return getResourcesImpl(1, &StreamHandle, [this](AMDGPUStreamTy *&Handle) {
return assignNextQueue(Handle);
});
}
/// Return stream to the pool.
virtual Error returnResource(AMDGPUStreamTy *StreamHandle) override {
return returnResourceImpl(StreamHandle, [](AMDGPUStreamTy *Handle) {
Handle->Queue->removeUser();
return Plugin::success();
});
}
private:
/// Search for and assign an prefereably idle queue to the given Stream. If
/// there is no queue without current users, resort to round robin selection.
inline Error assignNextQueue(AMDGPUStreamTy *Stream) {
uint32_t StartIndex = NextQueue % MaxNumQueues;
AMDGPUQueueTy *Q = nullptr;
for (int i = 0; i < MaxNumQueues; ++i) {
Q = &Queues[StartIndex++];
if (StartIndex == MaxNumQueues)
StartIndex = 0;
if (Q->isBusy())
continue;
else {
if (auto Err = Q->init(Agent, QueueSize))
return Err;
Q->addUser();
Stream->Queue = Q;
return Plugin::success();
}
}
// All queues busy: Round robin (StartIndex has the initial value again)
Queues[StartIndex].addUser();
Stream->Queue = &Queues[StartIndex];
++NextQueue;
return Plugin::success();
}
/// The next queue index to use for round robin selection.
uint32_t NextQueue;
/// The queues which are assigned to requested streams.
std::vector<AMDGPUQueueTy> Queues;
/// The corresponding device as HSA agent.
hsa_agent_t Agent;
/// The maximum number of queues.
int MaxNumQueues;
/// The size of created queues.
int QueueSize;
};
/// Abstract class that holds the common members of the actual kernel devices
/// and the host device. Both types should inherit from this class.
struct AMDGenericDeviceTy {
@ -1607,9 +1727,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
OMPX_InitialNumSignals("LIBOMPTARGET_AMDGPU_NUM_INITIAL_HSA_SIGNALS",
64),
OMPX_StreamBusyWait("LIBOMPTARGET_AMDGPU_STREAM_BUSYWAIT", 2000000),
AMDGPUStreamManager(*this), AMDGPUEventManager(*this),
AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice),
Queues() {}
AMDGPUStreamManager(*this, Agent), AMDGPUEventManager(*this),
AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice) {}
~AMDGPUDeviceTy() {}
@ -1676,17 +1795,12 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
return Err;
// Compute the number of queues and their size.
const uint32_t NumQueues = std::min(OMPX_NumQueues.get(), MaxQueues);
const uint32_t QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize);
// Construct and initialize each device queue.
Queues = std::vector<AMDGPUQueueTy>(NumQueues);
for (AMDGPUQueueTy &Queue : Queues)
if (auto Err = Queue.init(Agent, QueueSize))
return Err;
OMPX_NumQueues = std::max(1U, std::min(OMPX_NumQueues.get(), MaxQueues));
OMPX_QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize);
// Initialize stream pool.
if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams))
if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams,
OMPX_NumQueues, OMPX_QueueSize))
return Err;
// Initialize event pool.
@ -1725,11 +1839,6 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
}
}
for (AMDGPUQueueTy &Queue : Queues) {
if (auto Err = Queue.deinit())
return Err;
}
// Invalidate agent reference.
Agent = {0};
@ -2416,19 +2525,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
});
}
/// Get the next queue in a round-robin fashion.
AMDGPUQueueTy &getNextQueue() {
static std::atomic<uint32_t> NextQueue(0);
uint32_t Current = NextQueue.fetch_add(1, std::memory_order_relaxed);
return Queues[Current % Queues.size()];
}
private:
using AMDGPUStreamRef = AMDGPUResourceRef<AMDGPUStreamTy>;
using AMDGPUEventRef = AMDGPUResourceRef<AMDGPUEventTy>;
using AMDGPUStreamManagerTy = GenericDeviceResourceManagerTy<AMDGPUStreamRef>;
using AMDGPUEventManagerTy = GenericDeviceResourceManagerTy<AMDGPUEventRef>;
/// Envar for controlling the number of HSA queues per device. High number of
@ -2484,9 +2582,6 @@ private:
/// Reference to the host device.
AMDHostDeviceTy &HostDevice;
/// List of device packet queues.
std::vector<AMDGPUQueueTy> Queues;
};
Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) {
@ -2558,7 +2653,7 @@ Error AMDGPUResourceRef<ResourceTy>::create(GenericDeviceTy &Device) {
}
AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device)
: Agent(Device.getAgent()), Queue(Device.getNextQueue()),
: Agent(Device.getAgent()), Queue(nullptr),
SignalManager(Device.getSignalManager()), Device(Device),
// Initialize the std::deque with some empty positions.
Slots(32), NextSlot(0), SyncCycle(0), RPCServer(nullptr),

View File

@ -396,9 +396,9 @@ GenericDeviceTy::GenericDeviceTy(int32_t DeviceId, int32_t NumDevices,
// device initialization. These cannot be consulted until the device is
// initialized correctly. We intialize them in GenericDeviceTy::init().
OMPX_TargetStackSize(), OMPX_TargetHeapSize(),
// By default, the initial number of streams and events are 32.
OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 32),
OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 32),
// By default, the initial number of streams and events is 1.
OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 1),
OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 1),
DeviceId(DeviceId), GridValues(OMPGridValues),
PeerAccesses(NumDevices, PeerAccessState::PENDING), PeerAccessesLock(),
PinnedAllocs(*this), RPCServer(nullptr) {

View File

@ -1168,7 +1168,7 @@ public:
/// Deinitialize the resource pool and delete all resources. This function
/// must be called before the destructor.
Error deinit() {
virtual Error deinit() {
if (NextAvailable)
DP("Missing %d resources to be returned\n", NextAvailable);
@ -1252,7 +1252,7 @@ protected:
return Plugin::success();
}
private:
protected:
/// The resources between \p OldSize and \p NewSize need to be created or
/// destroyed. The mutex is locked when this function is called.
Error resizeResourcePoolImpl(uint32_t OldSize, uint32_t NewSize) {