001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.management.mbean; 018 019import java.io.InputStream; 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.RejectedExecutionException; 031import java.util.concurrent.TimeUnit; 032 033import javax.management.AttributeValueExp; 034import javax.management.MBeanServer; 035import javax.management.ObjectName; 036import javax.management.Query; 037import javax.management.QueryExp; 038import javax.management.StringValueExp; 039import javax.management.openmbean.CompositeData; 040import javax.management.openmbean.CompositeDataSupport; 041import javax.management.openmbean.CompositeType; 042import javax.management.openmbean.TabularData; 043import javax.management.openmbean.TabularDataSupport; 044 045import org.apache.camel.CamelContext; 046import org.apache.camel.ExtendedCamelContext; 047import org.apache.camel.ManagementStatisticsLevel; 048import org.apache.camel.Route; 049import org.apache.camel.RuntimeCamelException; 050import org.apache.camel.ServiceStatus; 051import org.apache.camel.TimerListener; 052import org.apache.camel.api.management.ManagedResource; 053import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; 054import org.apache.camel.api.management.mbean.ManagedProcessorMBean; 055import org.apache.camel.api.management.mbean.ManagedRouteMBean; 056import org.apache.camel.api.management.mbean.ManagedStepMBean; 057import org.apache.camel.api.management.mbean.RouteError; 058import org.apache.camel.model.Model; 059import org.apache.camel.model.ModelCamelContext; 060import org.apache.camel.model.RouteDefinition; 061import org.apache.camel.model.RoutesDefinition; 062import org.apache.camel.spi.InflightRepository; 063import org.apache.camel.spi.ManagementStrategy; 064import org.apache.camel.spi.RoutePolicy; 065import org.apache.camel.support.PluginHelper; 066import org.apache.camel.util.ObjectHelper; 067import org.apache.camel.xml.LwModelHelper; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@ManagedResource(description = "Managed Route") 072public class ManagedRoute extends ManagedPerformanceCounter implements TimerListener, ManagedRouteMBean { 073 074 public static final String VALUE_UNKNOWN = "Unknown"; 075 076 private static final Logger LOG = LoggerFactory.getLogger(ManagedRoute.class); 077 078 protected final Route route; 079 protected final String description; 080 protected final String configurationId; 081 protected final String sourceLocation; 082 protected final String sourceLocationShort; 083 protected final CamelContext context; 084 private final LoadTriplet load = new LoadTriplet(); 085 private final LoadThroughput thp = new LoadThroughput(); 086 private final String jmxDomain; 087 088 public ManagedRoute(CamelContext context, Route route) { 089 this.route = route; 090 this.context = context; 091 this.description = route.getDescription(); 092 this.configurationId = route.getConfigurationId(); 093 this.sourceLocation = route.getSourceLocation(); 094 this.sourceLocationShort = route.getSourceLocationShort(); 095 this.jmxDomain = context.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName(); 096 } 097 098 @Override 099 public void init(ManagementStrategy strategy) { 100 super.init(strategy); 101 boolean enabled 102 = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; 103 setStatisticsEnabled(enabled); 104 } 105 106 public Route getRoute() { 107 return route; 108 } 109 110 public CamelContext getContext() { 111 return context; 112 } 113 114 @Override 115 public String getRouteId() { 116 String id = route.getId(); 117 if (id == null) { 118 id = VALUE_UNKNOWN; 119 } 120 return id; 121 } 122 123 @Override 124 public String getNodePrefixId() { 125 return route.getNodePrefixId(); 126 } 127 128 @Override 129 public String getRouteGroup() { 130 return route.getGroup(); 131 } 132 133 @Override 134 public boolean isCreatedByRouteTemplate() { 135 return "true".equals(route.getProperties().getOrDefault(Route.TEMPLATE_PROPERTY, "false")); 136 } 137 138 @Override 139 public boolean isCreatedByKamelet() { 140 return "true".equals(route.getProperties().getOrDefault(Route.KAMELET_PROPERTY, "false")); 141 } 142 143 @Override 144 public TabularData getRouteProperties() { 145 try { 146 final Map<String, Object> properties = route.getProperties(); 147 final TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.camelRoutePropertiesTabularType()); 148 final CompositeType ct = CamelOpenMBeanTypes.camelRoutePropertiesCompositeType(); 149 150 // gather route properties 151 for (Map.Entry<String, Object> entry : properties.entrySet()) { 152 final String key = entry.getKey(); 153 final String val = context.getTypeConverter().convertTo(String.class, entry.getValue()); 154 155 CompositeData data = new CompositeDataSupport( 156 ct, 157 new String[] { "key", "value" }, 158 new Object[] { key, val }); 159 160 answer.put(data); 161 } 162 return answer; 163 } catch (Exception e) { 164 throw RuntimeCamelException.wrapRuntimeCamelException(e); 165 } 166 } 167 168 @Override 169 public String getDescription() { 170 return description; 171 } 172 173 @Override 174 public Boolean getAutoStartup() { 175 return route.isAutoStartup(); 176 } 177 178 @Override 179 public String getSourceLocation() { 180 return sourceLocation; 181 } 182 183 @Override 184 public String getSourceLocationShort() { 185 return sourceLocationShort; 186 } 187 188 @Override 189 public String getRouteConfigurationId() { 190 return configurationId; 191 } 192 193 @Override 194 public String getEndpointUri() { 195 if (route.getEndpoint() != null) { 196 return route.getEndpoint().getEndpointUri(); 197 } 198 return VALUE_UNKNOWN; 199 } 200 201 @Override 202 public String getState() { 203 // must use String type to be sure remote JMX can read the attribute without requiring Camel classes. 204 ServiceStatus status = context.getRouteController().getRouteStatus(route.getId()); 205 // if no status exists then its stopped 206 if (status == null) { 207 status = ServiceStatus.Stopped; 208 } 209 return status.name(); 210 } 211 212 @Override 213 public String getUptime() { 214 return route.getUptime(); 215 } 216 217 @Override 218 public long getUptimeMillis() { 219 return route.getUptimeMillis(); 220 } 221 222 @Override 223 public String getCamelId() { 224 return context.getName(); 225 } 226 227 @Override 228 public String getCamelManagementName() { 229 return context.getManagementName(); 230 } 231 232 @Override 233 public Boolean getTracing() { 234 return route.isTracing(); 235 } 236 237 @Override 238 public void setTracing(Boolean tracing) { 239 route.setTracing(tracing); 240 } 241 242 @Override 243 public Boolean getMessageHistory() { 244 return route.isMessageHistory(); 245 } 246 247 @Override 248 public Boolean getLogMask() { 249 return route.isLogMask(); 250 } 251 252 @Override 253 public String getRoutePolicyList() { 254 List<RoutePolicy> policyList = route.getRoutePolicyList(); 255 256 if (policyList == null || policyList.isEmpty()) { 257 // return an empty string to have it displayed nicely in JMX consoles 258 return ""; 259 } 260 261 StringBuilder sb = new StringBuilder(); 262 for (int i = 0; i < policyList.size(); i++) { 263 RoutePolicy policy = policyList.get(i); 264 sb.append(policy.getClass().getSimpleName()); 265 sb.append("(").append(ObjectHelper.getIdentityHashCode(policy)).append(")"); 266 if (i < policyList.size() - 1) { 267 sb.append(", "); 268 } 269 } 270 return sb.toString(); 271 } 272 273 @Override 274 public String getLoad01() { 275 double load1 = load.getLoad1(); 276 if (Double.isNaN(load1)) { 277 // empty string if load statistics is disabled 278 return ""; 279 } else { 280 return String.format("%.2f", load1); 281 } 282 } 283 284 @Override 285 public String getLoad05() { 286 double load5 = load.getLoad5(); 287 if (Double.isNaN(load5)) { 288 // empty string if load statistics is disabled 289 return ""; 290 } else { 291 return String.format("%.2f", load5); 292 } 293 } 294 295 @Override 296 public String getLoad15() { 297 double load15 = load.getLoad15(); 298 if (Double.isNaN(load15)) { 299 // empty string if load statistics is disabled 300 return ""; 301 } else { 302 return String.format("%.2f", load15); 303 } 304 } 305 306 @Override 307 public String getThroughput() { 308 double d = thp.getThroughput(); 309 if (Double.isNaN(d)) { 310 // empty string if load statistics is disabled 311 return ""; 312 } else { 313 return String.format("%.2f", d); 314 } 315 } 316 317 @Override 318 public void onTimer() { 319 load.update(getInflightExchanges()); 320 thp.update(getExchangesTotal()); 321 } 322 323 @Override 324 public void start() throws Exception { 325 if (!context.getStatus().isStarted()) { 326 throw new IllegalArgumentException("CamelContext is not started"); 327 } 328 try { 329 context.getRouteController().startRoute(getRouteId()); 330 } catch (Exception e) { 331 LOG.warn("Error starting route: {} due to: {}. This exception is ignored.", getRouteId(), e.getMessage(), e); 332 throw e; 333 } 334 } 335 336 @Override 337 public void stop() throws Exception { 338 if (!context.getStatus().isStarted()) { 339 throw new IllegalArgumentException("CamelContext is not started"); 340 } 341 try { 342 context.getRouteController().stopRoute(getRouteId()); 343 } catch (Exception e) { 344 LOG.warn("Error stopping route: {} due to: {}. This exception is ignored.", getRouteId(), e.getMessage(), e); 345 throw e; 346 } 347 } 348 349 @Override 350 public void stopAndFail() throws Exception { 351 if (!context.getStatus().isStarted()) { 352 throw new IllegalArgumentException("CamelContext is not started"); 353 } 354 Throwable cause = new RejectedExecutionException("Route " + getRouteId() + " is forced stopped and marked as failed"); 355 context.getRouteController().stopRoute(getRouteId(), cause); 356 } 357 358 @Override 359 public void stop(long timeout) throws Exception { 360 if (!context.getStatus().isStarted()) { 361 throw new IllegalArgumentException("CamelContext is not started"); 362 } 363 context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS); 364 } 365 366 @Override 367 public boolean stop(Long timeout, Boolean abortAfterTimeout) throws Exception { 368 if (!context.getStatus().isStarted()) { 369 throw new IllegalArgumentException("CamelContext is not started"); 370 } 371 return context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout); 372 } 373 374 /** 375 * @deprecated not in use 376 */ 377 @Deprecated(since = "4.8.0") 378 public void shutdown() throws Exception { 379 if (!context.getStatus().isStarted()) { 380 throw new IllegalArgumentException("CamelContext is not started"); 381 } 382 String routeId = getRouteId(); 383 context.getRouteController().stopRoute(routeId); 384 context.removeRoute(routeId); 385 } 386 387 /** 388 * @deprecated not in use 389 */ 390 @Deprecated(since = "4.8.0") 391 public void shutdown(long timeout) throws Exception { 392 if (!context.getStatus().isStarted()) { 393 throw new IllegalArgumentException("CamelContext is not started"); 394 } 395 String routeId = getRouteId(); 396 context.getRouteController().stopRoute(routeId, timeout, TimeUnit.SECONDS); 397 context.removeRoute(routeId); 398 } 399 400 @Override 401 public boolean remove() throws Exception { 402 if (!context.getStatus().isStarted()) { 403 throw new IllegalArgumentException("CamelContext is not started"); 404 } 405 return context.removeRoute(getRouteId()); 406 } 407 408 @Override 409 public void restart() throws Exception { 410 restart(1); 411 } 412 413 @Override 414 public void restart(long delay) throws Exception { 415 stop(); 416 if (delay > 0) { 417 try { 418 LOG.debug("Sleeping {} seconds before starting route: {}", delay, getRouteId()); 419 Thread.sleep(delay * 1000); 420 } catch (InterruptedException e) { 421 LOG.info("Interrupted while waiting before starting the route"); 422 Thread.currentThread().interrupt(); 423 } 424 } 425 start(); 426 } 427 428 @Override 429 public String dumpRouteAsXml() throws Exception { 430 return dumpRouteAsXml(false); 431 } 432 433 @Override 434 public String dumpRouteAsXml(boolean resolvePlaceholders) throws Exception { 435 return dumpRouteAsXml(resolvePlaceholders, true); 436 } 437 438 @Override 439 public String dumpRouteAsXml(boolean resolvePlaceholders, boolean generatedIds) throws Exception { 440 String id = route.getId(); 441 RouteDefinition def = context.getCamelContextExtension().getContextPlugin(Model.class).getRouteDefinition(id); 442 if (def != null) { 443 // if we are debugging then ids is needed for the debugger 444 if (context.isDebugging()) { 445 generatedIds = true; 446 } 447 return PluginHelper.getModelToXMLDumper(context).dumpModelAsXml(context, def, resolvePlaceholders, generatedIds); 448 } 449 450 return null; 451 } 452 453 @Override 454 public String dumpRouteAsYaml() throws Exception { 455 return dumpRouteAsYaml(false, false); 456 } 457 458 @Override 459 public String dumpRouteAsYaml(boolean resolvePlaceholders) throws Exception { 460 return dumpRouteAsYaml(resolvePlaceholders, false, true); 461 } 462 463 @Override 464 public String dumpRouteAsYaml(boolean resolvePlaceholders, boolean uriAsParameters) throws Exception { 465 return dumpRouteAsYaml(resolvePlaceholders, uriAsParameters, true); 466 } 467 468 @Override 469 public String dumpRouteAsYaml(boolean resolvePlaceholders, boolean uriAsParameters, boolean generatedIds) throws Exception { 470 String id = route.getId(); 471 RouteDefinition def = context.getCamelContextExtension().getContextPlugin(Model.class).getRouteDefinition(id); 472 if (def != null) { 473 return PluginHelper.getModelToYAMLDumper(context).dumpModelAsYaml(context, def, resolvePlaceholders, 474 uriAsParameters, generatedIds); 475 } 476 477 return null; 478 } 479 480 @Override 481 public String dumpRouteStatsAsXml(boolean fullStats, boolean includeProcessors) throws Exception { 482 // in this logic we need to calculate the accumulated processing time for the processor in the route 483 // and hence why the logic is a bit more complicated to do this, as we need to calculate that from 484 // the bottom -> top of the route but this information is valuable for profiling routes 485 StringBuilder sb = new StringBuilder(); 486 487 // need to calculate this value first, as we need that value for the route stat 488 long processorAccumulatedTime = 0L; 489 490 // gather all the processors for this route, which requires JMX 491 if (includeProcessors) { 492 sb.append(" <processorStats>\n"); 493 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 494 if (server != null) { 495 // get all the processor mbeans and sort them accordingly to their index 496 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 497 ObjectName query = ObjectName.getInstance( 498 jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 499 Set<ObjectName> names = server.queryNames(query, null); 500 List<ManagedProcessorMBean> mps = new ArrayList<>(); 501 for (ObjectName on : names) { 502 ManagedProcessorMBean processor = context.getManagementStrategy().getManagementAgent().newProxyClient(on, 503 ManagedProcessorMBean.class); 504 505 // the processor must belong to this route 506 if (getRouteId().equals(processor.getRouteId())) { 507 mps.add(processor); 508 } 509 } 510 mps.sort(new OrderProcessorMBeans()); 511 512 // walk the processors in reverse order, and calculate the accumulated total time 513 Map<String, Long> accumulatedTimes = new HashMap<>(); 514 Collections.reverse(mps); 515 for (ManagedProcessorMBean processor : mps) { 516 processorAccumulatedTime += processor.getTotalProcessingTime(); 517 accumulatedTimes.put(processor.getProcessorId(), processorAccumulatedTime); 518 } 519 // and reverse back again 520 Collections.reverse(mps); 521 522 // and now add the sorted list of processors to the xml output 523 for (ManagedProcessorMBean processor : mps) { 524 int line = processor.getSourceLineNumber() != null ? processor.getSourceLineNumber() : -1; 525 sb.append(" <processorStat") 526 .append(String.format(" id=\"%s\" index=\"%s\" state=\"%s\" sourceLineNumber=\"%s\"", 527 processor.getProcessorId(), processor.getIndex(), processor.getState(), line)); 528 // do we have an accumulated time then append that 529 Long accTime = accumulatedTimes.get(processor.getProcessorId()); 530 if (accTime != null) { 531 sb.append(" accumulatedProcessingTime=\"").append(accTime).append("\""); 532 } 533 // use substring as we only want the attributes 534 sb.append(" ").append(processor.dumpStatsAsXml(fullStats).substring(7)).append("\n"); 535 } 536 } 537 sb.append(" </processorStats>\n"); 538 } 539 540 // route self time is route total - processor accumulated total) 541 long routeSelfTime = getTotalProcessingTime() - processorAccumulatedTime; 542 if (routeSelfTime < 0) { 543 // ensure we don't calculate that as negative 544 routeSelfTime = 0; 545 } 546 547 StringBuilder answer = new StringBuilder(); 548 answer.append("<routeStat").append(String.format(" id=\"%s\"", route.getId())) 549 .append(String.format(" state=\"%s\"", getState())); 550 if (sourceLocation != null) { 551 answer.append(String.format(" sourceLocation=\"%s\"", getSourceLocation())); 552 } 553 // use substring as we only want the attributes 554 String stat = dumpStatsAsXml(fullStats); 555 answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); 556 answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); 557 InflightRepository.InflightExchange oldest = getOldestInflightEntry(); 558 if (oldest == null) { 559 answer.append(" oldestInflightExchangeId=\"\""); 560 answer.append(" oldestInflightDuration=\"\""); 561 } else { 562 answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchange().getExchangeId()).append("\""); 563 answer.append(" oldestInflightDuration=\"").append(oldest.getDuration()).append("\""); 564 } 565 answer.append(" ").append(stat, 7, stat.length() - 2).append(">\n"); 566 567 if (includeProcessors) { 568 answer.append(sb); 569 } 570 571 answer.append("</routeStat>"); 572 return answer.toString(); 573 } 574 575 @Override 576 public String dumpStepStatsAsXml(boolean fullStats) throws Exception { 577 // in this logic we need to calculate the accumulated processing time for the processor in the route 578 // and hence why the logic is a bit more complicated to do this, as we need to calculate that from 579 // the bottom -> top of the route but this information is valuable for profiling routes 580 StringBuilder sb = new StringBuilder(); 581 582 // gather all the steps for this route, which requires JMX 583 sb.append(" <stepStats>\n"); 584 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 585 if (server != null) { 586 // get all the processor mbeans and sort them accordingly to their index 587 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 588 ObjectName query = ObjectName 589 .getInstance(jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=steps,*"); 590 Set<ObjectName> names = server.queryNames(query, null); 591 List<ManagedStepMBean> mps = new ArrayList<>(); 592 for (ObjectName on : names) { 593 ManagedStepMBean step 594 = context.getManagementStrategy().getManagementAgent().newProxyClient(on, ManagedStepMBean.class); 595 596 // the step must belong to this route 597 if (getRouteId().equals(step.getRouteId())) { 598 mps.add(step); 599 } 600 } 601 mps.sort(new OrderProcessorMBeans()); 602 603 // and now add the sorted list of steps to the xml output 604 for (ManagedStepMBean step : mps) { 605 int line = step.getSourceLineNumber() != null ? step.getSourceLineNumber() : -1; 606 sb.append(" <stepStat") 607 .append(String.format(" id=\"%s\" index=\"%s\" state=\"%s\" sourceLineNumber=\"%s\"", 608 step.getProcessorId(), 609 step.getIndex(), step.getState(), line)); 610 // use substring as we only want the attributes 611 sb.append(" ").append(step.dumpStatsAsXml(fullStats).substring(7)).append("\n"); 612 } 613 } 614 sb.append(" </stepStats>\n"); 615 616 StringBuilder answer = new StringBuilder(); 617 answer.append("<routeStat").append(String.format(" id=\"%s\"", route.getId())) 618 .append(String.format(" state=\"%s\"", getState())); 619 if (sourceLocation != null) { 620 answer.append(String.format(" sourceLocation=\"%s\"", getSourceLocation())); 621 } 622 // use substring as we only want the attributes 623 String stat = dumpStatsAsXml(fullStats); 624 answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); 625 InflightRepository.InflightExchange oldest = getOldestInflightEntry(); 626 if (oldest == null) { 627 answer.append(" oldestInflightExchangeId=\"\""); 628 answer.append(" oldestInflightDuration=\"\""); 629 } else { 630 answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchange().getExchangeId()).append("\""); 631 answer.append(" oldestInflightDuration=\"").append(oldest.getDuration()).append("\""); 632 } 633 answer.append(" ").append(stat, 7, stat.length() - 2).append(">\n"); 634 635 answer.append(sb); 636 637 answer.append("</routeStat>"); 638 return answer.toString(); 639 } 640 641 @Override 642 public String dumpRouteSourceLocationsAsXml() throws Exception { 643 StringBuilder sb = new StringBuilder(); 644 sb.append("<routeLocations>"); 645 646 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 647 if (server != null) { 648 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 649 List<ManagedProcessorMBean> processors = new ArrayList<>(); 650 // gather all the processors for this CamelContext, which requires JMX 651 ObjectName query = ObjectName 652 .getInstance(jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 653 Set<ObjectName> names = server.queryNames(query, null); 654 for (ObjectName on : names) { 655 ManagedProcessorMBean processor 656 = context.getManagementStrategy().getManagementAgent().newProxyClient(on, ManagedProcessorMBean.class); 657 // the processor must belong to this route 658 if (getRouteId().equals(processor.getRouteId())) { 659 processors.add(processor); 660 } 661 } 662 processors.sort(new OrderProcessorMBeans()); 663 664 // grab route consumer 665 RouteDefinition rd = ((ModelCamelContext) context).getRouteDefinition(route.getRouteId()); 666 if (rd != null) { 667 String id = rd.getRouteId(); 668 int line = rd.getInput().getLineNumber(); 669 String location = getSourceLocation() != null ? getSourceLocation() : ""; 670 sb.append("\n <routeLocation") 671 .append(String.format( 672 " routeId=\"%s\" id=\"%s\" index=\"%s\" sourceLocation=\"%s\" sourceLineNumber=\"%s\"/>", 673 route.getRouteId(), id, 0, location, line)); 674 } 675 for (ManagedProcessorMBean processor : processors) { 676 // the step must belong to this route 677 if (route.getRouteId().equals(processor.getRouteId())) { 678 int line = processor.getSourceLineNumber() != null ? processor.getSourceLineNumber() : -1; 679 String location = processor.getSourceLocation() != null ? processor.getSourceLocation() : ""; 680 sb.append("\n <routeLocation") 681 .append(String.format( 682 " routeId=\"%s\" id=\"%s\" index=\"%s\" sourceLocation=\"%s\" sourceLineNumber=\"%s\"/>", 683 route.getRouteId(), processor.getProcessorId(), processor.getIndex(), location, line)); 684 } 685 } 686 } 687 sb.append("\n</routeLocations>"); 688 return sb.toString(); 689 } 690 691 @Override 692 public void reset(boolean includeProcessors) throws Exception { 693 reset(); 694 load.reset(); 695 thp.reset(); 696 697 // and now reset all processors for this route 698 if (includeProcessors) { 699 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 700 if (server != null) { 701 // get all the processor mbeans and sort them accordingly to their index 702 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 703 ObjectName query = ObjectName.getInstance( 704 jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 705 QueryExp queryExp = Query.match(new AttributeValueExp("RouteId"), new StringValueExp(getRouteId())); 706 Set<ObjectName> names = server.queryNames(query, queryExp); 707 for (ObjectName name : names) { 708 server.invoke(name, "reset", null, null); 709 } 710 } 711 } 712 } 713 714 @Override 715 public void updateRouteFromXml(String xml) throws Exception { 716 // check whether this is allowed 717 if (!isUpdateRouteEnabled()) { 718 throw new IllegalAccessException("Updating route is not enabled"); 719 } 720 721 // convert to model from xml 722 ExtendedCamelContext ecc = context.getCamelContextExtension(); 723 InputStream is = context.getTypeConverter().convertTo(InputStream.class, xml); 724 RoutesDefinition routes = LwModelHelper.loadRoutesDefinition(is); 725 if (routes == null || routes.getRoutes().isEmpty()) { 726 return; 727 } 728 RouteDefinition def = routes.getRoutes().get(0); 729 730 // if the xml does not contain the route-id then we fix this by adding the actual route id 731 // this may be needed if the route-id was auto-generated, as the intend is to update this route 732 // and not add a new route, adding a new route, use the MBean operation on ManagedCamelContext instead. 733 if (ObjectHelper.isEmpty(def.getId())) { 734 def.setId(getRouteId()); 735 } else if (!def.getId().equals(getRouteId())) { 736 throw new IllegalArgumentException( 737 "Cannot update route from XML as routeIds does not match. routeId: " 738 + getRouteId() + ", routeId from XML: " + def.getId()); 739 } 740 741 LOG.debug("Updating route: {} from xml: {}", def.getId(), xml); 742 try { 743 // add will remove existing route first 744 ecc.getContextPlugin(Model.class).addRouteDefinition(def); 745 } catch (Exception e) { 746 // log the error as warn as the management api may be invoked remotely over JMX which does not propagate such exception 747 String msg = "Error updating route: " + def.getId() + " from xml: " + xml + " due: " + e.getMessage(); 748 LOG.warn(msg, e); 749 throw e; 750 } 751 } 752 753 @Override 754 public boolean isUpdateRouteEnabled() { 755 // check whether this is allowed 756 Boolean enabled = context.getManagementStrategy().getManagementAgent().getUpdateRouteEnabled(); 757 return enabled != null ? enabled : false; 758 } 759 760 @Override 761 public boolean isRemoteEndpoint() { 762 if (route.getEndpoint() != null) { 763 return route.getEndpoint().isRemote(); 764 } 765 return false; 766 } 767 768 @Override 769 public boolean equals(Object o) { 770 return this == o || o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route); 771 } 772 773 @Override 774 public int hashCode() { 775 return route.hashCode(); 776 } 777 778 private InflightRepository.InflightExchange getOldestInflightEntry() { 779 return getContext().getInflightRepository().oldest(getRouteId()); 780 } 781 782 @Override 783 public Long getOldestInflightDuration() { 784 InflightRepository.InflightExchange oldest = getOldestInflightEntry(); 785 if (oldest == null) { 786 return null; 787 } else { 788 return oldest.getDuration(); 789 } 790 } 791 792 @Override 793 public String getOldestInflightExchangeId() { 794 InflightRepository.InflightExchange oldest = getOldestInflightEntry(); 795 if (oldest == null) { 796 return null; 797 } else { 798 return oldest.getExchange().getExchangeId(); 799 } 800 } 801 802 @Override 803 public Boolean getHasRouteController() { 804 return route.getRouteController() != null; 805 } 806 807 @Override 808 public RouteError getLastError() { 809 org.apache.camel.spi.RouteError error = route.getLastError(); 810 if (error == null) { 811 return null; 812 } else { 813 return new RouteError() { 814 @Override 815 public Phase getPhase() { 816 if (error.getPhase() != null) { 817 switch (error.getPhase()) { 818 case START: 819 return Phase.START; 820 case STOP: 821 return Phase.STOP; 822 case SUSPEND: 823 return Phase.SUSPEND; 824 case RESUME: 825 return Phase.RESUME; 826 case SHUTDOWN: 827 return Phase.SHUTDOWN; 828 case REMOVE: 829 return Phase.REMOVE; 830 default: 831 throw new IllegalStateException(); 832 } 833 } 834 return null; 835 } 836 837 @Override 838 public Throwable getException() { 839 return error.getException(); 840 } 841 842 @Override 843 public Date getDate() { 844 return error.getDate(); 845 } 846 }; 847 } 848 } 849 850 @Override 851 public Collection<String> processorIds() throws Exception { 852 List<String> ids = new ArrayList<>(); 853 854 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 855 if (server != null) { 856 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 857 // gather all the processors for this CamelContext, which requires JMX 858 ObjectName query = ObjectName 859 .getInstance(jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 860 Set<ObjectName> names = server.queryNames(query, null); 861 for (ObjectName on : names) { 862 ManagedProcessorMBean processor 863 = context.getManagementStrategy().getManagementAgent().newProxyClient(on, ManagedProcessorMBean.class); 864 // the processor must belong to this route 865 if (getRouteId().equals(processor.getRouteId())) { 866 ids.add(processor.getProcessorId()); 867 } 868 } 869 } 870 871 return ids; 872 } 873 874 private Integer getInflightExchanges() { 875 return (int) super.getExchangesInflight(); 876 } 877 878 /** 879 * Used for sorting the processor mbeans accordingly to their index. 880 */ 881 private static final class OrderProcessorMBeans implements Comparator<ManagedProcessorMBean>, Serializable { 882 883 @Override 884 public int compare(ManagedProcessorMBean o1, ManagedProcessorMBean o2) { 885 return o1.getIndex().compareTo(o2.getIndex()); 886 } 887 } 888}