Конвейер корпоративного машинного обучения — обработка данных журнала

алгоритм
Конвейер корпоративного машинного обучения — обработка данных журнала

Обработка исходных данных полного набора инженерных решений машинного обучения, реально используемых в промышленности


Как мы все знаем, платформы машинного обучения, которые мы обычно используем, теперь включают два модуля: автономное обучение и онлайн-оценка. Среди них автономная часть обычно отвечает засопоставление данных журнала,Обработка образцов,Обработка функцийиобучение моделиЖдать. Онлайн-часть включает в себя онлайн в режиме реального времениpredictПроцесс (онлайн-прогноз, также известный как вывод онлайн-моделей). Как показано ниже:

Для относительно зрелой системы мы бытитульная страница(Html, приложение и т. д.) собирают журнал поведения пользователя посредством встраивания SDK, обычно включая различные действия пользователя, такие как открытие экрана пользователя, запрос страницы, экспозиция, щелчок, загрузка, воспроизведение, перезарядка и другие действия. , а также записывать различное поведение.СерверЖурналы возвращаемых данных, например, для рекламной системы, это журналы запросов пользователей и журналы доставки рекламы, среди которых один журнал запросов может соответствовать нескольким записям журнала доставки.

После того, как этот системный журнал будет обработан платформами обработки больших данных, такими как flume, kafka, storm и т. д., он будет сохранен на платформе больших данных в виде таблицы куста или текстового файла hdfs. Как правило, каждый журнал сохраняется в виде строки в файле журнала, охватывающей несколько полей, которые могут однозначно определять поведение пользователя, например временная метка, androidID, imei, userId, requestid, идентификатор страницы доступа пользователя, поведение пользователя и т. д. По сути, это логика, чтобы различать, кто что делает на определенном устройстве в определенное время.

Как правило, для обработки этих журналов мы будем использовать Hive SQL, Spark или Flink. Для данных, хранящихся в таблице куста, мы можем читать данные различными способами для обработки. Вот в основном 3 метода обработки:

  1. hive sql
  2. sparksession sql
  3. spark Rdd

Способ 1, обработка данных журнала — shell + hive SQL

Используйте сценарий оболочки, чтобы управлять hive sql для выполнения инструкции sql, чтобы найти несколько полей исходных данных и записать в фиксированную таблицу hive. Пример кода выглядит следующим образом:


@欢迎关注微信公众号:算法全栈之路
@ filename format_log.sh

#!/bin/bash
source ~/.bashrc
set -x

cur_day=$1
source_table_name=user_xxx_rcv_log
des_table_name=user_xxx_rcv_data
des_table_location="hdfs:/user/base_table/${des_table_name}"

