@@ -49,29 +49,20 @@ std::string to_string(const ReadBufferResult& read_buffer_result);
49
49
50
50
namespace org ::apache::nifi::minifi::core {
51
51
52
- // ProcessSession Class
53
52
class ProcessSessionImpl : public ReferenceContainerImpl , public virtual ProcessSession {
54
53
public:
55
- // Constructor
56
- /* !
57
- * Create a new process session
58
- */
59
54
explicit ProcessSessionImpl (std::shared_ptr<ProcessContext> processContext);
60
55
61
- // Destructor
62
56
~ProcessSessionImpl () override ;
63
-
64
- // Commit the session
65
57
void commit () override ;
66
- // Roll Back the session
67
58
void rollback () override ;
68
59
69
60
nonstd::expected<void , std::exception_ptr> rollbackNoThrow () noexcept override ;
70
- // Get Provenance Report
61
+
71
62
std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter () override {
72
63
return provenance_report_;
73
64
}
74
- // writes the created contents to the underlying repository
65
+
75
66
void flushContent () override ;
76
67
77
68
std::shared_ptr<core::FlowFile> get () override ;
@@ -80,65 +71,52 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
80
71
void add (const std::shared_ptr<core::FlowFile> &record) override ;
81
72
std::shared_ptr<core::FlowFile> clone (const core::FlowFile& parent) override ;
82
73
std::shared_ptr<core::FlowFile> clone (const core::FlowFile& parent, int64_t offset, int64_t size) override ;
83
- // Transfer the FlowFile to the relationship
74
+
84
75
void transfer (const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship) override ;
85
76
void transferToCustomRelationship (const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) override ;
86
77
87
78
void putAttribute (core::FlowFile& flow, std::string_view key, const std::string& value) override ;
88
79
void removeAttribute (core::FlowFile& flow, std::string_view key) override ;
89
80
90
81
void remove (const std::shared_ptr<core::FlowFile> &flow) override ;
91
- // Access the contents of the flow file as an input stream; returns null if the flow file has no content claim
82
+
92
83
std::shared_ptr<io::InputStream> getFlowFileContentStream (const core::FlowFile& flow_file) override ;
93
- // Execute the given read callback against the content
84
+
94
85
int64_t read (const std::shared_ptr<core::FlowFile>& flow_file, const io::InputStreamCallback& callback) override ;
95
86
96
87
int64_t read (const core::FlowFile& flow_file, const io::InputStreamCallback& callback) override ;
97
- // Read content into buffer
88
+
98
89
detail::ReadBufferResult readBuffer (const std::shared_ptr<core::FlowFile>& flow) override ;
99
- // Execute the given write callback against the content
90
+
100
91
void write (const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) override ;
101
92
102
93
void write (core::FlowFile& flow, const io::OutputStreamCallback& callback) override ;
103
94
// Read and write the flow file at the same time (eg. for processing it line by line)
104
95
int64_t readWrite (const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) override ;
105
- // Replace content with buffer
96
+
106
97
void writeBuffer (const std::shared_ptr<core::FlowFile>& flow_file, std::span<const char > buffer) override ;
107
98
void writeBuffer (const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) override ;
108
- // Execute the given write/append callback against the content
99
+
109
100
void append (const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) override ;
110
- // Append buffer to content
101
+
111
102
void appendBuffer (const std::shared_ptr<core::FlowFile>& flow, std::span<const char > buffer) override ;
112
103
void appendBuffer (const std::shared_ptr<core::FlowFile>& flow, std::span<const std::byte> buffer) override ;
113
- // Penalize the flow
104
+
114
105
void penalize (const std::shared_ptr<core::FlowFile> &flow) override ;
115
106
116
107
bool outgoingConnectionsFull (const std::string& relationship) override ;
117
108
118
- /* *
119
- * Imports a file from the data stream
120
- * @param stream incoming data stream that contains the data to store into a file
121
- * @param flow flow file
122
- */
123
109
void importFrom (io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) override ;
124
110
void importFrom (io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) override ;
125
111
126
- // import from the data source.
127
112
void import (const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true , uint64_t offset = 0 ) override ;
128
113
129
- /* *
130
- * Exports the data stream to a file
131
- * @param string file to export stream to
132
- * @param flow flow file
133
- * @param bool whether or not to keep the content in the flow file
134
- */
135
114
bool exportContent (const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override ;
136
115
137
116
bool exportContent (const std::string &destination, const std::string &tmpFileName, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override ;
138
117
139
- // Stash the content to a key
140
118
void stash (const std::string &key, const std::shared_ptr<core::FlowFile> &flow) override ;
141
- // Restore content previously stashed to a key
119
+
142
120
void restore (const std::string &key, const std::shared_ptr<core::FlowFile> &flow) override ;
143
121
144
122
bool existsFlowFileInRelationship (const Relationship &relationship) override ;
@@ -149,8 +127,6 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
149
127
150
128
bool hasBeenTransferred (const core::FlowFile &flow) const override ;
151
129
152
- // Prevent default copy constructor and assignment operation
153
- // Only support pass by reference or pointer
154
130
ProcessSessionImpl (const ProcessSessionImpl &parent) = delete ;
155
131
ProcessSessionImpl &operator =(const ProcessSessionImpl &parent) = delete ;
156
132
@@ -202,17 +178,11 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
202
178
void ensureNonNullResourceClaim (
203
179
const std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);
204
180
205
- // Clone the flow file during transfer to multiple connections for a relationship
206
181
std::shared_ptr<core::FlowFile> cloneDuringTransfer (const core::FlowFile& parent);
207
- // ProcessContext
208
182
std::shared_ptr<ProcessContext> process_context_;
209
- // Logger
210
183
std::shared_ptr<logging::Logger> logger_;
211
- // Provenance Report
212
184
std::shared_ptr<provenance::ProvenanceReporter> provenance_report_;
213
-
214
185
std::shared_ptr<ContentSession> content_session_;
215
-
216
186
StateManager* stateManager_;
217
187
218
188
static std::shared_ptr<utils::IdGenerator> id_generator_;
0 commit comments