Hermes  0.9.5-beta
Hierarchical Distributed I/O Buffering System
rpc_thallium.h
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2  * Distributed under BSD 3-Clause license. *
3  * Copyright by The HDF Group. *
4  * Copyright by the Illinois Institute of Technology. *
5  * All rights reserved. *
6  * *
7  * This file is part of Hermes. The full Hermes copyright notice, including *
8  * terms governing use, modification, and redistribution, is contained in *
9  * the COPYING file, which can be found at the top directory. If you do not *
10  * have access to the file, you may request a copy from help@hdfgroup.org. *
11  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12 
13 #ifndef HERMES_RPC_THALLIUM_H_
14 #define HERMES_RPC_THALLIUM_H_
15 
16 #include <arpa/inet.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <sys/socket.h>
20 
21 #include <thallium.hpp>
22 #include <thallium/serialization/stl/pair.hpp>
23 #include <thallium/serialization/stl/string.hpp>
24 #include <thallium/serialization/stl/vector.hpp>
25 
26 #include "buffer_organizer.h"
27 
28 namespace tl = thallium;
29 
30 namespace hermes {
31 
32 const int kMaxServerNamePrefix = 32;
33 const int kMaxServerNamePostfix = 8;
34 const char kBoPrefix[] = "BO::";
36 const int kBoPrefixLength = sizeof(kBoPrefix) - 1;
37 
41 struct ThalliumState {
45  std::atomic<bool> kill_requested;
46  tl::engine *engine;
47  tl::engine *bo_engine;
48  ABT_xstream execution_stream;
49 };
50 
55  tl::engine *engine;
56 };
57 
66 template <typename A>
67 void serialize(A &ar, BufferID &buffer_id) {
68  ar &buffer_id.as_int;
69 }
70 
79 template <typename A>
80 void serialize(A &ar, BucketID &bucket_id) {
81  ar &bucket_id.as_int;
82 }
83 
92 template <typename A>
93 void serialize(A &ar, VBucketID &vbucket_id) {
94  ar &vbucket_id.as_int;
95 }
96 
105 template <typename A>
106 void serialize(A &ar, BlobID &blob_id) {
107  ar &blob_id.as_int;
108 }
109 
118 template <typename A>
119 void serialize(A &ar, TargetID &target_id) {
120  ar &target_id.as_int;
121 }
122 
124 template <typename A>
125 void serialize(A &ar, SwapBlob &swap_blob) {
126  ar &swap_blob.node_id;
127  ar &swap_blob.offset;
128  ar &swap_blob.size;
129  ar &swap_blob.bucket_id;
130 }
131 
133 template <typename A>
134 void serialize(A &ar, BufferInfo &info) {
135  ar &info.id;
136  ar &info.bandwidth_mbps;
137  ar &info.size;
138 }
139 
140 #ifndef THALLIUM_USE_CEREAL
141 
142 // NOTE(chogan): Thallium's default serialization doesn't handle enums by
143 // default so we must write serialization code for all enums when we're not
144 // using cereal.
145 
154 template <typename A>
155 void save(A &ar, MapType &map_type) {
156  int val = (int)map_type;
157  ar.write(&val, 1);
158 }
159 
168 template <typename A>
169 void load(A &ar, MapType &map_type) {
170  int val = 0;
171  ar.read(&val, 1);
172  map_type = (MapType)val;
173 }
174 
176 template <typename A>
177 void save(A &ar, BoPriority &priority) {
178  int val = (int)priority;
179  ar.write(&val, 1);
180 }
181 
183 template <typename A>
184 void load(A &ar, BoPriority &priority) {
185  int val = 0;
186  ar.read(&val, 1);
187  priority = (BoPriority)val;
188 }
189 
191 template <typename A>
192 void save(A &ar, ThresholdViolation &violation) {
193  int val = (int)violation;
194  ar.write(&val, 1);
195 }
196 
198 template <typename A>
199 void load(A &ar, ThresholdViolation &violation) {
200  int val = 0;
201  ar.read(&val, 1);
202  violation = (ThresholdViolation)val;
203 }
204 #endif // #ifndef THALLIUM_USE_CEREAL
205 
207 template <typename A>
208 void save(A &ar, BoOperation &op) {
209  int val = (int)op;
210  ar.write(&val, 1);
211 }
212 
214 template <typename A>
215 void load(A &ar, BoOperation &op) {
216  int val = 0;
217  ar.read(&val, 1);
218  op = (BoOperation)val;
219 }
220 
222 template <typename A>
223 void serialize(A &ar, BoArgs &bo_args) {
224  ar &bo_args.move_args.src;
225  ar &bo_args.move_args.dest;
226 }
227 
229 template <typename A>
230 void serialize(A &ar, BoTask &bo_task) {
231  ar &bo_task.op;
232  ar &bo_task.args;
233 }
234 
236 template <typename A>
237 void serialize(A &ar, ViolationInfo &info) {
238  ar &info.target_id;
239  ar &info.violation;
240  ar &info.violation_size;
241 }
242 
243 namespace api {
244 template <typename A>
245 #ifndef THALLIUM_USE_CEREAL
246 void save(A &ar, api::Context &ctx) {
247 #else
248 void save(A &ar, const api::Context &ctx) {
249 #endif // #ifndef THALLIUM_USE_CEREAL
250  ar.write(&ctx.buffer_organizer_retries, 1);
251  int val = (int)ctx.policy;
252  ar.write(&val, 1);
253 }
254 template <typename A>
255 void load(A &ar, api::Context &ctx) {
256  int val = 0;
257  ar.read(&ctx.buffer_organizer_retries, 1);
258  ar.read(&val, 1);
259  ctx.policy = (PlacementPolicy)val;
260 }
261 } // namespace api
262 
263 std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id,
264  int port);
267  ThalliumState *result = (ThalliumState *)rpc->state;
268 
269  return result;
270 }
271 
275 
276  return result;
277 }
278 
280 static bool IsBoFunction(const char *func_name) {
281  bool result = false;
282  int i = 0;
283 
284  while (func_name && *func_name != '\0' && i < kBoPrefixLength) {
285  if (func_name[i] != kBoPrefix[i]) {
286  break;
287  }
288  ++i;
289  }
290 
291  if (i == kBoPrefixLength) {
292  result = true;
293  }
294 
295  return result;
296 }
297 
299 template <typename ReturnType, typename... Ts>
300 ReturnType RpcCall(RpcContext *rpc, u32 node_id, const char *func_name,
301  Ts... args) {
302  VLOG(1) << "Calling " << func_name << " on node " << node_id << " from node "
303  << rpc->node_id << std::endl;
305  bool is_bo_func = IsBoFunction(func_name);
306  std::string server_name = GetServerName(rpc, node_id, is_bo_func);
307 
308  if (is_bo_func) {
309  func_name += kBoPrefixLength;
310  }
311 
312  tl::remote_procedure remote_proc = state->engine->define(func_name);
313  // TODO(chogan): @optimization We can save a little work by storing the
314  // endpoint instead of looking it up on every call
315  tl::endpoint server = state->engine->lookup(server_name);
316 
317  if constexpr (std::is_same<ReturnType, void>::value) {
318  remote_proc.disable_response();
319  remote_proc.on(server)(std::forward<Ts>(args)...);
320  } else {
321  ReturnType result = remote_proc.on(server)(std::forward<Ts>(args)...);
322 
323  return result;
324  }
325 }
326 
327 } // namespace hermes
328 
329 #endif // HERMES_RPC_THALLIUM_H_
PlacementPolicy
Definition: hermes_types.h:73
Definition: adapter_utils.cc:35
BoPriority
Definition: buffer_organizer.h:33
MapType
Definition: metadata_management.h:58
void serialize(A &ar, BufferID &buffer_id)
Definition: rpc_thallium.h:67
const int kMaxServerNamePostfix
Definition: rpc_thallium.h:33
ThresholdViolation
Definition: metadata_management.h:67
const char kBoPrefix[]
Definition: rpc_thallium.h:34
const int kMaxServerNamePrefix
Definition: rpc_thallium.h:32
void load(A &ar, MapType &map_type)
Definition: rpc_thallium.h:169
void save(A &ar, MapType &map_type)
Definition: rpc_thallium.h:155
static bool IsBoFunction(const char *func_name)
Definition: rpc_thallium.h:280
static ThalliumState * GetThalliumState(RpcContext *rpc)
Definition: rpc_thallium.h:266
std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id, int port)
Definition: rpc_thallium.cc:788
const int kBoPrefixLength
Definition: rpc_thallium.h:36
uint32_t u32
Definition: hermes_types.h:42
ReturnType RpcCall(RpcContext *rpc, u32 node_id, const char *func_name, Ts... args)
Definition: rpc_thallium.h:300
std::string GetServerName(RpcContext *rpc, u32 node_id, bool is_buffer_organizer=false)
Definition: rpc_thallium.cc:802
static ClientThalliumState * GetClientThalliumState(RpcContext *rpc)
Definition: rpc_thallium.h:273
BoOperation
Definition: buffer_organizer.h:24
Definition: buffer_organizer.h:63
BoOperation op
Definition: buffer_organizer.h:64
BoArgs args
Definition: buffer_organizer.h:65
Definition: buffer_organizer.h:71
void * state
Definition: rpc.h:36
Definition: rpc_thallium.h:54
tl::engine * engine
Definition: rpc_thallium.h:55
Definition: rpc.h:43
ClientRpcContext client_rpc
Definition: rpc.h:44
u32 node_id
Definition: rpc.h:51
void * state
Definition: rpc.h:45
Definition: buffer_pool.h:440
u64 size
Definition: buffer_pool.h:443
u32 node_id
Definition: buffer_pool.h:441
u64 offset
Definition: buffer_pool.h:442
BucketID bucket_id
Definition: buffer_pool.h:444
Definition: rpc_thallium.h:41
char bo_server_name_postfix[kMaxServerNamePostfix]
Definition: rpc_thallium.h:44
ABT_xstream execution_stream
Definition: rpc_thallium.h:48
tl::engine * engine
Definition: rpc_thallium.h:46
char server_name_postfix[kMaxServerNamePostfix]
Definition: rpc_thallium.h:43
std::atomic< bool > kill_requested
Definition: rpc_thallium.h:45
char server_name_prefix[kMaxServerNamePrefix]
Definition: rpc_thallium.h:42
tl::engine * bo_engine
Definition: rpc_thallium.h:47
Definition: metadata_management.h:72
Definition: hermes_types.h:405
u64 as_int
Definition: hermes_types.h:417
Definition: buffer_organizer.h:43
BufferID src
Definition: buffer_organizer.h:46
struct hermes::BoArgs::@0 move_args
TargetID dest
Definition: buffer_organizer.h:47
Definition: hermes_types.h:358
u64 as_int
Definition: hermes_types.h:369
Definition: buffer_pool.h:93
u64 as_int
Definition: buffer_pool.h:107
Definition: hermes_types.h:198
u64 as_int
Definition: hermes_types.h:213
Definition: hermes_types.h:388
u64 as_int
Definition: hermes_types.h:399