# 如果表不存在则新建表
${HIVE_HOME}/bin/hive  -e "
CREATE EXTERNAL TABLE IF NOT EXISTS ${des_table_name}(${column_name}) PARTITIONED BY (day STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE  LOCATION '${des_table_location}';
ALTER TABLE ${des_table_name} SET SERDEPROPERTIES('serialization.null.format' = '');
ALTER TABLE ${des_table_name} drop partition(day = ${cur_day});
"

# 删除目的地表已有分区
${HADOOP_HOME}/bin/hadoop fs -rm -r -skipTrash ${des_table_location}/day=${cur_day}

$HIVE_HOME/bin/hive  -e " ALTER TABLE ${table_name} drop partition(day = ${cur_day});
ALTER TABLE ${table_name} add partition(day = ${cur_day});
"


# 执行hive sql 写入数据到目的表
${HIVE_HOME}/bin/hive  -e "

set hive.exec.reducers.max = 100;
set hive.execution.engine=mr;
set mapreduce.map.java.opts=-Xmx4096M;
set mapreduce.map.memory.mb=4096;
set mapred.reduce.child.java.opts=-Xmx6g
set mapred.child.java.opts=-Xmx4g
set hive.exec.reducers.max = 100;

insert overwrite table ${des_table_name} partition(day = ${cur_day})
select timestamp,imei,userid,event_tyep from ${source_table_name} where date = ${cur_day}
"

RES=$?
if [ $RES -eq 0 ]
then
	echo "hive job finished!"
	${HADOOP_HOME}/bin/hadoop fs -touchz ${table_location}/day=${cur_day}/_SUCCESS
	exit 0
else
        echo "hive job Error !!!"
	exit -1
fi

Чтобы выполнить приведенный выше сценарий оболочки, вы можете использовать

nohup sh -x format_log.sh 20210701 > rcv.log 2>&1 & 

Способ 2, обработка данных журнала — sparksession sql

Так называемый saprk sql использует сеанс spark для выполнения инструкции sql для завершения обработки данных и сохранения данных в текстовый файл hdfs.

**talk is cheap, show the code !!! **

Здесь сценарий оболочки используется для отправки задачи scala spark на обработку. Спарк-код scala выглядит следующим образом:

@ 欢迎关注微信公众号:算法全栈之路
@ filename format_log.scala

package Data

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object LogMiddleDataGenerate {
  def main(args: Array[String]) {
    val Array(event_day,all_logdata_path) = args
    val sparkConf = new SparkConf()
    val sparkSession = SparkSession.builder()
    .appName("LogMiddleDataGenerate")
    .config(sparkConf)
    .config("spark.kryoserializer.buffer.max", "1024m")
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
   val sc = sparkSession.sparkContext
   val day = event_day
    // sql 语句
    val all_log_sql = " select timestamp,imei,userid,event_tyep from ${source_table_name} where date  '"+ day +"'"
    val all_log_df = sparkSession.sql(all_log_sql).distinct()
      .rdd
      .map(e=>{
        e.mkString("\t")
      }).persist(StorageLevel.MEMORY_AND_DISK)
 val outputPartNum = math.ceil(all_log_df.count() / 400000).toInt
    all_log_df.repartition(outputPartNum).saveAsTextFile(all_logdata_path)
all_log_df.unpersist()
  }
}

Чтобы выполнить задачу искры апелляции scala, вам необходимо ввести приведенный выше код в пакет jar и использовать следующий код, чтобы отправить задачу искры для задачи.

@ 欢迎关注微信公众号:算法全栈之路
@ filename spark_log_format.sh

#!/bin/sh
source ~/.bash_profile

set -x

mvn clean package -U || exit
echo "current working dir: $(pwd)"

day=`date -d "2 day ago" +%Y%m%d`
[ $# -ge 1 ] && day=$1

all_logdata_path=hdfs://dependy_data/all_logdata/${day}

JAR_PATH=./target/fclpc-1.0-SNAPSHOT.jar
class=Data.LogMiddleDataGenerate

${SPARK23_HOME}/bin/spark-submit \
    --master yarn  \
    --deploy-mode cluster \
    --class ${class}  \
    --driver-memory 10G \
    --executor-memory 6G  \
    --conf spark.driver.maxResultSize=8G \
    --conf spark.yarn.priority=VERY_HIGH \
    --conf spark.sql.hive.convertMetastoreParquet=false\
    --conf spark.sql.hive.convertMetastoreOrc=false\
    --conf spark.sql.hive.metastorePartitionPruning=false \
    ${JAR_PATH} \
    ${day}\
    ${all_logdata_path}\

Чтобы выполнить приведенный выше сценарий оболочки, вы можете использовать

nohup sh -x spark_log_format.sh 20210701 > spark.log 2>&1 & 

Способ 3, обработка данных лога - spark Rdd

Разница между использованием spark rdd и использованием sparksession sql выше не очень велика, но интерфейс sparkcontext используется для непосредственного чтения файла, хранящегося в кластере Hdfs, а интерфейс sc.textFile() используется для чтения файла. Он мало чем отличается от описанного выше и не будет здесь подробно описываться.


На этом этапе был представлен весь исходный процесс обработки данных. Вы можете использовать описанный выше метод, чтобы переписать код, который подходит вашему бизнесу. Если вы найдете его полезным, пожалуйста, поставьте лайк и поделитесь им ~

Добро пожаловать, чтобы отсканировать код и подписаться на официальный аккаунт автора: Путь к полному стеку алгоритмов.