[仅 Java 编程语言]

编写具有副本预装入控制器的装入器

[仅 Java 编程语言]具有副本预装入控制器的装入器是除装入器接口外实现 ReplicaPreloadController 接口的装入器。

ReplicaPreloadController 接口旨在为成为主分片的副本提供一种方式,以了解先前主分片是否已完成预装入过程。 如果预装入已部分完成,那么将提供信息以获取先前主项停止的位置。 通过 ReplicaPreloadController 接口实现,成为主数据库的副本将继续执行预装入过程,在此过程中,先前的主数据库停止并继续完成整体预装入。

在分布式 WebSphere® eXtreme Scale 环境中,映射可以具有副本,并且可以在初始化期间预装入大量数据。 预装入是装入器活动,在初始化期间仅在主映射中发生。 如果预装入大量数据,那么可能要很长时间才能完成预装入。 如果主映射已完成大部分预装入数据,但是在初始化期间由于未知原因而停止,那么副本会成为主项。 在此情况下,通常由于新主项执行无条件预装入,因此先前主项完成的预装入数据会丢失。 对于无条件预装入,新主项一开始会启动预装入过程,且忽略先前预装入的数据。 如果您希望新主项在预装入过程中获取先前主项停止的位置,请提供实现 ReplicaPreloadController 接口的装入程序。 有关更多信息,请参阅 API 文档

有关装入程序的信息,请参阅 装入程序。 如果您有兴趣编写常规装入器插件,请参阅 编写装入器

ReplicaPreloadController 接口具有以下定义:
public interface ReplicaPreloadController
{
    public static final class Status
    {
        static public final Status PRELOADED_ALREADY =
							new Status(K_PRELOADED_ALREADY);
        static public final Status FULL_PRELOAD_NEEDED =
							new Status(K_FULL_PRELOAD_NEEDED);
        static public final Status PARTIAL_PRELOAD_NEEDED =
							new Status(K_PARTIAL_PRELOAD_NEEDED);
    }

    Status checkPreloadStatus(Session session,
				BackingMap bmap);
}

以下部分讨论 LoaderReplicaPreloadController 接口的一些方法。

checkPreloadStatus 方法

当装入器实现 ReplicaPreloadController 接口时,会在映射初始化期间先调用 checkPreloadStatus 方法,然后调用 preloadMap 方法。 此方法的返回状态确定是否调用 preloadMap 方法。 如果此方法返回 Status#PRELOADED_ALREADY,那么不会调用 preload 方法。 否则,preload 方法运行。 由于此行为,此方法应充当装入器 initialization 方法。 您必须在此方法中初始化装入器属性。 此方法必须返回正确的状态,或预装入可能不会按预期运行。

public Status checkPreloadStatus(Session session,
							BackingMap backingMap) {
        // When a loader implements ReplicaPreloadController interface,
			 //	this method will be called before preloadMap method during
			 // map initialization. Whether the preloadMap method will be
			 //	called depends on teh returned status of this method. So, this
			 // method also serve as Loader's initialization method. This method
			 // has to return the right staus, otherwise the preload may not
			 // work as expected.

        // Note: must initialize this loader instance here.
        ivOptimisticCallback = backingMap.getOptimisticCallback();
        ivBackingMapName = backingMap.getName();
        ivPartitionId = backingMap.getPartitionId();
        ivPartitionManager = backingMap.getPartitionManager();
        ivTransformer = backingMap.getObjectTransformer();
        preloadStatusKey = ivBackingMapName + "_" + ivPartitionId;

        try {
            // get the preloadStatusMap to retrieve preload status that
					 // could be set by other JVMs.
            ObjectMap preloadStatusMap = session.getMap(ivPreloadStatusMapName);

            // retrieve last recorded preload data chunk index.
            Integer lastPreloadedDataChunk = (Integer) preloadStatusMap
									.get(preloadStatusKey);

            if (lastPreloadedDataChunk == null) {
                preloadStatus = Status.FULL_PRELOAD_NEEDED;
            } else {
                preloadedLastDataChunkIndex = lastPreloadedDataChunk.intValue();
                if (preloadedLastDataChunkIndex == preloadCompleteMark) {
                    preloadStatus = Status.PRELOADED_ALREADY;
                } else {
                    preloadStatus = Status.PARTIAL_PRELOAD_NEEDED;
                }
            }

            System.out.println("TupleHeapCacheWithReplicaPreloadControllerLoader.
								checkPreloadStatus() 
					-> map = " + ivBackingMapName + ", preloadStatusKey = " + preloadStatusKey
                    + ", retrieved lastPreloadedDataChunk =" + lastPreloadedDataChunk + ", 
									determined preloadStatus = "
                    + getStatusString(preloadStatus));

        } catch (Throwable t) {
            t.printStackTrace();
        }

        return preloadStatus;
    }

