Updated on 2022-06-01 GMT+08:00

Colocation

Function Description

Colocation is to store associated data or data on which associated operations are performed on the same storage node. The HDFS Colocation stores files to be associated on a same data node so that data can be obtained from the same data node during associated operations. This greatly reduces network bandwidth consumption.

Before using the Colocation function, you are advised to be familiar with the internal mechanisms of Colocation, including:

  • Colocation node allocation principles

    Colocation allocates data nodes to locators evenly according to the allocation node quantity.

    The allocation algorithm principle is as follows: Colocation queries all locators, reads the data nodes allocated to the locators, and records the number of use times. Based on the number of use times, Colocation sorts the data nodes. The data nodes that are rarely used are placed at the beginning and selected first. The count increase by 1 each time after a node is selected. The nodes are sorted again, and the subsequent node will be selected.

  • Capacity expansion and Colocation allocation
    After cluster capacity expansion, you can select one of the following two policies shown in Table 1 to balance the usage of data nodes and ensure that the allocation frequency of the newly added nodes is consistent with that of the old data nodes.
    Table 1 Allocation policies

    No.

    Policy

    Description

    1

    Delete the original locators and create new locators for all data nodes in the cluster.

    1. The original locators before the capacity expansion evenly use all data nodes. After the capacity expansion, the newly added nodes are not allocated to existing locators, so Colocation stores data only to the old data nodes.
    2. Data nodes are allocated to specific locators. Therefore, after capacity expansion, Colocation needs to reallocated data nodes to locators.

    2

    Create new locators and plan the data storage mode again.

    The old locators use the old data nodes, while the newly created locators mainly use the new data nodes. Therefore, locators need to be planned again based on the actual service requirements on data.

    Generally, you are advised to use the policy to reallocate data nodes to locators after capacity expansion to prevent data from being stored only to the new data nodes.

  • Colocation and data node capacity

    When Colocation is used to store data, the data is stored to the data node of a specified locator. If no locator planning is performed, the data node capacity will be uneven. Table 2 summarizes the two usage principles to ensure even data node capacity.

    Table 2 Usage principles

    No.

    Usage Principle

    Description

    1

    All the data nodes are used in the same frequency in locators.

    Assume that there are N data nodes, the number of locators must an integral multiple of N (N, 2 N, ...).

    2

    A proper data storage plan must be made for all locators to ensure that data is evenly stored in the locators.

    -

During HDFS secondary development, you can obtain the DFSColocationAdmin and DFSColocationClient instances to create groups, delete groups, write files, and delete files in or from the location.

  • When the Colocation function is enabled and users specify DataNodes, the data volume will be large on some nodes. Serious data skew will result in HDFS data write failures.
  • Because of data skew, MapReduce accesses only several nodes. In this case, the load is heavy on these nodes, while other nodes are idle.
  • For a single application task, the DFSColocationAdmin and DFSColocationClient instances can be used only once. If the instances are used for many times, excessive HDFS links will be created and use up HDFS resources.
  • If you need to perform the balance operation for a file uploaded by colocation, you can set the oi.dfs.colocation.file.pattern parameter on MRS Manager to the file path to avoid invalid colocation. If there are multiple files, use commas (,) to separate the file paths, for example, /test1,/test2.

Sample Code

For complete sample codes, see com.huawei.bigdata.hdfs.examples.ColocationExample.

When running the Colocation project, you need to bind the HDFS user to the supergroup user group.

  1. Initialization

    Kerberos security authentication is required before using Colocation.

    private static void init() throws IOException {
        LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf);
      }
  2. Obtain instances.

    Example: Colocation operations require the DFSColocationAdmin and DFSColocationClient instances. Therefore, the instances must be obtained before you perform operations, such as creating a group.

    public static void main(String[] args) throws IOException {
        init();
        dfsAdmin = new DFSColocationAdmin(conf);
        dfs = new DFSColocationClient();
        dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf);
        createGroup();
        put();
        delete();
        deleteGroup();
        dfs.close();
        dfsAdmin.close();
      }
  3. Create a group.

    Example: Create the gid01 group, which contains three locators.

      private static void createGroup() throws IOException {
        dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01,
            Arrays.asList(new String[] { "lid01", "lid02", "lid03" }));
      }
  4. Write data into a file. The related group must be created before writing data into the file.
    Example: Write data into the testfile.txt file.
     private static void put() throws IOException {
        FSDataOutputStream out = dfs.create(new Path("/testfile.txt"), true,
            COLOCATION_GROUP_GROUP01, "lid01");
        // Data to be written to HDFS
        byte[] readBuf = "Hello World".getBytes("UTF-8");
        out.write(readBuf, 0, readBuf.length);
        out.close();
      }
  5. Delete a file.
    Example: Delete the testfile.txt file.
     public static void delete() throws IOException {
        dfs.delete(new Path("/testfile.txt"));
      }
  6. Delete a group.
    Example: Delete gid01.
     private static void deleteGroup() throws IOException {
        dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01);
      }