@@ -751,9 +751,8 @@ def __init__(
751
751
self ,
752
752
file_object : FakeFile ,
753
753
file_path : AnyStr ,
754
- update : bool ,
755
- read : bool ,
756
- append : bool ,
754
+ open_modes : _OpenModes ,
755
+ allow_update : bool ,
757
756
delete_on_close : bool ,
758
757
filesystem : "FakeFilesystem" ,
759
758
newline : Optional [str ],
@@ -768,9 +767,8 @@ def __init__(
768
767
):
769
768
self .file_object = file_object
770
769
self .file_path = file_path # type: ignore[var-annotated]
771
- self ._append = append
772
- self ._read = read
773
- self .allow_update = update
770
+ self .open_modes = open_modes
771
+ self .allow_update = allow_update
774
772
self ._closefd = closefd
775
773
self ._file_epoch = file_object .epoch
776
774
self .raw_io = raw_io
@@ -801,8 +799,8 @@ def __init__(
801
799
self ._flush_pos = 0
802
800
if contents :
803
801
self ._flush_pos = len (contents )
804
- if update :
805
- if not append :
802
+ if self . allow_update :
803
+ if not self . open_modes . append :
806
804
self ._io .seek (0 )
807
805
else :
808
806
self ._io .seek (self ._flush_pos )
@@ -890,6 +888,16 @@ def closed(self) -> bool:
890
888
"""Simulate the `closed` attribute on file."""
891
889
return not self ._is_open ()
892
890
891
+ @property
892
+ def mode (self ) -> str :
893
+ if self .open_modes .append :
894
+ return "a+" if self .open_modes .can_read else "a"
895
+ if self .open_modes .truncate :
896
+ return "w+" if self .open_modes .can_read else "w"
897
+ if self .open_modes .must_not_exist :
898
+ return "x+" if self .open_modes .can_read else "x"
899
+ return "r+" if self .open_modes .can_write else "r"
900
+
893
901
def _try_flush (self , old_pos : int ) -> None :
894
902
"""Try to flush and reset the position if it fails."""
895
903
flush_pos = self ._flush_pos
@@ -910,7 +918,7 @@ def flush(self) -> None:
910
918
self ._check_open_file ()
911
919
912
920
if self .allow_update :
913
- if self ._append :
921
+ if self .open_modes . append :
914
922
contents = self ._io .getvalue ()
915
923
self ._sync_io ()
916
924
old_contents = self .file_object .byte_contents
@@ -951,14 +959,14 @@ def _flush_related_files(self) -> None:
951
959
open_file is not self
952
960
and isinstance (open_file , FakeFileWrapper )
953
961
and self .file_object == open_file .file_object
954
- and not open_file ._append
962
+ and not open_file .open_modes . append
955
963
):
956
964
open_file ._sync_io ()
957
965
958
966
def seek (self , offset : int , whence : int = 0 ) -> None :
959
967
"""Move read/write pointer in 'file'."""
960
968
self ._check_open_file ()
961
- if not self ._append :
969
+ if not self .open_modes . append :
962
970
self ._io .seek (offset , whence )
963
971
else :
964
972
self ._read_seek = offset
@@ -976,7 +984,7 @@ def tell(self) -> int:
976
984
if not self .is_stream :
977
985
self .flush ()
978
986
979
- if not self ._append :
987
+ if not self .open_modes . append :
980
988
return self ._io .tell ()
981
989
if self ._read_whence :
982
990
write_seek = self ._io .tell ()
@@ -1001,7 +1009,7 @@ def _set_stream_contents(self, contents: bytes) -> None:
1001
1009
self ._io .seek (0 )
1002
1010
self ._io .truncate ()
1003
1011
self ._io .putvalue (contents )
1004
- if not self ._append :
1012
+ if not self .open_modes . append :
1005
1013
self ._io .seek (whence )
1006
1014
1007
1015
def _read_wrappers (self , name : str ) -> Callable :
@@ -1117,7 +1125,7 @@ def write_wrapper(*args, **kwargs):
1117
1125
self ._try_flush (old_pos )
1118
1126
if not flush_all :
1119
1127
ret_value = io_attr (* args , ** kwargs )
1120
- if self ._append :
1128
+ if self .open_modes . append :
1121
1129
self ._read_seek = self ._io .tell ()
1122
1130
self ._read_whence = 0
1123
1131
return ret_value
@@ -1132,7 +1140,7 @@ def _adapt_size_for_related_files(self, size: int) -> None:
1132
1140
open_file is not self
1133
1141
and isinstance (open_file , FakeFileWrapper )
1134
1142
and self .file_object == open_file .file_object
1135
- and cast (FakeFileWrapper , open_file )._append
1143
+ and cast (FakeFileWrapper , open_file ).open_modes . append
1136
1144
):
1137
1145
open_file ._read_seek += size
1138
1146
@@ -1146,7 +1154,7 @@ def _truncate_wrapper(self) -> Callable:
1146
1154
1147
1155
def truncate_wrapper (* args , ** kwargs ):
1148
1156
"""Wrap truncate call to call flush after truncate."""
1149
- if self ._append :
1157
+ if self .open_modes . append :
1150
1158
self ._io .seek (self ._read_seek , self ._read_whence )
1151
1159
size = io_attr (* args , ** kwargs )
1152
1160
self .flush ()
@@ -1179,7 +1187,7 @@ def __getattr__(self, name: str) -> Any:
1179
1187
1180
1188
if reading or writing :
1181
1189
self ._check_open_file ()
1182
- if not self ._read and reading :
1190
+ if not self .open_modes . can_read and reading :
1183
1191
return self ._read_error ()
1184
1192
if not self .opened_as_fd and not self .allow_update and writing :
1185
1193
return self ._write_error ()
@@ -1192,7 +1200,7 @@ def __getattr__(self, name: str) -> Any:
1192
1200
self .file_object .st_atime = helpers .now ()
1193
1201
if truncate :
1194
1202
return self ._truncate_wrapper ()
1195
- if self ._append :
1203
+ if self .open_modes . append :
1196
1204
if reading :
1197
1205
return self ._read_wrappers (name )
1198
1206
elif not writing :
@@ -1234,12 +1242,12 @@ def _check_open_file(self) -> None:
1234
1242
raise ValueError ("I/O operation on closed file" )
1235
1243
1236
1244
def __iter__ (self ) -> Union [Iterator [str ], Iterator [bytes ]]:
1237
- if not self ._read :
1245
+ if not self .open_modes . can_read :
1238
1246
self ._raise ("File is not open for reading" )
1239
1247
return self ._io .__iter__ ()
1240
1248
1241
1249
def __next__ (self ):
1242
- if not self ._read :
1250
+ if not self .open_modes . can_read :
1243
1251
self ._raise ("File is not open for reading" )
1244
1252
return next (self ._io )
1245
1253
0 commit comments