6
6
use Amp \ByteStream \StreamException ;
7
7
use Amp \Cancellation ;
8
8
use Amp \DeferredFuture ;
9
- use Amp \File \File ;
10
9
use Amp \File \Internal ;
11
10
use Amp \File \PendingOperationError ;
12
11
use Amp \Future ;
13
- use function Amp \async ;
14
12
15
- final class EioFile implements File
13
+ final class EioFile extends Internal \QueuedWritesFile
16
14
{
17
15
private readonly Internal \EioPoll $ poll ;
18
16
19
17
/** @var resource eio file handle. */
20
18
private $ fh ;
21
19
22
- private string $ path ;
23
-
24
- private string $ mode ;
25
-
26
- private int $ size ;
27
-
28
- private int $ position ;
29
-
30
- private \SplQueue $ queue ;
31
-
32
- private bool $ isReading = false ;
33
-
34
- private bool $ writable = true ;
35
-
36
20
private ?Future $ closing = null ;
37
21
38
22
private readonly DeferredFuture $ onClose ;
@@ -42,22 +26,14 @@ final class EioFile implements File
42
26
*/
43
27
public function __construct (Internal \EioPoll $ poll , $ fh , string $ path , string $ mode , int $ size )
44
28
{
29
+ parent ::__construct ($ path , $ mode , $ size );
30
+
45
31
$ this ->poll = $ poll ;
46
32
$ this ->fh = $ fh ;
47
- $ this ->path = $ path ;
48
- $ this ->mode = $ mode ;
49
- $ this ->size = $ size ;
50
- $ this ->position = ($ mode [0 ] === "a " ) ? $ size : 0 ;
51
33
52
- $ this ->queue = new \SplQueue ;
53
34
$ this ->onClose = new DeferredFuture ;
54
35
}
55
36
56
- public function __destruct ()
57
- {
58
- async ($ this ->close (...));
59
- }
60
-
61
37
public function read (?Cancellation $ cancellation = null , int $ length = self ::DEFAULT_READ_LENGTH ): ?string
62
38
{
63
39
if ($ this ->isReading || !$ this ->queue ->isEmpty ()) {
@@ -116,35 +92,6 @@ public function read(?Cancellation $cancellation = null, int $length = self::DEF
116
92
}
117
93
}
118
94
119
- public function write (string $ bytes ): void
120
- {
121
- if ($ this ->isReading ) {
122
- throw new PendingOperationError ;
123
- }
124
-
125
- if (!$ this ->writable ) {
126
- throw new ClosedException ("The file is no longer writable " );
127
- }
128
-
129
- if ($ this ->queue ->isEmpty ()) {
130
- $ future = $ this ->push ($ bytes , $ this ->position );
131
- } else {
132
- $ position = $ this ->position ;
133
- /** @var Future $future */
134
- $ future = $ this ->queue ->top ()->map (fn () => $ this ->push ($ bytes , $ position )->await ());
135
- }
136
-
137
- $ this ->queue ->push ($ future );
138
-
139
- $ future ->await ();
140
- }
141
-
142
- public function end (): void
143
- {
144
- $ this ->writable = false ;
145
- $ this ->close ();
146
- }
147
-
148
95
public function close (): void
149
96
{
150
97
if ($ this ->closing ) {
@@ -177,71 +124,7 @@ public function onClose(\Closure $onClose): void
177
124
$ this ->onClose ->getFuture ()->finally ($ onClose );
178
125
}
179
126
180
- public function truncate (int $ size ): void
181
- {
182
- if ($ this ->isReading ) {
183
- throw new PendingOperationError ;
184
- }
185
-
186
- if (!$ this ->writable ) {
187
- throw new ClosedException ("The file is no longer writable " );
188
- }
189
-
190
- if ($ this ->queue ->isEmpty ()) {
191
- $ future = $ this ->trim ($ size );
192
- } else {
193
- $ future = $ this ->queue ->top ()->map (fn () => $ this ->trim ($ size )->await ());
194
- }
195
-
196
- $ this ->queue ->push ($ future );
197
-
198
- $ future ->await ();
199
- }
200
-
201
- public function seek (int $ position , int $ whence = \SEEK_SET ): int
202
- {
203
- if ($ this ->isReading ) {
204
- throw new PendingOperationError ;
205
- }
206
-
207
- switch ($ whence ) {
208
- case self ::SEEK_SET :
209
- $ this ->position = $ position ;
210
- break ;
211
- case self ::SEEK_CUR :
212
- $ this ->position += $ position ;
213
- break ;
214
- case self ::SEEK_END :
215
- $ this ->position = $ this ->size + $ position ;
216
- break ;
217
- default :
218
- throw new \Error ("Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected " );
219
- }
220
-
221
- return $ this ->position ;
222
- }
223
-
224
- public function tell (): int
225
- {
226
- return $ this ->position ;
227
- }
228
-
229
- public function eof (): bool
230
- {
231
- return $ this ->queue ->isEmpty () && $ this ->size <= $ this ->position ;
232
- }
233
-
234
- public function getPath (): string
235
- {
236
- return $ this ->path ;
237
- }
238
-
239
- public function getMode (): string
240
- {
241
- return $ this ->mode ;
242
- }
243
-
244
- private function push (string $ data , int $ position ): Future
127
+ protected function push (string $ data , int $ position ): Future
245
128
{
246
129
$ length = \strlen ($ data );
247
130
@@ -292,7 +175,7 @@ private function push(string $data, int $position): Future
292
175
return $ deferred ->getFuture ();
293
176
}
294
177
295
- private function trim (int $ size ): Future
178
+ protected function trim (int $ size ): Future
296
179
{
297
180
$ deferred = new DeferredFuture ;
298
181
$ this ->poll ->listen ();
@@ -331,19 +214,4 @@ private function trim(int $size): Future
331
214
332
215
return $ deferred ->getFuture ();
333
216
}
334
-
335
- public function isReadable (): bool
336
- {
337
- return !$ this ->isClosed ();
338
- }
339
-
340
- public function isSeekable (): bool
341
- {
342
- return !$ this ->isClosed ();
343
- }
344
-
345
- public function isWritable (): bool
346
- {
347
- return $ this ->writable ;
348
- }
349
217
}
0 commit comments