From ef19fc14be59ddb5c6b68f536960f77ab855ba25 Mon Sep 17 00:00:00 2001 From: Michael Hamann Date: Fri, 9 May 2025 16:24:21 +0200 Subject: [PATCH 1/2] XCOMMONS-3330: Job groups with several threads may not return a current job * Store several current jobs. * Add a new possibility to return all running jobs of a job group. * Deprecate the old method that returns just a single running job. * Clean up empty entries in groupedJobs. * Some code cleanup. --- .../main/java/org/xwiki/job/JobExecutor.java | 19 ++++ .../job/internal/DefaultJobExecutor.java | 103 ++++++++++-------- 2 files changed, 79 insertions(+), 43 deletions(-) diff --git a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java index 247476a59f..081028e0f3 100644 --- a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java +++ b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java @@ -19,9 +19,12 @@ */ package org.xwiki.job; +import java.util.Collection; +import java.util.Collections; import java.util.List; import org.xwiki.component.annotation.Role; +import org.xwiki.stability.Unstable; /** * By default Jobs are either executed asynchronously whenever there is a free thread in the pool. Jobs can implement @@ -40,9 +43,25 @@ public interface JobExecutor * * @param groupPath the group path * @return the currently running job in the passed group + * @deprecated Use {@link #getCurrentJobs(JobGroupPath)} instead. */ + @Deprecated(since = "17.4.0RC1") Job getCurrentJob(JobGroupPath groupPath); + /** + * The set of jobs running within the specified job group. + * + * @param groupPath the path of the job group for which the current jobs should be retrieved + * @return a collection containing the currently running jobs in the provided group + * @since 17.4.0RC1 + */ + @Unstable + default Collection getCurrentJobs(JobGroupPath groupPath) + { + // Not really accurate, but better than nothing. + return Collections.singleton(getCurrentJob(groupPath)); + } + /** * Return job corresponding to the provided id from the current executed or waiting jobs. * diff --git a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java index bdaf0e2a93..3e9387e477 100644 --- a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java +++ b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java @@ -19,9 +19,15 @@ */ package org.xwiki.job.internal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -72,11 +78,12 @@ private class JobGroupExecutor extends JobThreadExecutor implements ThreadFactor private final JobGroupPath path; - private Job currentJob; + private final Set currentJobs = + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); - private String groupThreadName; + private final String groupThreadName; - private GroupedJobInitializer initializer; + private final GroupedJobInitializer initializer; JobGroupExecutor(JobGroupPath path, GroupedJobInitializer initializer) { @@ -96,7 +103,7 @@ private class JobGroupExecutor extends JobThreadExecutor implements ThreadFactor @Override protected String getThreadName(Runnable r) { - return this.groupThreadName + " - " + this.currentJob; + return this.groupThreadName + " - " + r; } @Override @@ -110,7 +117,7 @@ protected void beforeExecute(Thread t, Runnable r) { DefaultJobExecutor.this.lockTree.lock(this.path); - this.currentJob = (Job) r; + this.currentJobs.add((Job) r); super.beforeExecute(t, r); } @@ -120,22 +127,26 @@ protected void afterExecute(Runnable r, Throwable t) { DefaultJobExecutor.this.lockTree.unlock(this.path); - this.currentJob = null; + Job job = (Job) r; - super.afterExecute(r, t); + this.currentJobs.remove(job); - Job job = (Job) r; + super.afterExecute(r, t); List jobId = job.getRequest().getId(); if (jobId != null) { - synchronized (DefaultJobExecutor.this.groupedJobs) { - Queue jobQueue = DefaultJobExecutor.this.groupedJobs.get(jobId); - if (jobQueue != null) { - if (jobQueue.peek() == job) { - jobQueue.poll(); - } + // Use compute() to synchronize on the job ID to avoid concurrent insertion/deletion by other threads. + DefaultJobExecutor.this.groupedJobs.compute(jobId, (k, v) -> { + if (v != null && v.peek() == job) { + v.poll(); } - } + + if (v == null || v.isEmpty()) { + return null; + } + + return v; + }); } } @@ -187,12 +198,13 @@ protected void afterExecute(Runnable r, Throwable t) List jobId = job.getRequest().getId(); if (jobId != null) { - synchronized (DefaultJobExecutor.this.jobs) { - Job storedJob = DefaultJobExecutor.this.jobs.get(jobId); - if (storedJob == job) { - DefaultJobExecutor.this.jobs.remove(jobId); + DefaultJobExecutor.this.jobs.computeIfPresent(jobId, (k, v) -> { + if (v == job) { + return null; } - } + + return v; + }); } // Reset thread name since it's not used anymore @@ -262,11 +274,23 @@ public void dispose() throws ComponentLifecycleException // JobManager @Override - public Job getCurrentJob(JobGroupPath path) + public Job getCurrentJob(JobGroupPath groupPath) + { + // Provide a default implementation so implementors of this interface can stop implementing this method. + return getCurrentJobs(groupPath).stream().findFirst().orElse(null); + } + + @Override + public Collection getCurrentJobs(JobGroupPath path) { JobGroupExecutor executor = this.groupExecutors.get(path); - return executor != null ? executor.currentJob : null; + if (executor != null) { + // Return an unmodifiable copy of the set of currently running jobs. + return Collections.unmodifiableCollection(new ArrayList<>(executor.currentJobs)); + } + + return Collections.emptyList(); } @Override @@ -281,10 +305,7 @@ public Job getJob(List id) // Is it in a group Queue jobQueue = this.groupedJobs.get(id); if (jobQueue != null) { - job = jobQueue.peek(); - if (job != null) { - return job; - } + return jobQueue.peek(); } return null; @@ -324,8 +345,8 @@ public Job execute(String jobType, Request request) throws JobException public void execute(Job job) { if (!this.disposed) { - if (job instanceof GroupedJob) { - executeGroupedJob((GroupedJob) job); + if (job instanceof GroupedJob groupedJob) { + executeGroupedJob(groupedJob); } else { executeSingleJob(job); } @@ -348,6 +369,8 @@ private void executeSingleJob(Job job) private void executeGroupedJob(GroupedJob job) { + // While synchronization isn't necessary for the insertion in the group executors, this ensures that jobs in + // the "groupedJobs" queues have the same order as in the executor's queue. synchronized (this.groupExecutors) { JobGroupPath path = job.getGroupPath(); @@ -358,26 +381,20 @@ private void executeGroupedJob(GroupedJob job) return; } - JobGroupExecutor groupExecutor = this.groupExecutors.get(path); - - if (groupExecutor == null) { - groupExecutor = - new JobGroupExecutor(path, this.groupedJobInitializerManager.getGroupedJobInitializer(path)); - this.groupExecutors.put(path, groupExecutor); - } + JobGroupExecutor groupExecutor = this.groupExecutors.computeIfAbsent(path, + p -> new JobGroupExecutor(p, this.groupedJobInitializerManager.getGroupedJobInitializer(p))); groupExecutor.execute(job); List jobId = job.getRequest().getId(); if (jobId != null) { - synchronized (this.groupedJobs) { - Queue jobQueue = this.groupedJobs.get(jobId); - if (jobQueue == null) { - jobQueue = new ConcurrentLinkedQueue<>(); - this.groupedJobs.put(jobId, jobQueue); - } - jobQueue.offer(job); - } + // Use compute() to synchronize on the job ID to prevent that another thread would remove the empty + // queue again. + this.groupedJobs.compute(jobId, (k, v) -> { + Queue queue = Objects.requireNonNullElseGet(v, ConcurrentLinkedQueue::new); + queue.offer(job); + return queue; + }); } } } From a6ce3efec2398c7dde0db14b401bc2ad4bc5af6c Mon Sep 17 00:00:00 2001 From: Michael Hamann Date: Fri, 9 May 2025 17:12:56 +0200 Subject: [PATCH 2/2] XCOMMONS-3330: Job groups with several threads may not return a current job * Change the API to return a List for the current jobs. * Fix not returning a list with a null value in the default implementation. * Fix synchronization on the set for iterating over it. --- .../src/main/java/org/xwiki/job/JobExecutor.java | 10 +++++++--- .../org/xwiki/job/internal/DefaultJobExecutor.java | 9 +++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java index 081028e0f3..2613ccd1dc 100644 --- a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java +++ b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-api/src/main/java/org/xwiki/job/JobExecutor.java @@ -19,7 +19,6 @@ */ package org.xwiki.job; -import java.util.Collection; import java.util.Collections; import java.util.List; @@ -56,10 +55,15 @@ public interface JobExecutor * @since 17.4.0RC1 */ @Unstable - default Collection getCurrentJobs(JobGroupPath groupPath) + default List getCurrentJobs(JobGroupPath groupPath) { // Not really accurate, but better than nothing. - return Collections.singleton(getCurrentJob(groupPath)); + Job currentJob = getCurrentJob(groupPath); + if (currentJob == null) { + return Collections.emptyList(); + } + + return List.of(currentJob); } /** diff --git a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java index 3e9387e477..85b4a2de8d 100644 --- a/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java +++ b/xwiki-commons-core/xwiki-commons-job/xwiki-commons-job-default/src/main/java/org/xwiki/job/internal/DefaultJobExecutor.java @@ -19,8 +19,6 @@ */ package org.xwiki.job.internal; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; @@ -281,13 +279,16 @@ public Job getCurrentJob(JobGroupPath groupPath) } @Override - public Collection getCurrentJobs(JobGroupPath path) + public List getCurrentJobs(JobGroupPath path) { JobGroupExecutor executor = this.groupExecutors.get(path); if (executor != null) { // Return an unmodifiable copy of the set of currently running jobs. - return Collections.unmodifiableCollection(new ArrayList<>(executor.currentJobs)); + // As this is a synchronized set, we need to explicitly synchronize on it for the iteration. + synchronized (executor.currentJobs) { + return List.copyOf(executor.currentJobs); + } } return Collections.emptyList();