Explanation of the cluster environment for big data tools

When using big data tools in automated processing tools, we can choose to use a distributed cluster environment to calculate algorithm functions, thus greatly improving the computing efficiency of large amounts of data.

Deployment of cluster environment

The big data tools provided by SuperMap can support the management of spatial data in distributed cluster storage and use distributed clusters for spatial analysis. The distributed storage framework supports HDFS, and the distributed computing framework supports Spark, with supported versions:

  1. Apache Hadoop 3.3.2, for cluster deployment please refer to Apache Hadoop.

  2. Apache Spark 3.3.4, for cluster deployment please refer to Apache Spark.

Use of cluster environment

When using big data tools in the iServer automated service modeling page and the iDesktopX automated modeling interface, you can find the Spark environment settings tab in the parameter settings of each tool, used to set the parameters of the distributed cluster environment.

SuperMap is compatible with the following two cluster managers:

  • Standalone: The cluster manager built into Spark, suitable for running the Spark cluster alone or with relatively few other components integrated.
  • YARN: The cluster manager in Hadoop, suitable for running Spark in the Hadoop ecosystem and seamlessly integrating with other Hadoop components. In scenarios where multiple tasks are executed simultaneously, it is recommended to use the YARN cluster and enable dynamic resource scheduling to improve resource utilization and the efficiency of concurrent task execution.

I. Deployment of Spark Standalone cluster environment

The relevant parameters for connecting to the cluster are:

  • master: Required parameter, the address of the cluster master, fill in local[K] for single machine use (K represents the number of threads, * represents the number of cores on the local machine), fill in the address of the cluster master for cluster use, such as spark://127.0.0.1:7077.

  • appName: Required parameter, custom application name, such as processing.

  • settings: When using the cluster, Spark property parameters need to be set, the format of the settings is key1=value1, key2=value2, commonly used parameters include:

    • spark.executor.cores=4: The number of CPU cores allocated to each executor, the more cores allocated, the stronger the executor's concurrency, default setting is 4.
    • spark.executor.memory=8g: The amount of memory used by each executor, default setting is 8G.
    • spark.driver.cores=4: Specifies the number of cores allocated to the driver process, please note that this parameter does not take effect in Client mode.
    • spark.driver.memroy=4g: The amount of memory used by the driver, default setting is 4G. If the driver-side log reports java.lang.OutOfMemoryError, try increasing this value. Please note that this parameter does not take effect in Client mode.
    • spark.driver.port=4040: Port for the driver process, please note that this parameter does not take effect in Client mode.
    • spark.driver.maxResultSize=1g: The maximum value of the serialized results of all partitions, it is recommended to set it to 0 to indicate no limit.
    • spark.kryoserlizer.buffer.max=2000m: The maximum value of the Kryo serialization buffer, the setting range is 1~2047MB. A larger buffer can improve the performance of serialization and deserialization, but it will also consume more memory, default setting is 2000MB.
    • spark.default.parallelism=2000: Used to set the default degree of parallelism, default setting is 2000.
    • spark.memory.fraction=0.6: The proportion of memory allocated for execution and storage to the total memory minus 300MB.
    • spark.network.timeout=3600s: The default timeout duration for all network interactions.
    • spark.local.dir=F://Temp: Used to specify the path where Spark stores temporary files on the local disk. By default, temporary files are stored in the system's temporary folder, this parameter can be used to specify a different directory. To ensure the normal operation of the cluster, make sure that the specified path exists and is usable on each cluster node (it is recommended to use a shared directory).

    For more Spark cluster parameter settings, please refer to Spark Configuration.

    • Index partition parameters

    When conducting distributed calculations on millions of spatial data, by properly controlling the number of partitions of the data, the computational performance of distributed overlay analysis can be improved. In Spark, each partition of an RDD is processed by a Task, which means that the number of partitions determines the number of Task tasks started in the cluster. The more partitions there are, the smaller the amount of data processed by a single partition, and the smaller the total memory consumed by the cluster during program operation. For spatial feature FeatureRDD and DSFFeatureRDD, it is recommended to control the number of data records in each partition between 20000 and 50000, and the specific value can be adjusted based on the actual situation. The calculation formula is: number of partition data records = total number of records in the dataset / number of partitions.

    There are two methods to control the data partition:

    (1) Grid index partition: When the data distribution is relatively uniform, it is recommended to use the grid index partition method. By re-partitioning the data that falls within the same index, it can ensure that objects in the same range are on the same node in space. The advantage of using the grid index partition method is that it can effectively handle large-scale data.

    The parameters for grid index partition are as follows:

    spark.bdt.1st.index = grid: Specifies the index type used as grid index;

    spark.bdt.1st.index.grid.cols = 20: The number of rows in the grid index.

    spark.bdt.1st.index.grid.rows = 20: The number of columns in the grid index.

    The actual number of partitions in the grid index is equal to the product of the number of rows and columns, and the number of rows and columns of the grid index can be set reasonably based on the total number of records in the dataset.

