Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proof of Concept] Very simple prototype for an MPI implementation #698

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions examples/fluid/hydrostatic_water_column_2d.jl
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
using TrixiParticles
using OrdinaryDiffEq
using P4estTypes, PointNeighbors
using MPI; MPI.Init()

# ==========================================================================================
# ==== Resolution
fluid_particle_spacing = 0.05
fluid_particle_spacing = 0.3

# Make sure that the kernel support of fluid particles at a boundary is always fully sampled
boundary_layers = 3
boundary_layers = 1

# ==========================================================================================
# ==== Experiment Setup
gravity = 9.81
tspan = (0.0, 1.0)

# Boundary geometry and initial fluid particle positions
initial_fluid_size = (1.0, 0.9)
initial_fluid_size = (1.0, 1.0)
tank_size = (1.0, 1.0)

fluid_density = 1000.0
Expand All @@ -41,31 +43,47 @@ system_acceleration = (0.0, -gravity)
fluid_system = WeaklyCompressibleSPHSystem(tank.fluid, fluid_density_calculator,
state_equation, smoothing_kernel,
smoothing_length, viscosity=viscosity,
acceleration=system_acceleration,
# acceleration=system_acceleration,
source_terms=nothing)

fluid_system2 = WeaklyCompressibleSPHSystem(tank.fluid, fluid_density_calculator,
state_equation, smoothing_kernel,
smoothing_length, viscosity=viscosity,
# acceleration=system_acceleration,
source_terms=nothing)
ghost_system = TrixiParticles.GhostSystem(fluid_system2)

# ==========================================================================================
# ==== Boundary

# This is to set another boundary density calculation with `trixi_include`
boundary_density_calculator = AdamiPressureExtrapolation()

# This is to set wall viscosity with `trixi_include`
viscosity_wall = nothing
boundary_model = BoundaryModelDummyParticles(tank.boundary.density, tank.boundary.mass,
state_equation=state_equation,
boundary_density_calculator,
smoothing_kernel, smoothing_length,
viscosity=viscosity_wall)
# viscosity_wall = nothing
# boundary_model = BoundaryModelDummyParticles(tank.boundary.density, tank.boundary.mass,
# state_equation=state_equation,
# boundary_density_calculator,
# smoothing_kernel, smoothing_length,
# viscosity=viscosity_wall)
k_solid = 0.2
beta_solid = 1
boundary_model = BoundaryModelMonaghanKajtar(k_solid, beta_solid,
fluid_particle_spacing,
tank.boundary.mass)
boundary_system = BoundarySPHSystem(tank.boundary, boundary_model, movement=nothing)

# ==========================================================================================
# ==== Simulation
semi = Semidiscretization(fluid_system, boundary_system)
min_corner = minimum(tank.fluid.coordinates, dims = 2) .- 2 * smoothing_length .- 0.2
max_corner = maximum(tank.fluid.coordinates, dims = 2) .+ 2 * smoothing_length .- 0.2

semi = Semidiscretization(fluid_system, ghost_system,
neighborhood_search=GridNeighborhoodSearch{2}(cell_list=PointNeighbors.P4estCellList(; min_corner, max_corner)))
ode = semidiscretize(semi, tspan)

info_callback = InfoCallback(interval=50)
saving_callback = SolutionSavingCallback(dt=0.02, prefix="")
saving_callback = SolutionSavingCallback(dt=0.02, prefix="$(MPI.Comm_rank(MPI.COMM_WORLD))")

# This is to easily add a new callback with `trixi_include`
extra_callback = nothing
Expand Down
1 change: 1 addition & 0 deletions src/TrixiParticles.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ using GPUArraysCore: AbstractGPUArray
using JSON: JSON
using KernelAbstractions: KernelAbstractions, @kernel, @index
using LinearAlgebra: norm, dot, I, tr, inv, pinv, det
using MPI: MPI
using MuladdMacro: @muladd
using Polyester: Polyester, @batch
using Printf: @printf, @sprintf
Expand Down
1 change: 1 addition & 0 deletions src/general/general.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ include("buffer.jl")
include("interpolation.jl")
include("custom_quantities.jl")
include("neighborhood_search.jl")
include("ghost_system.jl")
173 changes: 173 additions & 0 deletions src/general/ghost_system.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
struct GhostSystem{S, NDIMS, IC, U, V} <: System{NDIMS, IC}
system::S
v::V
u::U
buffer::Nothing

