diff --git a/src/com/haogames/core/util/FileKit.java b/src/com/haogames/core/util/FileKit.java deleted file mode 100644 index 8307cef..0000000 --- a/src/com/haogames/core/util/FileKit.java +++ /dev/null @@ -1,163 +0,0 @@ -package com.haogames.core.util; - -import org.redkale.convert.json.JsonConvert; - -import java.io.*; -import java.lang.reflect.Type; -import java.net.URL; -import java.net.URLDecoder; -import java.nio.file.Files; -import java.util.Map; - -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; -import static java.util.Arrays.asList; - -/** - * Created by liangxianyou at 2018/5/31 10:23. - */ -public final class FileKit { - - private FileKit() { - } - - public static void strToFile(String entityBody, File file) { - strToFile(entityBody, file, true); - } - - public static void strToFile(String entityBody, File file, boolean existDel) { - if (file.exists()) { - if (existDel) { - file.delete(); - } else { - throw new RuntimeException(file.getPath() + "已经存在"); - } - } - - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - try ( - FileOutputStream out = new FileOutputStream(file); - ) { - out.write(entityBody.getBytes("UTF-8")); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - public static void append(String str, File file) { - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - - try ( - FileOutputStream out = new FileOutputStream(file, true); - ) { - out.write(str.getBytes("UTF-8")); - if (!str.endsWith("\n")) { - out.write("\n".getBytes("UTF-8")); - } - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - /** - * 拷贝文件/文件目录 - * - * @param source 源文件目录 - * @param target 目标目录 - */ - private static void copyFiles(File source, File target) { - copyFiles(source, target, ""); - } - - /** - * 拷贝文件/文件目录 - * - * @param source - * @param target - * @param linkPath - */ - public static void copyFiles(File source, File target, String linkPath) { - if (source.isDirectory()) { - final String _linkPath = linkPath + File.separator + source.getName(); - asList(source.listFiles()).forEach(f -> { - copyFiles(f, target, _linkPath); - }); - } else if (source.isFile()) { - try { - String _linkPath = ""; - int index = linkPath.indexOf(File.separator, 1); - if (index > 0) { - _linkPath = linkPath.substring(index); - } - File targetFile = new File(target.toPath() + _linkPath + File.separator + source.getName()); - if (!targetFile.getParentFile().exists()) { - targetFile.getParentFile().mkdirs(); - } - - Files.copy(source.toPath(), targetFile.toPath(), REPLACE_EXISTING); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - /** - * 获取 clazz的路径,如果是jar里面的文件得到jar存放的目录,如:lib - * - * @param clazz - * @return - */ - public static String rootPath(Class clazz) { - //return clazz.getClassLoader().getResource("").getPath(); - URL url = clazz.getProtectionDomain().getCodeSource().getLocation(); - try { - String filePath = URLDecoder.decode(url.getPath(), "utf-8"); - if (filePath.endsWith(".jar")) { - return filePath.substring(0, filePath.lastIndexOf("/") + 1); - } - return filePath; - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - - return ""; - } - - public static String rootPath() { - return rootPath(FileKit.class); - } - - /** - * 读取流内的所有内容 - * - * @param inputStream - * @return - * @throws IOException - */ - public static String readAll(InputStream inputStream) { - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - StringBuffer buf = new StringBuffer(); - String str; - try { - while ((str = reader.readLine()) != null) { - buf.append(str + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } - return buf.toString(); - } - - public static T readAs(File file, Type typeToken) throws IOException { - try ( - FileInputStream inputStream = new FileInputStream(file) - ) { - return JsonConvert.root().convertFrom(typeToken, inputStream); - } - } -} diff --git a/src/com/haogames/core/util/IpKit.java b/src/com/haogames/core/util/IpKit.java deleted file mode 100644 index a5966de..0000000 --- a/src/com/haogames/core/util/IpKit.java +++ /dev/null @@ -1,391 +0,0 @@ -package com.haogames.core.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; -import java.util.regex.Pattern; - -/** - * @author: liangxianyou - */ -public class IpKit { - - public static String toNum(String ip) { - if (ip == null || ip.isEmpty()) { - return ""; - } - if (isV4(ip)) { - return v4Num(ip) + ""; - } else { - return v6Num(ip); - } - } - - - /** - * IP区间包含IP数计算,含首尾 - * - * @param startIp 其实ip - * @param endIp 结束ip - * @return - */ - public static String ipCount(String startIp, String endIp) { - if (startIp == null || startIp.isEmpty() || endIp == null || endIp.isEmpty()) { - return ""; - } - if (isV4(startIp, endIp)) { - return v4Count(startIp, endIp) + ""; - } else { - return v6Count(startIp, endIp); - } - } - - public static boolean isV4(String... ips) { - for (String ip : ips) { - String pattern = "^(25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)(\\.(25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)){3}$"; - if (!Pattern.matches(pattern, ip)) { - return false; - } - - /*if (!ip.contains(".")) { - return false; - }*/ - } - return true; - } - - //------------------------------ ipv4 ---------------------------------- - - /** - * ipv4 区间包含Ip数,含首尾 - * - * @param startIp - * @param endIp - * @return - */ - private static String v4Count(String startIp, String endIp) { - String start = v4Num(startIp); - String end = v4Num(endIp); - - String v = subtract(end, start); - if (v.startsWith("-")) { - v = v.substring(1); - } - v = add(v, "1"); - return v; - } - - /** - * ipv4转数值 - * - * @param ip ipv4 - * @return - */ - private static String v4Num(String ip) { - String[] ipArr = ip.trim().split("[.]"); - int[] vs = {16777216, 65536, 256, 1}; - String v = "0"; - for (int i = 0; i < 4; i++) { - // v += vs[i] * Integer.parseInt(ipArr[i]); - v = add(v, ride(vs[i] + "", ipArr[i])); - } - return v; - } - - //------------------------------ ipv6 ---------------------------------- - - /** - * ipv6 区间包含Ip数,含首尾 - * - * @param startIp - * @param endIp - * @return - */ - private static String v6Count(String startIp, String endIp) { - String start = v6Num(startIp); - String end = v6Num(endIp); - String subtract = subtract(start, end); - if (subtract.startsWith("-")) { - subtract = subtract.substring(1); - } - subtract = add(subtract, "1"); - return subtract; - } - - /** - * 将ipv6转为数值 - * - * @param ip ipv6 - * @return ipv6 转换后的数值 - */ - private static String v6Num(String ip) { - ip = restoreV6(ip); - - int[] ipArr = new int[8]; - String[] ipSlice = ip.split(":"); - - for (int i = 0; i < ipSlice.length; i++) { - ipArr[i] = Integer.parseInt(ipSlice[i], 16); - } - - String[] baseNum = { - "5192296858534827628530496329220096", - "79228162514264337593543950336", - "1208925819614629174706176", - "18446744073709551616", - "281474976710656", - "4294967296", - "65536", - "1"}; - - - String v = "0"; - for (int i = 0; i < 8; i++) { - v = add(v, ride(baseNum[i], ipArr[i] + "")); - } - return v; - } - - - //================================================================= - - /** - * 两任意大小正整数相乘 - * - * @param x 正整数 x - * @param y 正整数 y - * @return 返回乘积数字 字符串 - */ - private static String ride(String x, String y) { - List> tmp = new ArrayList<>(); - - List xArr = new ArrayList<>(); - List yArr = new ArrayList<>(); - for (String s : String.valueOf(x).split("")) { - xArr.add(Integer.parseInt(s)); - } - for (String s : String.valueOf(y).split("")) { - yArr.add(Integer.parseInt(s)); - } - - //分步 相乘 - int z = 0; - for (int i = xArr.size() - 1; i >= 0; i--, z++) { - List list = new ArrayList<>(); - for (int j = 0; j < z; j++) { - list.add(0); - } - - int[] carry = {0}; - int a = xArr.get(i); - for (int j = yArr.size() - 1; j >= 0; j--) { - int b = yArr.get(j); - list.add(a * b % 10 + carry[0]); - carry[0] = a * b / 10; - } - if (carry[0] > 0) { - list.add(carry[0]); - } - - tmp.add(list); - } - - //合并 相加 - String v = ""; - int carry = 0; - boolean end = false; - for (int i = 0; ; i++) { - end = true; - int _v = 0; - for (int j = 0; j < tmp.size(); j++) { - List row = tmp.get(j); - if (row.size() > i) { - end = false; - _v += row.get(i); - } - } - if (end) { - break; - } - - _v = carry + _v; - v = _v % 10 + v; - carry = _v / 10; - } - if (carry > 0) { - v = carry + v; - } - return v; - } - - /** - * 两任意大小正整数 相加 - * - * @param x 正整数 x - * @param y 正整数 y - * @return 返回两数之和 字符串 - */ - private static String add(String x, String y) { - List xArr = toIntSlice.apply(x); - List yArr = toIntSlice.apply(y); - - String v = ""; - int carry = 0; - for (int i = 0; i < xArr.size() || i < yArr.size(); i++) { - int a = i < xArr.size() ? xArr.get(i) : 0; - int b = i < yArr.size() ? yArr.get(i) : 0; - - int _v = a + b + carry; - if (_v >= 10) { - carry = 1; - _v -= 10; - } else { - carry = 0; - } - v = _v + v; - } - if (carry > 0) { - v = carry + v; - } - return v; - } - - private static Function> toIntSlice = (str) -> { - String[] strArr = str.trim().split(""); - List arr = new ArrayList<>(); - for (int i = strArr.length - 1; i >= 0; i--) { - arr.add(Integer.parseInt(strArr[i])); - } - return arr; - }; - /*private static Function> _toIntSlice = (str) -> { - String[] strArr = str.trim().split(""); - List arr = new ArrayList<>(strArr.length); - for (int i = 0; i < strArr.length; i++) { - arr.add(Integer.parseInt(strArr[i])); - } - return arr; - };*/ - - /** - * 任意两个大小正整数相减 - * - * @param x 正整数 x - * @param y 正整数 y - * @return x-y 的差 - */ - private static String subtract(String x, String y) { - List xArr = toIntSlice.apply(x); - List yArr = toIntSlice.apply(y); - - // 值大小比较 - boolean yThanX = xArr.size() < yArr.size(); - if (xArr.size() == yArr.size()) { - for (int i = xArr.size() - 1; i >= 0; i--) { - if (xArr.get(i) > yArr.get(i)) { - yThanX = false; - break; - } else if (xArr.get(i) < yArr.get(i)) { - yThanX = true; - break; - } - } - } - if (yThanX) { - List tmp = xArr; - xArr = yArr; - yArr = tmp; - } - - // 计算 - String v = ""; - int subplus = 0; // 如:-1 - for (int i = 0; i < xArr.size(); i++) { - int a = xArr.get(i); - int b = yArr.size() > i ? yArr.get(i) : 0; - - int _v = a - b + subplus; - if (_v < 0) { - subplus = -1; - _v = _v + 10; - } else { - subplus = 0; - } - - v = _v + v; - } - // 去除首位0 - while (v.startsWith("0") && v.length() > 1) { - v = v.substring(1); - } - if (yThanX) { - v = "-" + v; - } - return v; - } - - /** - * ipv6还原,去掉压缩写法 - * - * @param ip 原始ipv6 - * @return 标准IPV6 - */ - private static String restoreV6(String ip) { - // 计算 :个数 - // 补全 省略的0 - String[] arr = ip.split(""); - int n = 0; - for (String s : arr) { - if (":".equals(s)) { - n++; - } - } - String _ip = ip; - if (n < 7) { - String b = ""; - for (int i = 0; i <= 7 - n; i++) { - b += ":0"; - } - b += ":"; - _ip = ip.replace("::", b); - } - if (_ip.startsWith(":")) { - _ip = "0" + _ip; - } - if (_ip.endsWith(":")) { - _ip = _ip + "0"; - } - return _ip; - } - - // 任意两个小数相加 x: 0.1213, y: 0.981 - /*private static String _add(String x, String y) { - List xArr = _toIntSlice.apply(x.substring(2)); - List yArr = _toIntSlice.apply(y.substring(2)); - - - String v = ""; - int len = xArr.size() > yArr.size() ? xArr.size() : yArr.size(); - int carry = 0; - for (int i = 0; i < len; i++) { - int a = xArr.size() > (len - i - 1) ? xArr.get(len - i - 1) : 0; - int b = yArr.size() > (len - i - 1) ? yArr.get(len - i - 1) : 0; - int _v = a + b + carry; - - carry = _v / 10; - v = _v % 10 + v; - } - v = carry + v; - StringBuffer buf = new StringBuffer(v).insert(v.length() - len, "."); - - return buf.toString(); - } - - // 任意两个数相加 - public static String addx(String x, String y) { - String str = _add(x, y); - System.out.printf("%s + %s = %s \n", x, y, str); - - return str; - }*/ -} diff --git a/src/com/haogames/core/util/Kv.java b/src/com/haogames/core/util/Kv.java deleted file mode 100644 index 7e13a36..0000000 --- a/src/com/haogames/core/util/Kv.java +++ /dev/null @@ -1,272 +0,0 @@ -package com.haogames.core.util; - -import org.redkale.convert.json.JsonConvert; - -import javax.persistence.Id; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Created by liangxianyou@eversec.cn at 2018/3/12 14:17. - */ -public class Kv extends LinkedHashMap { - public static Kv of() { - return new Kv(); - } - - public static Kv of(Object k, Object v) { - return new Kv().set(k, v); - } - - public Kv set(K k, V v) { - put(k, v); - return this; - } - - public Kv putAll(Kv kv) { - kv.forEach((k, v) -> put(k, v)); - return this; - } - - // 将obj 属性映射到Kv 中 - public static Kv toKv(Object m, String... fields) { - Kv kv = Kv.of(); - if (m == null) { - return kv; - } - Stream.of(fields).forEach(field -> { - String filedT = field; - String filedS = field; - - try { - if (field.contains("=")) { - String[] arr = field.split("="); - filedT = arr[0]; - filedS = arr[1]; - } - - Method method = m.getClass().getDeclaredMethod("get" + Utils.toUpperCaseFirst(filedS)); - if (method != null) { - kv.set(filedT, method.invoke(m)); - } - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - new IllegalArgumentException(String.format("Kv.toKv获取 获取参数[]失败", field), e); - } - }); - - return kv; - } - - public static List toKv(Collection datas, String... fields) { - return datas.stream().map(x -> toKv(x, fields)).collect(Collectors.toList()); - } - - public static Kv toKv(Object m) { - return toKv(m, Kv.of(), m.getClass()); - } - - private static Kv toKv(Object m, Kv kv, Class clazz) { - Method[] methods = clazz.getMethods(); - for (Method method : methods) { - if (!method.getName().startsWith("get") || method.getParameterCount() > 0 || "getClass".equals(method.getName())) - continue; - - String k = Utils.toLowerCaseFirst(method.getName().replace("get", "")); - if (!kv.containsKey(k) || Utils.isEmpty(kv.get(k))) { - try { - kv.set(k, method.invoke(m)); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (InvocationTargetException e) { - e.printStackTrace(); - } - } - } - - for (Field field : clazz.getDeclaredFields()) { - if (field.getAnnotation(Id.class) != null) { - try { - field.setAccessible(true); - kv.set("_id", field.get(m)); - break; - } catch (IllegalAccessException e) { - e.printStackTrace(); - } - } - } - - Class superclass = clazz.getSuperclass(); - if (superclass != null) { - kv = toKv(m, kv, superclass); - } - return kv; - } - - public T toBean(Class type) { - return toBean(this, type); - } - - // 首字母大写 - private static Function upFirst = (s) -> { - return s.substring(0, 1).toUpperCase() + s.substring(1); - }; - - private static Predicate isNumber = (t) -> { - return t == Integer.class || t == int.class - || t == Long.class || t == long.class - || t == float.class || t == Float.class - || t == Double.class || t == double.class - || t == Short.class || t == short.class - || t == Byte.class || t == byte.class - ; - }; - - public static T toAs(Object v, Class clazz) { - if (v == null) { - return null; - } else if (v.getClass() == clazz) { - return (T) v; - } else if (clazz == String.class) { - return (T) String.valueOf(v); - } - - Object v1 = v; - try { - - if (v.getClass() == Long.class) {//多种数值类型的处理: Long => x - switch (clazz.getSimpleName()) { - case "int", "Integer" -> v1 = (int) (long) v; - case "short", "Short" -> v1 = (short) (long) v; - case "float", "Float" -> v1 = (float) (long) v; - case "byte", "Byte" -> v1 = (byte) (long) v; - } - } else if (v.getClass() == Double.class) { - if (isNumber.test(clazz)) { - switch (clazz.getSimpleName()) { - case "long", "Long" -> v1 = (long) (double) v; - case "int", "Integer" -> v1 = (int) (double) v; - case "short", "Short" -> v1 = (short) (double) v; - case "float", "Float" -> v1 = (float) (double) v; - case "byte", "Byte" -> v1 = (byte) (double) v; - } - } else if (clazz == String.class) { - v1 = String.valueOf(v); - } - } else if (v.getClass() == String.class) { - switch (clazz.getSimpleName()) { - case "Date" -> v1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse((String) v); - case "short", "Short" -> v1 = (short) Double.parseDouble((String) v); - case "float", "Float" -> v1 = (float) Double.parseDouble((String) v); - case "int", "Integer" -> v1 = (int) Double.parseDouble((String) v); - case "long", "Long" -> v1 = (long) Double.parseDouble((String) v); - case "double", "Double" -> v1 = Double.parseDouble((String) v); - case "byte", "Byte" -> v1 = Byte.parseByte((String) v); - } - } else if (v.getClass() == Integer.class) { - switch (clazz.getSimpleName()) { - case "long", "Long" -> v1 = (long) (int) v; - case "short", "Short" -> v1 = (short) (int) v; - case "float", "Float" -> v1 = (float) (int) v; - case "byte", "Byte" -> v1 = (byte) (int) v; - } - } else if (v.getClass() == Float.class) { - switch (clazz.getSimpleName()) { - case "long", "Long" -> v1 = (long) (float) v; - case "int", "Integer" -> v1 = (int) (float) v; - case "short", "Short" -> v1 = (short) (float) v; - case "byte", "Byte" -> v1 = (byte) (float) v; - } - } else { - v1 = v; - } - } catch (ParseException e) { - e.printStackTrace(); - } - return (T) v1; - } - - public static T toBean(Map map, Class clazz) { - //按照方法名 + 类型寻找, - //按照方法名 寻找 - //+ - Object obj = null; - try { - obj = clazz.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - new IllegalArgumentException("创建对象实列失败", e); // 检查clazz是否有无参构造 - } - - for (String k : (Set) map.keySet()) { - Object v = map.get(k); - if (v == null) continue; - //寻找method - try { - String methodName = "set" + upFirst.apply(k); - Class tClazz = null; - Method method = null; - try { - method = clazz.getMethod(methodName, tClazz = v.getClass()); - } catch (NoSuchMethodException e) { - //e.printStackTrace(); - } - if (method == null) { - for (Method _method : clazz.getMethods()) { - if (methodName.equals(_method.getName()) && _method.getParameterCount() == 1) { - method = _method; - tClazz = _method.getParameterTypes()[0]; - } - } - } - - if (method == null) { - for (Method _method : clazz.getMethods()) { - if (methodName.equalsIgnoreCase(_method.getName()) && _method.getParameterCount() == 1) { - method = _method; - tClazz = _method.getParameterTypes()[0]; - } - } - } - - if (method != null) { - method.invoke(obj, toAs(v, tClazz)); - } - - //没有方法,找属性注解 - /*if (method == null) { - Field field = null; - Field[] fields = clazz.getDeclaredFields(); - for (Field _field : fields) { - To to = _field.getAnnotation(To.class); - if (to != null && k.equals(to.value())) { - field = _field; - tClazz = _field.getType(); - break; - } - } - - if (field != null) { - field.setAccessible(true); - field.set(obj, toAs(v, tClazz)); - } - }*/ - } catch (IllegalAccessException | InvocationTargetException e) { - e.printStackTrace(); - } - } - - return (T) obj; - } - - public String toString() { - return JsonConvert.root().convertTo(this); - } - -} \ No newline at end of file diff --git a/src/com/haogames/core/util/QueueTask.java b/src/com/haogames/core/util/QueueTask.java deleted file mode 100644 index aa0f974..0000000 --- a/src/com/haogames/core/util/QueueTask.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.haogames.core.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * @param 泛型 - * @author zhangjx - */ -public class QueueTask { - - private static final AtomicInteger counter = new AtomicInteger(); - - protected final BlockingQueue queue; - - protected final int threads; - - protected Consumer consumer; - - protected Logger logger; - - public QueueTask(int threads) { - this.threads = threads; - this.queue = new LinkedBlockingQueue<>(); - } - - public QueueTask(int threads, int queueSize) { - this.threads = threads; - this.queue = new LinkedBlockingQueue<>(queueSize); - } - - public T poll() { - return this.queue.poll(); - } - - public T task() throws InterruptedException { - return this.queue.take(); - } - - public int size() { - return this.queue.size(); - } - - public boolean add(T data) { - return this.queue.add(data); - } - - public boolean remove(T data) { - return this.queue.remove(data); - } - - public void put(T data) throws InterruptedException { - this.queue.put(data); - } - - public void init(Logger logger, Consumer consumer) { - this.logger = logger; - this.consumer = consumer; - Runnable task = () -> { - T data; - try { - while ((data = queue.take()) != null) { - try { - consumer.accept(data); - } catch (Throwable e) { - if (logger != null) logger.log(Level.SEVERE, "QueueTask Data[" - + (data == null ? null : data.getClass().getSimpleName()) + "](" + data + ") consume error", e); - } - } - } catch (InterruptedException ex) { - } - }; - for (int i = 0; i < threads; i++) { - Thread thread = new Thread(task); - thread.setName("QueueTask-" + i + "-Thread"); - thread.setDaemon(true); - thread.start(); - } - counter.addAndGet(threads); - } - - public void destroy() { - int count = 0; - while (count < 50) { - if (queue.size() > 0) { - try { - Thread.sleep(200); - } catch (Exception e) { - break; - } - count++; - } else { - count = Integer.MAX_VALUE; - } - } - counter.addAndGet(-threads); - } - - public static int runningThreads() { - return counter.get(); - } -} diff --git a/src/com/haogames/core/util/QueueTasks.java b/src/com/haogames/core/util/QueueTasks.java deleted file mode 100644 index 8ec2552..0000000 --- a/src/com/haogames/core/util/QueueTasks.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.haogames.core.util; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -/** - * 公共异步队列 - * - * @author: liangxy. - */ -public class QueueTasks { - - private static final QueueTask queueTask = new QueueTask<>(1); - - static { - queueTask.init(Logger.getLogger(QueueTasks.class.getSimpleName()), Runnable::run); - } - - public static void add(Runnable runnable) { - queueTask.queue.add(runnable); - } - - // -------------------------- 支持返回结果的任务队列 ----------------------------- - private static ExecutorService executor = Executors.newFixedThreadPool(1); - - public static CompletableFuture submit(Runnable task) { - return CompletableFuture.runAsync(() -> { - try { - executor.submit(task).get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - }); - } -} diff --git a/src/com/haogames/core/util/Utils.java b/src/com/haogames/core/util/Utils.java deleted file mode 100644 index 9776698..0000000 --- a/src/com/haogames/core/util/Utils.java +++ /dev/null @@ -1,659 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.haogames.core.util; - -import org.redkale.boot.LogFileHandler; -import org.redkale.source.DataSource; -import org.redkale.source.FilterFunc; -import org.redkale.source.FilterNode; -import org.redkale.source.Flipper; -import org.redkale.util.Comment; -import org.redkale.util.Reproduce; -import org.redkale.util.SelectColumn; -import org.redkale.util.Utility; - -import javax.crypto.Cipher; -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.lang.reflect.Array; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.time.temporal.TemporalAccessor; -import java.util.*; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.logging.LogManager; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.util.Arrays.asList; - -/** - * @author zhangjx - */ -public abstract class Utils { - - public static final String HEADNAME_WS_SNCP_ADDRESS = "WS-SncpAddress"; - - private Utils() { - } - - public static void initLogConfig() { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - final PrintStream ps = new PrintStream(out); - ps.println("handlers = java.util.logging.ConsoleHandler"); - ps.println(".level = FINEST"); - ps.println("java.util.logging.ConsoleHandler.level = FINEST"); - ps.println("java.util.logging.ConsoleHandler.formatter = " + LogFileHandler.LoggingFormater.class.getName()); - LogManager.getLogManager().readConfiguration(new ByteArrayInputStream(out.toByteArray())); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * 获取当天yyyyMMddHHmmss格式的long值 - * - * @return yyyyMMddHHmmss格式的long值 - */ - public static long datetime14() { - LocalDateTime day = LocalDateTime.now(); - return day.getYear() * 10000_000000L + day.getMonthValue() * 100_000000 + day.getDayOfMonth() * 1000000 - + day.getHour() * 10000 + day.getMinute() * 100 + day.getSecond(); - } - - /** - * 获取当天yyMMddHHmmss格式的long值 - * - * @return yyMMddHHmmss格式的long值 - */ - public static long datetime12() { - LocalDateTime day = LocalDateTime.now(); - return day.getYear() % 100 * 10000_000000L + day.getMonthValue() * 100_000000 + day.getDayOfMonth() * 1000000 - + day.getHour() * 10000 + day.getMinute() * 100 + day.getSecond(); - } - - public static int[] calcIndexWeights(int[] weights) { - int size = 0; - for (int w : weights) { - size += w; - } - int[] newWeights = new int[size]; - int index = -1; - for (int i = 0; i < weights.length; i++) { - for (int j = 0; j < weights[i]; j++) { - newWeights[++index] = i; - } - } - return newWeights; - } - - /** - * 判断对象是否为空 - * - * @param obj 待判断的对象 - * @return - */ - public static boolean isEmpty(Object obj) { - if (obj == null) - return true; - if (obj instanceof String) - return ((String) obj).trim().isEmpty(); - if (obj instanceof Collection) - return ((Collection) obj).isEmpty(); - if (obj instanceof Map) - return ((Map) obj).isEmpty(); - - if (obj.getClass().isArray() && Array.getLength(obj) == 0) { - return true; - } - - /*if (obj instanceof Object[]) { - for (Object o : (Object[]) obj) { - if (o != null) return false; - } - return true; - }*/ - return false; - } - - public static byte[] encodeBySHA1(String key, String content) { - SecretKeySpec signKey = new SecretKeySpec(key.getBytes(), "HmacSHA1"); - try { - Mac mac = Mac.getInstance("HmacSHA1"); - mac.init(signKey); - return mac.doFinal(content.getBytes(StandardCharsets.UTF_8)); - } catch (Exception e) { - e.printStackTrace(); - } - - return null; - } - - public static String encodeByBase64WithUrlSafe(byte[] content) { - return Base64.getEncoder().encodeToString(content).replaceAll("\\+", "-").replace("/", "_"); - } - - - private static Map reproduceMap = new HashMap<>(); - - /** - * @param d 目标对象 - * @param s 源对象 - * @param 目标对象的数据类型 - * @param 源对象的数据类型 - * @return - */ - public static D copy(D d, S s) { - String reproductKey = d.getClass().getName() + "_" + s.getClass().getName(); - - Reproduce reproduce = reproduceMap.get(reproductKey); - if (reproduce == null) { - if (reproduce == null) { - reproduceMap.put(reproductKey, reproduce = (Reproduce) Reproduce.create(d.getClass(), s.getClass())); - } - } - - return reproduce.apply(d, s); - } - - /** - * 将字符串第一个字母转大写 - * - * @param str 待转换字符串 - * @return - */ - public static String toUpperCaseFirst(String str) { - Objects.requireNonNull(str); - return str.substring(0, 1).toUpperCase() + str.substring(1); - } - - /** - * 将字符串第一个字母转小写 - * - * @param str 待转换字符串 - * @return - */ - public static String toLowerCaseFirst(String str) { - Objects.requireNonNull(str); - return str.substring(0, 1).toLowerCase() + str.substring(1); - } - - /** - * 获取子集中最大序号 - * - * @param codes 参与比较的子集 - * @param parentCode 所属父节点 - * @return - */ - public static String buildMaxCode(List codes, String parentCode) { - String maxCode = ""; - //父级为几级 - int parentLevel = isEmpty(parentCode) ? 0 : parentCode.split("-").length; - //获取下一级编号最大code - for (int i = 0; i < codes.size(); i++) { - boolean flag = false; - if (i == 0) { - flag = true; - } else if (maxCode.split("-").length == parentLevel + 1) { - int endMaxVal = Integer.parseInt(maxCode.split("-")[parentLevel]); - int endThisVal = Integer.parseInt(codes.get(i).split("-")[parentLevel]); - flag = endThisVal > endMaxVal; - } - if (flag) { - maxCode = codes.get(i); - } - } - return maxCode; - } - - /** - * 获取下个序号[100-100 to 100-101] - * - * @param code - * @return - */ - public static String buildNextCode(String code) { - if (!Utility.contains(code, '-')) { - code = String.valueOf(Integer.parseInt(code) + 1); - } else { - String startCode = code.substring(0, code.lastIndexOf('-') + 1); - String endCode = String.valueOf(Integer.parseInt(code.substring(code.lastIndexOf('-') + 1)) + 1); - code = startCode + endCode; - } - return code; - } - - /** - * 判断字符串是否由数字组成 - * - * @param str - * @return - */ - public static boolean isNumeric(String str) { - if (isEmpty(str)) return false; - - for (int i = 0; i < str.length(); i++) { - if (!Character.isDigit(str.charAt(i))) { - return false; - } - } - return true; - } - - public static List strToArr(String str, Class clazz) { - if (isEmpty(str)) { - return new ArrayList<>(0); - } - List list = Arrays.stream(str.split(",")) - .filter(f -> !isEmpty(f)) - .map(x -> Kv.toAs(x.trim(), clazz)) - .collect(Collectors.toList()); - return list; - } - - public static String arrToStr(Object[] array) { - if (array == null) return ""; - return arrToStr(asList(array)); - } - - public static String arrToStr(Collection array) { - if (isEmpty(array)) { - return ""; - } - StringBuilder builder = new StringBuilder(); - array.stream().filter(f -> !isEmpty(f)).forEach(x -> builder.append(",").append(x instanceof String ? x : x.toString())); - return builder.append(",").toString(); - } - - public static List parseHtmlImage(String html) { - Pattern pattern = Pattern.compile("(?<=( ls = new ArrayList<>(); - while (match.find()) { - ls.add(match.group()); - } - return ls; - } - - /** - * 根据pattern格式化给定时间 - * - * @param accessor 指定时间 - * @param pattern 格式化pattern - * @return - */ - public static String formatByPattern(TemporalAccessor accessor, String pattern) { - if (isEmpty(pattern)) pattern = "yyyy-MM-dd"; - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); - return formatter.format(accessor); - } - - /** - * 获取给定时间距离1970-1-1 00:00:00的毫秒数 - * - * @param time 时间 - * @return - */ - public static long getEpochMilliByTime(LocalDateTime time) { - return time.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - } - - /** - * @param rs - * @param type - * @param - * @return - */ - public static List queryList(ResultSet rs, Class type) { - try { - List list = new ArrayList(); - ResultSetMetaData metaData = rs.getMetaData(); - int count = metaData.getColumnCount(); - while (rs.next()) { - Map row = new HashMap(); - for (int i = 1; i <= count; i++) { - String columnTypeName = metaData.getColumnTypeName(i); - //String columnName = metaData.getColumnName(i); - String columnLabel = metaData.getColumnLabel(i); - row.put(columnLabel, null); - - if (rs.getObject(i) != null) { - switch (columnTypeName) { - case "DATETIME", "TIMESTAMP", "DATE" -> row.put(columnLabel, rs.getTimestamp(i).getTime()); - default -> row.put(columnLabel, rs.getObject(i)); - } - } - } - list.add(Map.class == type ? row : Kv.toBean(row, type)); - } - - return list; - } catch (SQLException e) { - e.printStackTrace(); - return null; - } - } - - /** - * 查询 第一条的 第一列数据 值 - * - * @param rs - * @param type - * @param - * @return - */ - public static T findColumn(ResultSet rs, Class type) { - try { - Object v = null; - while (rs.next()) { - ResultSetMetaData metaData = rs.getMetaData(); - int count = metaData.getColumnCount(); - - for (int i = 1; i <= count; i++) { - String columnTypeName = metaData.getColumnTypeName(i); - if (rs.getObject(i) != null) { - switch (columnTypeName) { - case "DATETIME", "TIMESTAMP", "DATE" -> v = rs.getTimestamp(i).getTime(); - default -> v = rs.getObject(i); - } - } - break; - } - } - - return Kv.toAs(v, type); - } catch (SQLException e) { - e.printStackTrace(); - return null; - } - } - - public static void batchQueryExecute(DataSource dataSource, Class clz, SelectColumn column, FilterNode node, Flipper flipper, Consumer> consumer) { - Number count = dataSource.getNumberResult(clz, FilterFunc.COUNT, null, node); - if (count == null) - return; - for (int offset = flipper.getOffset(); offset < count.intValue(); offset = offset + flipper.getLimit()) { - flipper.setOffset(offset); - consumer.accept(dataSource.queryList(clz, column, flipper, node)); - } - } - - public static void batchQueryExecute(DataSource dataSource, int limit, Class clz, SelectColumn column, FilterNode node, Consumer> consumer) { - batchQueryExecute(dataSource, clz, column, node, new Flipper(limit), consumer); - } - - public static void batchExecute(Collection data, int limit, Consumer> consumer) { - for (int offset = 0; offset < data.size(); offset = offset + limit) { - consumer.accept(data.stream().skip(offset).limit(limit).collect(Collectors.toCollection(HashSet::new))); - } - } - - /** - * 批量处理数据并返回数据流 - * - * @param data 总数据 - * @param limit 每批处理数据量 - * @param parallel 是否使用异步处理。大批量使用 - * @param executor 执行器 - * @param 传入对象类型 - * @param 传出对象类型 - * @return 对象流 - */ - public static Stream batchStream(Collection data, int limit, boolean parallel, Function, Stream> executor) { - Stream.Builder builder = Stream.builder(); - for (int offset = 0; offset < data.size(); offset = offset + limit) { - builder.accept(offset); - } - Stream offsets = builder.build(); - if (parallel) { - offsets = offsets.parallel(); - } - return offsets.flatMap(offset -> executor.apply(data.stream().skip(offset).limit(limit).collect(Collectors.toCollection(ArrayList::new)))); - } - - /** - * List 混排 - * - * @param list - * @return - */ - public static List mix(List list) { - int len = list.size(); - Random random = new Random(); - for (int i = 0; i < len; i++) { - int r = random.nextInt(len); - if (i == r) continue; - - T x = list.get(i); - list.set(i, list.get(r)); - list.set(r, x); - } - return list; - } - - @Comment("获取集合随机元素") - public static List randomItems(List list, int len) { - List randoms = getRandoms(list.size(), len); - List items = new ArrayList<>(randoms.size()); - - randoms.forEach(x -> items.add(list.get(x))); - return items; - } - - @Comment("获取随机数集合") - private static List getRandoms(int max, int len) { - Set randoms = new HashSet<>(); - Random random = new Random(); - while (randoms.size() < len && randoms.size() < max) { - randoms.add(random.nextInt(max)); - } - List list = randoms.stream().collect(Collectors.toList()); - return mix(list); - } - - /** - * unicode转中文 - * - * @param str - * @return - */ - public static String unicodeToCn(String str) { - Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); - Matcher matcher = pattern.matcher(str); - char ch; - while (matcher.find()) { - ch = (char) Integer.parseInt(matcher.group(2), 16); - str = str.replace(matcher.group(1), ch + ""); - } - return str; - } - - /** - * 计算字符串的字符长度 - * - * @param value - * @return - */ - public static int strLength(String value) { - int valueLength = 0; - String chinese = "[\u4e00-\u9fa5]"; - for (int i = 0; i < value.length(); i++) { - String temp = value.substring(i, i + 1); - if (temp.matches(chinese)) { - valueLength += 2; - } else { - valueLength += 1; - } - } - return valueLength; - } - - @Comment("解析文本得到 @[用户ID:用户名称] 的用户内容") - public static List parseNoticeUser(String content) { - if (isEmpty(content)) { - return new ArrayList<>(0); - } - - List ls = new ArrayList<>(); - Pattern compile = Pattern.compile("(?<=@\\[)\\d+:[A-Za-z0-9_\\u2E80-\\u9FFF]+(?=])"); - Matcher matcher = compile.matcher(content); - while (matcher.find()) { - ls.add(matcher.group()); - } - return ls; - } - - public static void main(String[] args) { - System.out.println(randomIP()); - } - - public static String randomIP() { - // aaa.aaa.aaa.aaa - StringBuilder buf = new StringBuilder(); - - Random r = new Random(); - buf.append("x").append("."); - buf.append(r.nextInt(255)).append("."); - buf.append(r.nextInt(255)).append("."); - buf.append(r.nextInt(255)); - - int n = r.nextInt(50);// - System.out.println(n / 10f); - - return buf.toString(); - } - - public static String fmt36(int n) { - return Integer.toString(n, 36); - } - - public static String fmt36(long n) { - return Long.toString(n, 36); - } - - public static Map toMap(Collection list, Function fun) { - Map map = new HashMap<>(list.size()); - for (V v : list) { - if (v == null) { - continue; - } - map.put(fun.apply(v), v); - } - return map; - } - - public static Map toMap(Collection list, Function fun, Function fun2) { - Map map = new HashMap<>(list.size()); - for (V v : list) { - if (v == null) { - continue; - } - map.put(fun.apply(v), fun2.apply(v)); - } - return map; - } - - public static List toList(Collection list, Function fun) { - List list1 = new ArrayList<>(list.size()); - list.forEach(x -> list1.add(fun.apply(x))); - return list1; - } - - public static String getHtmlBody(String html) { - String s = html.replaceAll("\n", ""); - int bodyIndex = s.indexOf(""); - if (bodyIndex > -1) { - bodyIndex = bodyIndex + 6; - int lastIndexOf = s.lastIndexOf(""); - if (lastIndexOf < bodyIndex) lastIndexOf = s.length(); - s = s.substring(bodyIndex, lastIndexOf); - } - return s; - } - - public static String getHtmlText(String html) { - return html.replaceAll("<([^ \\f\\n\\r\\t\\v<]| )+>", ""); - } - - // ----------------- - private static final MessageDigest sha1; - private static final MessageDigest md5; - private static final String AES_KEY = "HAOGAME_20200721"; - private static final Cipher aesEncrypter; //加密 - private static final Cipher aesDecrypter; //解密 - - static { - MessageDigest d = null; - try { - d = MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException ex) { - throw new Error(ex); - } - sha1 = d; - try { - d = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException ex) { - throw new Error(ex); - } - md5 = d; - - Cipher cipher = null; - final SecretKeySpec aesKey = new SecretKeySpec(AES_KEY.getBytes(), "AES"); - try { - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.ENCRYPT_MODE, aesKey); - } catch (Exception e) { - throw new Error(e); - } - aesEncrypter = cipher; //加密 - try { - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, aesKey); - } catch (Exception e) { - throw new Error(e); - } - aesDecrypter = cipher; //解密 - } - - //AES加密 - public static String encryptAES(String value) { - if (value == null || value.isEmpty()) return value; - try { - synchronized (aesEncrypter) { - return Utility.binToHexString(aesEncrypter.doFinal(value.getBytes())); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - //AES解密 - public static String decryptAES(String value) { - if (value == null || value.isEmpty()) return value; - byte[] hex = Utility.hexToBin(value); - try { - synchronized (aesEncrypter) { - return new String(aesDecrypter.doFinal(hex)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/com/zdemo/IProducer.java b/src/com/zdemo/IProducer.java index a7a6cdc..017b70b 100644 --- a/src/com/zdemo/IProducer.java +++ b/src/com/zdemo/IProducer.java @@ -1,15 +1,10 @@ package com.zdemo; -import java.util.concurrent.CompletableFuture; import java.util.logging.Logger; public interface IProducer { Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); - default CompletableFuture sendAsync(T... t) { - return CompletableFuture.runAsync(() -> send(t)); - } - - void send(T... t); + void send(T t); } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index ff04553..a8f6fc0 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -41,14 +41,12 @@ public class KafakProducer implements IProducer, Service { } @Override - public void send(T... t) { - for (T x : t) { - String v = JsonConvert.root().convertTo(x.value); - if (v.startsWith("\"") && v.endsWith("\"")) { - v = v.substring(1, v.length() - 1); - } - producer.send(new ProducerRecord(x.topic, v)); + public void send(T t) { + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); } + producer.send(new ProducerRecord(t.topic, v)); } @Override diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 35f5679..0b25e8f 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -1,12 +1,14 @@ package com.zdemo.redis; import com.zdemo.AbstractConsumer; +import com.zdemo.EventType; import com.zdemo.IConsumer; import org.redkale.service.Service; import org.redkale.util.AnyValue; import javax.annotation.Resource; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; @@ -22,41 +24,50 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume @Resource(name = "property.redis.port") private int port = 6379; + private Socket client; + private OutputStreamWriter writer; + private BufferedReader reader; + @Override public void init(AnyValue config) { + try { + client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); + + writer = new OutputStreamWriter(client.getOutputStream()); + writer.write("AUTH " + password + "\r\n"); + writer.flush(); + + StringBuffer buf = new StringBuffer("SUBSCRIBE"); + for (String topic : getSubscribes()) { + buf.append(" ").append(topic); + } + buf.append(" _\r\n"); + writer.write(buf.toString()); + writer.flush(); + + reader = new BufferedReader(new InputStreamReader(client.getInputStream())); + } catch (IOException e) { + logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e); + } + new Thread(() -> { try { - Socket client = new Socket(); - client.connect(new InetSocketAddress(host, port)); - client.setKeepAlive(true); - - OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream()); - oswSub.write("AUTH " + password + "\r\n"); - oswSub.flush(); - - StringBuffer buf = new StringBuffer("SUBSCRIBE"); - for (String topic : getSubscribes()) { - buf.append(" ").append(topic); - } - buf.append(" _ping\r\n"); - oswSub.write(buf.toString()); - oswSub.flush(); - - BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream())); - String type = ""; - String readLine; - while ((readLine = br.readLine()) != null) { + while (true) { + String readLine = reader.readLine(); + String type = ""; if ("*3".equals(readLine)) { - br.readLine(); // $7 len() - type = br.readLine(); // message + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message if (!"message".equals(type)) { continue; } - br.readLine(); //$n len(key) - String topic = br.readLine(); // topic + reader.readLine(); //$n len(key) + String topic = reader.readLine(); // topic - br.readLine(); //$n len(value) - String value = br.readLine(); // value + reader.readLine(); //$n len(value) + String value = reader.readLine(); // value try { accept(topic, value); } catch (Exception e) { @@ -64,9 +75,30 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume } } } - } catch (Exception e) { - logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); } }).start(); } + + @Override + public void addEventType(EventType... eventType) { + for (EventType type : eventType) { + String[] topics = type.topic.split(","); + for (String topic : topics) { + if (topic.isEmpty()) { + continue; + } + eventMap.put(topic, type); + + //新增订阅 + try { + writer.write("SUBSCRIBE " + topic + "\r\n"); + writer.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } + } + } + } } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index f8db3ec..a8e45bb 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -22,7 +22,7 @@ public class RedisProducer implements IProducer, Service { @Resource(name = "property.redis.port") private int port = 6379; - private OutputStreamWriter oswPub; + private OutputStreamWriter osw; @Override public void init(AnyValue config) { @@ -31,23 +31,22 @@ public class RedisProducer implements IProducer, Service { client.connect(new InetSocketAddress(host, port)); client.setKeepAlive(true); - oswPub = new OutputStreamWriter(client.getOutputStream()); - oswPub.write("AUTH " + password + "\r\n"); - oswPub.flush(); + osw = new OutputStreamWriter(client.getOutputStream()); + osw.write("AUTH " + password + "\r\n"); + osw.flush(); } catch (IOException e) { logger.log(Level.WARNING, "", e); } } @Override - public void send(T... t) { - for (T x : t) { - try { - oswPub.write("PUBLISH " + x.topic + " '" + JsonConvert.root().convertTo(x.value) + "' \r\n"); - oswPub.flush(); - } catch (IOException e) { - logger.log(Level.WARNING, "", e); - } + public void send(T t) { + try { + osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n"); + osw.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 324fdd1..a2c52fd 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -22,7 +22,23 @@ public class AppTest { public void runConsumer() { try { //启动并开启消费监听 - Application.singleton(MyConsumer.class); + MyConsumer consumer = Application.singleton(MyConsumer.class); + + consumer.addEventType( + EventType.of("a1", new TypeToken() { + }, r -> { + System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); + }), + + EventType.of("bx", str -> { + System.out.println("我收到了消息 主题bx 事件:" + str); + }) + + , EventType.of("game-update", str -> { + System.out.println("我收到了消息 主题game-update 事件:" + str); + }) + ); + Thread.sleep(60_000 * 60); } catch (Exception e) { e.printStackTrace(); @@ -39,13 +55,14 @@ public class AppTest { Map v1 = Map.of("k", "v"); List v2 = asList(1, 2, 3); - producer.send(Event.of("a1", v0)); - producer.send(Event.of("b1", v1)); - producer.send(Event.of("c1", v2)); + //producer.send(Event.of("a1", v0)); + /*producer.send(Event.of("b1", v1)); + producer.send(Event.of("c1", v2));*/ + producer.send(Event.of("game-update", 23256)); try { - Thread.sleep(1_000); + Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 2daf5b1..8a308e8 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,9 +1,6 @@ package com.zdemo.test; -import com.zdemo.EventType; import com.zdemo.kafak.KafakConsumer; -import org.redkale.convert.json.JsonConvert; -import org.redkale.util.TypeToken; public class MyConsumer extends KafakConsumer { @@ -13,17 +10,6 @@ public class MyConsumer extends KafakConsumer { @Override public boolean preInit() { - addEventType( - EventType.of("a1", new TypeToken() { - }, r -> { - System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); - }), - - EventType.of("bx", str -> { - System.out.println("我收到了消息 主题bx 事件:" + str); - }) - ); - return true; } }