geotrellis使用(二十一)自动导入数据

目录

一、前言

       之前Geotrellis数据导入集群采用的是命令行的方式,即通过命令行提交spark任务来ingest数据,待数据导入完毕再启动主程序进行数据的调用。这样造成的一个问题就是数据导入与数据处理不能无缝对接,并且只能由管理员导入数据导入数据流程也很麻烦,用户想要导入自己的数据几乎不可能。本文为大家介绍一种自动数据导入方式——通过浏览器前端界面实现交互式数据导入。

二、整体介绍

       通过浏览器方式导入,摆脱了SHELL的限制并且可交互式,大大方便了普通用户的操作;并且也能将数据的导入与数据管理、用户控制、权限控制等结合起来,可以说是优点非常多,也是一个很重要的环节;同时相当于直接实现了数据导入、处理、展示的流程化作业,将数据服务一体化,等同于在分布式集群中实现了Arcgis等传统软件的发布数据服务。

       本文主要从前台和后台两个方面来介绍数据的自动导入,前台主要实现了数据位置的选择,单波段多波段数据的选择等,后台接收到用户的请求后将选择的数据导入到相应的位置,导入完毕后即可在前台进行显示。

三、前台界面

       前台界面比较简单,由于不是美工出身,所以比较简陋,主要看功能。整体界面如下。

交互式数据导入

       主要就是一个地址输入框,目前还是手工输入,后续可以与hdfs的管理结合起来,实现从hdfs中选择数据;一个是否多波段得选择框,如果数据为多波段需要勾选此框,这样后台会将数据直接切成MultibandTile否则会将波段合并切割成只有一个波段的Tile;一个导入按钮,无需多言。浏览器将文件位置以及是否多波段通过ajax的方式发送到后台,后台接收到之后进行导入处理。

四、后台控制

       上一篇文章中简单介绍了1.0版Geotrellis在导入数据的时候配置信息发生了变化,主要信息基本都写在了json文件中(见geotrellis使用(二十)geotrellis1.0版本新功能及变化介绍),这也为我们实现自动导入提供了便利,只需要将json文件做成模板,读出模板字符串将相应信息替换成用户输入值,然后将信息提交到spark完成作业即可。

4.1 生成导入数据的EtlConf

       EtlConf是Geotrellis中导入数据的配置类,要实现导入数据,首先就要创建EtlConf的实例,然后将此实例交给Etl类即可完成数据导入。所以我们首先要实现根据用户输入创建EtlConf实例。由于原始的EtlConf类直接根据在SHELL中提交作业时配置的input.json、output.json与backend-profiles.json文件中读取信息完成自身的实例化,所以我们需要创建一个自己的EtlConf类根据前台传入的数据封装配置信息,并生成一个EtlConf实例。我们可以直接拼接json数据进行传入,我在这里偷了个懒,将上述三json文件做成了模板,自定义的EtlConf类先读取模板然后根据前台传入数据修改模板配置信息,但是由于output.json与backend-profiles.json文件内容基本不需要变化,所以不用做成模板,直接读取即可,当然如果你有需要更改的配置也可以进行同样操作,input.json模板文件如下所示:

[
  {
    "name": "{name}",
    "format": "{format}",
    "backend": {
      "type": "hadoop",
      "path": "{path}"
    },
    "cache": "NONE"
  }
]

       由于没有考虑从S3读取数据,所以backend.type项并未配置成模板,同样如果需要自行更改即可。其中{name}可以表示数据导入存放的层,当然此处可以根据用户信息或时间等信息进行配置,只要能够与当前用户相关联即可;{format}表示输入文件信息,如果是单波段文件此处为geotiff,如果为多波段文件此处为multiband-geotiff;{path}表示文件位置,根据前台数据修改即可。配置好这些信息之后即可创建EtlConf实例,方法与原始EtlConf类相同,这里不做介绍,将自定义的EtlConf类整体代码放在下面,仅供参考。

import com.github.fge.jackson.JsonLoader
import geotrellis.helper.ConfigHelper
import geotrellis.spark.etl.config.{BackendProfile, EtlConf, Input, Output}
import geotrellis.spark.etl.config.json._
import org.apache.spark.SparkContext
import spray.json.DefaultJsonProtocol._
import spray.json._

/**
  * Created by wsf on 2016/9/8.
  */
object UserEtlConf {