function GhostSystem(system)
v = zeros(eltype(system), v_nvariables(system), nparticles(system))
u = zeros(eltype(system), u_nvariables(system), nparticles(system))
return new{typeof(system), ndims(system),
typeof(system.initial_condition),
typeof(v), typeof(u)}(system, v, u, nothing)
end
end

vtkname(system::GhostSystem) = "ghost"
timer_name(::GhostSystem) = "ghost"

@inline nparticles(system::GhostSystem) = nparticles(system.system)
@inline Base.eltype(system::GhostSystem) = eltype(system.system)

@inline u_nvariables(system::GhostSystem) = 0
@inline v_nvariables(system::GhostSystem) = 0

@inline compact_support(system, neighbor::GhostSystem) = compact_support(system, neighbor.system)
@inline compact_support(system::GhostSystem, neighbor) = compact_support(system.system, neighbor)
@inline compact_support(system::GhostSystem, neighbor::GhostSystem) = compact_support(system.system, neighbor.system)

@inline initial_coordinates(system::GhostSystem) = initial_coordinates(system.system)
@inline current_coordinates(u, system::GhostSystem) = current_coordinates(system.u, system.system)

function write_u0!(u0, system::GhostSystem)
return u0
end

function write_v0!(v0, system::GhostSystem)
return v0
end

function partition(system::GhostSystem, neighborhood_search)
# TODO variable size
keep = trues(nparticles(system))
for particle in eachparticle(system)
cell = PointNeighbors.cell_coords(initial_coords(system, particle), neighborhood_search)
mpi_neighbors = neighborhood_search.cell_list.neighbor_cells[2]
# length of mpi_neighbors is the number of mirror cells. Everything after that is ghost.
if neighborhood_search.cell_list.cell_indices[cell...] <= length(mpi_neighbors)
# Remove particle from this rank
keep[particle] = false
end
end

return GhostSystem(keep_particles(system.system, keep))
end

function copy_neighborhood_search_ghost(nhs, search_radius, n_points)
(; periodic_box) = nhs

cell_list = copy_cell_list_ghost(nhs.cell_list, search_radius, periodic_box)

return GridNeighborhoodSearch{ndims(nhs)}(; search_radius, n_points, periodic_box,
cell_list,
update_strategy = nhs.update_strategy)
end

function copy_cell_list_ghost(cell_list, search_radius, periodic_box)
(; min_corner, max_corner) = cell_list

return PointNeighbors.P4estCellList(; min_corner, max_corner, search_radius,
backend = typeof(cell_list.cells), ghost=true)
end

function create_neighborhood_search(neighborhood_search, system, neighbor::GhostSystem)
return copy_neighborhood_search_ghost(neighborhood_search, compact_support(system, neighbor),
nparticles(neighbor))
end

function update_nhs!(neighborhood_search,
system, neighbor::GhostSystem,
u_system, u_neighbor)
end

function update_nhs!(neighborhood_search,
system::GhostSystem, neighbor,
u_system, u_neighbor)
end

function update_nhs!(neighborhood_search,
system::GhostSystem, neighbor::GhostSystem,
u_system, u_neighbor)
end

# function mpi_communication1!(system::GhostSystem, v, u, v_ode, u_ode, semi, t)
# # Receive data from other ranks
# return system
# end

function mpi_communication3!(system::GhostSystem, v, u, v_ode, u_ode, semi, t)
# TODO Receive only density and pressure from other ranks
# and move the rest to `mpi_communication1!`.
n_particles_from_rank = zeros(Int, MPI.Comm_size(MPI.COMM_WORLD))
for source in 0:MPI.Comm_size(MPI.COMM_WORLD) - 1
buffer = view(n_particles_from_rank, source + 1)
MPI.Recv!(buffer, MPI.COMM_WORLD; source=source, tag=0)
end

