Hermes  0.9.5-beta
Hierarchical Distributed I/O Buffering System
buffer_pool.h
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2  * Distributed under BSD 3-Clause license. *
3  * Copyright by The HDF Group. *
4  * Copyright by the Illinois Institute of Technology. *
5  * All rights reserved. *
6  * *
7  * This file is part of Hermes. The full Hermes copyright notice, including *
8  * terms governing use, modification, and redistribution, is contained in *
9  * the COPYING file, which can be found at the top directory. If you do not *
10  * have access to the file, you may request a copy from help@hdfgroup.org. *
11  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12 
13 #ifndef HERMES_BUFFER_POOL_H_
14 #define HERMES_BUFFER_POOL_H_
15 
16 #include <assert.h>
17 #include <stddef.h>
18 #include <stdio.h>
19 
20 #include <atomic>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 #include "communication.h"
26 #include "hermes_status.h"
27 #include "hermes_types.h"
28 #include "memory_management.h"
29 
38 namespace hermes {
39 
40 struct RpcContext;
41 
49 struct Device {
67  bool is_shared;
70 };
71 
75 struct Target {
79  std::atomic<u64> remaining_space;
80  std::atomic<u64> speed;
83 };
84 
93 union BufferID {
95  struct {
104  } bits;
105 
108 };
109 
113 struct BufferIdHash {
115  size_t operator()(const BufferID &id) const {
116  return std::hash<u64>()(id.as_int);
117  }
118 };
119 
120 bool operator==(const BufferID &lhs, const BufferID &rhs);
121 
126  ptrdiff_t mdm_offset;
127  ptrdiff_t bpm_offset;
128 };
129 
139 struct BufferHeader {
147  ptrdiff_t data_offset;
159  bool in_use;
163  std::atomic<bool> locked;
164 };
165 
188 struct BufferPool {
192  ptrdiff_t headers_offset;
195  ptrdiff_t devices_offset;
197  ptrdiff_t targets_offset;
225 
228 
245 };
246 
268 struct BufferOrganizer;
269 
284 
285  // File buffering context
287  std::vector<std::vector<std::string>> buffering_filenames;
291  FILE *swap_file;
292 };
293 
294 struct BufferIdArray;
295 
299 size_t GetBlobSize(SharedMemoryContext *context, RpcContext *rpc,
300  BufferIdArray *buffer_ids);
301 
305 size_t GetBlobSizeById(SharedMemoryContext *context, RpcContext *rpc,
306  Arena *arena, BlobID blob_id);
307 
309  BlobID blob_id);
320 void MakeFullShmemName(char *dest, const char *base);
321 
335  CommunicationContext &comm);
336 
350 
362 
367 
382 std::vector<BufferID> GetBuffers(SharedMemoryContext *context,
383  const PlacementSchema &schema);
392 void ReleaseBuffers(SharedMemoryContext *context, RpcContext *rpc,
393  const std::vector<BufferID> &buffer_ids);
404 void StartBufferPoolRpcServer(SharedMemoryContext *context, const char *addr,
405  i32 num_rpc_threads);
406 
419  RpcContext *rpc, const char *shmem_name, Arena *trans_arena,
420  bool is_application_core, bool force_rpc_shutdown);
421 
422 // I/O Clients
423 
427 struct Blob {
432 };
433 
440 struct SwapBlob {
445 };
446 
452  SwapBlobMembers_NodeId,
453  SwapBlobMembers_Offset,
454  SwapBlobMembers_Size,
455  SwapBlobMembers_BucketId,
456 
457  SwapBlobMembers_Count
458 };
459 
472 void WriteBlobToBuffers(SharedMemoryContext *context, RpcContext *rpc,
473  const Blob &blob,
474  const std::vector<BufferID> &buffer_ids);
475 
492 size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc,
493  Blob *blob, BufferIdArray *buffer_ids,
494  u32 *buffer_sizes);
495 
496 size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena,
497  Blob blob, BlobID blob_id);
498 
499 size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena,
500  api::Blob &dest, BlobID blob_id);
501 
502 size_t LocalWriteBufferById(SharedMemoryContext *context, BufferID id,
503  const Blob &blob, size_t offset);
504 size_t LocalReadBufferById(SharedMemoryContext *context, BufferID id,
505  Blob *blob, size_t offset);
506 
507 SwapBlob PutToSwap(SharedMemoryContext *context, RpcContext *rpc,
508  const std::string &name, BucketID bucket_id, const u8 *data,
509  size_t size);
513 template <typename T>
514 std::vector<SwapBlob> PutToSwap(SharedMemoryContext *context, RpcContext *rpc,
515  BucketID id, std::vector<std::vector<T>> &blobs,
516  std::vector<std::string> &names) {
517  size_t num_blobs = blobs.size();
518  std::vector<SwapBlob> result(num_blobs);
519 
520  for (size_t i = 0; i < num_blobs; ++i) {
521  SwapBlob swap_blob =
522  PutToSwap(context, rpc, names[i], id, (const u8 *)blobs[i].data(),
523  blobs[i].size() * sizeof(T));
524  result.push_back(swap_blob);
525  }
526 
527  return result;
528 }
529 
534  BucketID bucket_id);
535 size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
536  SwapBlob swap_blob);
537 
550 std::vector<f32> GetBandwidths(SharedMemoryContext *context,
551  const std::vector<TargetID> &targets);
552 
557  SwapBlob swap_blob, const std::string &blob_name,
558  const api::Context &ctx);
560  PlacementSchema &schema, Blob blob,
561  const std::string &name, BucketID bucket_id,
562  const api::Context &ctx,
563  bool called_from_buffer_organizer = false);
565  Arena *arena, BucketID bucket_id,
566  const std::string &file_name,
567  const std::string &open_mode);
568 
570  Arena *arena, BlobID blob_id, int fd,
571  const i32 &offset);
572 
574 } // namespace hermes
575 
576 #endif // HERMES_BUFFER_POOL_H_
Definition: hermes_status.h:80
Definition: adapter_utils.cc:35
std::vector< f32 > GetBandwidths(SharedMemoryContext *context, const std::vector< TargetID > &targets)
Definition: buffer_pool.cc:173
size_t LocalReadBufferById(SharedMemoryContext *context, BufferID id, Blob *blob, size_t read_offset)
Definition: buffer_pool.cc:1696
constexpr int kMaxDevices
Definition: hermes_types.h:184
bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id)
Definition: buffer_pool.cc:423
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, PlacementSchema &schema, Blob blob, const std::string &name, BucketID bucket_id, const api::Context &ctx, bool called_from_buffer_organizer)
Definition: buffer_pool.cc:1898
static constexpr int kMaxBufferPoolSlabs
Definition: hermes_types.h:180
SwapBlobMembers
Definition: buffer_pool.h:451
constexpr int kMaxPathLength
Definition: hermes_types.h:181
float f32
Definition: hermes_types.h:48
size_t LocalWriteBufferById(SharedMemoryContext *context, BufferID id, const Blob &blob, size_t offset)
Definition: buffer_pool.cc:1624
size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, Blob blob, BlobID blob_id)
Definition: buffer_pool.cc:1788
Device * GetDeviceFromHeader(SharedMemoryContext *context, BufferHeader *header)
Definition: buffer_pool.cc:148
void ReleaseBuffers(SharedMemoryContext *context, RpcContext *rpc, const std::vector< BufferID > &buffer_ids)
Definition: buffer_pool.cc:616
SwapBlob PutToSwap(SharedMemoryContext *context, RpcContext *rpc, const std::string &name, BucketID bucket_id, const u8 *data, size_t size)
Definition: buffer_pool.cc:1864
SharedMemoryContext GetSharedMemoryContext(char *shmem_name)
Definition: buffer_pool.cc:1553
uint8_t u8
Definition: hermes_types.h:40
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, BucketID bucket_id, const std::string &file_name, const std::string &open_mode)
Definition: buffer_pool.cc:1954
void StartBufferPoolRpcServer(SharedMemoryContext *context, const char *addr, i32 num_rpc_threads)
void ReleaseSharedMemoryContext(SharedMemoryContext *context)
Definition: buffer_pool.cc:1616
void UnmapSharedMemory(SharedMemoryContext *context)
Definition: buffer_pool.cc:1590
u16 DeviceID
Definition: hermes_types.h:51
f32 GetBlobImportanceScore(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id)
Definition: metadata_storage_stb_ds.cc:1096
size_t ReadFromSwap(SharedMemoryContext *context, Blob blob, SwapBlob swap_blob)
Definition: buffer_pool.cc:1881
void MakeFullShmemName(char *dest, const char *base)
Definition: buffer_pool.cc:1396
api::Status StdIoPersistBlob(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, BlobID blob_id, int fd, const i32 &offset)
Definition: buffer_pool.cc:2003
std::vector< std::pair< size_t, TargetID > > PlacementSchema
Definition: hermes_types.h:226
Status PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc, SwapBlob swap_blob, const std::string &name, const api::Context &ctx)
Definition: buffer_organizer.cc:750
static u32 GetNumBuffersAvailable(SharedMemoryContext *context, DeviceID device_id, int slab_index)
Definition: buffer_pool.cc:479
bool operator==(const BufferInfo &lhs, const BufferInfo &rhs)
Definition: buffer_organizer.cc:27
size_t GetBlobSize(SharedMemoryContext *context, RpcContext *rpc, BufferIdArray *buffer_ids)
Definition: buffer_pool.cc:746
void Finalize(SharedMemoryContext *context, CommunicationContext *comm, RpcContext *rpc, const char *shmem_name, Arena *trans_arena, bool is_application_core, bool force_rpc_shutdown)
Definition: buffer_pool.cc:89
uint32_t u32
Definition: hermes_types.h:42
void InitFilesForBuffering(SharedMemoryContext *context, CommunicationContext &comm)
Definition: buffer_pool.cc:1442
int32_t i32
Definition: hermes_types.h:46
size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc, Blob *blob, BufferIdArray *buffer_ids, u32 *buffer_sizes)
Definition: buffer_pool.cc:1743
void WriteBlobToBuffers(SharedMemoryContext *context, RpcContext *rpc, const Blob &blob, const std::vector< BufferID > &buffer_ids)
Definition: buffer_pool.cc:1666
std::vector< BufferID > GetBuffers(SharedMemoryContext *context, const PlacementSchema &schema)
Definition: buffer_pool.cc:660
uint64_t u64
Definition: hermes_types.h:43
SwapBlob WriteToSwap(SharedMemoryContext *context, Blob blob, u32 node_id, BucketID bucket_id)
Definition: buffer_pool.cc:1833
size_t GetBlobSizeById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, BlobID blob_id)
Definition: buffer_pool.cc:762
u32 GetBufferSize(SharedMemoryContext *context, RpcContext *rpc, BufferID id)
Definition: buffer_pool.cc:731
Definition: memory_management.h:84
Definition: buffer_pool.h:427
u64 size
Definition: buffer_pool.h:431
u8 * data
Definition: buffer_pool.h:429
Definition: buffer_pool.h:139
u32 capacity
Definition: buffer_pool.h:151
std::atomic< bool > locked
Definition: buffer_pool.h:163
bool in_use
Definition: buffer_pool.h:159
DeviceID device_id
Definition: buffer_pool.h:155
ptrdiff_t data_offset
Definition: buffer_pool.h:147
u32 used
Definition: buffer_pool.h:149
BufferID id
Definition: buffer_pool.h:141
BufferID next_free
Definition: buffer_pool.h:143
Definition: metadata_management.h:98
Definition: buffer_pool.h:113
size_t operator()(const BufferID &id) const
Definition: buffer_pool.h:115
Definition: buffer_organizer.h:83
Definition: buffer_pool.h:188
ptrdiff_t headers_offset
Definition: buffer_pool.h:192
ptrdiff_t slab_unit_sizes_offsets[kMaxDevices]
Definition: buffer_pool.h:208
ptrdiff_t free_list_offsets[kMaxDevices]
Definition: buffer_pool.h:202
i32 num_devices
Definition: buffer_pool.h:236
ptrdiff_t slab_buffer_sizes_offsets[kMaxDevices]
Definition: buffer_pool.h:215
f32 max_device_bw_mbps
Definition: buffer_pool.h:244
i32 num_targets
Definition: buffer_pool.h:238
ptrdiff_t devices_offset
Definition: buffer_pool.h:195
TicketMutex ticket_mutex
Definition: buffer_pool.h:224
u32 total_headers
Definition: buffer_pool.h:240
u32 num_headers[kMaxDevices]
Definition: buffer_pool.h:234
ptrdiff_t targets_offset
Definition: buffer_pool.h:197
ptrdiff_t buffers_available_offsets[kMaxDevices]
Definition: buffer_pool.h:220
i32 num_slabs[kMaxDevices]
Definition: buffer_pool.h:232
i32 block_sizes[kMaxDevices]
Definition: buffer_pool.h:230
std::atomic< i64 > capacity_adjustments[kMaxDevices]
Definition: buffer_pool.h:227
f32 min_device_bw_mbps
Definition: buffer_pool.h:242
Definition: hermes_types.h:56
Definition: communication.h:35
Definition: buffer_pool.h:49
bool is_shared
Definition: buffer_pool.h:67
f32 latency_ns
Definition: buffer_pool.h:53
bool is_byte_addressable
Definition: buffer_pool.h:61
f32 bandwidth_mbps
Definition: buffer_pool.h:51
bool has_fallocate
Definition: buffer_pool.h:65
char mount_point[kMaxPathLength]
Definition: buffer_pool.h:69
DeviceID id
Definition: buffer_pool.h:57
Definition: rpc.h:43
Definition: buffer_pool.h:273
int open_files[kMaxDevices][kMaxBufferPoolSlabs]
Definition: buffer_pool.h:289
BufferOrganizer * bo
Definition: buffer_pool.h:283
ptrdiff_t buffer_pool_offset
Definition: buffer_pool.h:277
u8 * shm_base
Definition: buffer_pool.h:275
ptrdiff_t metadata_manager_offset
Definition: buffer_pool.h:279
FILE * swap_file
Definition: buffer_pool.h:291
std::vector< std::vector< std::string > > buffering_filenames
Definition: buffer_pool.h:287
u64 shm_size
Definition: buffer_pool.h:281
Definition: buffer_pool.h:125
ptrdiff_t bpm_offset
Definition: buffer_pool.h:127
ptrdiff_t mdm_offset
Definition: buffer_pool.h:126
Definition: buffer_pool.h:440
u64 size
Definition: buffer_pool.h:443
u32 node_id
Definition: buffer_pool.h:441
u64 offset
Definition: buffer_pool.h:442
BucketID bucket_id
Definition: buffer_pool.h:444
Definition: buffer_pool.h:75
ChunkedIdList effective_blobs
Definition: buffer_pool.h:81
TargetID id
Definition: buffer_pool.h:76
std::atomic< u64 > speed
Definition: buffer_pool.h:80
std::atomic< u64 > remaining_space
Definition: buffer_pool.h:79
u64 capacity
Definition: buffer_pool.h:78
TicketMutex effective_blobs_lock
Definition: buffer_pool.h:82
Definition: memory_management.h:36
Definition: hermes_types.h:137
Definition: hermes_types.h:405
Definition: hermes_types.h:358
Definition: buffer_pool.h:93
u64 as_int
Definition: buffer_pool.h:107
u32 header_index
Definition: buffer_pool.h:99
u32 node_id
Definition: buffer_pool.h:103
struct hermes::BufferID::@3 bits
Definition: hermes_types.h:198