Hermes  0.9.5-beta
Hierarchical Distributed I/O Buffering System
bucket.h
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2  * Distributed under BSD 3-Clause license. *
3  * Copyright by The HDF Group. *
4  * Copyright by the Illinois Institute of Technology. *
5  * All rights reserved. *
6  * *
7  * This file is part of Hermes. The full Hermes copyright notice, including *
8  * terms governing use, modification, and redistribution, is contained in *
9  * the COPYING file, which can be found at the top directory. If you do not *
10  * have access to the file, you may request a copy from help@hdfgroup.org. *
11  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12 
13 #ifndef BUCKET_H_
14 #define BUCKET_H_
15 
16 #include <glog/logging.h>
17 
18 #include <memory>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 
23 #include "dpe/round_robin.h"
24 #include "hermes.h"
25 #include "metadata_management.h"
26 #include "utils.h"
27 
30 namespace hermes {
31 
32 namespace api {
33 
37 class Bucket {
38  private:
40  std::string name_;
43 
44  public:
46  std::shared_ptr<Hermes> hermes_;
49 
50  // TODO(chogan): Think about copy/move constructor/assignment operators
51 
56  Bucket() : name_(""), id_{0, 0}, hermes_(nullptr) {
57  LOG(INFO) << "Create NULL Bucket " << std::endl;
58  }
59 
69  Bucket(const std::string &initial_name, std::shared_ptr<Hermes> const &h,
70  Context ctx = Context());
71 
77  ~Bucket();
78 
83  std::string GetName() const;
84 
91  u64 GetId() const;
92 
101  bool IsValid() const;
102 
107  size_t GetTotalBlobSize();
108 
118  template <typename T>
119  Status Put(const std::string &name, const std::vector<T> &data);
120 
127  template <typename T>
128  Status Put(const std::string &name, const std::vector<T> &data, Context &ctx);
129 
146  Status Put(const std::string &name, const u8 *data, size_t size);
147 
155  Status Put(const std::string &name, const u8 *data, size_t size,
156  const Context &ctx);
157 
168  template <typename T>
169  Status Put(const std::vector<std::string> &names,
170  const std::vector<std::vector<T>> &blobs);
171 
178  template <typename T>
179  Status Put(const std::vector<std::string> &names,
180  const std::vector<std::vector<T>> &blobs, const Context &ctx);
181 
187  size_t GetBlobSize(const std::string &name, const Context &ctx);
188 
195  size_t GetBlobSize(Arena *arena, const std::string &name, const Context &ctx);
196 
208  size_t Get(const std::string &name, Blob &user_blob);
209 
216  size_t Get(const std::string &name, Blob &user_blob, const Context &ctx);
217 
227  std::vector<size_t> Get(const std::vector<std::string> &names,
228  std::vector<Blob> &blobs, const Context &ctx);
229 
233  size_t Get(const std::string &name, void *user_blob, size_t blob_size,
234  const Context &ctx);
235 
249  size_t GetNext(u64 blob_index, Blob &user_blob);
250 
257  size_t GetNext(u64 blob_index, Blob &user_blob, const Context &ctx);
258 
266  size_t GetNext(u64 blob_index, void *user_blob, size_t blob_size,
267  const Context &ctx);
268 
269  // TODO(chogan):
274  std::vector<size_t> GetNext(u64 blob_index, u64 count,
275  std::vector<Blob> &blobs, const Context &ctx);
276 
283  template <class Predicate>
284  Status GetV(void *user_blob, Predicate pred, Context &ctx);
285 
292  Status DeleteBlob(const std::string &name);
293 
299  Status DeleteBlob(const std::string &name, const Context &ctx);
300 
310  Status RenameBlob(const std::string &old_name, const std::string &new_name);
311 
318  Status RenameBlob(const std::string &old_name, const std::string &new_name,
319  const Context &ctx);
320 
327  bool ContainsBlob(const std::string &name);
328 
335  bool BlobIsInSwap(const std::string &name);
336 
341  template <class Predicate>
342  std::vector<std::string> GetBlobNames(Predicate pred, Context &ctx);
343 
353  Status Rename(const std::string &new_name);
354 
360  Status Rename(const std::string &new_name, const Context &ctx);
361 
370  Status Persist(const std::string &file_name);
371 
376  Status Persist(const std::string &file_name, const Context &ctx);
377 
393  void OrganizeBlob(const std::string &blob_name, f32 epsilon,
394  f32 custom_importance = -1.f);
395 
404  Status Release();
405 
410  Status Release(const Context &ctx);
411 
421  Status Destroy();
422 
427  Status Destroy(const Context &ctx);
428 
429  private:
434  template <typename T>
435  Status PutInternal(const std::vector<std::string> &names,
436  const std::vector<size_t> &sizes,
437  const std::vector<std::vector<T>> &blobs,
438  const Context &ctx);
443  template <typename T>
444  Status PlaceBlobs(std::vector<PlacementSchema> &schemas,
445  const std::vector<std::vector<T>> &blobs,
446  const std::vector<std::string> &names, const Context &ctx);
447 };
448 
449 template <typename T>
450 Status Bucket::Put(const std::string &name, const std::vector<T> &data,
451  Context &ctx) {
452  Status result = Put(name, (u8 *)data.data(), data.size() * sizeof(T), ctx);
453 
454  return result;
455 }
456 
457 template <typename T>
458 Status Bucket::Put(const std::string &name, const std::vector<T> &data) {
459  Status result = Put(name, data, ctx_);
460 
461  return result;
462 }
463 
464 template <typename T>
465 Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
466  const std::vector<std::vector<T>> &blobs,
467  const std::vector<std::string> &names,
468  const Context &ctx) {
469  Status result;
470 
471  for (size_t i = 0; i < schemas.size(); ++i) {
472  PlacementSchema &schema = schemas[i];
473  hermes::Blob blob = {};
474  blob.data = (u8 *)blobs[i].data();
475  blob.size = blobs[i].size() * sizeof(T);
476  LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_
477  << "'" << std::endl;
478  result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
479  names[i], id_, ctx);
480  if (result.Failed()) {
481  // TODO(chogan): Need to return a std::vector<Status>
482  break;
483  }
484  }
485 
486  return result;
487 }
488 
489 template <typename T>
490 Status Bucket::Put(const std::vector<std::string> &names,
491  const std::vector<std::vector<T>> &blobs) {
492  Status result = Put(names, blobs, ctx_);
493 
494  return result;
495 }
496 
497 template <typename T>
498 Status Bucket::PutInternal(const std::vector<std::string> &names,
499  const std::vector<size_t> &sizes,
500  const std::vector<std::vector<T>> &blobs,
501  const Context &ctx) {
502  std::vector<PlacementSchema> schemas;
503  HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement");
504  Status result = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes,
505  schemas, ctx);
507 
508  if (result.Succeeded()) {
509  result = PlaceBlobs(schemas, blobs, names, ctx);
510  } else {
511  LOG(ERROR) << result.Msg();
512  }
513 
514  return result;
515 }
516 
517 template <typename T>
518 Status Bucket::Put(const std::vector<std::string> &names,
519  const std::vector<std::vector<T>> &blobs,
520  const Context &ctx) {
521  Status ret;
522 
523  for (auto &name : names) {
524  if (IsBlobNameTooLong(name)) {
525  ret = BLOB_NAME_TOO_LONG;
526  LOG(ERROR) << ret.Msg();
527  return ret;
528  }
529  }
530 
531  if (blobs.size() == 0) {
532  ret = INVALID_BLOB;
533  LOG(ERROR) << ret.Msg();
534  return ret;
535  }
536 
537  if (IsValid()) {
538  size_t num_blobs = blobs.size();
539  std::vector<size_t> sizes_in_bytes(num_blobs);
540  for (size_t i = 0; i < num_blobs; ++i) {
541  sizes_in_bytes[i] = blobs[i].size() * sizeof(T);
542  }
543 
544  if (ctx.rr_retry) {
545  int num_devices =
547 
548  for (int i = 0; i < num_devices; ++i) {
549  ret = PutInternal(names, sizes_in_bytes, blobs, ctx);
550 
551  if (ret.Failed()) {
552  RoundRobin rr_state;
553  int current = rr_state.GetCurrentDeviceIndex();
554  rr_state.SetCurrentDeviceIndex((current + 1) % num_devices);
555  } else {
556  break;
557  }
558  }
559  } else {
560  ret = PutInternal(names, sizes_in_bytes, blobs, ctx);
561  }
562  } else {
563  ret = INVALID_BUCKET;
564  LOG(ERROR) << ret.Msg();
565  return ret;
566  }
567 
568  return ret;
569 }
570 
571 } // namespace api
572 } // namespace hermes
573 
574 #endif // BUCKET_H_
Definition: hermes_status.h:80
bool Succeeded() const
Definition: hermes_status.h:86
bool Failed() const
Definition: hermes_status.h:94
std::string Msg() const
Definition: hermes_status.h:108
Definition: round_robin.h:25
int GetCurrentDeviceIndex() const
Definition: round_robin.cc:51
void SetCurrentDeviceIndex(int new_device_index)
Definition: round_robin.cc:55
A container for Blobs.
Definition: bucket.h:37
Status Put(const std::string &name, const std::vector< T > &data)
Put a Blob in this Bucket.
Definition: bucket.h:458
size_t GetBlobSize(const std::string &name, const Context &ctx)
Get the size in bytes of the Blob referred to by name.
Definition: bucket.cc:100
Status GetV(void *user_blob, Predicate pred, Context &ctx)
Get Blob%(s) from this Bucket according to a predicate.
Definition: bucket.cc:251
bool BlobIsInSwap(const std::string &name)
Return true if the Blob name is in swap space.
Definition: bucket.cc:312
void OrganizeBlob(const std::string &blob_name, f32 epsilon, f32 custom_importance=-1.f)
Allign blob_name's access speed to its importance.
Definition: bucket.cc:375
u64 GetId() const
Get the internal ID of the bucket.
Definition: bucket.cc:52
bool ContainsBlob(const std::string &name)
Returns true if the Bucket contains a Blob called name.
Definition: bucket.cc:305
Status Rename(const std::string &new_name)
Rename this Bucket.
Definition: bucket.cc:332
~Bucket()
Releases the Bucket, decrementing its reference count.
Definition: bucket.cc:41
size_t GetTotalBlobSize()
Return the total size in bytes of all Blobs in this Bucket.
Definition: bucket.cc:86
Status PutInternal(const std::vector< std::string > &names, const std::vector< size_t > &sizes, const std::vector< std::vector< T >> &blobs, const Context &ctx)
Internal version of Put, called by all overloads.
Definition: bucket.h:498
std::shared_ptr< Hermes > hermes_
Definition: bucket.h:46
std::string GetName() const
Get the user-facing name of the Bucket.
Definition: bucket.cc:48
std::string name_
Definition: bucket.h:40
Status Release()
Release this Bucket.
Definition: bucket.cc:381
Status Persist(const std::string &file_name)
Save this Bucket's Blobs to persistent storage.
Definition: bucket.cc:354
Status PlaceBlobs(std::vector< PlacementSchema > &schemas, const std::vector< std::vector< T >> &blobs, const std::vector< std::string > &names, const Context &ctx)
Low-level version of Put.
Definition: bucket.h:465
hermes::BucketID id_
Definition: bucket.h:42
bool IsValid() const
Return true if the Bucket is valid.
Definition: bucket.cc:56
Bucket()
Default constructor.
Definition: bucket.h:56
Context ctx_
Definition: bucket.h:48
std::vector< std::string > GetBlobNames(Predicate pred, Context &ctx)
Get a list of blob names filtered by pred.
Definition: bucket.cc:321
Status Destroy()
Destroy this Bucket.
Definition: bucket.cc:400
Status RenameBlob(const std::string &old_name, const std::string &new_name)
Rename a Blob in this Bucket.
Definition: bucket.cc:279
Status DeleteBlob(const std::string &name)
Delete a Blob from this Bucket.
Definition: bucket.cc:263
size_t Get(const std::string &name, Blob &user_blob)
Get a blob from this Bucket.
Definition: bucket.cc:133
size_t GetNext(u64 blob_index, Blob &user_blob)
Given an ordering of Blobs, retrieves the Blob at index blob_index + 1.
Definition: bucket.cc:189
std::vector< unsigned char > Blob
Definition: hermes_types.h:70
Definition: adapter_utils.cc:35
SystemViewState * GetLocalSystemViewState(MetadataManager *mdm)
Definition: metadata_management.cc:1018
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, PlacementSchema &schema, Blob blob, const std::string &name, BucketID bucket_id, const api::Context &ctx, bool called_from_buffer_organizer)
Definition: buffer_pool.cc:1898
float f32
Definition: hermes_types.h:48
uint8_t u8
Definition: hermes_types.h:40
bool IsBlobNameTooLong(const std::string &name)
Definition: metadata_management.cc:48
std::vector< std::pair< size_t, TargetID > > PlacementSchema
Definition: hermes_types.h:226
Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, const std::vector< size_t > &blob_sizes, std::vector< PlacementSchema > &output, const api::Context &api_context)
Definition: data_placement_engine.cc:111
uint64_t u64
Definition: hermes_types.h:43
Definition: memory_management.h:84
Definition: buffer_pool.h:427
u64 size
Definition: buffer_pool.h:431
u8 * data
Definition: buffer_pool.h:429
int num_devices
Definition: metadata_management.h:172
Definition: hermes_types.h:137
bool rr_retry
Definition: hermes_types.h:162
Definition: hermes_types.h:358
#define HERMES_BEGIN_TIMED_BLOCK(func_name)
Definition: utils.h:46
#define HERMES_END_TIMED_BLOCK()
Definition: utils.h:47