xrootd
XrdTpcStream.hh
Go to the documentation of this file.
1 
10 #include <memory>
11 #include <vector>
12 #include <string>
13 
14 #include <cstring>
15 
16 struct stat;
17 
18 class XrdSfsFile;
19 class XrdSysError;
20 
21 namespace TPC {
22 class Stream {
23 public:
24  Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
25  : m_open_for_write(false),
26  m_avail_count(max_blocks),
27  m_fh(std::move(fh)),
28  m_offset(0),
29  m_log(log)
30  {
31  m_buffers.reserve(max_blocks);
32  for (size_t idx=0; idx < max_blocks; idx++) {
33  m_buffers.push_back(new Entry(buffer_size));
34  }
35  m_open_for_write = true;
36  }
37 
38  ~Stream();
39 
40  int Stat(struct stat *);
41 
42  int Read(off_t offset, char *buffer, size_t size);
43 
44  int Write(off_t offset, const char *buffer, size_t size);
45 
46  size_t AvailableBuffers() const {return m_avail_count;}
47 
48  void DumpBuffers() const;
49 
50  // Flush and finalize the stream. If all data has been sent to the underlying
51  // file handle, close() will be invoked on the file handle.
52  //
53  // Further write operations on this stream will result in an error.
54  // If any memory buffers remain, an error occurs.
55  //
56  // Returns true on success; false otherwise.
57  bool Finalize();
58 
59  std::string GetErrorMessage() const {return m_error_buf;}
60 
61 private:
62 
63  class Entry {
64  public:
65  Entry(size_t capacity) :
66  m_offset(-1),
67  m_capacity(capacity),
68  m_size(0)
69  {}
70 
71  bool Available() const {return m_offset == -1;}
72 
73  int Write(Stream &stream) {
74  if (Available() || !CanWrite(stream)) {return 0;}
75  // Currently, only full writes are accepted.
76  int size_desired = m_size;
77  int retval = stream.Write(m_offset, &m_buffer[0], size_desired);
78  m_size = 0;
79  m_offset = -1;
80  if (retval != size_desired) {
81  return -1;
82  }
83  return retval;
84  }
85 
86  bool Accept(off_t offset, const char *buf, size_t size) {
87  // Validate acceptance criteria.
88  if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
89  return false;
90  }
91  if (size > m_capacity - m_size) {
92  return false;
93  }
94 
95  // Inflate the underlying buffer if needed.
96  ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
97  if (new_bytes_needed > 0) {
98  m_buffer.reserve(m_capacity);
99  }
100 
101  // Finally, do the copy.
102  memcpy(&m_buffer[0] + m_size, buf, size);
103  m_size += size;
104  if (m_offset == -1) {
105  m_offset = offset;
106  }
107  return true;
108  }
109 
110  void ShrinkIfUnused() {
111  if (!Available()) {return;}
112 #if __cplusplus > 199711L
113  m_buffer.shrink_to_fit();
114 #endif
115  }
116 
117  void Move(Entry &other) {
118  m_buffer.swap(other.m_buffer);
119  m_offset = other.m_offset;
120  m_size = other.m_size;
121  }
122 
123  off_t GetOffset() const {return m_offset;}
124  size_t GetCapacity() const {return m_capacity;}
125  size_t GetSize() const {return m_size;}
126 
127  private:
128 
129  Entry(const Entry&) = delete;
130 
131  bool CanWrite(Stream &stream) const {
132  return (m_size > 0) && (m_offset == stream.m_offset);
133  }
134 
135  off_t m_offset; // Offset within file that m_buffer[0] represents.
136  size_t m_capacity;
137  size_t m_size; // Number of bytes held in buffer.
138  std::vector<char> m_buffer;
139  };
140 
143  std::unique_ptr<XrdSfsFile> m_fh;
144  off_t m_offset;
145  std::vector<Entry*> m_buffers;
147  std::string m_error_buf;
148 };
149 }
Definition: XrdTpcStream.hh:22
bool Available() const
Definition: XrdTpcStream.hh:71
XrdSysError & m_log
Definition: XrdTpcStream.hh:146
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:46
size_t m_capacity
Definition: XrdTpcStream.hh:136
size_t m_avail_count
Definition: XrdTpcStream.hh:142
bool CanWrite(Stream &stream) const
Definition: XrdTpcStream.hh:131
void DumpBuffers() const
int Stat(struct stat *)
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition: XrdTpcStream.hh:24
off_t m_offset
Definition: XrdTpcStream.hh:135
Definition: XrdSysError.hh:89
std::string GetErrorMessage() const
Definition: XrdTpcStream.hh:59
int Read(off_t offset, char *buffer, size_t size)
size_t GetCapacity() const
Definition: XrdTpcStream.hh:124
void ShrinkIfUnused()
Definition: XrdTpcStream.hh:110
off_t m_offset
Definition: XrdTpcStream.hh:144
int Write(Stream &stream)
Definition: XrdTpcStream.hh:73
bool Finalize()
Definition: XrdTpcStream.hh:63
size_t GetSize() const
Definition: XrdTpcStream.hh:125
int Write(off_t offset, const char *buffer, size_t size)
bool Accept(off_t offset, const char *buf, size_t size)
Definition: XrdTpcStream.hh:86
Entry(size_t capacity)
Definition: XrdTpcStream.hh:65
Definition: XrdTpcState.hh:16
std::vector< Entry * > m_buffers
Definition: XrdTpcStream.hh:145
off_t GetOffset() const
Definition: XrdTpcStream.hh:123
std::unique_ptr< XrdSfsFile > m_fh
Definition: XrdTpcStream.hh:143
#define stat(a, b)
Definition: XrdPosix.hh:96
std::string m_error_buf
Definition: XrdTpcStream.hh:147
bool m_open_for_write
Definition: XrdTpcStream.hh:141
size_t m_size
Definition: XrdTpcStream.hh:137
void Move(Entry &other)
Definition: XrdTpcStream.hh:117
std::vector< char > m_buffer
Definition: XrdTpcStream.hh:138