@info "" n_particles_from_rank
total_particles = sum(n_particles_from_rank)

requests = Vector{MPI.Request}(undef, 3 * MPI.Comm_size(MPI.COMM_WORLD))

coordinates = zeros(ndims(system), total_particles)
for source in 0:MPI.Comm_size(MPI.COMM_WORLD) - 1
buffer = view(coordinates, :, (1 + sum(n_particles_from_rank[1:source])):total_particles)
# requests[source + 1] = MPI.Irecv!(buffer, MPI.COMM_WORLD; source=source, tag=1)
MPI.Recv!(buffer, MPI.COMM_WORLD; source=source, tag=1)
end

# Receive density and pressure from other ranks
density = zeros(total_particles)
for source in 0:MPI.Comm_size(MPI.COMM_WORLD) - 1
buffer = view(density, (1 + sum(n_particles_from_rank[1:source])):total_particles)
# requests[source + MPI.Comm_size(MPI.COMM_WORLD) + 1] = MPI.Irecv!(buffer, MPI.COMM_WORLD; source=source, tag=2)
MPI.Recv!(buffer, MPI.COMM_WORLD; source=source, tag=2)
end

pressure = zeros(total_particles)
for source in 0:MPI.Comm_size(MPI.COMM_WORLD) - 1
buffer = view(pressure, (1 + sum(n_particles_from_rank[1:source])):total_particles)
# requests[source + 2 * MPI.Comm_size(MPI.COMM_WORLD) + 1] = MPI.Irecv!(buffer, MPI.COMM_WORLD; source=source, tag=3)
MPI.Recv!(buffer, MPI.COMM_WORLD; source=source, tag=3)
end

# MPI.Waitall(requests)

return system
end

function update_quantities!(system::GhostSystem, v, u,
v_ode, u_ode, semi, t)
return system
end

function update_pressure!(system::GhostSystem, v, u, v_ode, u_ode, semi, t)
return system
end

function interact!(dv, v_particle_system, u_particle_system,
v_neighbor_system, u_neighbor_system, neighborhood_search,
particle_system::GhostSystem,
neighbor_system)
return dv
end

function interact!(dv, v_particle_system, u_particle_system,
v_neighbor_system, u_neighbor_system, neighborhood_search,
particle_system::GhostSystem,
neighbor_system::GhostSystem)
return dv
end

function interact!(dv, v_particle_system, u_particle_system,
v_neighbor_system, u_neighbor_system, neighborhood_search,
particle_system,
neighbor_system::GhostSystem)
interact!(dv, v_particle_system, u_particle_system,
v_neighbor_system, u_neighbor_system, neighborhood_search,
particle_system, neighbor_system.system)

return dv
end

@inline add_velocity!(du, v, particle, system::GhostSystem) = du
12 changes: 12 additions & 0 deletions src/general/initial_condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,15 @@ function find_too_close_particles(coords1, coords2, max_distance)

return result
end

function keep_particles(initial_condition, keep)
NDIMS = ndims(initial_condition)
coordinates = initial_condition.coordinates[:, keep]
velocity = initial_condition.velocity[:, keep]
mass = initial_condition.mass[keep]
density = initial_condition.density[keep]
pressure = initial_condition.pressure[keep]

