1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| private Broadcaster(final KylinConfig config) { this.config = config; final int retryLimitTimes = config.getCacheSyncRetrys();
final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config"); } logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { final Map<String, RestClient> restClientMap = Maps.newHashMap(); final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());
while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1); if (broadcastEvent.getRetryTime() > retryLimitTimes) { logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent); continue; }
String[] restServers = config.getRestServers(); logger.debug("Servers in the cluster: " + Arrays.toString(restServers)); for (final String node : restServers) { if (restClientMap.containsKey(node) == false) { restClientMap.put(node, new RestClient(node)); } }
logger.debug("Announcing new broadcast event: " + broadcastEvent); for (final String node : restServers) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at {}, error msg: {}", broadcastEvent, e); try { broadcastEvents.putLast(broadcastEvent); } catch (InterruptedException ex) { logger.warn( "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ", broadcastEvent, ex); } } } }); } } catch (Exception e) { logger.error("error running wiping", e); } } } }); }
|