@@ -59,15 +59,13 @@ defmodule Keila.Contacts.Import do
59
59
lines = read_file_line_count! ( filename )
60
60
send ( notify_pid , { :contacts_import_progress , 0 , lines } )
61
61
62
- insert_opts = insert_opts ( on_conflict )
63
-
64
62
File . stream! ( filename )
65
63
|> parser . parse_stream ( )
66
64
|> Stream . map ( row_function )
67
65
|> Stream . reject ( & is_nil / 1 )
68
66
|> Stream . with_index ( )
69
67
|> Stream . map ( fn { changeset , n } ->
70
- case Repo . insert ( changeset , insert_opts ) do
68
+ case insert ( changeset , n , project_id , on_conflict ) do
71
69
{ :ok , % { id: id } } ->
72
70
Keila.Tracking . log_event ( "import" , id , % { } )
73
71
n
@@ -107,6 +105,7 @@ defmodule Keila.Contacts.Import do
107
105
columns =
108
106
[
109
107
email: find_header_column ( headers , ~r{ email.?(address)?} i ) ,
108
+ external_id: find_header_column ( headers , ~r{ external.?id} i ) ,
110
109
first_name: find_header_column ( headers , ~r{ first.?name} i ) ,
111
110
last_name: find_header_column ( headers , ~r{ last.?name} i ) ,
112
111
data: find_header_column ( headers , ~r{ data} i ) ,
@@ -162,20 +161,65 @@ defmodule Keila.Contacts.Import do
162
161
|> then ( fn lines -> max ( lines - 1 , 0 ) end )
163
162
end
164
163
165
- defp insert_opts ( on_conflict ) do
166
- conflict_opts =
167
- case on_conflict do
168
- :replace -> [ on_conflict: { :replace_all_except , [ :id , :email , :project_id ] } ]
169
- :ignore -> [ on_conflict: :nothing ]
170
- end
164
+ defp insert ( changeset , _n , _project_id , :ignore ) do
165
+ Repo . insert ( changeset , on_conflict: :nothing )
166
+ end
167
+
168
+ defp insert ( changeset , n , project_id , :replace ) do
169
+ external_id = get_change ( changeset , :external_id )
171
170
172
- [ returning: false , conflict_target: [ :email , :project_id ] ] ++ conflict_opts
171
+ if not is_nil ( external_id ) do
172
+ maybe_pre_set_external_id ( changeset , project_id , external_id )
173
+ end
174
+
175
+ insert_opts = replace_insert_opts ( changeset , external_id )
176
+ Repo . insert ( changeset , insert_opts )
177
+ rescue
178
+ e in Postgrex.Error ->
179
+ raise_import_error! ( changeset , e , n + 1 )
173
180
end
174
181
175
- defp raise_import_error! ( changeset , line ) do
182
+ @ replace_fields [ :email , :external_id , :first_name , :last_name , :data , :updated_at , :status ]
183
+ defp replace_insert_opts ( changeset , external_id ) do
184
+ external_id? = not is_nil ( external_id )
185
+
186
+ replace_fields =
187
+ @ replace_fields
188
+ |> Enum . filter ( & ( not is_nil ( get_change ( changeset , & 1 ) ) ) )
189
+
190
+ conflict_target =
191
+ if external_id? , do: [ :external_id , :project_id ] , else: [ :email , :project_id ]
192
+
193
+ [
194
+ conflict_target: conflict_target ,
195
+ on_conflict: { :replace , replace_fields } ,
196
+ returning: false
197
+ ]
198
+ end
199
+
200
+ # This is necessary because Postgres doesn't allow using both email and
201
+ # external_id as conflict targets. Because of this and to allow updating
202
+ # existing Contacts that don't have an external ID yet, this function
203
+ # sets the external ID for such contacts before they are updated.
204
+ defp maybe_pre_set_external_id ( changeset , project_id , external_id ) do
205
+ email = get_change ( changeset , :email )
206
+
207
+ if not is_nil ( email ) do
208
+ from ( c in Contact ,
209
+ where: c . project_id == ^ project_id and c . email == ^ email and is_nil ( c . external_id ) ,
210
+ update: [ set: [ external_id: ^ external_id ] ]
211
+ )
212
+ |> Repo . update_all ( [ ] )
213
+ end
214
+ end
215
+
216
+ defp raise_import_error! ( changeset , exception \\ nil , line ) do
176
217
message =
177
- case changeset . errors do
178
- [ { field , { message , _ } } | _ ] ->
218
+ case { changeset , exception } do
219
+ { _ , % Postgrex.Error { postgres: % { code: :unique_violation } } } ->
220
+ gettext ( "duplicate entry" )
221
+
222
+ { % { errors: [ { field , { message , _ } } | _ ] } , _ } ->
179
223
gettext ( "Field %{field}: %{message}" , field: field , message: message )
180
224
181
225
_other ->
0 commit comments