Explanation of Spark Standalone Application Deployment Modes

The automated module for the Spark application supports four deployment modes (deployMode), which can be selected through a dropdown list: local (single-process), local (multi-process), client, and cluster.

Local mode specifically includes the following two types:

local (single-process): simulates Spark distributed computing through multiple threads on a single machine, directly running locally for model debugging, often used for testing models with small-scale data.
Enter local[*] (where * represents the number of cores on the local machine) as the cluster master address.

local (multi-process): simulates a distributed environment by starting multiple processes on the local machine. It distributes data to multiple processes for computation, making better use of computing resources and improving computational efficiency. As it involves reading and writing data in multiple processes, it is recommended to use a database-type data source to store computational data.
The default cluster master address is spark://127.0.0.1:7077, where 127.0.0.1 represents the local loopback IP for the local machine. Additionally, localhost can also be used as the IP, which resolves to 127.0.0.1 by default.

It should be noted that when using the local (multi-process) mode for distributed stacking analysis on a single machine with data at the tens of millions level, it is necessary to add index partition parameters and control the number of data partitions rationally to ensure effective processing of massive data.

Comparison of cluster mode and client mode:

Cluster Mode Client Mode
The driver runs simultaneously on nodes in the cluster and does not occupy computing resources of the iServer node, but cannot directly access files on the iServer node. Typically, it uses a database-type data source or shared directory to achieve synchronized access to data on multiple nodes, maximizing the use of the cluster. The driver runs on the iServer server and can access local files on the iServer server, including UDB, UDBX, etc. Since only one application runs on Spark, multiple tasks need to queue for execution, which may cause delays.

Environment parameter settings in cluster mode:

Parameter Name Parameter Description Example
gp.hdfs.url Provides an HDFS directory as a temporary location for storing jar files and models. gp.hdfs.url=hdfs://127.0.0.1:9000/up
gp.appResource
(Required)
In the absence of an HDFS environment, filling in this parameter allows direct reading of the bat-*-all.jar file in the shared directory without the need to distribute the required Jar files during startup, resolving the issue of excessive disk space consumption due to repeated distribution of bdt-all packages. The specific steps are as follows:
1. Place the sps-startup-11.X.X-SNAPSHOT.jar file from the iServer product directory/support/geoprocessing/lib and the bdt-all-runtime-11.X.X.jar file from the iServer product directory/support/iObjectsForSpark into the same folder on each node of the cluster (suggested to use a shared directory).
2. Fill in the shared directory path for these files in this environment parameter, ensuring that the file path starts with file: and the two file paths are separated by a comma in English.
file:/ssdData/share/sps-startup.jar,file:/ssdData/share/bdt-all-runtime.jar
spark.web.ui.port The port for Spark's web UI page, with the default port being 8080. spark.web.ui.port=8080

Environment parameter settings in client mode:

Parameter Name Parameter Description Example
spark.driver.host In a dual-network environment or when the local IP and cluster IP are inconsistent, this parameter needs to be set to avoid potential issues of not finding the driver, generally filled with the address of the iServer server. spark.driver.host=127.0.0.1
gp.appResource Filling in this parameter allows direct reading of the bat-*-all.jar file in the shared directory without the need to distribute the required Jar files during startup, resolving the issue of excessive disk space consumption due to repeated distribution of bdt-all packages. The specific steps are as follows:
1. Place the sps-startup-11.X.X-SNAPSHOT.jar file from the iServer product directory/support/geoprocessing/lib and the bdt-all-runtime-11.X.X.jar file from the iServer product directory/support/iObjectsForSpark into the same folder on each node of the cluster (suggested to use a shared directory).
2. Fill in the shared directory path for these files in this environment parameter, ensuring that the file path starts with file: and the two file paths are separated by a comma in English.
file:/ssdData/share/sps-startup.jar,file:/ssdData/share/bdt-all-runtime.jar

When modifying environment parameters after model execution, please note:

1. Any modifications to parameters under Client mode require restarting the Spark task for the changes to take effect.

Restarting steps:

(1) View and close tasks through Spark's WebUI (http://{ip}:8080).

(2) Return to the automated modeling page to execute the model and the modifications will take effect.

2. Any modifications to parameters under local mode require restarting the GPA service for the changes to take effect.

Restarting steps:

(1) Return to the iServer homepage and enter the "Service Management" page.

(2) Select "Automated Processing Service", stop the service, and then start it again.

Spark Configuration Items

  1. To achieve file dependencies and log cleanup, ensuring server disk space, it is necessary to modify the Spark directory/conf/spark-env.sh configuration file and add the following parameters:
  • spark.worker.cleanup.enabled: Whether to enable periodic cleanup.
  • spark.worker.cleanup.interval: Cleanup interval, how often to check and clean up, in seconds.
  • spark.worker.cleanup.appDataTtl: Retention period, how long to keep after a job ends, to determine whether the files can be deleted, in seconds.

