001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.concurrent.ConcurrentHashMap; 009import java.util.concurrent.LinkedBlockingDeque; 010import java.util.concurrent.Semaphore; 011import java.util.concurrent.ThreadPoolExecutor; 012import java.util.concurrent.TimeUnit; 013 014import org.openstreetmap.josm.tools.Logging; 015 016/** 017 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 018 * and it will set a runnable task with semaphore release, when job has finished. 019 * <p> 020 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 021 * guarantee that all threads will be busy, when there is work for them[2]. <br> 022 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 023 * tasks do not go through the Queue <br> 024 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 025 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 026 * for some task further in queue, but this implementation doesn't try to detect such situation 027 * 028 * @author Wiktor Niesiobędzki 029 */ 030public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 031 private static final long serialVersionUID = 1L; 032 033 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 034 private final int hostLimit; 035 036 private ThreadPoolExecutor executor; 037 038 private int corePoolSize; 039 040 private int maximumPoolSize; 041 042 /** 043 * Creates an unbounded queue 044 * @param hostLimit how many parallel calls to host to allow 045 */ 046 public HostLimitQueue(int hostLimit) { 047 super(); // create unbounded queue 048 this.hostLimit = hostLimit; 049 } 050 051 private JCSCachedTileLoaderJob<?, ?> findJob() { 052 for (Iterator<Runnable> it = iterator(); it.hasNext();) { 053 Runnable r = it.next(); 054 if (r instanceof JCSCachedTileLoaderJob) { 055 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 056 if (tryAcquireSemaphore(job)) { 057 if (remove(job)) { 058 return job; 059 } else { 060 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 061 // release the semaphore and look for another candidate 062 releaseSemaphore(job); 063 } 064 } else { 065 URL url = null; 066 try { 067 url = job.getUrl(); 068 } catch (IOException e) { 069 Logging.debug(e); 070 } 071 Logging.debug("TMS - Skipping job {0} because host limit reached", url); 072 } 073 } 074 } 075 return null; 076 } 077 078 @Override 079 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 080 Runnable job = findJob(); 081 if (job != null) { 082 return job; 083 } 084 job = pollFirst(timeout, unit); 085 if (job != null) { 086 try { 087 boolean gotLock = tryAcquireSemaphore(job, timeout, unit); 088 return gotLock ? job : null; 089 } catch (InterruptedException e) { 090 // acquire my got interrupted, first offer back what was taken 091 if (!offer(job)) { 092 Logging.warn("Unable to offer back " + job); 093 } 094 throw e; 095 } 096 } 097 return job; 098 } 099 100 @Override 101 public Runnable take() throws InterruptedException { 102 Runnable job = findJob(); 103 if (job != null) { 104 return job; 105 } 106 job = takeFirst(); 107 try { 108 acquireSemaphore(job); 109 } catch (InterruptedException e) { 110 // acquire my got interrupted, first offer back what was taken 111 if (!offer(job)) { 112 Logging.warn("Unable to offer back " + job); 113 } 114 throw e; 115 } 116 return job; 117 } 118 119 /** 120 * Set the executor for which this queue works. It's needed to spawn new threads. 121 * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue# 122 * 123 * @param executor executor for which this queue works 124 */ 125 public void setExecutor(ThreadPoolExecutor executor) { 126 this.executor = executor; 127 this.maximumPoolSize = executor.getMaximumPoolSize(); 128 this.corePoolSize = executor.getCorePoolSize(); 129 } 130 131 @Override 132 public boolean offer(Runnable e) { 133 if (!super.offer(e)) { 134 return false; 135 } 136 137 if (executor != null) { 138 // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue# 139 // force spawn of a thread if not reached maximum 140 int currentPoolSize = executor.getPoolSize(); 141 if (currentPoolSize < maximumPoolSize 142 && currentPoolSize >= corePoolSize) { 143 executor.setCorePoolSize(currentPoolSize + 1); 144 executor.setCorePoolSize(corePoolSize); 145 } 146 } 147 return true; 148 } 149 150 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 151 String host; 152 try { 153 host = job.getUrl().getHost(); 154 } catch (IOException e) { 155 // do not pass me illegal URL's 156 throw new IllegalArgumentException(e); 157 } 158 Semaphore limit = hostSemaphores.get(host); 159 if (limit == null) { 160 synchronized (hostSemaphores) { 161 limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit)); 162 } 163 } 164 return limit; 165 } 166 167 private void acquireSemaphore(Runnable job) throws InterruptedException { 168 if (job instanceof JCSCachedTileLoaderJob) { 169 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 170 getSemaphore(jcsJob).acquire(); 171 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 172 } 173 } 174 175 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 176 boolean ret = true; 177 Semaphore limit = getSemaphore(job); 178 if (limit != null) { 179 ret = limit.tryAcquire(); 180 if (ret) { 181 job.setFinishedTask(() -> releaseSemaphore(job)); 182 } 183 } 184 return ret; 185 } 186 187 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException { 188 boolean ret = true; 189 if (job instanceof JCSCachedTileLoaderJob) { 190 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 191 Semaphore limit = getSemaphore(jcsJob); 192 if (limit != null) { 193 ret = limit.tryAcquire(timeout, unit); 194 if (ret) { 195 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 196 } 197 } 198 } 199 return ret; 200 } 201 202 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 203 Semaphore limit = getSemaphore(job); 204 if (limit != null) { 205 limit.release(); 206 if (limit.availablePermits() > hostLimit) { 207 Logging.warn("More permits than it should be"); 208 } 209 } 210 } 211}