本地文件接入
概要:
本例主要演示
LuceneIndexTask接入本地文件数据的流程,整个过程会与csv文件接入的流程比较类似,但与csv文件接入中的lucene_index_realtime相比,本流程的LuceneIndexTask支持一些额外的优化设置:
- 通过
maxRowsPerSegment或numShards属性可以设定不同segment生成策略,二者区别如下:maxRowsPerSegment: 每个segment最大的存储行数numShards: 设置总的segment数- 通过
overwrite设置对datasource进行覆盖写入还是追加写入
基于以上的优化,接入本地文件数据时推荐使用本流程
以下以接入 csv 数据为例。
第一步:准备好目标csv文件数据
- 本例的部分
csv数据如下:1500520434191,1,jkcdashjfksa 1500520430411,2,fdfjdkhjfk 1500020434191,4,fkdsjfkskjfa 1420520434191,5,kljfdsaj 1500510434191,9,sagfkldjgf 1500520214191,8,fdjsklklf
第二步:创建、编辑、保存json文件
- 具体的json文件配置请参考后文的LuceneIndexTask_Json实例
第三步:上传csv、json文件到MiddleManager服务器上
- 使用
scp命令将csv、json文件上传到MiddleManagerscp {file_path} root@{MiddleManagerIP}:{storage_dir}file_path
csv或json文件在本地的绝对路径,要指定到文件名这一层
MiddleManagerIP: druid的MiddleManager节点ip地址
storage_dir 在MiddleManager的存放目录,使用绝对路径
第四步:执行命令,启动Task
- 通过
ssh远程登录到第三步中选择的MiddleManager中 - 执行
cd命令进入第三步中选择存放json文件的目录中 - 如果task在worker上的分配策略为均分策略,执行
curl命令取消之curl -X 'POST' -H 'Content-Type:application/json' -d '{"selectStrategy":{"type":"specialEqualDistribution"},"autoScaler":null}' http://{overlordIP}:8090/druid/indexer/v1/worker 执行
curl命令启动Task,具体命令格式如下:curl -X 'POST' -H 'Content-Type:application/json' -d @{file_name} http://{OverlordIP}:8090/druid/indexer/v1/taskOverlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
file_name
json文件名
第五步:查看Task执行情况
查看日志 访问:
http://{OverlordIP}:8090/console.html,点击Task的日志,查看Task的执行情况OverlordIP: druid的overlord节点ip地址