For example, to check the worker directory every 60 seconds and delete task directories that have completed execution after 1 hour.


export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=60 -Dspark.worker.cleanup.appDataTtl=3600"
  1. When the language environment of the client and the cluster is inconsistent, it is necessary to modify the Spark directory/conf/spark-env.sh configuration file and add the parameter setting: export LANG=zh_CN.UTF-8, to keep the cluster's language environment consistent with the client.

2. Deployment of Spark on YARN Cluster Environment

The relevant parameters for connecting to the cluster are:

  • master: Required parameter, set it as yarn. Unlike the Spark Standalone cluster, the master address of the YARN cluster is not set in the master parameter, but in the Hadoop configuration file.

  • appName: Required parameter, customized application name, such as processing.

  • settings: When using the cluster, Spark attribute parameters need to be set. The format for setting is key1=value1, key2=value2. Common parameters include:

    • spark.driver.cores=4: Specifies the number of cores allocated to the driver process.
    • spark.driver.memroy=4g: Amount of memory used by the driver, default is set to 4G. If the driver-side log reports an error java.lang.OutOfMemoryError, try increasing this value.
    • spark.executor.instances=2: Number of executors.
    • spark.executor.cores=4: Number of CPU cores assigned to each executor. The more cores allocated, the stronger the concurrency of the executor. Default is set to 4.
    • spark.executor.memory=8g: Amount of memory used by each executor, default is set to 8G.
    • spark.yarn.am.memory=1g: Memory size of the Spark Application Master, used to manage the resources required to run Spark applications.
    • yarn.nodemanager.resource.memory-mb=10g: Specifies the total amount of memory that YARN can use on the nodemanager node (make sure the parameter value is greater than or equal to the value of spark.executor.memory). This can avoid terminations of executors due to memory restrictions.
    • yarn.scheduler.maximum-allocation-mb=10g: Maximum memory that the resource scheduler (Scheduler) can allocate to a single application container (container) (it is recommended to keep it consistent with yarn.nodemanager.resource.memory-mb).

Parameters related to security authentication:

  • spark.kerberos.principal: When Kerberos authentication is enabled, fill in the Kerberos principal identity (principal) for authentication with the Kerberos authentication center in the cluster.
  • spark.kerberos.keytab: Keytab corresponding to the Kerberos principal identity, which contains the keys used for identity authentication.

Explanation of Spark on YARN Application Deployment Modes

Currently, the YARN cluster only supports the cluster deployment mode. The environment parameter settings under the cluster mode are as follows:

Parameter Name Parameter Description Example
HADOOP_CONF_DIR
or
YARN_CONF_DIR
Directory of the Hadoop cluster client configuration files. Read YARN cluster location and other information from the configuration files.  
spark.yarn.jars Location of the Spark dependencies. If not specified, the default is to use the spark/jars directory under the iServer product directory. For example: hdfs://bigdata0:8020/user/spark/jars
spark.executor.extraLibraryPath
(required)
Specifies the path of the component library that each executor should include when starting. The objectsjava/bin folder in the same path of the iServer product directory needs to be placed in the same path of each node in the cluster. It should be noted that the components are different for Linux and Windows versions, so the corresponding components need to be copied to the corresponding cluster node machines. For example: /opt/SuperMap/iobjects/trunk/Bin

Spark Configuration Items

  1. When the language environment of the client and the cluster are different, you need to modify the yarn-env.sh configuration file under the Hadoop directory and add the following parameter setting: export LANG=zh_CN.UTF-8, to keep the language environment of the cluster consistent with the client.

iServer Configuration Items

In order to use the cluster and Python tools more flexibly, you can modify the related configurations for processing automated services in the iserver-geoprocessing.xml file located in the \webapps\iserver\WEB-INF directory under the iServer product directory:

  • clusterRPCServerIP: In cluster mode, set the IP address to which the automated service is bound. The remote Spark driver process will communicate with this IP. The default is to bind the IP of the first network card on the machine where the service is located. When there are multiple network cards, the IP of the physical machine where the service is located should be bound to ensure that the remote Spark driver process can communicate with this IP.
  • (xmx): The maximum available memory for processing automated services. The default is 1/4 of the physical memory.
  • port: The port used when starting the automated service (WebUI). The default port number is 8097.
  • clusterRPCPort: Used for communication with the remote Spark driver process when running the automated service in Spark Cluster Mode. Real-time updates on the execution status of Spark tasks and understanding the progress of the tool. The default port number is 18098.
  • maxConnection: Specifies the maximum number of connections allowed when using the cluster in cluster mode. The default is 50.
  • connectionRequestTimeout: Timeout for obtaining a connection from the connection pool. The default is 3000 milliseconds.
  • pythonSocketPort: When the automated service calls a Python tool, a separate process is started for Python. This port is used for communication between the automated service process and the Python process. The default port number is 18099.