return InitialCondition{NDIMS}(coordinates, velocity, mass, density, pressure,
initial_condition.particle_spacing)
end
56 changes: 46 additions & 10 deletions src/general/semidiscretization.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ function Semidiscretization(systems...;
# Other checks might be added here later.
check_configuration(systems)

# Create a tuple of n neighborhood searches for each of the n systems.
# We will need one neighborhood search for each pair of systems.
searches = Tuple(Tuple(create_neighborhood_search(neighborhood_search,
system, neighbor)
for neighbor in systems)
for system in systems)

systems = partition(systems, searches)

searches = Tuple(Tuple(create_neighborhood_search(neighborhood_search,
system, neighbor)
for neighbor in systems)
for system in systems)

sizes_u = [u_nvariables(system) * n_moving_particles(system)
for system in systems]
ranges_u = Tuple((sum(sizes_u[1:(i - 1)]) + 1):sum(sizes_u[1:i])
Expand All @@ -82,13 +96,6 @@ function Semidiscretization(systems...;
ranges_v = Tuple((sum(sizes_v[1:(i - 1)]) + 1):sum(sizes_v[1:i])
for i in eachindex(sizes_v))

# Create a tuple of n neighborhood searches for each of the n systems.
# We will need one neighborhood search for each pair of systems.
searches = Tuple(Tuple(create_neighborhood_search(neighborhood_search,
system, neighbor)
for neighbor in systems)
for system in systems)

return Semidiscretization(systems, ranges_u, ranges_v, searches)
end

Expand Down Expand Up @@ -178,7 +185,7 @@ end
@inline function compact_support(system, model::BoundaryModelMonaghanKajtar,
neighbor::BoundarySPHSystem)
# This NHS is never used
return 0.0
return 1.0
end

@inline function compact_support(system, model::BoundaryModelDummyParticles, neighbor)
Expand Down Expand Up @@ -206,10 +213,14 @@ end
return neighborhood_searches[system_index][neighbor_index]
end

@inline function system_indices(system, semi)
@inline function system_indices(system, semi::Semidiscretization)
system_indices(system, semi.systems)
end

@inline function system_indices(system, systems)
# Note that this takes only about 5 ns, while mapping systems to indices with a `Dict`
# is ~30x slower because `hash(::System)` is very slow.
index = findfirst(==(system), semi.systems)
index = findfirst(==(system), systems)

if isnothing(index)
throw(ArgumentError("system is not in the semidiscretization"))
Expand Down Expand Up @@ -472,6 +483,16 @@ function update_systems_and_nhs(v_ode, u_ode, semi, t; update_from_callback=fals
# Update NHS
@trixi_timeit timer() "update nhs" update_nhs!(semi, u_ode)

# foreach_system(semi) do system
# v = wrap_v(v_ode, system, semi)
# u = wrap_u(u_ode, system, semi)

# mpi_communication1!(system, v, u, v_ode, u_ode, semi, t)
# end

# TODO update NHS for ghost systems
# @trixi_timeit timer() "update nhs" update_nhs!(semi, u_ode)

# Second update step.
# This is used to calculate density and pressure of the fluid systems
# before updating the boundary systems,
Expand All @@ -483,6 +504,8 @@ function update_systems_and_nhs(v_ode, u_ode, semi, t; update_from_callback=fals
update_quantities!(system, v, u, v_ode, u_ode, semi, t)
end

# TODO MPI communication 2 if correction is used

# Perform correction and pressure calculation
foreach_system(semi) do system
v = wrap_v(v_ode, system, semi)
Expand All @@ -491,6 +514,13 @@ function update_systems_and_nhs(v_ode, u_ode, semi, t; update_from_callback=fals
update_pressure!(system, v, u, v_ode, u_ode, semi, t)
end

foreach_system(semi) do system
v = wrap_v(v_ode, system, semi)
u = wrap_u(u_ode, system, semi)

mpi_communication3!(system, v, u, v_ode, u_ode, semi, t)
end

# Final update step for all remaining systems
foreach_system(semi) do system
v = wrap_v(v_ode, system, semi)
Expand Down Expand Up @@ -817,6 +847,12 @@ function update!(neighborhood_search, system::GPUSystem, x, y; points_moving=(tr
parallelization_backend=KernelAbstractions.get_backend(system))
end

function partition(systems, searches)
return map(systems) do system
return partition(system, searches[system_indices(system, systems)][system_indices(system, systems)])
end
end

function check_configuration(systems)
foreach_system(systems) do system
check_configuration(system, systems)
Expand Down
Loading
Loading