查看执行结果 使用
sugo-plyql查询Task的执行结果,具体的命令格式为:./plyql -h {OverlordIP} -q 'select count(*) from {datasource}'OverlordIP: druid的overlord节点ip地址
datasource:json配置文件中定义的datasource名称如果查询的结果是"No Such Datasorce",则说明数据接入没有成功。
如果数据接入成功,那么查询到的结果如下:
该数字为数据条数。关于
sugo-plyql的安装和使用,详见 sugo-plyql 使用文档
注意事项
- 如果task在worker上的分配策略为均分策略,执行
curl命令取消之curl -X 'POST' -H 'Content-Type:application/json' -d '{"selectStrategy":{"type":"specialEqualDistribution"},"autoScaler":null}' http://{overlordIP}:8090/druid/indexer/v1/worker worker:指定具体的worker的address,格式为hostname:port,如dev224.sugo.net:8091spec.dataSchema.granularitySpec.intervals:是数据时间戳范围,不能为空maxRowsPerSegment和概要中提到的numShards是两种不同的segment生成策略,所以不可同时指定,只能二选一spec.tuningConfig.overwrite:设置对datasource进行覆盖写入还是追加写入,默认为falsespec.tuningConfig.reportParseExceptions:是否汇报数据解析错误,默认为falsecontext:任务上下文环境设置,可以不设置context.debug:开启debug模式,调试时开启,生成环境不开启
本流程使用的json配置实例如下:
{
"type": "lucene_index",
"worker": "dev224.sugo.net:8091",
"spec": {
"dataSchema": {
"dataSource": "index-224-cyz113",
"metricsSpec": [],
"parser": {
"parseSpec": {
"format": "csv",
"timestampSpec": {"column": "da","format": "millis"},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{"name": "num","type": "int"},
{"name": "strvalue","type": "string"}
]
},
"listDelimiter": ",",
"columns": ["da","num","strvalue"]
}
},
"granularitySpec": {
"intervals": ["2010-01-01/2018-01-01"],
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"type": "uniform"
}
},
"ioConfig": {
"type":"lucene_index",
"firehose":{
"type": "local",
"filter": "data1.csv",
"baseDir": "/data1/csv/"
}
},
"tuningConfig": {
"type": "lucene_index",
"maxRowsPerSegment": 5000000,
"overwrite": false,
"reportParseExceptions":true
}
},
"context":{
"debug":true
}
}
type:指定数据接入的类型,固定为lucene_indexworker:指定具体的worker的address,一般的形式为hostname:portspec:一些参数说明spec.dataSchema:关于接入数据的概要说明spec.dataSchema.datasource:数据源的名称,类似关系数据库中的表名spec.dataSchema.metricsSpec:所有的指标列和所使用的聚合函数,可以不设置spec.dataSchema.parser:数据转换器spec.dataSchema.parser.parseSpec:数据转换的参数说明spec.dataSchema.parser.parseSpec.format:单条数据的格式spec.dataSchema.parser.parseSpec.timestampSpec:时间戳的说明spec.dataSchema.parser.parseSpec.timestampSpec.column:时间戳的列名spec.dataSchema.parser.parseSpec.timestampSpec.format:时间格式类型, 推荐millis- [ ]
yy-MM-dd HH:mm:ss.SSS:自定义的时间格式 - [ ]
auto:自动识别时间,支持iso和millis格式 - [ ]
iso:iso标准时间格式,如2016-08-03T12:53:51.999Z - [ ]
posix:从1970年1月1日开始所经过的秒数,10位的数字 - [ ]
millis:从1970年1月1日开始所经过的毫秒数,13位数字
- [ ]
spec.dataSchema.parser.parseSpec.dimensionsSpec:维度说明spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensionExclusions:排除在外的维度spec.dataSchema.parser.parseSpec.dimensionsSpec.spatialDimensions:维度的空间说明spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions:维度定义列表,每个维度的格式为:{“name”: “age”, “type”:”string”}。Type支持的类型:string、int、float、long、datespec.dataSchema.parser.parseSpec.dimensionsSpec.listDelimiter:csv列分隔符spec.dataSchema.parser.parseSpec.dimensionsSpec.multiValueDelimiter:csv多值列分隔符spec.dataSchema.parser.parseSpec.dimensionsSpec.columns:维度列表,包含时间戳列,eg:["da","ProductID"]spec.dataSchema.granularitySpec:数据粒度说明spec.dataSchema.granularitySpec.intervals:数据时间戳范围,不能为空,可以指定多个范围spec.dataSchema.granularitySpec.segmentGranularity:段粒度,根据每天的数据量进行设置。 小数据量建议DAY,大数据量(每天百亿)可以选择HOUR。可选项:SECOND、MINUTE、FIVE_MINUTE、TEN_MINUTE、FIFTEEN_MINUTE、HOUR、SIX_HOUR、DAY、MONTH、YEAR。spec.dataSchema.granularitySpec.queryGranularity:查询粒度spec.dataSchema.granularitySpec.type:粒度说明的类型,默认使用uniformspec.ioConfig:数据的IO说明spec.ioConfig.type:固定为lucene_indexspec.ioConfig.firehose:数据源适配器spec.ioConfig.firehose.type:数据源适配器的类型,一般用localspec.ioConfig.firehose.filter:文件名匹配符,如*.csv匹配所有的csv文件spec.ioConfig.firehose.parser:数据转换器,与上面配置的spec.dataSchema.parser一样spec.ioConfig.firehose.baseDir:数据文件所在目录spec.tuningConfig:优化说明spec.tuningConfig.type:固定为lucene_indexspec.tuningConfig.maxRowsPerSegment:每个segment最大的存储行数,默认为5000000