use completable futures

This commit is contained in:
dags- 2020-02-17 09:21:18 +00:00
parent eaf0be6ef2
commit 5b14cad343
8 changed files with 127 additions and 83 deletions

View File

@ -8,7 +8,7 @@ import com.terraforged.core.region.chunk.ChunkGenTask;
import com.terraforged.core.region.chunk.ChunkReader; import com.terraforged.core.region.chunk.ChunkReader;
import com.terraforged.core.region.chunk.ChunkWriter; import com.terraforged.core.region.chunk.ChunkWriter;
import com.terraforged.core.region.chunk.ChunkZoomTask; import com.terraforged.core.region.chunk.ChunkZoomTask;
import com.terraforged.core.util.concurrent.ThreadPool; import com.terraforged.core.util.concurrent.batcher.Batcher;
import com.terraforged.core.world.heightmap.Heightmap; import com.terraforged.core.world.heightmap.Heightmap;
import com.terraforged.core.world.terrain.Terrain; import com.terraforged.core.world.terrain.Terrain;
@ -106,7 +106,7 @@ public class Region implements Extent {
} }
} }
public void generate(Heightmap heightmap, ThreadPool.Batcher batcher) { public void generate(Heightmap heightmap, Batcher batcher) {
for (int cz = 0; cz < chunkSize.total; cz++) { for (int cz = 0; cz < chunkSize.total; cz++) {
for (int cx = 0; cx < chunkSize.total; cx++) { for (int cx = 0; cx < chunkSize.total; cx++) {
int index = chunkSize.indexOf(cx, cz); int index = chunkSize.indexOf(cx, cz);
@ -117,7 +117,7 @@ public class Region implements Extent {
} }
} }
public void generateZoom(Heightmap heightmap, float offsetX, float offsetZ, float zoom, ThreadPool.Batcher batcher) { public void generateZoom(Heightmap heightmap, float offsetX, float offsetZ, float zoom, Batcher batcher) {
float translateX = offsetX - ((blockSize.total * zoom) / 2F); float translateX = offsetX - ((blockSize.total * zoom) / 2F);
float translateZ = offsetZ - ((blockSize.total * zoom) / 2F); float translateZ = offsetZ - ((blockSize.total * zoom) / 2F);
for (int cz = 0; cz < chunkSize.total; cz++) { for (int cz = 0; cz < chunkSize.total; cz++) {

View File

@ -2,26 +2,24 @@ package com.terraforged.core.region;
import com.terraforged.core.region.chunk.ChunkReader; import com.terraforged.core.region.chunk.ChunkReader;
import com.terraforged.core.util.Cache; import com.terraforged.core.util.Cache;
import com.terraforged.core.util.FutureValue;
import com.terraforged.core.world.heightmap.RegionExtent; import com.terraforged.core.world.heightmap.RegionExtent;
import me.dags.noise.util.NoiseUtil; import me.dags.noise.util.NoiseUtil;
import java.util.concurrent.ExecutionException; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class RegionCache implements RegionExtent { public class RegionCache implements RegionExtent {
private final boolean queuing; private final boolean queuing;
private final RegionGenerator renderer; private final RegionGenerator renderer;
private final Cache<Long, Future<Region>> cache; private final Cache<Long, CompletableFuture<Region>> cache;
private Region cachedRegion = null; private Region cachedRegion = null;
public RegionCache(boolean queueNeighbours, RegionGenerator renderer) { public RegionCache(boolean queueNeighbours, RegionGenerator renderer) {
this.renderer = renderer; this.renderer = renderer;
this.queuing = queueNeighbours; this.queuing = queueNeighbours;
this.cache = new com.terraforged.core.util.Cache<>(180, 60, TimeUnit.SECONDS); this.cache = new Cache<>(180, 60, TimeUnit.SECONDS);
} }
@Override @Override
@ -30,9 +28,9 @@ public class RegionCache implements RegionExtent {
} }
@Override @Override
public Future<Region> getRegionAsync(int regionX, int regionZ) { public CompletableFuture<Region> getRegionAsync(int regionX, int regionZ) {
long id = NoiseUtil.seed(regionX, regionZ); long id = NoiseUtil.seed(regionX, regionZ);
Future<Region> future = cache.get(id); CompletableFuture<Region> future = cache.get(id);
if (future == null) { if (future == null) {
future = renderer.getRegionAsync(regionX, regionZ); future = renderer.getRegionAsync(regionX, regionZ);
cache.put(id, future); cache.put(id, future);
@ -55,17 +53,13 @@ public class RegionCache implements RegionExtent {
} }
long id = NoiseUtil.seed(regionX, regionZ); long id = NoiseUtil.seed(regionX, regionZ);
Future<Region> futureRegion = cache.get(id); CompletableFuture<Region> futureRegion = cache.get(id);
if (futureRegion == null) { if (futureRegion == null) {
cachedRegion = renderer.generateRegion(regionX, regionZ); cachedRegion = renderer.generateRegion(regionX, regionZ);
cache.put(id, new FutureValue<>(cachedRegion)); cache.put(id, CompletableFuture.completedFuture(cachedRegion));
} else { } else {
try { cachedRegion = futureRegion.join();
cachedRegion = futureRegion.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
} }
if (queuing) { if (queuing) {

View File

@ -2,12 +2,12 @@ package com.terraforged.core.region;
import com.terraforged.core.util.concurrent.ObjectPool; import com.terraforged.core.util.concurrent.ObjectPool;
import com.terraforged.core.util.concurrent.ThreadPool; import com.terraforged.core.util.concurrent.ThreadPool;
import com.terraforged.core.util.concurrent.batcher.Batcher;
import com.terraforged.core.world.WorldGenerator; import com.terraforged.core.world.WorldGenerator;
import com.terraforged.core.world.WorldGeneratorFactory; import com.terraforged.core.world.WorldGeneratorFactory;
import com.terraforged.core.world.heightmap.RegionExtent; import com.terraforged.core.world.heightmap.RegionExtent;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class RegionGenerator implements RegionExtent { public class RegionGenerator implements RegionExtent {
@ -42,23 +42,23 @@ public class RegionGenerator implements RegionExtent {
} }
@Override @Override
public Future<Region> getRegionAsync(int regionX, int regionZ) { public CompletableFuture<Region> getRegionAsync(int regionX, int regionZ) {
return generate(regionX, regionZ); return generate(regionX, regionZ);
} }
public Future<Region> generate(int regionX, int regionZ) { public CompletableFuture<Region> generate(int regionX, int regionZ) {
return ForkJoinPool.commonPool().submit(() -> generateRegion(regionX, regionZ)); return CompletableFuture.supplyAsync(() -> generateRegion(regionX, regionZ));
} }
public Future<Region> generate(float centerX, float centerZ, float zoom, boolean filter) { public CompletableFuture<Region> generate(float centerX, float centerZ, float zoom, boolean filter) {
return ForkJoinPool.commonPool().submit(() -> generateRegion(centerX, centerZ, zoom, filter)); return CompletableFuture.supplyAsync(() -> generateRegion(centerX, centerZ, zoom, filter));
} }
public Region generateRegion(int regionX, int regionZ) { public Region generateRegion(int regionX, int regionZ) {
try (ObjectPool.Item<WorldGenerator> item = genPool.get()) { try (ObjectPool.Item<WorldGenerator> item = genPool.get()) {
WorldGenerator generator = item.getValue(); WorldGenerator generator = item.getValue();
Region region = new Region(regionX, regionZ, factor, border); Region region = new Region(regionX, regionZ, factor, border);
try (ThreadPool.Batcher batcher = threadPool.batcher(region.getChunkCount())) { try (Batcher batcher = threadPool.batcher(region.getChunkCount())) {
region.generate(generator.getHeightmap(), batcher); region.generate(generator.getHeightmap(), batcher);
} }
postProcess(region, generator); postProcess(region, generator);
@ -75,7 +75,7 @@ public class RegionGenerator implements RegionExtent {
try (ObjectPool.Item<WorldGenerator> item = genPool.get()) { try (ObjectPool.Item<WorldGenerator> item = genPool.get()) {
WorldGenerator generator = item.getValue(); WorldGenerator generator = item.getValue();
Region region = new Region(0, 0, factor, border); Region region = new Region(0, 0, factor, border);
try (ThreadPool.Batcher batcher = threadPool.batcher(region.getChunkCount())) { try (Batcher batcher = threadPool.batcher(region.getChunkCount())) {
region.generateZoom(generator.getHeightmap(), centerX, centerZ, zoom, batcher); region.generateZoom(generator.getHeightmap(), centerX, centerZ, zoom, batcher);
} }
region.check(); region.check();

View File

@ -1,7 +1,9 @@
package com.terraforged.core.util.concurrent; package com.terraforged.core.util.concurrent;
import java.util.ArrayList; import com.terraforged.core.util.concurrent.batcher.AsyncBatcher;
import java.util.List; import com.terraforged.core.util.concurrent.batcher.Batcher;
import com.terraforged.core.util.concurrent.batcher.SyncBatcher;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -10,27 +12,27 @@ import java.util.concurrent.Future;
public class ThreadPool { public class ThreadPool {
public static final int DEFAULT_POOL_SIZE = Math.max(2, (Runtime.getRuntime().availableProcessors() / 2) + 1); public static final int DEFAULT_POOL_SIZE = defaultPoolSize();
private static final Object lock = new Object(); private static final Object lock = new Object();
private static ThreadPool instance = new ThreadPool(); private static ThreadPool instance = new ThreadPool();
private final int size; private final int poolSize;
private final ExecutorService service; private final ExecutorService service;
private ThreadPool() { private ThreadPool() {
this.service = ForkJoinPool.commonPool(); this.service = ForkJoinPool.commonPool();
this.size = -1; this.poolSize = -1;
} }
public ThreadPool(int size) { public ThreadPool(int size) {
this.service = Executors.newFixedThreadPool(size); this.service = Executors.newFixedThreadPool(size);
this.size = size; this.poolSize = size;
} }
public void shutdown() { public void shutdown() {
if (size > 0) { if (poolSize > 0) {
service.shutdown(); service.shutdown();
} }
} }
@ -44,47 +46,15 @@ public class ThreadPool {
} }
public Batcher batcher(int size) { public Batcher batcher(int size) {
return new Batcher(size); if (this.poolSize != -1 && this.poolSize < 3) {
} return new SyncBatcher();
public class Batcher implements AutoCloseable {
private final List<Future<?>> tasks;
public Batcher(int size) {
tasks = new ArrayList<>(size);
}
public void submit(Runnable task) {
tasks.add(ThreadPool.this.submit(task));
}
public void submit(Callable<?> task) {
tasks.add(ThreadPool.this.submit(task));
}
public void await() {
boolean hasMore = true;
while (hasMore) {
hasMore = false;
for (Future<?> future : tasks) {
if (!future.isDone()) {
hasMore = true;
}
}
}
tasks.clear();
}
@Override
public void close() {
await();
} }
return new AsyncBatcher(service, size);
} }
public static ThreadPool getFixed(int size) { public static ThreadPool getFixed(int size) {
synchronized (lock) { synchronized (lock) {
if (instance.size != size) { if (instance.poolSize != size) {
instance.shutdown(); instance.shutdown();
instance = new ThreadPool(size); instance = new ThreadPool(size);
} }
@ -94,7 +64,7 @@ public class ThreadPool {
public static ThreadPool getFixed() { public static ThreadPool getFixed() {
synchronized (lock) { synchronized (lock) {
if (instance.size == -1) { if (instance.poolSize == -1) {
instance = new ThreadPool(ThreadPool.DEFAULT_POOL_SIZE); instance = new ThreadPool(ThreadPool.DEFAULT_POOL_SIZE);
} }
return instance; return instance;
@ -103,7 +73,7 @@ public class ThreadPool {
public static ThreadPool getCommon() { public static ThreadPool getCommon() {
synchronized (lock) { synchronized (lock) {
if (instance.size != -1) { if (instance.poolSize != -1) {
instance.shutdown(); instance.shutdown();
instance = new ThreadPool(); instance = new ThreadPool();
} }
@ -118,4 +88,9 @@ public class ThreadPool {
instance = new ThreadPool(); instance = new ThreadPool();
} }
} }
private static int defaultPoolSize() {
int threads = Runtime.getRuntime().availableProcessors();
return Math.max(1, (threads / 3) * 2);
}
} }

View File

@ -0,0 +1,42 @@
package com.terraforged.core.util.concurrent.batcher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class AsyncBatcher implements Batcher {
private final List<Future<?>> tasks;
private final ExecutorService executor;
public AsyncBatcher(ExecutorService executor, int size) {
this.executor = executor;
this.tasks = new ArrayList<>(size);
}
@Override
public void submit(Runnable task) {
tasks.add(executor.submit(task));
}
@Override
public void submit(Callable<?> task) {
tasks.add(executor.submit(task));
}
@Override
public void close() {
boolean hasMore = true;
while (hasMore) {
hasMore = false;
for (Future<?> future : tasks) {
if (!future.isDone()) {
hasMore = true;
}
}
}
tasks.clear();
}
}

View File

@ -0,0 +1,13 @@
package com.terraforged.core.util.concurrent.batcher;
import java.util.concurrent.Callable;
public interface Batcher extends AutoCloseable {
void submit(Runnable task);
void submit(Callable<?> task);
@Override
void close();
}

View File

@ -0,0 +1,25 @@
package com.terraforged.core.util.concurrent.batcher;
import java.util.concurrent.Callable;
public class SyncBatcher implements Batcher {
@Override
public void submit(Runnable task) {
task.run();
}
@Override
public void submit(Callable<?> task) {
try {
task.call();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
}
}

View File

@ -9,8 +9,7 @@ import com.terraforged.core.world.terrain.Terrain;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public interface RegionExtent extends Extent { public interface RegionExtent extends Extent {
@ -18,7 +17,7 @@ public interface RegionExtent extends Extent {
Region getRegion(int regionX, int regionZ); Region getRegion(int regionX, int regionZ);
Future<Region> getRegionAsync(int regionX, int regionZ); CompletableFuture<Region> getRegionAsync(int regionX, int regionZ);
default ChunkReader getChunk(int chunkX, int chunkZ) { default ChunkReader getChunk(int chunkX, int chunkZ) {
int regionX = chunkToRegion(chunkX); int regionX = chunkToRegion(chunkX);
@ -27,8 +26,8 @@ public interface RegionExtent extends Extent {
return region.getChunk(chunkX, chunkZ); return region.getChunk(chunkX, chunkZ);
} }
default List<Future<Region>> getRegions(int minRegionX, int minRegionZ, int maxRegionX, int maxRegionZ) { default List<CompletableFuture<Region>> getRegions(int minRegionX, int minRegionZ, int maxRegionX, int maxRegionZ) {
List<Future<Region>> regions = new LinkedList<>(); List<CompletableFuture<Region>> regions = new LinkedList<>();
for (int rz = minRegionZ; rz <= maxRegionZ; rz++) { for (int rz = minRegionZ; rz <= maxRegionZ; rz++) {
for (int rx = minRegionX; rx <= maxRegionX; rx++) { for (int rx = minRegionX; rx <= maxRegionX; rx++) {
regions.add(getRegionAsync(rx, rz)); regions.add(getRegionAsync(rx, rz));
@ -43,18 +42,14 @@ public interface RegionExtent extends Extent {
int minRegionZ = chunkToRegion(Size.blockToChunk(minZ)); int minRegionZ = chunkToRegion(Size.blockToChunk(minZ));
int maxRegionX = chunkToRegion(Size.blockToChunk(maxX)); int maxRegionX = chunkToRegion(Size.blockToChunk(maxX));
int maxRegionZ = chunkToRegion(Size.blockToChunk(maxZ)); int maxRegionZ = chunkToRegion(Size.blockToChunk(maxZ));
List<Future<Region>> regions = getRegions(minRegionX, minRegionZ, maxRegionX, maxRegionZ); List<CompletableFuture<Region>> regions = getRegions(minRegionX, minRegionZ, maxRegionX, maxRegionZ);
while (!regions.isEmpty()) { while (!regions.isEmpty()) {
regions.removeIf(future -> { regions.removeIf(future -> {
if (!future.isDone()) { if (!future.isDone()) {
return false; return false;
} }
try { Region region = future.join();
Region region = future.get();
region.visit(minX, minZ, maxX, maxZ, visitor); region.visit(minX, minZ, maxX, maxZ, visitor);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return true; return true;
}); });
} }