This commit is contained in:
Redkale
2017-04-20 12:21:47 +08:00
parent 7463a8f6b5
commit ffa80c9212
6 changed files with 100 additions and 15 deletions

View File

@@ -328,7 +328,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param future 输出对象的句柄
*/
public void finishJson(final JsonConvert convert, final CompletableFuture future) {
future.whenCompleteAsync((v, e) -> {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
@@ -341,7 +341,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else {
finishJson(convert, v);
}
}, this.context.getExecutor());
});
}
/**
@@ -352,7 +352,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
* @param future 输出对象的句柄
*/
public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future) {
future.whenCompleteAsync((v, e) -> {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + request, e);
finish(500, null);
@@ -365,7 +365,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else {
finishJson(convert, type, v);
}
}, this.context.getExecutor());
});
}
/**

View File

@@ -24,7 +24,7 @@ public abstract class RestHttpServlet<T> extends HttpBaseServlet {
protected abstract T currentUser(HttpRequest req) throws IOException;
protected void finishJson(final HttpResponse response, CompletableFuture<RestOutput> future) throws IOException {
future.whenCompleteAsync((output, e) -> {
future.whenComplete((output, e) -> {
if (e != null) {
response.getContext().getLogger().log(Level.WARNING, "Servlet occur, forece to close channel. request = " + response.getRequest(), e);
response.finish(500, null);
@@ -36,7 +36,7 @@ public abstract class RestHttpServlet<T> extends HttpBaseServlet {
response.getContext().getLogger().log(Level.WARNING, "Servlet finish RestOutput occur, forece to close channel. request = " + response.getRequest(), ioe);
response.finish(500, null);
}
}, response.getContext().getExecutor());
});
}
protected void finishJson(final HttpResponse response, RestOutput output) throws IOException {

View File

@@ -305,7 +305,7 @@ public final class SncpClient {
CompletableFuture<byte[]> future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params);
if (action.boolReturnTypeFuture) {
CompletableFuture result = action.futureCreator.create();
future.whenCompleteAsync((v, e) -> {
future.whenComplete((v, e) -> {
try {
if (e != null) {
result.completeExceptionally(e);

View File

@@ -151,7 +151,7 @@ public final class SncpDynServlet extends SncpServlet {
action.convert.convertTo(out, Object.class, null);
} else {
Object[] sncpParams = handler.sncp_getParams();
future.whenCompleteAsync((v, e) -> {
future.whenComplete((v, e) -> {
if (e != null) {
response.getContext().getLogger().log(Level.INFO, "sncp CompleteAsync error(" + request + ")", e);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
@@ -162,7 +162,7 @@ public final class SncpDynServlet extends SncpServlet {
response.finish(0, out);
action.convert.offerBsonReader(in);
action.convert.offerBsonWriter(out);
}, getExecutor());
});
}
}
} catch (Throwable t) {

View File

@@ -195,7 +195,7 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return !entry.isExpired();
}
//@Override
@Override
public CompletableFuture<Boolean> existsAsync(final K key) {
CompletableFuture<Boolean> future = new CompletableFuture();
future.complete(exists(key));
@@ -218,7 +218,7 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return (V) entry.getValue();
}
//@Override
@Override
public CompletableFuture<V> getAsync(final K key) {
CompletableFuture<V> future = new CompletableFuture();
future.complete(get(key));
@@ -244,7 +244,7 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return (V) entry.getValue();
}
//@Override
@Override
public CompletableFuture<V> getAndRefreshAsync(final K key, final int expireSeconds) {
CompletableFuture<V> future = new CompletableFuture();
future.complete(getAndRefresh(key, expireSeconds));
@@ -267,6 +267,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
entry.expireSeconds = expireSeconds;
}
@Override
public CompletableFuture<Void> refreshAsync(final K key, final int expireSeconds) {
CompletableFuture<Void> future = new CompletableFuture();
refresh(key, expireSeconds);
future.complete(null);
return future;
}
@Override
public void refreshAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final int expireSeconds) {
refresh(key, expireSeconds);
@@ -288,6 +296,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
}
}
@Override
public CompletableFuture<Void> setAsync(K key, V value) {
CompletableFuture<Void> future = new CompletableFuture();
set(key, value);
future.complete(null);
return future;
}
@Override
public void setAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final V value) {
set(key, value);
@@ -309,6 +325,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
}
}
@Override
public CompletableFuture<Void> setAsync(int expireSeconds, K key, V value) {
CompletableFuture<Void> future = new CompletableFuture();
set(expireSeconds, key, value);
future.complete(null);
return future;
}
@Override
public void setAsync(final AsyncHandler<Void, K> handler, final int expireSeconds, @RpcAttachment final K key, final V value) {
set(expireSeconds, key, value);
@@ -324,6 +348,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
entry.expireSeconds = expireSeconds;
}
@Override
public CompletableFuture<Void> setExpireSecondsAsync(final K key, final int expireSeconds) {
CompletableFuture<Void> future = new CompletableFuture();
setExpireSeconds(key, expireSeconds);
future.complete(null);
return future;
}
@Override
public void setExpireSecondsAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final int expireSeconds) {
setExpireSeconds(key, expireSeconds);
@@ -337,6 +369,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
container.remove(key);
}
@Override
public CompletableFuture<Void> removeAsync(final K key) {
CompletableFuture<Void> future = new CompletableFuture();
remove(key);
future.complete(null);
return future;
}
@Override
public void removeAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key) {
remove(key);
@@ -348,6 +388,13 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return (Collection<V>) get(key);
}
@Override
public CompletableFuture<Collection<V>> getCollectionAsync(final K key) {
CompletableFuture<Collection<V>> future = new CompletableFuture();
future.complete((Collection<V>) get(key));
return future;
}
@Override
public void getCollectionAsync(final AsyncHandler<Collection<V>, K> handler, @RpcAttachment final K key) {
Collection<V> rs = getCollection(key);
@@ -359,6 +406,13 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return (Collection<V>) getAndRefresh(key, expireSeconds);
}
@Override
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds) {
CompletableFuture<Collection<V>> future = new CompletableFuture();
future.complete((Collection<V>) getAndRefresh(key, expireSeconds));
return future;
}
@Override
public void getCollectionAndRefreshAsync(final AsyncHandler<Collection<V>, K> handler, @RpcAttachment final K key, final int expireSeconds) {
Collection<V> rs = getCollectionAndRefresh(key, expireSeconds);
@@ -381,6 +435,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
}
}
@Override
public CompletableFuture<Void> appendListItemAsync(final K key, final V value) {
CompletableFuture<Void> future = new CompletableFuture();
appendListItem(key, value);
future.complete(null);
return future;
}
@Override
public void appendListItemAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final V value) {
appendListItem(key, value);
@@ -396,6 +458,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
((Collection) entry.getValue()).remove(value);
}
@Override
public CompletableFuture<Void> removeListItemAsync(final K key, final V value) {
CompletableFuture<Void> future = new CompletableFuture();
removeListItem(key, value);
future.complete(null);
return future;
}
@Override
public void removeListItemAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final V value) {
removeListItem(key, value);
@@ -418,6 +488,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
}
}
@Override
public CompletableFuture<Void> appendSetItemAsync(final K key, final V value) {
CompletableFuture<Void> future = new CompletableFuture();
appendSetItem(key, value);
future.complete(null);
return future;
}
@Override
public void appendSetItemAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final V value) {
appendSetItem(key, value);
@@ -433,6 +511,14 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
((Set) entry.getValue()).remove(value);
}
@Override
public CompletableFuture<Void> removeSetItemAsync(final K key, final V value) {
CompletableFuture<Void> future = new CompletableFuture();
removeSetItem(key, value);
future.complete(null);
return future;
}
@Override
public void removeSetItemAsync(final AsyncHandler<Void, K> handler, @RpcAttachment final K key, final V value) {
removeSetItem(key, value);

View File

@@ -7,6 +7,7 @@ package org.redkale.source;
import java.io.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.redkale.util.*;
/**
@@ -53,7 +54,6 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public void removeSetItem(final K key, final V value);
//---------------------- CompletableFuture 异步版 ---------------------------------
/**
public CompletableFuture<Boolean> existsAsync(final K key);
public CompletableFuture<V> getAsync(final K key);
@@ -87,9 +87,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
future.complete(true);
return future;
}
*/
//---------------------- AsyncHandler 异步版 ---------------------------------
public void existsAsync(final AsyncHandler<Boolean, K> handler, final K key);
public void getAsync(final AsyncHandler<V, K> handler, final K key);