preloadMap 方法

运行 preloadMap 方法取决于从 checkPreloadStatus 方法返回的结果。 如果调用 preloadMap 方法,那么通常它必须从指定预装入状态映射检索预装入状态信息,并确定如何继续。 理想情况下,preloadMap 方法应知道预装入是否已部分完成以及从何处开始。 将数据预装入期间,preloadMap 方法应先检查预装入状态(存储在预装入状态映射中),然后每次将数据推送至高速缓存时更新预装入状态。 预装入状态(存储在预装入状态映射中)由 checkPreloadStatus 方法在需要检查预装入状态时检索。

public void preloadMap(Session session, BackingMap backingMap)
					throws LoaderException {
        EntityMetadata emd = backingMap.getEntityMetadata();
        if (emd != null && tupleHeapPreloadData != null) {
            // The getPreLoadData method is similar to fetching data
					 // from database. These data will be push into cache as
					 // preload process.
            ivPreloadData = tupleHeapPreloadData.getPreLoadData(emd);

            ivOptimisticCallback = backingMap.getOptimisticCallback();
            ivBackingMapName = backingMap.getName();
            ivPartitionId = backingMap.getPartitionId();
            ivPartitionManager = backingMap.getPartitionManager();
            ivTransformer = backingMap.getObjectTransformer();
            Map preloadMap;

            if (ivPreloadData != null) {
                try {
                    ObjectMap map = session.getMap(ivBackingMapName);

                    // get the preloadStatusMap to record preload status.
                    ObjectMap preloadStatusMap = session.
											getMap(ivPreloadStatusMapName);

                    // Note: when this preloadMap method is invoked, the
									// checkPreloadStatus has been called, Both preloadStatus
									// and preloadedLastDataChunkIndex have been set. And the
									// preloadStatus must be either PARTIAL_PRELOAD_NEEDED 
									// or FULL_PRELOAD_NEEDED that will require a preload again.

                    // If large amount of data will be preloaded, the data usually
									// is divided into few chunks and the preload process will
									// process each chunk within its own tran. This sample only
									// preload few entries and assuming each entry represent a chunk.
                    // so that the preload process an entry in a tran to simulate
									// chunk preloading.

                    Set entrySet = ivPreloadData.entrySet();
                    preloadMap = new HashMap();
                    ivMap = preloadMap;

                    // The dataChunkIndex represent the data chunk that is in
									// processing
                    int dataChunkIndex = -1;
                    boolean shouldRecordPreloadStatus = false;
                    int numberOfDataChunk = entrySet.size();
                    System.out.println("    numberOfDataChunk to be preloaded = " 
											+ numberOfDataChunk);

                    Iterator it = entrySet.iterator();
                    int whileCounter = 0;
                    while (it.hasNext()) {
                        whileCounter++;
                        System.out.println("preloadStatusKey = " + preloadStatusKey
														+ " , 
												whileCounter = " + whileCounter);

                        dataChunkIndex++;

                        // if the current dataChunkIndex <= preloadedLastDataChunkIndex
                        // no need to process, becasue it has been preloaded by
											// other JVM before. only need to process dataChunkIndex
											// > preloadedLastDataChunkIndex
                        if (dataChunkIndex <= preloadedLastDataChunkIndex) {
                            System.out.println("ignore current dataChunkIndex =
															" + dataChunkIndex + " that has been previously
															preloaded.");
                            continue;
                        }

                        // Note: This sample simulate data chunk as an entry.
                        // each key represent a data chunk for simplicity.
                        // If the primary server or shard stopped for unknown
										  // reason, the preload status that indicates the progress
										  // of preload should be available in preloadStatusMap. A
											// replica that become a primary can get the preload status
											// and determine how to preload again.
                        // Note: recording preload status should be in the same
										  // tran as putting data into cache; so that if tran
											// rollback or error, the recorded preload status is the
											// actual status.

                        Map.Entry entry = (Entry) it.next();
                        Object key = entry.getKey();
                        Object value = entry.getValue();
                        boolean tranActive = false;

                        System.out.println("processing data chunk. map = " +
													this.ivBackingMapName + ", current dataChunkIndex = " +
													dataChunkIndex + ", key = " + key);

                        try {
                            shouldRecordPreloadStatus = false; // re-set to false
                            session.beginNoWriteThrough();
                            tranActive = true;

                            if (ivPartitionManager.getNumOfPartitions() == 1) {
                                // if just only 1 partition, no need to deal with
														 // partition.
                                // just push data into cache
                                map.put(key, value);
                                preloadMap.put(key, value);
                                shouldRecordPreloadStatus = true;
                            } else if (ivPartitionManager.getPartition(key) ==
														 ivPartitionId) {
                                // if map is partitioned, need to consider the
														 // partition key only preload data that belongs
														 // to this partition.
                                map.put(key, value);
                                preloadMap.put(key, value);
                                shouldRecordPreloadStatus = true;
                            } else {
                                // ignore this entry, because it does not belong to
														 // this partition.
                            }

                            if (shouldRecordPreloadStatus) {
                                System.out.println("record preload status. map = " +
																		this.ivBackingMapName + ", preloadStatusKey = " +
																		preloadStatusKey + ", current dataChunkIndex ="
																		+ dataChunkIndex);
                                if (dataChunkIndex == numberOfDataChunk) {
                                    System.out.println("record preload status. map = " +
																			  this.ivBackingMapName + ", preloadStatusKey = " +
																				preloadStatusKey + ", mark complete =" +
																				preloadCompleteMark);
                                    // means we are at the lastest data chunk, if commit
																// successfully, record preload complete.
																// at this point, the preload is considered to be done
                                    // use -99 as special mark for preload complete status.

                                    preloadStatusMap.get(preloadStatusKey);

                                    // a put follow a get will become update if the get
																 // return an object, otherwise, it will be insert.
                                    preloadStatusMap.put(preloadStatusKey, new
																			Integer(preloadCompleteMark));

                                } else {
                                    // record preloaded current dataChunkIndex into
																 // preloadStatusMap a put follow a get will become
																 // update if teh get return an object, otherwise, it
																 // will be insert.
                                    preloadStatusMap.get(preloadStatusKey);
                                    preloadStatusMap.put(preloadStatusKey, new
																		Integer(dataChunkIndex));
                                }
                            }

                            session.commit();
                            tranActive = false;

                            // to simulate preloading large amount of data
                            // put this thread into sleep for 30 secs.
                            // The real app should NOT put this thread to sleep
                            Thread.sleep(10000);

                        } catch (Throwable e) {
                            e.printStackTrace();
                            throw new LoaderException("preload failed with
												  exception: " + e, e);
                        } finally {
                            if (tranActive && session != null) {
                                try {
                                    session.rollback();
                                } catch (Throwable e1) {
                                    // preload ignoring exception from rollback
                                }
                            }
                        }
                    }

                    // at this point, the preload is considered to be done for sure
                    // use -99 as special mark for preload complete status.
                    // this is a insurance to make sure the complete mark is set.
                    // besides, when partitioning, each partition does not know when
									// is its last data chunk. so the following block serves as the
									// overall preload status complete reporting.
                    System.out.println("Overall preload status complete -> record
												  preload status. map = " + this.ivBackingMapName + ",
												  preloadStatusKey = " + preloadStatusKey + ", mark complete =" +
													preloadCompleteMark);
                    session.begin();
                    preloadStatusMap.get(preloadStatusKey);
                    // a put follow a get will become update if teh get return an object,
                    // otherwise, it will be insert.
                    preloadStatusMap.put(preloadStatusKey, new Integer(preloadCompleteMark));
                    session.commit();

                    ivMap = preloadMap;
                } catch (Throwable e) {
                    e.printStackTrace();
                    throw new LoaderException("preload failed with exception: " + e, e);
                }
            }
        }
    }

预装入状态映射

您必须使用预装入状态映射来支持 ReplicaPreloadController 接口实现。 preloadMap 方法应始终先检查存储在预装入状态映射中的预装入状态,无论何时它将数据推送到高速缓存,均会更新预装入状态映射中的预装入状态。 checkPreloadStatus 方法从预装入状态映射检索预装入状态,确定预装入状态,并将状态返回给其调用程序。 预装入状态映射应与其他具有副本预装入控制器装入器的映射位于相同 mapSet 中。