  def updateInputJson(path: String, isMulti: Boolean, m: Map[Symbol, String]) = {
    val input = m('input)
    val inputPath = s"file://${path}"
    val format = if(isMulti) "multiband-geotiff" else "geotiff"
    val name = "userlayer"
    val realInput = input.replace("{path}", inputPath).replace("{format}", format).replace("{name}", name)
    m.updated('input, realInput)
  }

  //todo: update some output information
  def updateOutputJson(m: Map[Symbol, String]) = {
    m
  }

  //todo: update some backend-profiles information
  def updateBackendProfilesJson(m: Map[Symbol, String]) = {
    m
  }

  def getSet(path: String, isMulti: Boolean)(implicit sc: SparkContext) = {
    val confPath = ConfigHelper.confPath
    val args = Array("--input", s"file://${confPath}inputTemplete.json", "--output", s"file://${confPath}output.json", "--backend-profiles", s"file://${confPath}backend-profiles.json")
    val m: Map[Symbol, String] = EtlConf.parse(args)
    val inputM = updateInputJson(path, isMulti, m)
    val outputM = updateOutputJson(inputM)
    updateBackendProfilesJson(outputM)
  }

  def apply(path: String, isMulti: Boolean)(implicit sc: SparkContext): List[EtlConf] = {
    val m = getSet(path, isMulti)

    val (backendProfiles, input, output) = (m('backendProfiles), m('input), m('output))

    val inputValidation = EtlConf.inputSchema.validate(JsonLoader.fromString(input), true)
    val backendProfilesValidation = EtlConf.backendProfilesSchema.validate(JsonLoader.fromString(backendProfiles), true)
    val outputValidation = EtlConf.outputSchema.validate(JsonLoader.fromString(output), true)

    if (!inputValidation.isSuccess || !backendProfilesValidation.isSuccess || !outputValidation.isSuccess) {
      if (!inputValidation.isSuccess) {
        println("input validation error:")
        println(inputValidation)
      }
      if (!backendProfilesValidation.isSuccess) {
        println("backendProfiles validation error:")
        println(backendProfilesValidation)
      }
      if (!outputValidation.isSuccess) {
        println("output validation error:")
        println(outputValidation)
      }
      sys.exit(1)
    }

    val backendProfilesParsed = backendProfiles.parseJson.convertTo[Map[String, BackendProfile]]
    val inputsParsed = InputsFormat(backendProfilesParsed).read(input.parseJson)
    val outputParsed = OutputFormat(backendProfilesParsed).read(output.parseJson)

    inputsParsed.map { inputParsed =>
      new EtlConf(
        input = inputParsed,
        output = outputParsed,
        inputProfile = inputParsed.backend.profile,
        outputProfile = outputParsed.backend.profile
      )
    }
  }
}

4.2 完成数据导入

       有了EtlConf实例,只需要将其传入Etl类即可完成数据导入,此处要注意的是需要根据是否多波段传入不同的类型,具体代码如下所示:

  def ingest(path: String, isMulti: Boolean)(implicit sc: SparkContext): Unit = {
    if (!isMulti)
      ingestTile[ProjectedExtent, SpatialKey, Tile](path, isMulti)
    else
      ingestTile[ProjectedExtent, SpatialKey, MultibandTile](path, isMulti)
  }

  def ingestTile[
  I: Component[?, ProjectedExtent] : TypeTag : ? => TilerKeyMethods[I, K],
  K: SpatialComponent : TypeTag : AvroRecordCodec : Boundable : JsonFormat,
  V <: CellGrid : TypeTag : Stitcher : (? => TileReprojectMethods[V]) : (? => CropMethods[V]) : (? => TileMergeMethods[V]) : (? => TilePrototypeMethods[V]) : AvroRecordCodec
  ](
     path: String, isMulti: Boolean, modules: Seq[TypedModule] = Etl.defaultModules
   )(implicit sc: SparkContext) = {
    implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
    implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]

    val etlConfs = UserEtlConf(path, isMulti)
    etlConfs foreach { conf =>
      val etl = Etl(conf, Etl.defaultModules)
      val sourceTiles = etl.load[I, V]
      val (zoom: Int, tiled) = etl.tile[I, V, K](sourceTiles)
      etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
    }
  }

       主要就是在ingest函数中调用ingestTile函数的时候根据是否多波段为泛型赋不同的类型,单波段为Tile,多波段为MultibandTile。ingestTile中的代码与原始Etl类中的代码基本相同,首先使用自定义的UserEtlConf类创建EtlConf实例,然后提交到Etl完成数据导入,自此便完成了交互式数据导入。

4.3 前台浏览导入结果

       如果前台能够在后台导入完毕后既浏览到自己的数据,这在用户体验以及查看数据完整性等方面都有很好的作用。实现的方式有很多,如通过WebSocket在后台导入完毕后通知前台刷新页面,或者前台定时循环请求后台等。无论采用什么方式只需要能够将导入的数据以TMS的方式发送到前台即可实现该功能,这样就打通了数据发布的整个流程。

五、总结

       本文为大家简单介绍了如何实现交互式的数据导入。洋洋洒洒关于Geotrellis的使用已经写了二十多篇,总体来说经历了一个从“无知”到稍微“有知”的这么一个过程。回首走过的这段Geotrellis岁月,从中无论是编程技术还是思维方式还是遥感影像处理以及地理信息系统甚至文字功底等多方面知识都有了明显的提高,自我感觉博客中总结的技术点以及博客行文也都相较越来越好,这条路我会一直走下去。

转载自:http://www.cnblogs.com/shoufengwei/p/5856323.html

You may also like...