From 5b14cad343f55c995afc4cb34b33e96f8e0aa3be Mon Sep 17 00:00:00 2001 From: dags- Date: Mon, 17 Feb 2020 09:21:18 +0000 Subject: [PATCH] use completable futures --- .../com/terraforged/core/region/Region.java | 6 +- .../terraforged/core/region/RegionCache.java | 22 +++---- .../core/region/RegionGenerator.java | 18 ++--- .../core/util/concurrent/ThreadPool.java | 65 ++++++------------- .../util/concurrent/batcher/AsyncBatcher.java | 42 ++++++++++++ .../core/util/concurrent/batcher/Batcher.java | 13 ++++ .../util/concurrent/batcher/SyncBatcher.java | 25 +++++++ .../core/world/heightmap/RegionExtent.java | 19 ++---- 8 files changed, 127 insertions(+), 83 deletions(-) create mode 100644 TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/AsyncBatcher.java create mode 100644 TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/Batcher.java create mode 100644 TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/SyncBatcher.java diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/region/Region.java b/TerraForgedCore/src/main/java/com/terraforged/core/region/Region.java index 69db60e..8801cf9 100644 --- a/TerraForgedCore/src/main/java/com/terraforged/core/region/Region.java +++ b/TerraForgedCore/src/main/java/com/terraforged/core/region/Region.java @@ -8,7 +8,7 @@ import com.terraforged.core.region.chunk.ChunkGenTask; import com.terraforged.core.region.chunk.ChunkReader; import com.terraforged.core.region.chunk.ChunkWriter; import com.terraforged.core.region.chunk.ChunkZoomTask; -import com.terraforged.core.util.concurrent.ThreadPool; +import com.terraforged.core.util.concurrent.batcher.Batcher; import com.terraforged.core.world.heightmap.Heightmap; import com.terraforged.core.world.terrain.Terrain; @@ -106,7 +106,7 @@ public class Region implements Extent { } } - public void generate(Heightmap heightmap, ThreadPool.Batcher batcher) { + public void generate(Heightmap heightmap, Batcher batcher) { for (int cz = 0; cz < chunkSize.total; cz++) { for (int cx = 0; cx < chunkSize.total; cx++) { int index = chunkSize.indexOf(cx, cz); @@ -117,7 +117,7 @@ public class Region implements Extent { } } - public void generateZoom(Heightmap heightmap, float offsetX, float offsetZ, float zoom, ThreadPool.Batcher batcher) { + public void generateZoom(Heightmap heightmap, float offsetX, float offsetZ, float zoom, Batcher batcher) { float translateX = offsetX - ((blockSize.total * zoom) / 2F); float translateZ = offsetZ - ((blockSize.total * zoom) / 2F); for (int cz = 0; cz < chunkSize.total; cz++) { diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionCache.java b/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionCache.java index c6bc3db..7ef89d4 100644 --- a/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionCache.java +++ b/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionCache.java @@ -2,26 +2,24 @@ package com.terraforged.core.region; import com.terraforged.core.region.chunk.ChunkReader; import com.terraforged.core.util.Cache; -import com.terraforged.core.util.FutureValue; import com.terraforged.core.world.heightmap.RegionExtent; import me.dags.noise.util.NoiseUtil; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class RegionCache implements RegionExtent { private final boolean queuing; private final RegionGenerator renderer; - private final Cache> cache; + private final Cache> cache; private Region cachedRegion = null; public RegionCache(boolean queueNeighbours, RegionGenerator renderer) { this.renderer = renderer; this.queuing = queueNeighbours; - this.cache = new com.terraforged.core.util.Cache<>(180, 60, TimeUnit.SECONDS); + this.cache = new Cache<>(180, 60, TimeUnit.SECONDS); } @Override @@ -30,9 +28,9 @@ public class RegionCache implements RegionExtent { } @Override - public Future getRegionAsync(int regionX, int regionZ) { + public CompletableFuture getRegionAsync(int regionX, int regionZ) { long id = NoiseUtil.seed(regionX, regionZ); - Future future = cache.get(id); + CompletableFuture future = cache.get(id); if (future == null) { future = renderer.getRegionAsync(regionX, regionZ); cache.put(id, future); @@ -55,17 +53,13 @@ public class RegionCache implements RegionExtent { } long id = NoiseUtil.seed(regionX, regionZ); - Future futureRegion = cache.get(id); + CompletableFuture futureRegion = cache.get(id); if (futureRegion == null) { cachedRegion = renderer.generateRegion(regionX, regionZ); - cache.put(id, new FutureValue<>(cachedRegion)); + cache.put(id, CompletableFuture.completedFuture(cachedRegion)); } else { - try { - cachedRegion = futureRegion.get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } + cachedRegion = futureRegion.join(); } if (queuing) { diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionGenerator.java b/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionGenerator.java index 4113fd5..e60b12f 100644 --- a/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionGenerator.java +++ b/TerraForgedCore/src/main/java/com/terraforged/core/region/RegionGenerator.java @@ -2,12 +2,12 @@ package com.terraforged.core.region; import com.terraforged.core.util.concurrent.ObjectPool; import com.terraforged.core.util.concurrent.ThreadPool; +import com.terraforged.core.util.concurrent.batcher.Batcher; import com.terraforged.core.world.WorldGenerator; import com.terraforged.core.world.WorldGeneratorFactory; import com.terraforged.core.world.heightmap.RegionExtent; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; public class RegionGenerator implements RegionExtent { @@ -42,23 +42,23 @@ public class RegionGenerator implements RegionExtent { } @Override - public Future getRegionAsync(int regionX, int regionZ) { + public CompletableFuture getRegionAsync(int regionX, int regionZ) { return generate(regionX, regionZ); } - public Future generate(int regionX, int regionZ) { - return ForkJoinPool.commonPool().submit(() -> generateRegion(regionX, regionZ)); + public CompletableFuture generate(int regionX, int regionZ) { + return CompletableFuture.supplyAsync(() -> generateRegion(regionX, regionZ)); } - public Future generate(float centerX, float centerZ, float zoom, boolean filter) { - return ForkJoinPool.commonPool().submit(() -> generateRegion(centerX, centerZ, zoom, filter)); + public CompletableFuture generate(float centerX, float centerZ, float zoom, boolean filter) { + return CompletableFuture.supplyAsync(() -> generateRegion(centerX, centerZ, zoom, filter)); } public Region generateRegion(int regionX, int regionZ) { try (ObjectPool.Item item = genPool.get()) { WorldGenerator generator = item.getValue(); Region region = new Region(regionX, regionZ, factor, border); - try (ThreadPool.Batcher batcher = threadPool.batcher(region.getChunkCount())) { + try (Batcher batcher = threadPool.batcher(region.getChunkCount())) { region.generate(generator.getHeightmap(), batcher); } postProcess(region, generator); @@ -75,7 +75,7 @@ public class RegionGenerator implements RegionExtent { try (ObjectPool.Item item = genPool.get()) { WorldGenerator generator = item.getValue(); Region region = new Region(0, 0, factor, border); - try (ThreadPool.Batcher batcher = threadPool.batcher(region.getChunkCount())) { + try (Batcher batcher = threadPool.batcher(region.getChunkCount())) { region.generateZoom(generator.getHeightmap(), centerX, centerZ, zoom, batcher); } region.check(); diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/ThreadPool.java b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/ThreadPool.java index 0e4aef5..83f3333 100644 --- a/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/ThreadPool.java +++ b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/ThreadPool.java @@ -1,7 +1,9 @@ package com.terraforged.core.util.concurrent; -import java.util.ArrayList; -import java.util.List; +import com.terraforged.core.util.concurrent.batcher.AsyncBatcher; +import com.terraforged.core.util.concurrent.batcher.Batcher; +import com.terraforged.core.util.concurrent.batcher.SyncBatcher; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -10,27 +12,27 @@ import java.util.concurrent.Future; public class ThreadPool { - public static final int DEFAULT_POOL_SIZE = Math.max(2, (Runtime.getRuntime().availableProcessors() / 2) + 1); + public static final int DEFAULT_POOL_SIZE = defaultPoolSize(); private static final Object lock = new Object(); private static ThreadPool instance = new ThreadPool(); - private final int size; + private final int poolSize; private final ExecutorService service; private ThreadPool() { this.service = ForkJoinPool.commonPool(); - this.size = -1; + this.poolSize = -1; } public ThreadPool(int size) { this.service = Executors.newFixedThreadPool(size); - this.size = size; + this.poolSize = size; } public void shutdown() { - if (size > 0) { + if (poolSize > 0) { service.shutdown(); } } @@ -44,47 +46,15 @@ public class ThreadPool { } public Batcher batcher(int size) { - return new Batcher(size); - } - - public class Batcher implements AutoCloseable { - - private final List> tasks; - - public Batcher(int size) { - tasks = new ArrayList<>(size); - } - - public void submit(Runnable task) { - tasks.add(ThreadPool.this.submit(task)); - } - - public void submit(Callable task) { - tasks.add(ThreadPool.this.submit(task)); - } - - public void await() { - boolean hasMore = true; - while (hasMore) { - hasMore = false; - for (Future future : tasks) { - if (!future.isDone()) { - hasMore = true; - } - } - } - tasks.clear(); - } - - @Override - public void close() { - await(); + if (this.poolSize != -1 && this.poolSize < 3) { + return new SyncBatcher(); } + return new AsyncBatcher(service, size); } public static ThreadPool getFixed(int size) { synchronized (lock) { - if (instance.size != size) { + if (instance.poolSize != size) { instance.shutdown(); instance = new ThreadPool(size); } @@ -94,7 +64,7 @@ public class ThreadPool { public static ThreadPool getFixed() { synchronized (lock) { - if (instance.size == -1) { + if (instance.poolSize == -1) { instance = new ThreadPool(ThreadPool.DEFAULT_POOL_SIZE); } return instance; @@ -103,7 +73,7 @@ public class ThreadPool { public static ThreadPool getCommon() { synchronized (lock) { - if (instance.size != -1) { + if (instance.poolSize != -1) { instance.shutdown(); instance = new ThreadPool(); } @@ -118,4 +88,9 @@ public class ThreadPool { instance = new ThreadPool(); } } + + private static int defaultPoolSize() { + int threads = Runtime.getRuntime().availableProcessors(); + return Math.max(1, (threads / 3) * 2); + } } diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/AsyncBatcher.java b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/AsyncBatcher.java new file mode 100644 index 0000000..802f2e0 --- /dev/null +++ b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/AsyncBatcher.java @@ -0,0 +1,42 @@ +package com.terraforged.core.util.concurrent.batcher; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class AsyncBatcher implements Batcher { + + private final List> tasks; + private final ExecutorService executor; + + public AsyncBatcher(ExecutorService executor, int size) { + this.executor = executor; + this.tasks = new ArrayList<>(size); + } + + @Override + public void submit(Runnable task) { + tasks.add(executor.submit(task)); + } + + @Override + public void submit(Callable task) { + tasks.add(executor.submit(task)); + } + + @Override + public void close() { + boolean hasMore = true; + while (hasMore) { + hasMore = false; + for (Future future : tasks) { + if (!future.isDone()) { + hasMore = true; + } + } + } + tasks.clear(); + } +} diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/Batcher.java b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/Batcher.java new file mode 100644 index 0000000..4be09f1 --- /dev/null +++ b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/Batcher.java @@ -0,0 +1,13 @@ +package com.terraforged.core.util.concurrent.batcher; + +import java.util.concurrent.Callable; + +public interface Batcher extends AutoCloseable { + + void submit(Runnable task); + + void submit(Callable task); + + @Override + void close(); +} diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/SyncBatcher.java b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/SyncBatcher.java new file mode 100644 index 0000000..7a41cd9 --- /dev/null +++ b/TerraForgedCore/src/main/java/com/terraforged/core/util/concurrent/batcher/SyncBatcher.java @@ -0,0 +1,25 @@ +package com.terraforged.core.util.concurrent.batcher; + +import java.util.concurrent.Callable; + +public class SyncBatcher implements Batcher { + + @Override + public void submit(Runnable task) { + task.run(); + } + + @Override + public void submit(Callable task) { + try { + task.call(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() { + + } +} diff --git a/TerraForgedCore/src/main/java/com/terraforged/core/world/heightmap/RegionExtent.java b/TerraForgedCore/src/main/java/com/terraforged/core/world/heightmap/RegionExtent.java index dab72a0..22b641c 100644 --- a/TerraForgedCore/src/main/java/com/terraforged/core/world/heightmap/RegionExtent.java +++ b/TerraForgedCore/src/main/java/com/terraforged/core/world/heightmap/RegionExtent.java @@ -9,8 +9,7 @@ import com.terraforged.core.world.terrain.Terrain; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; public interface RegionExtent extends Extent { @@ -18,7 +17,7 @@ public interface RegionExtent extends Extent { Region getRegion(int regionX, int regionZ); - Future getRegionAsync(int regionX, int regionZ); + CompletableFuture getRegionAsync(int regionX, int regionZ); default ChunkReader getChunk(int chunkX, int chunkZ) { int regionX = chunkToRegion(chunkX); @@ -27,8 +26,8 @@ public interface RegionExtent extends Extent { return region.getChunk(chunkX, chunkZ); } - default List> getRegions(int minRegionX, int minRegionZ, int maxRegionX, int maxRegionZ) { - List> regions = new LinkedList<>(); + default List> getRegions(int minRegionX, int minRegionZ, int maxRegionX, int maxRegionZ) { + List> regions = new LinkedList<>(); for (int rz = minRegionZ; rz <= maxRegionZ; rz++) { for (int rx = minRegionX; rx <= maxRegionX; rx++) { regions.add(getRegionAsync(rx, rz)); @@ -43,18 +42,14 @@ public interface RegionExtent extends Extent { int minRegionZ = chunkToRegion(Size.blockToChunk(minZ)); int maxRegionX = chunkToRegion(Size.blockToChunk(maxX)); int maxRegionZ = chunkToRegion(Size.blockToChunk(maxZ)); - List> regions = getRegions(minRegionX, minRegionZ, maxRegionX, maxRegionZ); + List> regions = getRegions(minRegionX, minRegionZ, maxRegionX, maxRegionZ); while (!regions.isEmpty()) { regions.removeIf(future -> { if (!future.isDone()) { return false; } - try { - Region region = future.get(); - region.visit(minX, minZ, maxX, maxZ, visitor); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } + Region region = future.join(); + region.visit(minX, minZ, maxX, maxZ, visitor); return true; }); }