easyconfigs-it4i/m/MRCC/mrcc_files/mrcc_manager.f90
2024-07-25 10:27:17 +02:00

297 lines
8.2 KiB
Fortran
Executable File

#ifdef MPI
module manager
! use mpi
implicit none
private
include "mpif.h"
logical :: manager_running = .false.
integer :: num_create_called = 0
integer :: manager_communicator
integer :: owner_communicator
interface send_file
module procedure send_file_byname
module procedure send_file_byunit
end interface
public :: create_manager, get_and_increment_counter, reset_counter, free_manager
public :: allocate_counter, deallocate_counter
public :: send_file
public :: send_file_byunit, send_file_byname
public :: REQ_EXIT, REQ_SENDCOUNTER, REQ_RESETCOUNTER
public :: REQ_ALLOCATECOUNTER, REQ_DEALLOCATECOUNTER
public :: REQ_SENDFILE
integer, parameter :: buffer_length = 1024
public :: buffer_length
integer, parameter :: REQ_EXIT = 0
integer, parameter :: REQ_ALLOCATECOUNTER = 11, REQ_DEALLOCATECOUNTER = 12, REQ_RESETCOUNTER = 13, REQ_SENDCOUNTER = 14
integer, parameter :: REQ_SENDFILE = 21
contains
subroutine create_manager(owner_comm, master_rank)
implicit none
integer :: owner_comm
integer :: master_rank
integer :: own_rank, info, error
! character(len = mpi_max_processor_name) :: name
! integer :: len
#ifdef DEBUG
write(*,*) 'DEBUG: Creating manager process on communicator ', owner_comm, ' with master rank ', master_rank
#endif
num_create_called = num_create_called + 1
! if(manager_running) return
owner_communicator = owner_comm
call MPI_Comm_rank(owner_comm, own_rank, error)
! spawn hangs with intel mpi 2021.4 Build 20210831 if host is set to sg other than MPI_INFO_NULL
! if(own_rank == master_rank) then
! call MPI_Get_processor_name(name, len, error)
! call MPI_Info_create(info, error)
! call MPI_Info_set(info, 'host', name, error)
! else
info = MPI_INFO_NULL
! end if
call MPI_Comm_spawn('mrcc_manager', MPI_ARGV_NULL, 1, info, master_rank, &
owner_comm, manager_communicator, MPI_ERRCODES_IGNORE, error)
manager_running = .true.
! call MPI_Barrier(manager_comm, error)
#ifdef DEBUG
write(*,*) 'DEBUG: Created manager process on communicator ', owner_comm, ' with master rank ', master_rank
#endif
end subroutine
subroutine allocate_counter(owner_comm, ncounter)
implicit none
integer :: owner_comm, ncounter
integer :: error, rank
integer :: ncount
#ifdef DEBUG
write(*,*) 'DEBUG: Allocating counter on ', owner_comm
#endif
call MPI_Comm_rank(owner_comm, rank, error)
ncount = ncounter
if(rank == 0) call MPI_Send(REQ_ALLOCATECOUNTER, 1, MPI_INTEGER_MRCC, 0, REQ_ALLOCATECOUNTER, manager_communicator, error)
if(rank == 0) call MPI_Recv(ncount, 1, MPI_INTEGER_MRCC, 0, REQ_ALLOCATECOUNTER, manager_communicator, MPI_STATUS_IGNORE, error)
call MPI_Bcast(ncount, 1, MPI_INTEGER_MRCC, 0, owner_comm, error)
ncounter = ncount
#ifdef DEBUG
write(*,*) 'DEBUG: Allocated counter ', ncounter, ' on ', owner_comm
#endif
end subroutine
subroutine deallocate_counter(owner_comm, ncount)
implicit none
integer :: owner_comm, ncount
integer :: error, rank
integer :: ncounter
#ifdef DEBUG
write(*,*) 'DEBUG: Deallocating counter ', ncounter, ' on ', owner_comm
#endif
ncounter = ncount
call MPI_Barrier(owner_comm, error)
call MPI_Comm_rank(owner_comm, rank, error)
if(rank == 0) call MPI_Send(ncounter, 1, MPI_INTEGER_MRCC, 0, REQ_DEALLOCATECOUNTER, manager_communicator, error)
#ifdef DEBUG
write(*,*) 'DEBUG: Deallocated counter ', ncounter, ' on ', owner_comm
#endif
end subroutine
subroutine get_and_increment_counter(count, n_in, ncount)
implicit none
integer :: count
integer :: n_in
integer :: ncount
integer :: error
integer :: ncounter
integer :: counter
integer :: n
#ifdef DEBUG
write(*,*) 'DEBUG: Getting counter ', ncount, ' increment ', n_in
#endif
ncounter = ncount
n = n_in
!$OMP CRITICAL
call MPI_Send(ncounter, 1, MPI_INTEGER_MRCC, 0, REQ_SENDCOUNTER, manager_communicator, error)
call MPI_Send(n, 1, MPI_INTEGER_MRCC, 0, REQ_SENDCOUNTER, manager_communicator, error)
call MPI_Recv(counter, 1, MPI_INTEGER_MRCC, 0, REQ_SENDCOUNTER, manager_communicator, MPI_STATUS_IGNORE, error)
!$OMP END CRITICAL
count = counter
#ifdef DEBUG
write(*,*) 'DEBUG: Got counter ', ncounter, ', value ', count
#endif
end subroutine
subroutine reset_counter(owner_comm, ncount, communicator)
implicit none
integer :: owner_comm, ncount, communicator
integer :: error, mpi_rank
integer :: ncounter
#ifdef DEBUG
write(*,*) 'DEBUG: Resetting counter ', ncount, ' on ', owner_comm
#endif
ncounter = ncount
call MPI_Comm_rank(owner_comm, mpi_rank, error)
call MPI_Barrier(owner_comm, error)
if(mpi_rank == 0) then
call MPI_Send(ncounter, 1, MPI_INTEGER_MRCC, 0, REQ_RESETCOUNTER, manager_communicator, error)
call MPI_Recv(ncounter, 1, MPI_INTEGER_MRCC, 0, REQ_RESETCOUNTER, manager_communicator, MPI_STATUS_IGNORE, error)
end if
call MPI_Barrier(communicator, error)
#ifdef DEBUG
write(*,*) 'DEBUG: Reset counter ', ncount, ' on ', owner_comm
#endif
end subroutine
subroutine free_manager(owner_comm)
implicit none
integer :: owner_comm
integer :: error, mpi_rank
integer :: message
#ifdef DEBUG
write(*,*) 'DEBUG: Freeing manager on ', owner_comm
#endif
num_create_called = num_create_called - 1
if(num_create_called /= 0) return
if(owner_comm /= owner_communicator) return
message = 0
call MPI_Comm_rank(owner_comm, mpi_rank, error)
call MPI_Barrier(owner_comm, error)
if(mpi_rank == 0) call MPI_Send(message, 1, MPI_INTEGER_MRCC, 0, REQ_EXIT, manager_communicator, error)
! call MPI_Comm_free(manager_communicator, error)
call MPI_Comm_disconnect(manager_communicator, error)
manager_running = .false.
#ifdef DEBUG
write(*,*) 'DEBUG: Freed manager on ', owner_comm
#endif
end subroutine
subroutine send_file_byname(filename, saved_filename)
implicit none
character(len = *) :: filename
character(len = *), optional :: saved_filename
integer, parameter :: sunit = 100
open(unit = sunit, file = filename, access = "stream")
if(present(saved_filename)) then
call send_file_byunit(sunit, saved_filename)
else
call send_file_byunit(sunit)
end if
close(sunit)
end subroutine
subroutine send_file_byunit(sunit, saved_filename)
implicit none
integer :: sunit
character(len = *), optional :: saved_filename
integer :: error, mpi_size, iostatus, len_sfname
integer :: length
character(len = :), allocatable :: buffer
! character(len = 7) :: cstream
! logical :: file_opened
! inquire(file = filename, opened = file_opened)
! if(.not.file_opened) then
! open(unit = usend, file = filename, access = "stream")
! else
! inquire(file = filename, stream = cstream)
! if(cstream /= "YES ") then
! write(*, '("ERROR: stream access not allowed")')
! call mrccend(1)
! end if
! end if
allocate(character(len = buffer_length) :: buffer)
len_sfname = 0
if(present(saved_filename)) len_sfname = len_trim(saved_filename)
call MPI_Send(len_sfname, 1, MPI_INTEGER_MRCC, 0, REQ_SENDFILE, manager_communicator, error)
if(len_sfname /= 0) call MPI_Send(saved_filename, len_sfname, MPI_CHARACTER, 0, REQ_SENDFILE, manager_communicator, error)
do
buffer = repeat(' ', buffer_length)
read(sunit, iostat = iostatus) buffer
length = len_trim(buffer)
call MPI_Send(length, 1, MPI_INTEGER_MRCC, 0, REQ_SENDFILE, manager_communicator, error)
call MPI_Send(buffer, buffer_length, MPI_CHARACTER, 0, REQ_SENDFILE, manager_communicator, error)
if(iostatus .ne. 0) then
! end of file
call MPI_Send(-1, 1, MPI_INTEGER_MRCC, 0, REQ_SENDFILE, manager_communicator, error)
exit
end if
end do
deallocate(buffer)
end subroutine
end module manager
#endif