/** * Load the aggregate with the given unique identifier. No version checks are done when loading an aggregate, * meaning that concurrent access will not be checked for. * * @param aggregateIdentifier The identifier of the aggregate to load * @return The aggregate root with the given identifier. * @throws AggregateNotFoundException if aggregate with given id cannot be found */ Aggregate<T> load(String aggregateIdentifier);
/** * Load the aggregate with the given unique identifier. * * @param aggregateIdentifier The identifier of the aggregate to load * @param expectedVersion The expected version of the loaded aggregate * @return The aggregate root with the given identifier. * @throws AggregateNotFoundException if aggregate with given id cannot be found */ Aggregate<T> load(String aggregateIdentifier, Long expectedVersion);
/** * Creates a new managed instance for the aggregate, using the given {@code factoryMethod} * to instantiate the aggregate's root. * * @param factoryMethod The method to create the aggregate's root instance * @return an Aggregate instance describing the aggregate's state * @throws Exception when the factoryMethod throws an exception */ Aggregate<T> newInstance(Callable<T> factoryMethod)throws Exception; }
try { // multiply 100 on the price to avoid float number CreateProductCommand command = new CreateProductCommand(id,name,price*100,stock); commandGateway.sendAndWait(command); response.setStatus(HttpServletResponse.SC_CREATED);// Set up the 201 CREATED response return; } catch (CommandExecutionException cex) { LOGGER.warn("Add Command FAILED with Message: {}", cex.getMessage()); response.setStatus(HttpServletResponse.SC_BAD_REQUEST); if (null != cex.getCause()) { LOGGER.warn("Caused by: {} {}", cex.getCause().getClass().getName(), cex.getCause().getMessage()); if (cex.getCause() instanceof ConcurrencyException) { LOGGER.warn("A duplicate product with the same ID [{}] already exists.", id); response.setStatus(HttpServletResponse.SC_CONFLICT); } } } } }
@Bean public Serializer axonJsonSerializer(){ returnnew JacksonSerializer(); }
@Bean public EventStorageEngine eventStorageEngine(){ returnnew MongoEventStorageEngine( axonJsonSerializer(),null, axonMongoTemplate(), new DocumentPerEventStorageStrategy()); }
@Bean(name = "axonMongoTemplate") public MongoTemplate axonMongoTemplate(){ MongoTemplate template = new DefaultMongoTemplate(mongoClient(), mongoDbName, eventsCollectionName, snapshotCollectionName); return template; }
@Bean public MongoClient mongoClient(){ MongoFactory mongoFactory = new MongoFactory(); mongoFactory.setMongoAddresses(Arrays.asList(new ServerAddress(mongoUrl))); return mongoFactory.createMongo(); } }
用Jacson做序列化器,MongoClient提供了具体连接实现,MongoTemplate指定了db名称、存放event的collection名称、存放snapshot的collection名称。(snapshot的概念以后再解释) 中间一个参数是做不同版本Event间兼容的,我们先留null。 EventStorageEngine指定MongoEventStorageEngine,spring-boot-autoconfigure中的AxonAutoConfiguration就会帮你把它注入到Axon的配置器中。 这里指的注意的是,使用Jackson做序列化器时,对应的entity的所有需要持久化的field必须都有public getter方法,因为Jackson在反射时默认只读public修饰符的field,否则就会报 com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class com.edi.learn.axon.common.domain.OrderId and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: com.edi.learn.axon.common.events.OrderCreatedEvent[“orderId”]) 错误。如果确实不想写,那么在Entity的class声明前加上@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY) 到此,Command端的实现已基本完成(Event我没写,因为与前文类似),那么我们来看看Query端。
公司正打算采用Axon框架重构之前的项目,你这篇教程写得很实用,感谢分享。
如果使用ES,那么服务启动的时候回加载所有的Aggregation对象吗?感觉不可能是这样啊
这个可以自定义,你可以选择在启动时就回溯,加载到内存,也可以在调用时再去回溯,也是很快的。
有一个疑问点,就是你在Query端定义了一个EventHandler来接受创建事件,以便在query端保存一份数据供查询使用,但是聚合在每次触发命令的时候都会把历史事件进行回朔重放一边,那query端不是每次在聚合有命令过来时也会收到一大堆命令?但实际上query端只希望获取当前聚合最后的状态就够了,不需要对历史的事件都重演一遍。如果只是创建的话还可以在创建时判断是否是否已经创建过,但如果是状态变化命令的话,就基本上等于query端也跟着跑了一遍。
回溯已保存的历史事件获得Aggregate最新状态时,并不会重新触发EventHandler。
试了一下,确实不会触发外部的EventHandler。
聚合内部自己的EventHandler方法会触发。
是不是说回朔事件的时候只会执行自己聚合内部的EventHandler,外部的不会执行?
还是说axon有其他方式来区分啊?
徐老师您好, 请问
中为何没用@EventSourcingHandler? 谢谢!